]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: make rbd spec names pointer to const
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         const char      *pool_name;
142
143         const char      *image_id;
144         const char      *image_name;
145
146         u64             snap_id;
147         const char      *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221         struct page             **copyup_pages;
222
223         struct ceph_osd_request *osd_req;
224
225         u64                     xferred;        /* bytes transferred */
226         u64                     version;
227         int                     result;
228
229         rbd_obj_callback_t      callback;
230         struct completion       completion;
231
232         struct kref             kref;
233 };
234
235 enum img_req_flags {
236         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
237         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
238         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
239 };
240
241 struct rbd_img_request {
242         struct rbd_device       *rbd_dev;
243         u64                     offset; /* starting image byte offset */
244         u64                     length; /* byte count from offset */
245         unsigned long           flags;
246         union {
247                 u64                     snap_id;        /* for reads */
248                 struct ceph_snap_context *snapc;        /* for writes */
249         };
250         union {
251                 struct request          *rq;            /* block request */
252                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
253         };
254         struct page             **copyup_pages;
255         spinlock_t              completion_lock;/* protects next_completion */
256         u32                     next_completion;
257         rbd_img_callback_t      callback;
258         u64                     xferred;/* aggregate bytes transferred */
259         int                     result; /* first nonzero obj_request result */
260
261         u32                     obj_request_count;
262         struct list_head        obj_requests;   /* rbd_obj_request structs */
263
264         struct kref             kref;
265 };
266
267 #define for_each_obj_request(ireq, oreq) \
268         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_from(ireq, oreq) \
270         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_safe(ireq, oreq, n) \
272         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
273
274 struct rbd_snap {
275         const char              *name;
276         u64                     size;
277         struct list_head        node;
278         u64                     id;
279         u64                     features;
280 };
281
282 struct rbd_mapping {
283         u64                     size;
284         u64                     features;
285         bool                    read_only;
286 };
287
288 /*
289  * a single device
290  */
291 struct rbd_device {
292         int                     dev_id;         /* blkdev unique id */
293
294         int                     major;          /* blkdev assigned major */
295         struct gendisk          *disk;          /* blkdev's gendisk and rq */
296
297         u32                     image_format;   /* Either 1 or 2 */
298         struct rbd_client       *rbd_client;
299
300         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
301
302         spinlock_t              lock;           /* queue, flags, open_count */
303
304         struct rbd_image_header header;
305         unsigned long           flags;          /* possibly lock protected */
306         struct rbd_spec         *spec;
307
308         char                    *header_name;
309
310         struct ceph_file_layout layout;
311
312         struct ceph_osd_event   *watch_event;
313         struct rbd_obj_request  *watch_request;
314
315         struct rbd_spec         *parent_spec;
316         u64                     parent_overlap;
317         struct rbd_device       *parent;
318
319         u64                     stripe_unit;
320         u64                     stripe_count;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680         if (rbdc)
681                 kref_put(&rbdc->kref, rbd_client_release);
682 }
683
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686         return image_format == 1 || image_format == 2;
687 }
688
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691         size_t size;
692         u32 snap_count;
693
694         /* The header has to start with the magic rbd header text */
695         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696                 return false;
697
698         /* The bio layer requires at least sector-sized I/O */
699
700         if (ondisk->options.order < SECTOR_SHIFT)
701                 return false;
702
703         /* If we use u64 in a few spots we may be able to loosen this */
704
705         if (ondisk->options.order > 8 * sizeof (int) - 1)
706                 return false;
707
708         /*
709          * The size of a snapshot header has to fit in a size_t, and
710          * that limits the number of snapshots.
711          */
712         snap_count = le32_to_cpu(ondisk->snap_count);
713         size = SIZE_MAX - sizeof (struct ceph_snap_context);
714         if (snap_count > size / sizeof (__le64))
715                 return false;
716
717         /*
718          * Not only that, but the size of the entire the snapshot
719          * header must also be representable in a size_t.
720          */
721         size -= snap_count * sizeof (__le64);
722         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723                 return false;
724
725         return true;
726 }
727
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733                                  struct rbd_image_header_ondisk *ondisk)
734 {
735         u32 snap_count;
736         size_t len;
737         size_t size;
738         u32 i;
739
740         memset(header, 0, sizeof (*header));
741
742         snap_count = le32_to_cpu(ondisk->snap_count);
743
744         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746         if (!header->object_prefix)
747                 return -ENOMEM;
748         memcpy(header->object_prefix, ondisk->object_prefix, len);
749         header->object_prefix[len] = '\0';
750
751         if (snap_count) {
752                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754                 /* Save a copy of the snapshot names */
755
756                 if (snap_names_len > (u64) SIZE_MAX)
757                         return -EIO;
758                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759                 if (!header->snap_names)
760                         goto out_err;
761                 /*
762                  * Note that rbd_dev_v1_header_read() guarantees
763                  * the ondisk buffer we're working with has
764                  * snap_names_len bytes beyond the end of the
765                  * snapshot id array, this memcpy() is safe.
766                  */
767                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768                         snap_names_len);
769
770                 /* Record each snapshot's size */
771
772                 size = snap_count * sizeof (*header->snap_sizes);
773                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774                 if (!header->snap_sizes)
775                         goto out_err;
776                 for (i = 0; i < snap_count; i++)
777                         header->snap_sizes[i] =
778                                 le64_to_cpu(ondisk->snaps[i].image_size);
779         } else {
780                 WARN_ON(ondisk->snap_names_len);
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793         size = sizeof (struct ceph_snap_context);
794         size += snap_count * sizeof (header->snapc->snaps[0]);
795         header->snapc = kzalloc(size, GFP_KERNEL);
796         if (!header->snapc)
797                 goto out_err;
798
799         atomic_set(&header->snapc->nref, 1);
800         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
801         header->snapc->num_snaps = snap_count;
802         for (i = 0; i < snap_count; i++)
803                 header->snapc->snaps[i] =
804                         le64_to_cpu(ondisk->snaps[i].id);
805
806         return 0;
807
808 out_err:
809         kfree(header->snap_sizes);
810         header->snap_sizes = NULL;
811         kfree(header->snap_names);
812         header->snap_names = NULL;
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815
816         return -ENOMEM;
817 }
818
819 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
820 {
821         struct rbd_snap *snap;
822
823         if (snap_id == CEPH_NOSNAP)
824                 return RBD_SNAP_HEAD_NAME;
825
826         list_for_each_entry(snap, &rbd_dev->snaps, node)
827                 if (snap_id == snap->id)
828                         return snap->name;
829
830         return NULL;
831 }
832
833 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
834                                         const char *snap_name)
835 {
836         struct rbd_snap *snap;
837
838         list_for_each_entry(snap, &rbd_dev->snaps, node)
839                 if (!strcmp(snap_name, snap->name))
840                         return snap;
841
842         return NULL;
843 }
844
845 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
846 {
847         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
848                     sizeof (RBD_SNAP_HEAD_NAME))) {
849                 rbd_dev->mapping.size = rbd_dev->header.image_size;
850                 rbd_dev->mapping.features = rbd_dev->header.features;
851         } else {
852                 struct rbd_snap *snap;
853
854                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
855                 if (!snap)
856                         return -ENOENT;
857                 rbd_dev->mapping.size = snap->size;
858                 rbd_dev->mapping.features = snap->features;
859                 rbd_dev->mapping.read_only = true;
860         }
861         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
862
863         return 0;
864 }
865
866 static void rbd_header_free(struct rbd_image_header *header)
867 {
868         kfree(header->object_prefix);
869         header->object_prefix = NULL;
870         kfree(header->snap_sizes);
871         header->snap_sizes = NULL;
872         kfree(header->snap_names);
873         header->snap_names = NULL;
874         ceph_put_snap_context(header->snapc);
875         header->snapc = NULL;
876 }
877
878 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
879 {
880         char *name;
881         u64 segment;
882         int ret;
883
884         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
885         if (!name)
886                 return NULL;
887         segment = offset >> rbd_dev->header.obj_order;
888         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
889                         rbd_dev->header.object_prefix, segment);
890         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
891                 pr_err("error formatting segment name for #%llu (%d)\n",
892                         segment, ret);
893                 kfree(name);
894                 name = NULL;
895         }
896
897         return name;
898 }
899
900 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
901 {
902         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
903
904         return offset & (segment_size - 1);
905 }
906
907 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
908                                 u64 offset, u64 length)
909 {
910         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
911
912         offset &= segment_size - 1;
913
914         rbd_assert(length <= U64_MAX - offset);
915         if (offset + length > segment_size)
916                 length = segment_size - offset;
917
918         return length;
919 }
920
921 /*
922  * returns the size of an object in the image
923  */
924 static u64 rbd_obj_bytes(struct rbd_image_header *header)
925 {
926         return 1 << header->obj_order;
927 }
928
929 /*
930  * bio helpers
931  */
932
933 static void bio_chain_put(struct bio *chain)
934 {
935         struct bio *tmp;
936
937         while (chain) {
938                 tmp = chain;
939                 chain = chain->bi_next;
940                 bio_put(tmp);
941         }
942 }
943
944 /*
945  * zeros a bio chain, starting at specific offset
946  */
947 static void zero_bio_chain(struct bio *chain, int start_ofs)
948 {
949         struct bio_vec *bv;
950         unsigned long flags;
951         void *buf;
952         int i;
953         int pos = 0;
954
955         while (chain) {
956                 bio_for_each_segment(bv, chain, i) {
957                         if (pos + bv->bv_len > start_ofs) {
958                                 int remainder = max(start_ofs - pos, 0);
959                                 buf = bvec_kmap_irq(bv, &flags);
960                                 memset(buf + remainder, 0,
961                                        bv->bv_len - remainder);
962                                 bvec_kunmap_irq(buf, &flags);
963                         }
964                         pos += bv->bv_len;
965                 }
966
967                 chain = chain->bi_next;
968         }
969 }
970
971 /*
972  * similar to zero_bio_chain(), zeros data defined by a page array,
973  * starting at the given byte offset from the start of the array and
974  * continuing up to the given end offset.  The pages array is
975  * assumed to be big enough to hold all bytes up to the end.
976  */
977 static void zero_pages(struct page **pages, u64 offset, u64 end)
978 {
979         struct page **page = &pages[offset >> PAGE_SHIFT];
980
981         rbd_assert(end > offset);
982         rbd_assert(end - offset <= (u64)SIZE_MAX);
983         while (offset < end) {
984                 size_t page_offset;
985                 size_t length;
986                 unsigned long flags;
987                 void *kaddr;
988
989                 page_offset = (size_t)(offset & ~PAGE_MASK);
990                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
991                 local_irq_save(flags);
992                 kaddr = kmap_atomic(*page);
993                 memset(kaddr + page_offset, 0, length);
994                 kunmap_atomic(kaddr);
995                 local_irq_restore(flags);
996
997                 offset += length;
998                 page++;
999         }
1000 }
1001
1002 /*
1003  * Clone a portion of a bio, starting at the given byte offset
1004  * and continuing for the number of bytes indicated.
1005  */
1006 static struct bio *bio_clone_range(struct bio *bio_src,
1007                                         unsigned int offset,
1008                                         unsigned int len,
1009                                         gfp_t gfpmask)
1010 {
1011         struct bio_vec *bv;
1012         unsigned int resid;
1013         unsigned short idx;
1014         unsigned int voff;
1015         unsigned short end_idx;
1016         unsigned short vcnt;
1017         struct bio *bio;
1018
1019         /* Handle the easy case for the caller */
1020
1021         if (!offset && len == bio_src->bi_size)
1022                 return bio_clone(bio_src, gfpmask);
1023
1024         if (WARN_ON_ONCE(!len))
1025                 return NULL;
1026         if (WARN_ON_ONCE(len > bio_src->bi_size))
1027                 return NULL;
1028         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1029                 return NULL;
1030
1031         /* Find first affected segment... */
1032
1033         resid = offset;
1034         __bio_for_each_segment(bv, bio_src, idx, 0) {
1035                 if (resid < bv->bv_len)
1036                         break;
1037                 resid -= bv->bv_len;
1038         }
1039         voff = resid;
1040
1041         /* ...and the last affected segment */
1042
1043         resid += len;
1044         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1045                 if (resid <= bv->bv_len)
1046                         break;
1047                 resid -= bv->bv_len;
1048         }
1049         vcnt = end_idx - idx + 1;
1050
1051         /* Build the clone */
1052
1053         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1054         if (!bio)
1055                 return NULL;    /* ENOMEM */
1056
1057         bio->bi_bdev = bio_src->bi_bdev;
1058         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1059         bio->bi_rw = bio_src->bi_rw;
1060         bio->bi_flags |= 1 << BIO_CLONED;
1061
1062         /*
1063          * Copy over our part of the bio_vec, then update the first
1064          * and last (or only) entries.
1065          */
1066         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1067                         vcnt * sizeof (struct bio_vec));
1068         bio->bi_io_vec[0].bv_offset += voff;
1069         if (vcnt > 1) {
1070                 bio->bi_io_vec[0].bv_len -= voff;
1071                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1072         } else {
1073                 bio->bi_io_vec[0].bv_len = len;
1074         }
1075
1076         bio->bi_vcnt = vcnt;
1077         bio->bi_size = len;
1078         bio->bi_idx = 0;
1079
1080         return bio;
1081 }
1082
1083 /*
1084  * Clone a portion of a bio chain, starting at the given byte offset
1085  * into the first bio in the source chain and continuing for the
1086  * number of bytes indicated.  The result is another bio chain of
1087  * exactly the given length, or a null pointer on error.
1088  *
1089  * The bio_src and offset parameters are both in-out.  On entry they
1090  * refer to the first source bio and the offset into that bio where
1091  * the start of data to be cloned is located.
1092  *
1093  * On return, bio_src is updated to refer to the bio in the source
1094  * chain that contains first un-cloned byte, and *offset will
1095  * contain the offset of that byte within that bio.
1096  */
1097 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1098                                         unsigned int *offset,
1099                                         unsigned int len,
1100                                         gfp_t gfpmask)
1101 {
1102         struct bio *bi = *bio_src;
1103         unsigned int off = *offset;
1104         struct bio *chain = NULL;
1105         struct bio **end;
1106
1107         /* Build up a chain of clone bios up to the limit */
1108
1109         if (!bi || off >= bi->bi_size || !len)
1110                 return NULL;            /* Nothing to clone */
1111
1112         end = &chain;
1113         while (len) {
1114                 unsigned int bi_size;
1115                 struct bio *bio;
1116
1117                 if (!bi) {
1118                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1119                         goto out_err;   /* EINVAL; ran out of bio's */
1120                 }
1121                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1122                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1123                 if (!bio)
1124                         goto out_err;   /* ENOMEM */
1125
1126                 *end = bio;
1127                 end = &bio->bi_next;
1128
1129                 off += bi_size;
1130                 if (off == bi->bi_size) {
1131                         bi = bi->bi_next;
1132                         off = 0;
1133                 }
1134                 len -= bi_size;
1135         }
1136         *bio_src = bi;
1137         *offset = off;
1138
1139         return chain;
1140 out_err:
1141         bio_chain_put(chain);
1142
1143         return NULL;
1144 }
1145
1146 /*
1147  * The default/initial value for all object request flags is 0.  For
1148  * each flag, once its value is set to 1 it is never reset to 0
1149  * again.
1150  */
1151 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1152 {
1153         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1154                 struct rbd_device *rbd_dev;
1155
1156                 rbd_dev = obj_request->img_request->rbd_dev;
1157                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1158                         obj_request);
1159         }
1160 }
1161
1162 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1163 {
1164         smp_mb();
1165         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1166 }
1167
1168 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1169 {
1170         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1171                 struct rbd_device *rbd_dev = NULL;
1172
1173                 if (obj_request_img_data_test(obj_request))
1174                         rbd_dev = obj_request->img_request->rbd_dev;
1175                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1176                         obj_request);
1177         }
1178 }
1179
1180 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1181 {
1182         smp_mb();
1183         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1184 }
1185
1186 /*
1187  * This sets the KNOWN flag after (possibly) setting the EXISTS
1188  * flag.  The latter is set based on the "exists" value provided.
1189  *
1190  * Note that for our purposes once an object exists it never goes
1191  * away again.  It's possible that the response from two existence
1192  * checks are separated by the creation of the target object, and
1193  * the first ("doesn't exist") response arrives *after* the second
1194  * ("does exist").  In that case we ignore the second one.
1195  */
1196 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1197                                 bool exists)
1198 {
1199         if (exists)
1200                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1201         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1202         smp_mb();
1203 }
1204
1205 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1206 {
1207         smp_mb();
1208         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1209 }
1210
1211 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1212 {
1213         smp_mb();
1214         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1215 }
1216
1217 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1218 {
1219         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1220                 atomic_read(&obj_request->kref.refcount));
1221         kref_get(&obj_request->kref);
1222 }
1223
1224 static void rbd_obj_request_destroy(struct kref *kref);
1225 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1226 {
1227         rbd_assert(obj_request != NULL);
1228         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1229                 atomic_read(&obj_request->kref.refcount));
1230         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1231 }
1232
1233 static void rbd_img_request_get(struct rbd_img_request *img_request)
1234 {
1235         dout("%s: img %p (was %d)\n", __func__, img_request,
1236                 atomic_read(&img_request->kref.refcount));
1237         kref_get(&img_request->kref);
1238 }
1239
1240 static void rbd_img_request_destroy(struct kref *kref);
1241 static void rbd_img_request_put(struct rbd_img_request *img_request)
1242 {
1243         rbd_assert(img_request != NULL);
1244         dout("%s: img %p (was %d)\n", __func__, img_request,
1245                 atomic_read(&img_request->kref.refcount));
1246         kref_put(&img_request->kref, rbd_img_request_destroy);
1247 }
1248
1249 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1250                                         struct rbd_obj_request *obj_request)
1251 {
1252         rbd_assert(obj_request->img_request == NULL);
1253
1254         /* Image request now owns object's original reference */
1255         obj_request->img_request = img_request;
1256         obj_request->which = img_request->obj_request_count;
1257         rbd_assert(!obj_request_img_data_test(obj_request));
1258         obj_request_img_data_set(obj_request);
1259         rbd_assert(obj_request->which != BAD_WHICH);
1260         img_request->obj_request_count++;
1261         list_add_tail(&obj_request->links, &img_request->obj_requests);
1262         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1263                 obj_request->which);
1264 }
1265
1266 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1267                                         struct rbd_obj_request *obj_request)
1268 {
1269         rbd_assert(obj_request->which != BAD_WHICH);
1270
1271         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1272                 obj_request->which);
1273         list_del(&obj_request->links);
1274         rbd_assert(img_request->obj_request_count > 0);
1275         img_request->obj_request_count--;
1276         rbd_assert(obj_request->which == img_request->obj_request_count);
1277         obj_request->which = BAD_WHICH;
1278         rbd_assert(obj_request_img_data_test(obj_request));
1279         rbd_assert(obj_request->img_request == img_request);
1280         obj_request->img_request = NULL;
1281         obj_request->callback = NULL;
1282         rbd_obj_request_put(obj_request);
1283 }
1284
1285 static bool obj_request_type_valid(enum obj_request_type type)
1286 {
1287         switch (type) {
1288         case OBJ_REQUEST_NODATA:
1289         case OBJ_REQUEST_BIO:
1290         case OBJ_REQUEST_PAGES:
1291                 return true;
1292         default:
1293                 return false;
1294         }
1295 }
1296
1297 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1298                                 struct rbd_obj_request *obj_request)
1299 {
1300         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1301
1302         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1303 }
1304
1305 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1306 {
1307
1308         dout("%s: img %p\n", __func__, img_request);
1309
1310         /*
1311          * If no error occurred, compute the aggregate transfer
1312          * count for the image request.  We could instead use
1313          * atomic64_cmpxchg() to update it as each object request
1314          * completes; not clear which way is better off hand.
1315          */
1316         if (!img_request->result) {
1317                 struct rbd_obj_request *obj_request;
1318                 u64 xferred = 0;
1319
1320                 for_each_obj_request(img_request, obj_request)
1321                         xferred += obj_request->xferred;
1322                 img_request->xferred = xferred;
1323         }
1324
1325         if (img_request->callback)
1326                 img_request->callback(img_request);
1327         else
1328                 rbd_img_request_put(img_request);
1329 }
1330
1331 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1332
1333 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1334 {
1335         dout("%s: obj %p\n", __func__, obj_request);
1336
1337         return wait_for_completion_interruptible(&obj_request->completion);
1338 }
1339
1340 /*
1341  * The default/initial value for all image request flags is 0.  Each
1342  * is conditionally set to 1 at image request initialization time
1343  * and currently never change thereafter.
1344  */
1345 static void img_request_write_set(struct rbd_img_request *img_request)
1346 {
1347         set_bit(IMG_REQ_WRITE, &img_request->flags);
1348         smp_mb();
1349 }
1350
1351 static bool img_request_write_test(struct rbd_img_request *img_request)
1352 {
1353         smp_mb();
1354         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1355 }
1356
1357 static void img_request_child_set(struct rbd_img_request *img_request)
1358 {
1359         set_bit(IMG_REQ_CHILD, &img_request->flags);
1360         smp_mb();
1361 }
1362
1363 static bool img_request_child_test(struct rbd_img_request *img_request)
1364 {
1365         smp_mb();
1366         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1367 }
1368
1369 static void img_request_layered_set(struct rbd_img_request *img_request)
1370 {
1371         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1372         smp_mb();
1373 }
1374
1375 static bool img_request_layered_test(struct rbd_img_request *img_request)
1376 {
1377         smp_mb();
1378         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1379 }
1380
1381 static void
1382 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1383 {
1384         u64 xferred = obj_request->xferred;
1385         u64 length = obj_request->length;
1386
1387         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1388                 obj_request, obj_request->img_request, obj_request->result,
1389                 xferred, length);
1390         /*
1391          * ENOENT means a hole in the image.  We zero-fill the
1392          * entire length of the request.  A short read also implies
1393          * zero-fill to the end of the request.  Either way we
1394          * update the xferred count to indicate the whole request
1395          * was satisfied.
1396          */
1397         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1398         if (obj_request->result == -ENOENT) {
1399                 if (obj_request->type == OBJ_REQUEST_BIO)
1400                         zero_bio_chain(obj_request->bio_list, 0);
1401                 else
1402                         zero_pages(obj_request->pages, 0, length);
1403                 obj_request->result = 0;
1404                 obj_request->xferred = length;
1405         } else if (xferred < length && !obj_request->result) {
1406                 if (obj_request->type == OBJ_REQUEST_BIO)
1407                         zero_bio_chain(obj_request->bio_list, xferred);
1408                 else
1409                         zero_pages(obj_request->pages, xferred, length);
1410                 obj_request->xferred = length;
1411         }
1412         obj_request_done_set(obj_request);
1413 }
1414
1415 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1416 {
1417         dout("%s: obj %p cb %p\n", __func__, obj_request,
1418                 obj_request->callback);
1419         if (obj_request->callback)
1420                 obj_request->callback(obj_request);
1421         else
1422                 complete_all(&obj_request->completion);
1423 }
1424
1425 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1426 {
1427         dout("%s: obj %p\n", __func__, obj_request);
1428         obj_request_done_set(obj_request);
1429 }
1430
1431 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1432 {
1433         struct rbd_img_request *img_request = NULL;
1434         struct rbd_device *rbd_dev = NULL;
1435         bool layered = false;
1436
1437         if (obj_request_img_data_test(obj_request)) {
1438                 img_request = obj_request->img_request;
1439                 layered = img_request && img_request_layered_test(img_request);
1440                 rbd_dev = img_request->rbd_dev;
1441         }
1442
1443         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1444                 obj_request, img_request, obj_request->result,
1445                 obj_request->xferred, obj_request->length);
1446         if (layered && obj_request->result == -ENOENT &&
1447                         obj_request->img_offset < rbd_dev->parent_overlap)
1448                 rbd_img_parent_read(obj_request);
1449         else if (img_request)
1450                 rbd_img_obj_request_read_callback(obj_request);
1451         else
1452                 obj_request_done_set(obj_request);
1453 }
1454
1455 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1458                 obj_request->result, obj_request->length);
1459         /*
1460          * There is no such thing as a successful short write.  Set
1461          * it to our originally-requested length.
1462          */
1463         obj_request->xferred = obj_request->length;
1464         obj_request_done_set(obj_request);
1465 }
1466
1467 /*
1468  * For a simple stat call there's nothing to do.  We'll do more if
1469  * this is part of a write sequence for a layered image.
1470  */
1471 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1472 {
1473         dout("%s: obj %p\n", __func__, obj_request);
1474         obj_request_done_set(obj_request);
1475 }
1476
1477 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1478                                 struct ceph_msg *msg)
1479 {
1480         struct rbd_obj_request *obj_request = osd_req->r_priv;
1481         u16 opcode;
1482
1483         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1484         rbd_assert(osd_req == obj_request->osd_req);
1485         if (obj_request_img_data_test(obj_request)) {
1486                 rbd_assert(obj_request->img_request);
1487                 rbd_assert(obj_request->which != BAD_WHICH);
1488         } else {
1489                 rbd_assert(obj_request->which == BAD_WHICH);
1490         }
1491
1492         if (osd_req->r_result < 0)
1493                 obj_request->result = osd_req->r_result;
1494         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1495
1496         BUG_ON(osd_req->r_num_ops > 2);
1497
1498         /*
1499          * We support a 64-bit length, but ultimately it has to be
1500          * passed to blk_end_request(), which takes an unsigned int.
1501          */
1502         obj_request->xferred = osd_req->r_reply_op_len[0];
1503         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1504         opcode = osd_req->r_ops[0].op;
1505         switch (opcode) {
1506         case CEPH_OSD_OP_READ:
1507                 rbd_osd_read_callback(obj_request);
1508                 break;
1509         case CEPH_OSD_OP_WRITE:
1510                 rbd_osd_write_callback(obj_request);
1511                 break;
1512         case CEPH_OSD_OP_STAT:
1513                 rbd_osd_stat_callback(obj_request);
1514                 break;
1515         case CEPH_OSD_OP_CALL:
1516         case CEPH_OSD_OP_NOTIFY_ACK:
1517         case CEPH_OSD_OP_WATCH:
1518                 rbd_osd_trivial_callback(obj_request);
1519                 break;
1520         default:
1521                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1522                         obj_request->object_name, (unsigned short) opcode);
1523                 break;
1524         }
1525
1526         if (obj_request_done_test(obj_request))
1527                 rbd_obj_request_complete(obj_request);
1528 }
1529
1530 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1531 {
1532         struct rbd_img_request *img_request = obj_request->img_request;
1533         struct ceph_osd_request *osd_req = obj_request->osd_req;
1534         u64 snap_id;
1535
1536         rbd_assert(osd_req != NULL);
1537
1538         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1539         ceph_osdc_build_request(osd_req, obj_request->offset,
1540                         NULL, snap_id, NULL);
1541 }
1542
1543 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1544 {
1545         struct rbd_img_request *img_request = obj_request->img_request;
1546         struct ceph_osd_request *osd_req = obj_request->osd_req;
1547         struct ceph_snap_context *snapc;
1548         struct timespec mtime = CURRENT_TIME;
1549
1550         rbd_assert(osd_req != NULL);
1551
1552         snapc = img_request ? img_request->snapc : NULL;
1553         ceph_osdc_build_request(osd_req, obj_request->offset,
1554                         snapc, CEPH_NOSNAP, &mtime);
1555 }
1556
1557 static struct ceph_osd_request *rbd_osd_req_create(
1558                                         struct rbd_device *rbd_dev,
1559                                         bool write_request,
1560                                         struct rbd_obj_request *obj_request)
1561 {
1562         struct ceph_snap_context *snapc = NULL;
1563         struct ceph_osd_client *osdc;
1564         struct ceph_osd_request *osd_req;
1565
1566         if (obj_request_img_data_test(obj_request)) {
1567                 struct rbd_img_request *img_request = obj_request->img_request;
1568
1569                 rbd_assert(write_request ==
1570                                 img_request_write_test(img_request));
1571                 if (write_request)
1572                         snapc = img_request->snapc;
1573         }
1574
1575         /* Allocate and initialize the request, for the single op */
1576
1577         osdc = &rbd_dev->rbd_client->client->osdc;
1578         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1579         if (!osd_req)
1580                 return NULL;    /* ENOMEM */
1581
1582         if (write_request)
1583                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1584         else
1585                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1586
1587         osd_req->r_callback = rbd_osd_req_callback;
1588         osd_req->r_priv = obj_request;
1589
1590         osd_req->r_oid_len = strlen(obj_request->object_name);
1591         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1592         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1593
1594         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1595
1596         return osd_req;
1597 }
1598
1599 /*
1600  * Create a copyup osd request based on the information in the
1601  * object request supplied.  A copyup request has two osd ops,
1602  * a copyup method call, and a "normal" write request.
1603  */
1604 static struct ceph_osd_request *
1605 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1606 {
1607         struct rbd_img_request *img_request;
1608         struct ceph_snap_context *snapc;
1609         struct rbd_device *rbd_dev;
1610         struct ceph_osd_client *osdc;
1611         struct ceph_osd_request *osd_req;
1612
1613         rbd_assert(obj_request_img_data_test(obj_request));
1614         img_request = obj_request->img_request;
1615         rbd_assert(img_request);
1616         rbd_assert(img_request_write_test(img_request));
1617
1618         /* Allocate and initialize the request, for the two ops */
1619
1620         snapc = img_request->snapc;
1621         rbd_dev = img_request->rbd_dev;
1622         osdc = &rbd_dev->rbd_client->client->osdc;
1623         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1624         if (!osd_req)
1625                 return NULL;    /* ENOMEM */
1626
1627         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1628         osd_req->r_callback = rbd_osd_req_callback;
1629         osd_req->r_priv = obj_request;
1630
1631         osd_req->r_oid_len = strlen(obj_request->object_name);
1632         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1633         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1634
1635         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1636
1637         return osd_req;
1638 }
1639
1640
1641 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1642 {
1643         ceph_osdc_put_request(osd_req);
1644 }
1645
1646 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1647
1648 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1649                                                 u64 offset, u64 length,
1650                                                 enum obj_request_type type)
1651 {
1652         struct rbd_obj_request *obj_request;
1653         size_t size;
1654         char *name;
1655
1656         rbd_assert(obj_request_type_valid(type));
1657
1658         size = strlen(object_name) + 1;
1659         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1660         if (!obj_request)
1661                 return NULL;
1662
1663         name = (char *)(obj_request + 1);
1664         obj_request->object_name = memcpy(name, object_name, size);
1665         obj_request->offset = offset;
1666         obj_request->length = length;
1667         obj_request->flags = 0;
1668         obj_request->which = BAD_WHICH;
1669         obj_request->type = type;
1670         INIT_LIST_HEAD(&obj_request->links);
1671         init_completion(&obj_request->completion);
1672         kref_init(&obj_request->kref);
1673
1674         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1675                 offset, length, (int)type, obj_request);
1676
1677         return obj_request;
1678 }
1679
1680 static void rbd_obj_request_destroy(struct kref *kref)
1681 {
1682         struct rbd_obj_request *obj_request;
1683
1684         obj_request = container_of(kref, struct rbd_obj_request, kref);
1685
1686         dout("%s: obj %p\n", __func__, obj_request);
1687
1688         rbd_assert(obj_request->img_request == NULL);
1689         rbd_assert(obj_request->which == BAD_WHICH);
1690
1691         if (obj_request->osd_req)
1692                 rbd_osd_req_destroy(obj_request->osd_req);
1693
1694         rbd_assert(obj_request_type_valid(obj_request->type));
1695         switch (obj_request->type) {
1696         case OBJ_REQUEST_NODATA:
1697                 break;          /* Nothing to do */
1698         case OBJ_REQUEST_BIO:
1699                 if (obj_request->bio_list)
1700                         bio_chain_put(obj_request->bio_list);
1701                 break;
1702         case OBJ_REQUEST_PAGES:
1703                 if (obj_request->pages)
1704                         ceph_release_page_vector(obj_request->pages,
1705                                                 obj_request->page_count);
1706                 break;
1707         }
1708
1709         kfree(obj_request);
1710 }
1711
1712 /*
1713  * Caller is responsible for filling in the list of object requests
1714  * that comprises the image request, and the Linux request pointer
1715  * (if there is one).
1716  */
1717 static struct rbd_img_request *rbd_img_request_create(
1718                                         struct rbd_device *rbd_dev,
1719                                         u64 offset, u64 length,
1720                                         bool write_request,
1721                                         bool child_request)
1722 {
1723         struct rbd_img_request *img_request;
1724         struct ceph_snap_context *snapc = NULL;
1725
1726         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1727         if (!img_request)
1728                 return NULL;
1729
1730         if (write_request) {
1731                 down_read(&rbd_dev->header_rwsem);
1732                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1733                 up_read(&rbd_dev->header_rwsem);
1734                 if (WARN_ON(!snapc)) {
1735                         kfree(img_request);
1736                         return NULL;    /* Shouldn't happen */
1737                 }
1738
1739         }
1740
1741         img_request->rq = NULL;
1742         img_request->rbd_dev = rbd_dev;
1743         img_request->offset = offset;
1744         img_request->length = length;
1745         img_request->flags = 0;
1746         if (write_request) {
1747                 img_request_write_set(img_request);
1748                 img_request->snapc = snapc;
1749         } else {
1750                 img_request->snap_id = rbd_dev->spec->snap_id;
1751         }
1752         if (child_request)
1753                 img_request_child_set(img_request);
1754         if (rbd_dev->parent_spec)
1755                 img_request_layered_set(img_request);
1756         spin_lock_init(&img_request->completion_lock);
1757         img_request->next_completion = 0;
1758         img_request->callback = NULL;
1759         img_request->result = 0;
1760         img_request->obj_request_count = 0;
1761         INIT_LIST_HEAD(&img_request->obj_requests);
1762         kref_init(&img_request->kref);
1763
1764         rbd_img_request_get(img_request);       /* Avoid a warning */
1765         rbd_img_request_put(img_request);       /* TEMPORARY */
1766
1767         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1768                 write_request ? "write" : "read", offset, length,
1769                 img_request);
1770
1771         return img_request;
1772 }
1773
1774 static void rbd_img_request_destroy(struct kref *kref)
1775 {
1776         struct rbd_img_request *img_request;
1777         struct rbd_obj_request *obj_request;
1778         struct rbd_obj_request *next_obj_request;
1779
1780         img_request = container_of(kref, struct rbd_img_request, kref);
1781
1782         dout("%s: img %p\n", __func__, img_request);
1783
1784         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1785                 rbd_img_obj_request_del(img_request, obj_request);
1786         rbd_assert(img_request->obj_request_count == 0);
1787
1788         if (img_request_write_test(img_request))
1789                 ceph_put_snap_context(img_request->snapc);
1790
1791         if (img_request_child_test(img_request))
1792                 rbd_obj_request_put(img_request->obj_request);
1793
1794         kfree(img_request);
1795 }
1796
1797 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1798 {
1799         struct rbd_img_request *img_request;
1800         unsigned int xferred;
1801         int result;
1802         bool more;
1803
1804         rbd_assert(obj_request_img_data_test(obj_request));
1805         img_request = obj_request->img_request;
1806
1807         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1808         xferred = (unsigned int)obj_request->xferred;
1809         result = obj_request->result;
1810         if (result) {
1811                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1812
1813                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1814                         img_request_write_test(img_request) ? "write" : "read",
1815                         obj_request->length, obj_request->img_offset,
1816                         obj_request->offset);
1817                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1818                         result, xferred);
1819                 if (!img_request->result)
1820                         img_request->result = result;
1821         }
1822
1823         /* Image object requests don't own their page array */
1824
1825         if (obj_request->type == OBJ_REQUEST_PAGES) {
1826                 obj_request->pages = NULL;
1827                 obj_request->page_count = 0;
1828         }
1829
1830         if (img_request_child_test(img_request)) {
1831                 rbd_assert(img_request->obj_request != NULL);
1832                 more = obj_request->which < img_request->obj_request_count - 1;
1833         } else {
1834                 rbd_assert(img_request->rq != NULL);
1835                 more = blk_end_request(img_request->rq, result, xferred);
1836         }
1837
1838         return more;
1839 }
1840
1841 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1842 {
1843         struct rbd_img_request *img_request;
1844         u32 which = obj_request->which;
1845         bool more = true;
1846
1847         rbd_assert(obj_request_img_data_test(obj_request));
1848         img_request = obj_request->img_request;
1849
1850         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1851         rbd_assert(img_request != NULL);
1852         rbd_assert(img_request->obj_request_count > 0);
1853         rbd_assert(which != BAD_WHICH);
1854         rbd_assert(which < img_request->obj_request_count);
1855         rbd_assert(which >= img_request->next_completion);
1856
1857         spin_lock_irq(&img_request->completion_lock);
1858         if (which != img_request->next_completion)
1859                 goto out;
1860
1861         for_each_obj_request_from(img_request, obj_request) {
1862                 rbd_assert(more);
1863                 rbd_assert(which < img_request->obj_request_count);
1864
1865                 if (!obj_request_done_test(obj_request))
1866                         break;
1867                 more = rbd_img_obj_end_request(obj_request);
1868                 which++;
1869         }
1870
1871         rbd_assert(more ^ (which == img_request->obj_request_count));
1872         img_request->next_completion = which;
1873 out:
1874         spin_unlock_irq(&img_request->completion_lock);
1875
1876         if (!more)
1877                 rbd_img_request_complete(img_request);
1878 }
1879
1880 /*
1881  * Split up an image request into one or more object requests, each
1882  * to a different object.  The "type" parameter indicates whether
1883  * "data_desc" is the pointer to the head of a list of bio
1884  * structures, or the base of a page array.  In either case this
1885  * function assumes data_desc describes memory sufficient to hold
1886  * all data described by the image request.
1887  */
1888 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1889                                         enum obj_request_type type,
1890                                         void *data_desc)
1891 {
1892         struct rbd_device *rbd_dev = img_request->rbd_dev;
1893         struct rbd_obj_request *obj_request = NULL;
1894         struct rbd_obj_request *next_obj_request;
1895         bool write_request = img_request_write_test(img_request);
1896         struct bio *bio_list;
1897         unsigned int bio_offset = 0;
1898         struct page **pages;
1899         u64 img_offset;
1900         u64 resid;
1901         u16 opcode;
1902
1903         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1904                 (int)type, data_desc);
1905
1906         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1907         img_offset = img_request->offset;
1908         resid = img_request->length;
1909         rbd_assert(resid > 0);
1910
1911         if (type == OBJ_REQUEST_BIO) {
1912                 bio_list = data_desc;
1913                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1914         } else {
1915                 rbd_assert(type == OBJ_REQUEST_PAGES);
1916                 pages = data_desc;
1917         }
1918
1919         while (resid) {
1920                 struct ceph_osd_request *osd_req;
1921                 const char *object_name;
1922                 u64 offset;
1923                 u64 length;
1924
1925                 object_name = rbd_segment_name(rbd_dev, img_offset);
1926                 if (!object_name)
1927                         goto out_unwind;
1928                 offset = rbd_segment_offset(rbd_dev, img_offset);
1929                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1930                 obj_request = rbd_obj_request_create(object_name,
1931                                                 offset, length, type);
1932                 kfree(object_name);     /* object request has its own copy */
1933                 if (!obj_request)
1934                         goto out_unwind;
1935
1936                 if (type == OBJ_REQUEST_BIO) {
1937                         unsigned int clone_size;
1938
1939                         rbd_assert(length <= (u64)UINT_MAX);
1940                         clone_size = (unsigned int)length;
1941                         obj_request->bio_list =
1942                                         bio_chain_clone_range(&bio_list,
1943                                                                 &bio_offset,
1944                                                                 clone_size,
1945                                                                 GFP_ATOMIC);
1946                         if (!obj_request->bio_list)
1947                                 goto out_partial;
1948                 } else {
1949                         unsigned int page_count;
1950
1951                         obj_request->pages = pages;
1952                         page_count = (u32)calc_pages_for(offset, length);
1953                         obj_request->page_count = page_count;
1954                         if ((offset + length) & ~PAGE_MASK)
1955                                 page_count--;   /* more on last page */
1956                         pages += page_count;
1957                 }
1958
1959                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1960                                                 obj_request);
1961                 if (!osd_req)
1962                         goto out_partial;
1963                 obj_request->osd_req = osd_req;
1964                 obj_request->callback = rbd_img_obj_callback;
1965
1966                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1967                                                 0, 0);
1968                 if (type == OBJ_REQUEST_BIO)
1969                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1970                                         obj_request->bio_list, length);
1971                 else
1972                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1973                                         obj_request->pages, length,
1974                                         offset & ~PAGE_MASK, false, false);
1975
1976                 if (write_request)
1977                         rbd_osd_req_format_write(obj_request);
1978                 else
1979                         rbd_osd_req_format_read(obj_request);
1980
1981                 obj_request->img_offset = img_offset;
1982                 rbd_img_obj_request_add(img_request, obj_request);
1983
1984                 img_offset += length;
1985                 resid -= length;
1986         }
1987
1988         return 0;
1989
1990 out_partial:
1991         rbd_obj_request_put(obj_request);
1992 out_unwind:
1993         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1994                 rbd_obj_request_put(obj_request);
1995
1996         return -ENOMEM;
1997 }
1998
1999 static void
2000 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2001 {
2002         struct rbd_img_request *img_request;
2003         struct rbd_device *rbd_dev;
2004         u64 length;
2005         u32 page_count;
2006
2007         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2008         rbd_assert(obj_request_img_data_test(obj_request));
2009         img_request = obj_request->img_request;
2010         rbd_assert(img_request);
2011
2012         rbd_dev = img_request->rbd_dev;
2013         rbd_assert(rbd_dev);
2014         length = (u64)1 << rbd_dev->header.obj_order;
2015         page_count = (u32)calc_pages_for(0, length);
2016
2017         rbd_assert(obj_request->copyup_pages);
2018         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2019         obj_request->copyup_pages = NULL;
2020
2021         /*
2022          * We want the transfer count to reflect the size of the
2023          * original write request.  There is no such thing as a
2024          * successful short write, so if the request was successful
2025          * we can just set it to the originally-requested length.
2026          */
2027         if (!obj_request->result)
2028                 obj_request->xferred = obj_request->length;
2029
2030         /* Finish up with the normal image object callback */
2031
2032         rbd_img_obj_callback(obj_request);
2033 }
2034
2035 static void
2036 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2037 {
2038         struct rbd_obj_request *orig_request;
2039         struct ceph_osd_request *osd_req;
2040         struct ceph_osd_client *osdc;
2041         struct rbd_device *rbd_dev;
2042         struct page **pages;
2043         int result;
2044         u64 obj_size;
2045         u64 xferred;
2046
2047         rbd_assert(img_request_child_test(img_request));
2048
2049         /* First get what we need from the image request */
2050
2051         pages = img_request->copyup_pages;
2052         rbd_assert(pages != NULL);
2053         img_request->copyup_pages = NULL;
2054
2055         orig_request = img_request->obj_request;
2056         rbd_assert(orig_request != NULL);
2057         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2058         result = img_request->result;
2059         obj_size = img_request->length;
2060         xferred = img_request->xferred;
2061
2062         rbd_dev = img_request->rbd_dev;
2063         rbd_assert(rbd_dev);
2064         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2065
2066         rbd_img_request_put(img_request);
2067
2068         if (result)
2069                 goto out_err;
2070
2071         /* Allocate the new copyup osd request for the original request */
2072
2073         result = -ENOMEM;
2074         rbd_assert(!orig_request->osd_req);
2075         osd_req = rbd_osd_req_create_copyup(orig_request);
2076         if (!osd_req)
2077                 goto out_err;
2078         orig_request->osd_req = osd_req;
2079         orig_request->copyup_pages = pages;
2080
2081         /* Initialize the copyup op */
2082
2083         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2084         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2085                                                 false, false);
2086
2087         /* Then the original write request op */
2088
2089         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2090                                         orig_request->offset,
2091                                         orig_request->length, 0, 0);
2092         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2093                                         orig_request->length);
2094
2095         rbd_osd_req_format_write(orig_request);
2096
2097         /* All set, send it off. */
2098
2099         orig_request->callback = rbd_img_obj_copyup_callback;
2100         osdc = &rbd_dev->rbd_client->client->osdc;
2101         result = rbd_obj_request_submit(osdc, orig_request);
2102         if (!result)
2103                 return;
2104 out_err:
2105         /* Record the error code and complete the request */
2106
2107         orig_request->result = result;
2108         orig_request->xferred = 0;
2109         obj_request_done_set(orig_request);
2110         rbd_obj_request_complete(orig_request);
2111 }
2112
2113 /*
2114  * Read from the parent image the range of data that covers the
2115  * entire target of the given object request.  This is used for
2116  * satisfying a layered image write request when the target of an
2117  * object request from the image request does not exist.
2118  *
2119  * A page array big enough to hold the returned data is allocated
2120  * and supplied to rbd_img_request_fill() as the "data descriptor."
2121  * When the read completes, this page array will be transferred to
2122  * the original object request for the copyup operation.
2123  *
2124  * If an error occurs, record it as the result of the original
2125  * object request and mark it done so it gets completed.
2126  */
2127 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2128 {
2129         struct rbd_img_request *img_request = NULL;
2130         struct rbd_img_request *parent_request = NULL;
2131         struct rbd_device *rbd_dev;
2132         u64 img_offset;
2133         u64 length;
2134         struct page **pages = NULL;
2135         u32 page_count;
2136         int result;
2137
2138         rbd_assert(obj_request_img_data_test(obj_request));
2139         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2140
2141         img_request = obj_request->img_request;
2142         rbd_assert(img_request != NULL);
2143         rbd_dev = img_request->rbd_dev;
2144         rbd_assert(rbd_dev->parent != NULL);
2145
2146         /*
2147          * First things first.  The original osd request is of no
2148          * use to use any more, we'll need a new one that can hold
2149          * the two ops in a copyup request.  We'll get that later,
2150          * but for now we can release the old one.
2151          */
2152         rbd_osd_req_destroy(obj_request->osd_req);
2153         obj_request->osd_req = NULL;
2154
2155         /*
2156          * Determine the byte range covered by the object in the
2157          * child image to which the original request was to be sent.
2158          */
2159         img_offset = obj_request->img_offset - obj_request->offset;
2160         length = (u64)1 << rbd_dev->header.obj_order;
2161
2162         /*
2163          * There is no defined parent data beyond the parent
2164          * overlap, so limit what we read at that boundary if
2165          * necessary.
2166          */
2167         if (img_offset + length > rbd_dev->parent_overlap) {
2168                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2169                 length = rbd_dev->parent_overlap - img_offset;
2170         }
2171
2172         /*
2173          * Allocate a page array big enough to receive the data read
2174          * from the parent.
2175          */
2176         page_count = (u32)calc_pages_for(0, length);
2177         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2178         if (IS_ERR(pages)) {
2179                 result = PTR_ERR(pages);
2180                 pages = NULL;
2181                 goto out_err;
2182         }
2183
2184         result = -ENOMEM;
2185         parent_request = rbd_img_request_create(rbd_dev->parent,
2186                                                 img_offset, length,
2187                                                 false, true);
2188         if (!parent_request)
2189                 goto out_err;
2190         rbd_obj_request_get(obj_request);
2191         parent_request->obj_request = obj_request;
2192
2193         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2194         if (result)
2195                 goto out_err;
2196         parent_request->copyup_pages = pages;
2197
2198         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2199         result = rbd_img_request_submit(parent_request);
2200         if (!result)
2201                 return 0;
2202
2203         parent_request->copyup_pages = NULL;
2204         parent_request->obj_request = NULL;
2205         rbd_obj_request_put(obj_request);
2206 out_err:
2207         if (pages)
2208                 ceph_release_page_vector(pages, page_count);
2209         if (parent_request)
2210                 rbd_img_request_put(parent_request);
2211         obj_request->result = result;
2212         obj_request->xferred = 0;
2213         obj_request_done_set(obj_request);
2214
2215         return result;
2216 }
2217
2218 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2219 {
2220         struct rbd_obj_request *orig_request;
2221         int result;
2222
2223         rbd_assert(!obj_request_img_data_test(obj_request));
2224
2225         /*
2226          * All we need from the object request is the original
2227          * request and the result of the STAT op.  Grab those, then
2228          * we're done with the request.
2229          */
2230         orig_request = obj_request->obj_request;
2231         obj_request->obj_request = NULL;
2232         rbd_assert(orig_request);
2233         rbd_assert(orig_request->img_request);
2234
2235         result = obj_request->result;
2236         obj_request->result = 0;
2237
2238         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2239                 obj_request, orig_request, result,
2240                 obj_request->xferred, obj_request->length);
2241         rbd_obj_request_put(obj_request);
2242
2243         rbd_assert(orig_request);
2244         rbd_assert(orig_request->img_request);
2245
2246         /*
2247          * Our only purpose here is to determine whether the object
2248          * exists, and we don't want to treat the non-existence as
2249          * an error.  If something else comes back, transfer the
2250          * error to the original request and complete it now.
2251          */
2252         if (!result) {
2253                 obj_request_existence_set(orig_request, true);
2254         } else if (result == -ENOENT) {
2255                 obj_request_existence_set(orig_request, false);
2256         } else if (result) {
2257                 orig_request->result = result;
2258                 goto out;
2259         }
2260
2261         /*
2262          * Resubmit the original request now that we have recorded
2263          * whether the target object exists.
2264          */
2265         orig_request->result = rbd_img_obj_request_submit(orig_request);
2266 out:
2267         if (orig_request->result)
2268                 rbd_obj_request_complete(orig_request);
2269         rbd_obj_request_put(orig_request);
2270 }
2271
2272 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2273 {
2274         struct rbd_obj_request *stat_request;
2275         struct rbd_device *rbd_dev;
2276         struct ceph_osd_client *osdc;
2277         struct page **pages = NULL;
2278         u32 page_count;
2279         size_t size;
2280         int ret;
2281
2282         /*
2283          * The response data for a STAT call consists of:
2284          *     le64 length;
2285          *     struct {
2286          *         le32 tv_sec;
2287          *         le32 tv_nsec;
2288          *     } mtime;
2289          */
2290         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2291         page_count = (u32)calc_pages_for(0, size);
2292         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2293         if (IS_ERR(pages))
2294                 return PTR_ERR(pages);
2295
2296         ret = -ENOMEM;
2297         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2298                                                         OBJ_REQUEST_PAGES);
2299         if (!stat_request)
2300                 goto out;
2301
2302         rbd_obj_request_get(obj_request);
2303         stat_request->obj_request = obj_request;
2304         stat_request->pages = pages;
2305         stat_request->page_count = page_count;
2306
2307         rbd_assert(obj_request->img_request);
2308         rbd_dev = obj_request->img_request->rbd_dev;
2309         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2310                                                 stat_request);
2311         if (!stat_request->osd_req)
2312                 goto out;
2313         stat_request->callback = rbd_img_obj_exists_callback;
2314
2315         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2316         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2317                                         false, false);
2318         rbd_osd_req_format_read(stat_request);
2319
2320         osdc = &rbd_dev->rbd_client->client->osdc;
2321         ret = rbd_obj_request_submit(osdc, stat_request);
2322 out:
2323         if (ret)
2324                 rbd_obj_request_put(obj_request);
2325
2326         return ret;
2327 }
2328
2329 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2330 {
2331         struct rbd_img_request *img_request;
2332         struct rbd_device *rbd_dev;
2333         bool known;
2334
2335         rbd_assert(obj_request_img_data_test(obj_request));
2336
2337         img_request = obj_request->img_request;
2338         rbd_assert(img_request);
2339         rbd_dev = img_request->rbd_dev;
2340
2341         /*
2342          * Only writes to layered images need special handling.
2343          * Reads and non-layered writes are simple object requests.
2344          * Layered writes that start beyond the end of the overlap
2345          * with the parent have no parent data, so they too are
2346          * simple object requests.  Finally, if the target object is
2347          * known to already exist, its parent data has already been
2348          * copied, so a write to the object can also be handled as a
2349          * simple object request.
2350          */
2351         if (!img_request_write_test(img_request) ||
2352                 !img_request_layered_test(img_request) ||
2353                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2354                 ((known = obj_request_known_test(obj_request)) &&
2355                         obj_request_exists_test(obj_request))) {
2356
2357                 struct rbd_device *rbd_dev;
2358                 struct ceph_osd_client *osdc;
2359
2360                 rbd_dev = obj_request->img_request->rbd_dev;
2361                 osdc = &rbd_dev->rbd_client->client->osdc;
2362
2363                 return rbd_obj_request_submit(osdc, obj_request);
2364         }
2365
2366         /*
2367          * It's a layered write.  The target object might exist but
2368          * we may not know that yet.  If we know it doesn't exist,
2369          * start by reading the data for the full target object from
2370          * the parent so we can use it for a copyup to the target.
2371          */
2372         if (known)
2373                 return rbd_img_obj_parent_read_full(obj_request);
2374
2375         /* We don't know whether the target exists.  Go find out. */
2376
2377         return rbd_img_obj_exists_submit(obj_request);
2378 }
2379
2380 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2381 {
2382         struct rbd_obj_request *obj_request;
2383         struct rbd_obj_request *next_obj_request;
2384
2385         dout("%s: img %p\n", __func__, img_request);
2386         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2387                 int ret;
2388
2389                 ret = rbd_img_obj_request_submit(obj_request);
2390                 if (ret)
2391                         return ret;
2392         }
2393
2394         return 0;
2395 }
2396
2397 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2398 {
2399         struct rbd_obj_request *obj_request;
2400         struct rbd_device *rbd_dev;
2401         u64 obj_end;
2402
2403         rbd_assert(img_request_child_test(img_request));
2404
2405         obj_request = img_request->obj_request;
2406         rbd_assert(obj_request);
2407         rbd_assert(obj_request->img_request);
2408
2409         obj_request->result = img_request->result;
2410         if (obj_request->result)
2411                 goto out;
2412
2413         /*
2414          * We need to zero anything beyond the parent overlap
2415          * boundary.  Since rbd_img_obj_request_read_callback()
2416          * will zero anything beyond the end of a short read, an
2417          * easy way to do this is to pretend the data from the
2418          * parent came up short--ending at the overlap boundary.
2419          */
2420         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2421         obj_end = obj_request->img_offset + obj_request->length;
2422         rbd_dev = obj_request->img_request->rbd_dev;
2423         if (obj_end > rbd_dev->parent_overlap) {
2424                 u64 xferred = 0;
2425
2426                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2427                         xferred = rbd_dev->parent_overlap -
2428                                         obj_request->img_offset;
2429
2430                 obj_request->xferred = min(img_request->xferred, xferred);
2431         } else {
2432                 obj_request->xferred = img_request->xferred;
2433         }
2434 out:
2435         rbd_img_obj_request_read_callback(obj_request);
2436         rbd_obj_request_complete(obj_request);
2437 }
2438
2439 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2440 {
2441         struct rbd_device *rbd_dev;
2442         struct rbd_img_request *img_request;
2443         int result;
2444
2445         rbd_assert(obj_request_img_data_test(obj_request));
2446         rbd_assert(obj_request->img_request != NULL);
2447         rbd_assert(obj_request->result == (s32) -ENOENT);
2448         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2449
2450         rbd_dev = obj_request->img_request->rbd_dev;
2451         rbd_assert(rbd_dev->parent != NULL);
2452         /* rbd_read_finish(obj_request, obj_request->length); */
2453         img_request = rbd_img_request_create(rbd_dev->parent,
2454                                                 obj_request->img_offset,
2455                                                 obj_request->length,
2456                                                 false, true);
2457         result = -ENOMEM;
2458         if (!img_request)
2459                 goto out_err;
2460
2461         rbd_obj_request_get(obj_request);
2462         img_request->obj_request = obj_request;
2463
2464         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2465                                         obj_request->bio_list);
2466         if (result)
2467                 goto out_err;
2468
2469         img_request->callback = rbd_img_parent_read_callback;
2470         result = rbd_img_request_submit(img_request);
2471         if (result)
2472                 goto out_err;
2473
2474         return;
2475 out_err:
2476         if (img_request)
2477                 rbd_img_request_put(img_request);
2478         obj_request->result = result;
2479         obj_request->xferred = 0;
2480         obj_request_done_set(obj_request);
2481 }
2482
2483 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2484                                    u64 ver, u64 notify_id)
2485 {
2486         struct rbd_obj_request *obj_request;
2487         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2488         int ret;
2489
2490         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2491                                                         OBJ_REQUEST_NODATA);
2492         if (!obj_request)
2493                 return -ENOMEM;
2494
2495         ret = -ENOMEM;
2496         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2497         if (!obj_request->osd_req)
2498                 goto out;
2499         obj_request->callback = rbd_obj_request_put;
2500
2501         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2502                                         notify_id, ver, 0);
2503         rbd_osd_req_format_read(obj_request);
2504
2505         ret = rbd_obj_request_submit(osdc, obj_request);
2506 out:
2507         if (ret)
2508                 rbd_obj_request_put(obj_request);
2509
2510         return ret;
2511 }
2512
2513 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2514 {
2515         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2516         u64 hver;
2517
2518         if (!rbd_dev)
2519                 return;
2520
2521         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2522                 rbd_dev->header_name, (unsigned long long) notify_id,
2523                 (unsigned int) opcode);
2524         (void)rbd_dev_refresh(rbd_dev, &hver);
2525
2526         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2527 }
2528
2529 /*
2530  * Request sync osd watch/unwatch.  The value of "start" determines
2531  * whether a watch request is being initiated or torn down.
2532  */
2533 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2534 {
2535         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2536         struct rbd_obj_request *obj_request;
2537         int ret;
2538
2539         rbd_assert(start ^ !!rbd_dev->watch_event);
2540         rbd_assert(start ^ !!rbd_dev->watch_request);
2541
2542         if (start) {
2543                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2544                                                 &rbd_dev->watch_event);
2545                 if (ret < 0)
2546                         return ret;
2547                 rbd_assert(rbd_dev->watch_event != NULL);
2548         }
2549
2550         ret = -ENOMEM;
2551         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2552                                                         OBJ_REQUEST_NODATA);
2553         if (!obj_request)
2554                 goto out_cancel;
2555
2556         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2557         if (!obj_request->osd_req)
2558                 goto out_cancel;
2559
2560         if (start)
2561                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2562         else
2563                 ceph_osdc_unregister_linger_request(osdc,
2564                                         rbd_dev->watch_request->osd_req);
2565
2566         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2567                                 rbd_dev->watch_event->cookie,
2568                                 rbd_dev->header.obj_version, start);
2569         rbd_osd_req_format_write(obj_request);
2570
2571         ret = rbd_obj_request_submit(osdc, obj_request);
2572         if (ret)
2573                 goto out_cancel;
2574         ret = rbd_obj_request_wait(obj_request);
2575         if (ret)
2576                 goto out_cancel;
2577         ret = obj_request->result;
2578         if (ret)
2579                 goto out_cancel;
2580
2581         /*
2582          * A watch request is set to linger, so the underlying osd
2583          * request won't go away until we unregister it.  We retain
2584          * a pointer to the object request during that time (in
2585          * rbd_dev->watch_request), so we'll keep a reference to
2586          * it.  We'll drop that reference (below) after we've
2587          * unregistered it.
2588          */
2589         if (start) {
2590                 rbd_dev->watch_request = obj_request;
2591
2592                 return 0;
2593         }
2594
2595         /* We have successfully torn down the watch request */
2596
2597         rbd_obj_request_put(rbd_dev->watch_request);
2598         rbd_dev->watch_request = NULL;
2599 out_cancel:
2600         /* Cancel the event if we're tearing down, or on error */
2601         ceph_osdc_cancel_event(rbd_dev->watch_event);
2602         rbd_dev->watch_event = NULL;
2603         if (obj_request)
2604                 rbd_obj_request_put(obj_request);
2605
2606         return ret;
2607 }
2608
2609 /*
2610  * Synchronous osd object method call.  Returns the number of bytes
2611  * returned in the outbound buffer, or a negative error code.
2612  */
2613 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2614                              const char *object_name,
2615                              const char *class_name,
2616                              const char *method_name,
2617                              const void *outbound,
2618                              size_t outbound_size,
2619                              void *inbound,
2620                              size_t inbound_size,
2621                              u64 *version)
2622 {
2623         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2624         struct rbd_obj_request *obj_request;
2625         struct page **pages;
2626         u32 page_count;
2627         int ret;
2628
2629         /*
2630          * Method calls are ultimately read operations.  The result
2631          * should placed into the inbound buffer provided.  They
2632          * also supply outbound data--parameters for the object
2633          * method.  Currently if this is present it will be a
2634          * snapshot id.
2635          */
2636         page_count = (u32)calc_pages_for(0, inbound_size);
2637         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2638         if (IS_ERR(pages))
2639                 return PTR_ERR(pages);
2640
2641         ret = -ENOMEM;
2642         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2643                                                         OBJ_REQUEST_PAGES);
2644         if (!obj_request)
2645                 goto out;
2646
2647         obj_request->pages = pages;
2648         obj_request->page_count = page_count;
2649
2650         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2651         if (!obj_request->osd_req)
2652                 goto out;
2653
2654         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2655                                         class_name, method_name);
2656         if (outbound_size) {
2657                 struct ceph_pagelist *pagelist;
2658
2659                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2660                 if (!pagelist)
2661                         goto out;
2662
2663                 ceph_pagelist_init(pagelist);
2664                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2665                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2666                                                 pagelist);
2667         }
2668         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2669                                         obj_request->pages, inbound_size,
2670                                         0, false, false);
2671         rbd_osd_req_format_read(obj_request);
2672
2673         ret = rbd_obj_request_submit(osdc, obj_request);
2674         if (ret)
2675                 goto out;
2676         ret = rbd_obj_request_wait(obj_request);
2677         if (ret)
2678                 goto out;
2679
2680         ret = obj_request->result;
2681         if (ret < 0)
2682                 goto out;
2683
2684         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2685         ret = (int)obj_request->xferred;
2686         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2687         if (version)
2688                 *version = obj_request->version;
2689 out:
2690         if (obj_request)
2691                 rbd_obj_request_put(obj_request);
2692         else
2693                 ceph_release_page_vector(pages, page_count);
2694
2695         return ret;
2696 }
2697
2698 static void rbd_request_fn(struct request_queue *q)
2699                 __releases(q->queue_lock) __acquires(q->queue_lock)
2700 {
2701         struct rbd_device *rbd_dev = q->queuedata;
2702         bool read_only = rbd_dev->mapping.read_only;
2703         struct request *rq;
2704         int result;
2705
2706         while ((rq = blk_fetch_request(q))) {
2707                 bool write_request = rq_data_dir(rq) == WRITE;
2708                 struct rbd_img_request *img_request;
2709                 u64 offset;
2710                 u64 length;
2711
2712                 /* Ignore any non-FS requests that filter through. */
2713
2714                 if (rq->cmd_type != REQ_TYPE_FS) {
2715                         dout("%s: non-fs request type %d\n", __func__,
2716                                 (int) rq->cmd_type);
2717                         __blk_end_request_all(rq, 0);
2718                         continue;
2719                 }
2720
2721                 /* Ignore/skip any zero-length requests */
2722
2723                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2724                 length = (u64) blk_rq_bytes(rq);
2725
2726                 if (!length) {
2727                         dout("%s: zero-length request\n", __func__);
2728                         __blk_end_request_all(rq, 0);
2729                         continue;
2730                 }
2731
2732                 spin_unlock_irq(q->queue_lock);
2733
2734                 /* Disallow writes to a read-only device */
2735
2736                 if (write_request) {
2737                         result = -EROFS;
2738                         if (read_only)
2739                                 goto end_request;
2740                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2741                 }
2742
2743                 /*
2744                  * Quit early if the mapped snapshot no longer
2745                  * exists.  It's still possible the snapshot will
2746                  * have disappeared by the time our request arrives
2747                  * at the osd, but there's no sense in sending it if
2748                  * we already know.
2749                  */
2750                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2751                         dout("request for non-existent snapshot");
2752                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2753                         result = -ENXIO;
2754                         goto end_request;
2755                 }
2756
2757                 result = -EINVAL;
2758                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2759                         goto end_request;       /* Shouldn't happen */
2760
2761                 result = -ENOMEM;
2762                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2763                                                         write_request, false);
2764                 if (!img_request)
2765                         goto end_request;
2766
2767                 img_request->rq = rq;
2768
2769                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2770                                                 rq->bio);
2771                 if (!result)
2772                         result = rbd_img_request_submit(img_request);
2773                 if (result)
2774                         rbd_img_request_put(img_request);
2775 end_request:
2776                 spin_lock_irq(q->queue_lock);
2777                 if (result < 0) {
2778                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2779                                 write_request ? "write" : "read",
2780                                 length, offset, result);
2781
2782                         __blk_end_request_all(rq, result);
2783                 }
2784         }
2785 }
2786
2787 /*
2788  * a queue callback. Makes sure that we don't create a bio that spans across
2789  * multiple osd objects. One exception would be with a single page bios,
2790  * which we handle later at bio_chain_clone_range()
2791  */
2792 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2793                           struct bio_vec *bvec)
2794 {
2795         struct rbd_device *rbd_dev = q->queuedata;
2796         sector_t sector_offset;
2797         sector_t sectors_per_obj;
2798         sector_t obj_sector_offset;
2799         int ret;
2800
2801         /*
2802          * Find how far into its rbd object the partition-relative
2803          * bio start sector is to offset relative to the enclosing
2804          * device.
2805          */
2806         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2807         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2808         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2809
2810         /*
2811          * Compute the number of bytes from that offset to the end
2812          * of the object.  Account for what's already used by the bio.
2813          */
2814         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2815         if (ret > bmd->bi_size)
2816                 ret -= bmd->bi_size;
2817         else
2818                 ret = 0;
2819
2820         /*
2821          * Don't send back more than was asked for.  And if the bio
2822          * was empty, let the whole thing through because:  "Note
2823          * that a block device *must* allow a single page to be
2824          * added to an empty bio."
2825          */
2826         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2827         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2828                 ret = (int) bvec->bv_len;
2829
2830         return ret;
2831 }
2832
2833 static void rbd_free_disk(struct rbd_device *rbd_dev)
2834 {
2835         struct gendisk *disk = rbd_dev->disk;
2836
2837         if (!disk)
2838                 return;
2839
2840         rbd_dev->disk = NULL;
2841         if (disk->flags & GENHD_FL_UP) {
2842                 del_gendisk(disk);
2843                 if (disk->queue)
2844                         blk_cleanup_queue(disk->queue);
2845         }
2846         put_disk(disk);
2847 }
2848
2849 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2850                                 const char *object_name,
2851                                 u64 offset, u64 length,
2852                                 void *buf, u64 *version)
2853
2854 {
2855         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2856         struct rbd_obj_request *obj_request;
2857         struct page **pages = NULL;
2858         u32 page_count;
2859         size_t size;
2860         int ret;
2861
2862         page_count = (u32) calc_pages_for(offset, length);
2863         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2864         if (IS_ERR(pages))
2865                 ret = PTR_ERR(pages);
2866
2867         ret = -ENOMEM;
2868         obj_request = rbd_obj_request_create(object_name, offset, length,
2869                                                         OBJ_REQUEST_PAGES);
2870         if (!obj_request)
2871                 goto out;
2872
2873         obj_request->pages = pages;
2874         obj_request->page_count = page_count;
2875
2876         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2877         if (!obj_request->osd_req)
2878                 goto out;
2879
2880         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2881                                         offset, length, 0, 0);
2882         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2883                                         obj_request->pages,
2884                                         obj_request->length,
2885                                         obj_request->offset & ~PAGE_MASK,
2886                                         false, false);
2887         rbd_osd_req_format_read(obj_request);
2888
2889         ret = rbd_obj_request_submit(osdc, obj_request);
2890         if (ret)
2891                 goto out;
2892         ret = rbd_obj_request_wait(obj_request);
2893         if (ret)
2894                 goto out;
2895
2896         ret = obj_request->result;
2897         if (ret < 0)
2898                 goto out;
2899
2900         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2901         size = (size_t) obj_request->xferred;
2902         ceph_copy_from_page_vector(pages, buf, 0, size);
2903         rbd_assert(size <= (size_t) INT_MAX);
2904         ret = (int) size;
2905         if (version)
2906                 *version = obj_request->version;
2907 out:
2908         if (obj_request)
2909                 rbd_obj_request_put(obj_request);
2910         else
2911                 ceph_release_page_vector(pages, page_count);
2912
2913         return ret;
2914 }
2915
2916 /*
2917  * Read the complete header for the given rbd device.
2918  *
2919  * Returns a pointer to a dynamically-allocated buffer containing
2920  * the complete and validated header.  Caller can pass the address
2921  * of a variable that will be filled in with the version of the
2922  * header object at the time it was read.
2923  *
2924  * Returns a pointer-coded errno if a failure occurs.
2925  */
2926 static struct rbd_image_header_ondisk *
2927 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2928 {
2929         struct rbd_image_header_ondisk *ondisk = NULL;
2930         u32 snap_count = 0;
2931         u64 names_size = 0;
2932         u32 want_count;
2933         int ret;
2934
2935         /*
2936          * The complete header will include an array of its 64-bit
2937          * snapshot ids, followed by the names of those snapshots as
2938          * a contiguous block of NUL-terminated strings.  Note that
2939          * the number of snapshots could change by the time we read
2940          * it in, in which case we re-read it.
2941          */
2942         do {
2943                 size_t size;
2944
2945                 kfree(ondisk);
2946
2947                 size = sizeof (*ondisk);
2948                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2949                 size += names_size;
2950                 ondisk = kmalloc(size, GFP_KERNEL);
2951                 if (!ondisk)
2952                         return ERR_PTR(-ENOMEM);
2953
2954                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2955                                        0, size, ondisk, version);
2956                 if (ret < 0)
2957                         goto out_err;
2958                 if (WARN_ON((size_t) ret < size)) {
2959                         ret = -ENXIO;
2960                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2961                                 size, ret);
2962                         goto out_err;
2963                 }
2964                 if (!rbd_dev_ondisk_valid(ondisk)) {
2965                         ret = -ENXIO;
2966                         rbd_warn(rbd_dev, "invalid header");
2967                         goto out_err;
2968                 }
2969
2970                 names_size = le64_to_cpu(ondisk->snap_names_len);
2971                 want_count = snap_count;
2972                 snap_count = le32_to_cpu(ondisk->snap_count);
2973         } while (snap_count != want_count);
2974
2975         return ondisk;
2976
2977 out_err:
2978         kfree(ondisk);
2979
2980         return ERR_PTR(ret);
2981 }
2982
2983 /*
2984  * reload the ondisk the header
2985  */
2986 static int rbd_read_header(struct rbd_device *rbd_dev,
2987                            struct rbd_image_header *header)
2988 {
2989         struct rbd_image_header_ondisk *ondisk;
2990         u64 ver = 0;
2991         int ret;
2992
2993         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2994         if (IS_ERR(ondisk))
2995                 return PTR_ERR(ondisk);
2996         ret = rbd_header_from_disk(header, ondisk);
2997         if (ret >= 0)
2998                 header->obj_version = ver;
2999         kfree(ondisk);
3000
3001         return ret;
3002 }
3003
3004 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3005 {
3006         struct rbd_snap *snap;
3007         struct rbd_snap *next;
3008
3009         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3010                 list_del(&snap->node);
3011                 rbd_snap_destroy(snap);
3012         }
3013 }
3014
3015 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3016 {
3017         sector_t size;
3018
3019         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3020                 return;
3021
3022         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3023         dout("setting size to %llu sectors", (unsigned long long) size);
3024         rbd_dev->mapping.size = (u64) size;
3025         set_capacity(rbd_dev->disk, size);
3026 }
3027
3028 /*
3029  * only read the first part of the ondisk header, without the snaps info
3030  */
3031 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3032 {
3033         int ret;
3034         struct rbd_image_header h;
3035
3036         ret = rbd_read_header(rbd_dev, &h);
3037         if (ret < 0)
3038                 return ret;
3039
3040         down_write(&rbd_dev->header_rwsem);
3041
3042         /* Update image size, and check for resize of mapped image */
3043         rbd_dev->header.image_size = h.image_size;
3044         rbd_update_mapping_size(rbd_dev);
3045
3046         /* rbd_dev->header.object_prefix shouldn't change */
3047         kfree(rbd_dev->header.snap_sizes);
3048         kfree(rbd_dev->header.snap_names);
3049         /* osd requests may still refer to snapc */
3050         ceph_put_snap_context(rbd_dev->header.snapc);
3051
3052         if (hver)
3053                 *hver = h.obj_version;
3054         rbd_dev->header.obj_version = h.obj_version;
3055         rbd_dev->header.image_size = h.image_size;
3056         rbd_dev->header.snapc = h.snapc;
3057         rbd_dev->header.snap_names = h.snap_names;
3058         rbd_dev->header.snap_sizes = h.snap_sizes;
3059         /* Free the extra copy of the object prefix */
3060         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3061         kfree(h.object_prefix);
3062
3063         ret = rbd_dev_snaps_update(rbd_dev);
3064
3065         up_write(&rbd_dev->header_rwsem);
3066
3067         return ret;
3068 }
3069
3070 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3071 {
3072         int ret;
3073
3074         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3075         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3076         if (rbd_dev->image_format == 1)
3077                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3078         else
3079                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3080         mutex_unlock(&ctl_mutex);
3081         revalidate_disk(rbd_dev->disk);
3082         if (ret)
3083                 rbd_warn(rbd_dev, "got notification but failed to "
3084                            " update snaps: %d\n", ret);
3085
3086         return ret;
3087 }
3088
3089 static int rbd_init_disk(struct rbd_device *rbd_dev)
3090 {
3091         struct gendisk *disk;
3092         struct request_queue *q;
3093         u64 segment_size;
3094
3095         /* create gendisk info */
3096         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3097         if (!disk)
3098                 return -ENOMEM;
3099
3100         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3101                  rbd_dev->dev_id);
3102         disk->major = rbd_dev->major;
3103         disk->first_minor = 0;
3104         disk->fops = &rbd_bd_ops;
3105         disk->private_data = rbd_dev;
3106
3107         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3108         if (!q)
3109                 goto out_disk;
3110
3111         /* We use the default size, but let's be explicit about it. */
3112         blk_queue_physical_block_size(q, SECTOR_SIZE);
3113
3114         /* set io sizes to object size */
3115         segment_size = rbd_obj_bytes(&rbd_dev->header);
3116         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3117         blk_queue_max_segment_size(q, segment_size);
3118         blk_queue_io_min(q, segment_size);
3119         blk_queue_io_opt(q, segment_size);
3120
3121         blk_queue_merge_bvec(q, rbd_merge_bvec);
3122         disk->queue = q;
3123
3124         q->queuedata = rbd_dev;
3125
3126         rbd_dev->disk = disk;
3127
3128         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3129
3130         return 0;
3131 out_disk:
3132         put_disk(disk);
3133
3134         return -ENOMEM;
3135 }
3136
3137 /*
3138   sysfs
3139 */
3140
3141 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3142 {
3143         return container_of(dev, struct rbd_device, dev);
3144 }
3145
3146 static ssize_t rbd_size_show(struct device *dev,
3147                              struct device_attribute *attr, char *buf)
3148 {
3149         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3150         sector_t size;
3151
3152         down_read(&rbd_dev->header_rwsem);
3153         size = get_capacity(rbd_dev->disk);
3154         up_read(&rbd_dev->header_rwsem);
3155
3156         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3157 }
3158
3159 /*
3160  * Note this shows the features for whatever's mapped, which is not
3161  * necessarily the base image.
3162  */
3163 static ssize_t rbd_features_show(struct device *dev,
3164                              struct device_attribute *attr, char *buf)
3165 {
3166         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3167
3168         return sprintf(buf, "0x%016llx\n",
3169                         (unsigned long long) rbd_dev->mapping.features);
3170 }
3171
3172 static ssize_t rbd_major_show(struct device *dev,
3173                               struct device_attribute *attr, char *buf)
3174 {
3175         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3176
3177         return sprintf(buf, "%d\n", rbd_dev->major);
3178 }
3179
3180 static ssize_t rbd_client_id_show(struct device *dev,
3181                                   struct device_attribute *attr, char *buf)
3182 {
3183         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3184
3185         return sprintf(buf, "client%lld\n",
3186                         ceph_client_id(rbd_dev->rbd_client->client));
3187 }
3188
3189 static ssize_t rbd_pool_show(struct device *dev,
3190                              struct device_attribute *attr, char *buf)
3191 {
3192         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3193
3194         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3195 }
3196
3197 static ssize_t rbd_pool_id_show(struct device *dev,
3198                              struct device_attribute *attr, char *buf)
3199 {
3200         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3201
3202         return sprintf(buf, "%llu\n",
3203                 (unsigned long long) rbd_dev->spec->pool_id);
3204 }
3205
3206 static ssize_t rbd_name_show(struct device *dev,
3207                              struct device_attribute *attr, char *buf)
3208 {
3209         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3210
3211         if (rbd_dev->spec->image_name)
3212                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3213
3214         return sprintf(buf, "(unknown)\n");
3215 }
3216
3217 static ssize_t rbd_image_id_show(struct device *dev,
3218                              struct device_attribute *attr, char *buf)
3219 {
3220         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3221
3222         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3223 }
3224
3225 /*
3226  * Shows the name of the currently-mapped snapshot (or
3227  * RBD_SNAP_HEAD_NAME for the base image).
3228  */
3229 static ssize_t rbd_snap_show(struct device *dev,
3230                              struct device_attribute *attr,
3231                              char *buf)
3232 {
3233         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3234
3235         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3236 }
3237
3238 /*
3239  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3240  * for the parent image.  If there is no parent, simply shows
3241  * "(no parent image)".
3242  */
3243 static ssize_t rbd_parent_show(struct device *dev,
3244                              struct device_attribute *attr,
3245                              char *buf)
3246 {
3247         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3248         struct rbd_spec *spec = rbd_dev->parent_spec;
3249         int count;
3250         char *bufp = buf;
3251
3252         if (!spec)
3253                 return sprintf(buf, "(no parent image)\n");
3254
3255         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3256                         (unsigned long long) spec->pool_id, spec->pool_name);
3257         if (count < 0)
3258                 return count;
3259         bufp += count;
3260
3261         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3262                         spec->image_name ? spec->image_name : "(unknown)");
3263         if (count < 0)
3264                 return count;
3265         bufp += count;
3266
3267         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3268                         (unsigned long long) spec->snap_id, spec->snap_name);
3269         if (count < 0)
3270                 return count;
3271         bufp += count;
3272
3273         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3274         if (count < 0)
3275                 return count;
3276         bufp += count;
3277
3278         return (ssize_t) (bufp - buf);
3279 }
3280
3281 static ssize_t rbd_image_refresh(struct device *dev,
3282                                  struct device_attribute *attr,
3283                                  const char *buf,
3284                                  size_t size)
3285 {
3286         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287         int ret;
3288
3289         ret = rbd_dev_refresh(rbd_dev, NULL);
3290
3291         return ret < 0 ? ret : size;
3292 }
3293
3294 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3295 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3296 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3297 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3298 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3299 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3300 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3301 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3302 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3303 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3304 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3305
3306 static struct attribute *rbd_attrs[] = {
3307         &dev_attr_size.attr,
3308         &dev_attr_features.attr,
3309         &dev_attr_major.attr,
3310         &dev_attr_client_id.attr,
3311         &dev_attr_pool.attr,
3312         &dev_attr_pool_id.attr,
3313         &dev_attr_name.attr,
3314         &dev_attr_image_id.attr,
3315         &dev_attr_current_snap.attr,
3316         &dev_attr_parent.attr,
3317         &dev_attr_refresh.attr,
3318         NULL
3319 };
3320
3321 static struct attribute_group rbd_attr_group = {
3322         .attrs = rbd_attrs,
3323 };
3324
3325 static const struct attribute_group *rbd_attr_groups[] = {
3326         &rbd_attr_group,
3327         NULL
3328 };
3329
3330 static void rbd_sysfs_dev_release(struct device *dev)
3331 {
3332 }
3333
3334 static struct device_type rbd_device_type = {
3335         .name           = "rbd",
3336         .groups         = rbd_attr_groups,
3337         .release        = rbd_sysfs_dev_release,
3338 };
3339
3340 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3341 {
3342         kref_get(&spec->kref);
3343
3344         return spec;
3345 }
3346
3347 static void rbd_spec_free(struct kref *kref);
3348 static void rbd_spec_put(struct rbd_spec *spec)
3349 {
3350         if (spec)
3351                 kref_put(&spec->kref, rbd_spec_free);
3352 }
3353
3354 static struct rbd_spec *rbd_spec_alloc(void)
3355 {
3356         struct rbd_spec *spec;
3357
3358         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3359         if (!spec)
3360                 return NULL;
3361         kref_init(&spec->kref);
3362
3363         return spec;
3364 }
3365
3366 static void rbd_spec_free(struct kref *kref)
3367 {
3368         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3369
3370         kfree(spec->pool_name);
3371         kfree(spec->image_id);
3372         kfree(spec->image_name);
3373         kfree(spec->snap_name);
3374         kfree(spec);
3375 }
3376
3377 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3378                                 struct rbd_spec *spec)
3379 {
3380         struct rbd_device *rbd_dev;
3381
3382         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3383         if (!rbd_dev)
3384                 return NULL;
3385
3386         spin_lock_init(&rbd_dev->lock);
3387         rbd_dev->flags = 0;
3388         INIT_LIST_HEAD(&rbd_dev->node);
3389         INIT_LIST_HEAD(&rbd_dev->snaps);
3390         init_rwsem(&rbd_dev->header_rwsem);
3391
3392         rbd_dev->spec = spec;
3393         rbd_dev->rbd_client = rbdc;
3394
3395         /* Initialize the layout used for all rbd requests */
3396
3397         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3398         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3399         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3400         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3401
3402         return rbd_dev;
3403 }
3404
3405 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3406 {
3407         rbd_spec_put(rbd_dev->parent_spec);
3408         kfree(rbd_dev->header_name);
3409         rbd_put_client(rbd_dev->rbd_client);
3410         rbd_spec_put(rbd_dev->spec);
3411         kfree(rbd_dev);
3412 }
3413
3414 static void rbd_snap_destroy(struct rbd_snap *snap)
3415 {
3416         kfree(snap->name);
3417         kfree(snap);
3418 }
3419
3420 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3421                                                 const char *snap_name,
3422                                                 u64 snap_id, u64 snap_size,
3423                                                 u64 snap_features)
3424 {
3425         struct rbd_snap *snap;
3426
3427         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3428         if (!snap)
3429                 return ERR_PTR(-ENOMEM);
3430
3431         snap->name = snap_name;
3432         snap->id = snap_id;
3433         snap->size = snap_size;
3434         snap->features = snap_features;
3435
3436         return snap;
3437 }
3438
3439 /*
3440  * Returns a dynamically-allocated snapshot name if successful, or a
3441  * pointer-coded error otherwise.
3442  */
3443 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3444                 u64 *snap_size, u64 *snap_features)
3445 {
3446         char *snap_name;
3447         int i;
3448
3449         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3450
3451         /* Skip over names until we find the one we are looking for */
3452
3453         snap_name = rbd_dev->header.snap_names;
3454         for (i = 0; i < which; i++)
3455                 snap_name += strlen(snap_name) + 1;
3456
3457         snap_name = kstrdup(snap_name, GFP_KERNEL);
3458         if (!snap_name)
3459                 return ERR_PTR(-ENOMEM);
3460
3461         *snap_size = rbd_dev->header.snap_sizes[which];
3462         *snap_features = 0;     /* No features for v1 */
3463
3464         return snap_name;
3465 }
3466
3467 /*
3468  * Get the size and object order for an image snapshot, or if
3469  * snap_id is CEPH_NOSNAP, gets this information for the base
3470  * image.
3471  */
3472 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3473                                 u8 *order, u64 *snap_size)
3474 {
3475         __le64 snapid = cpu_to_le64(snap_id);
3476         int ret;
3477         struct {
3478                 u8 order;
3479                 __le64 size;
3480         } __attribute__ ((packed)) size_buf = { 0 };
3481
3482         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3483                                 "rbd", "get_size",
3484                                 &snapid, sizeof (snapid),
3485                                 &size_buf, sizeof (size_buf), NULL);
3486         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3487         if (ret < 0)
3488                 return ret;
3489         if (ret < sizeof (size_buf))
3490                 return -ERANGE;
3491
3492         if (order)
3493                 *order = size_buf.order;
3494         *snap_size = le64_to_cpu(size_buf.size);
3495
3496         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3497                 (unsigned long long)snap_id, (unsigned int)*order,
3498                 (unsigned long long)*snap_size);
3499
3500         return 0;
3501 }
3502
3503 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3504 {
3505         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3506                                         &rbd_dev->header.obj_order,
3507                                         &rbd_dev->header.image_size);
3508 }
3509
3510 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3511 {
3512         void *reply_buf;
3513         int ret;
3514         void *p;
3515
3516         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3517         if (!reply_buf)
3518                 return -ENOMEM;
3519
3520         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3521                                 "rbd", "get_object_prefix", NULL, 0,
3522                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3523         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3524         if (ret < 0)
3525                 goto out;
3526
3527         p = reply_buf;
3528         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3529                                                 p + ret, NULL, GFP_NOIO);
3530         ret = 0;
3531
3532         if (IS_ERR(rbd_dev->header.object_prefix)) {
3533                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3534                 rbd_dev->header.object_prefix = NULL;
3535         } else {
3536                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3537         }
3538 out:
3539         kfree(reply_buf);
3540
3541         return ret;
3542 }
3543
3544 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3545                 u64 *snap_features)
3546 {
3547         __le64 snapid = cpu_to_le64(snap_id);
3548         struct {
3549                 __le64 features;
3550                 __le64 incompat;
3551         } __attribute__ ((packed)) features_buf = { 0 };
3552         u64 incompat;
3553         int ret;
3554
3555         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3556                                 "rbd", "get_features",
3557                                 &snapid, sizeof (snapid),
3558                                 &features_buf, sizeof (features_buf), NULL);
3559         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3560         if (ret < 0)
3561                 return ret;
3562         if (ret < sizeof (features_buf))
3563                 return -ERANGE;
3564
3565         incompat = le64_to_cpu(features_buf.incompat);
3566         if (incompat & ~RBD_FEATURES_SUPPORTED)
3567                 return -ENXIO;
3568
3569         *snap_features = le64_to_cpu(features_buf.features);
3570
3571         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3572                 (unsigned long long)snap_id,
3573                 (unsigned long long)*snap_features,
3574                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3575
3576         return 0;
3577 }
3578
3579 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3580 {
3581         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3582                                                 &rbd_dev->header.features);
3583 }
3584
3585 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3586 {
3587         struct rbd_spec *parent_spec;
3588         size_t size;
3589         void *reply_buf = NULL;
3590         __le64 snapid;
3591         void *p;
3592         void *end;
3593         char *image_id;
3594         u64 overlap;
3595         int ret;
3596
3597         parent_spec = rbd_spec_alloc();
3598         if (!parent_spec)
3599                 return -ENOMEM;
3600
3601         size = sizeof (__le64) +                                /* pool_id */
3602                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3603                 sizeof (__le64) +                               /* snap_id */
3604                 sizeof (__le64);                                /* overlap */
3605         reply_buf = kmalloc(size, GFP_KERNEL);
3606         if (!reply_buf) {
3607                 ret = -ENOMEM;
3608                 goto out_err;
3609         }
3610
3611         snapid = cpu_to_le64(CEPH_NOSNAP);
3612         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3613                                 "rbd", "get_parent",
3614                                 &snapid, sizeof (snapid),
3615                                 reply_buf, size, NULL);
3616         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3617         if (ret < 0)
3618                 goto out_err;
3619
3620         p = reply_buf;
3621         end = reply_buf + ret;
3622         ret = -ERANGE;
3623         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3624         if (parent_spec->pool_id == CEPH_NOPOOL)
3625                 goto out;       /* No parent?  No problem. */
3626
3627         /* The ceph file layout needs to fit pool id in 32 bits */
3628
3629         ret = -EIO;
3630         if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
3631                 goto out_err;
3632
3633         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3634         if (IS_ERR(image_id)) {
3635                 ret = PTR_ERR(image_id);
3636                 goto out_err;
3637         }
3638         parent_spec->image_id = image_id;
3639         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3640         ceph_decode_64_safe(&p, end, overlap, out_err);
3641
3642         rbd_dev->parent_overlap = overlap;
3643         rbd_dev->parent_spec = parent_spec;
3644         parent_spec = NULL;     /* rbd_dev now owns this */
3645 out:
3646         ret = 0;
3647 out_err:
3648         kfree(reply_buf);
3649         rbd_spec_put(parent_spec);
3650
3651         return ret;
3652 }
3653
3654 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3655 {
3656         struct {
3657                 __le64 stripe_unit;
3658                 __le64 stripe_count;
3659         } __attribute__ ((packed)) striping_info_buf = { 0 };
3660         size_t size = sizeof (striping_info_buf);
3661         void *p;
3662         u64 obj_size;
3663         u64 stripe_unit;
3664         u64 stripe_count;
3665         int ret;
3666
3667         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3668                                 "rbd", "get_stripe_unit_count", NULL, 0,
3669                                 (char *)&striping_info_buf, size, NULL);
3670         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3671         if (ret < 0)
3672                 return ret;
3673         if (ret < size)
3674                 return -ERANGE;
3675
3676         /*
3677          * We don't actually support the "fancy striping" feature
3678          * (STRIPINGV2) yet, but if the striping sizes are the
3679          * defaults the behavior is the same as before.  So find
3680          * out, and only fail if the image has non-default values.
3681          */
3682         ret = -EINVAL;
3683         obj_size = (u64)1 << rbd_dev->header.obj_order;
3684         p = &striping_info_buf;
3685         stripe_unit = ceph_decode_64(&p);
3686         if (stripe_unit != obj_size) {
3687                 rbd_warn(rbd_dev, "unsupported stripe unit "
3688                                 "(got %llu want %llu)",
3689                                 stripe_unit, obj_size);
3690                 return -EINVAL;
3691         }
3692         stripe_count = ceph_decode_64(&p);
3693         if (stripe_count != 1) {
3694                 rbd_warn(rbd_dev, "unsupported stripe count "
3695                                 "(got %llu want 1)", stripe_count);
3696                 return -EINVAL;
3697         }
3698         rbd_dev->stripe_unit = stripe_unit;
3699         rbd_dev->stripe_count = stripe_count;
3700
3701         return 0;
3702 }
3703
3704 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3705 {
3706         size_t image_id_size;
3707         char *image_id;
3708         void *p;
3709         void *end;
3710         size_t size;
3711         void *reply_buf = NULL;
3712         size_t len = 0;
3713         char *image_name = NULL;
3714         int ret;
3715
3716         rbd_assert(!rbd_dev->spec->image_name);
3717
3718         len = strlen(rbd_dev->spec->image_id);
3719         image_id_size = sizeof (__le32) + len;
3720         image_id = kmalloc(image_id_size, GFP_KERNEL);
3721         if (!image_id)
3722                 return NULL;
3723
3724         p = image_id;
3725         end = image_id + image_id_size;
3726         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3727
3728         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3729         reply_buf = kmalloc(size, GFP_KERNEL);
3730         if (!reply_buf)
3731                 goto out;
3732
3733         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3734                                 "rbd", "dir_get_name",
3735                                 image_id, image_id_size,
3736                                 reply_buf, size, NULL);
3737         if (ret < 0)
3738                 goto out;
3739         p = reply_buf;
3740         end = reply_buf + ret;
3741
3742         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3743         if (IS_ERR(image_name))
3744                 image_name = NULL;
3745         else
3746                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3747 out:
3748         kfree(reply_buf);
3749         kfree(image_id);
3750
3751         return image_name;
3752 }
3753
3754 /*
3755  * When a parent image gets probed, we only have the pool, image,
3756  * and snapshot ids but not the names of any of them.  This call
3757  * is made later to fill in those names.  It has to be done after
3758  * rbd_dev_snaps_update() has completed because some of the
3759  * information (in particular, snapshot name) is not available
3760  * until then.
3761  *
3762  * When an image being mapped (not a parent) is probed, we have the
3763  * pool name and pool id, image name and image id, and the snapshot
3764  * name.  The only thing we're missing is the snapshot id.
3765  */
3766 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3767 {
3768         struct ceph_osd_client *osdc;
3769         const char *name;
3770         void *reply_buf = NULL;
3771         int ret;
3772
3773         /*
3774          * An image being mapped will have the pool name (etc.), but
3775          * we need to look up the snapshot id.
3776          */
3777         if (rbd_dev->spec->pool_name) {
3778                 if (strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3779                         struct rbd_snap *snap;
3780
3781                         snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
3782                         if (!snap)
3783                                 return -ENOENT;
3784                         rbd_dev->spec->snap_id = snap->id;
3785                 } else {
3786                         rbd_dev->spec->snap_id = CEPH_NOSNAP;
3787                 }
3788
3789                 return 0;
3790         }
3791
3792         /* Look up the pool name */
3793
3794         osdc = &rbd_dev->rbd_client->client->osdc;
3795         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3796         if (!name) {
3797                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3798                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3799                 return -EIO;
3800         }
3801
3802         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3803         if (!rbd_dev->spec->pool_name)
3804                 return -ENOMEM;
3805
3806         /* Fetch the image name; tolerate failure here */
3807
3808         name = rbd_dev_image_name(rbd_dev);
3809         if (name)
3810                 rbd_dev->spec->image_name = (char *)name;
3811         else
3812                 rbd_warn(rbd_dev, "unable to get image name");
3813
3814         /* Look up the snapshot name. */
3815
3816         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3817         if (!name) {
3818                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3819                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3820                 ret = -EIO;
3821                 goto out_err;
3822         }
3823         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3824         if(!rbd_dev->spec->snap_name)
3825                 goto out_err;
3826
3827         return 0;
3828 out_err:
3829         kfree(reply_buf);
3830         kfree(rbd_dev->spec->pool_name);
3831         rbd_dev->spec->pool_name = NULL;
3832
3833         return ret;
3834 }
3835
3836 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3837 {
3838         size_t size;
3839         int ret;
3840         void *reply_buf;
3841         void *p;
3842         void *end;
3843         u64 seq;
3844         u32 snap_count;
3845         struct ceph_snap_context *snapc;
3846         u32 i;
3847
3848         /*
3849          * We'll need room for the seq value (maximum snapshot id),
3850          * snapshot count, and array of that many snapshot ids.
3851          * For now we have a fixed upper limit on the number we're
3852          * prepared to receive.
3853          */
3854         size = sizeof (__le64) + sizeof (__le32) +
3855                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3856         reply_buf = kzalloc(size, GFP_KERNEL);
3857         if (!reply_buf)
3858                 return -ENOMEM;
3859
3860         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3861                                 "rbd", "get_snapcontext", NULL, 0,
3862                                 reply_buf, size, ver);
3863         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3864         if (ret < 0)
3865                 goto out;
3866
3867         p = reply_buf;
3868         end = reply_buf + ret;
3869         ret = -ERANGE;
3870         ceph_decode_64_safe(&p, end, seq, out);
3871         ceph_decode_32_safe(&p, end, snap_count, out);
3872
3873         /*
3874          * Make sure the reported number of snapshot ids wouldn't go
3875          * beyond the end of our buffer.  But before checking that,
3876          * make sure the computed size of the snapshot context we
3877          * allocate is representable in a size_t.
3878          */
3879         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3880                                  / sizeof (u64)) {
3881                 ret = -EINVAL;
3882                 goto out;
3883         }
3884         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3885                 goto out;
3886
3887         size = sizeof (struct ceph_snap_context) +
3888                                 snap_count * sizeof (snapc->snaps[0]);
3889         snapc = kmalloc(size, GFP_KERNEL);
3890         if (!snapc) {
3891                 ret = -ENOMEM;
3892                 goto out;
3893         }
3894         ret = 0;
3895
3896         atomic_set(&snapc->nref, 1);
3897         snapc->seq = seq;
3898         snapc->num_snaps = snap_count;
3899         for (i = 0; i < snap_count; i++)
3900                 snapc->snaps[i] = ceph_decode_64(&p);
3901
3902         rbd_dev->header.snapc = snapc;
3903
3904         dout("  snap context seq = %llu, snap_count = %u\n",
3905                 (unsigned long long)seq, (unsigned int)snap_count);
3906 out:
3907         kfree(reply_buf);
3908
3909         return ret;
3910 }
3911
3912 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3913 {
3914         size_t size;
3915         void *reply_buf;
3916         __le64 snap_id;
3917         int ret;
3918         void *p;
3919         void *end;
3920         char *snap_name;
3921
3922         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3923         reply_buf = kmalloc(size, GFP_KERNEL);
3924         if (!reply_buf)
3925                 return ERR_PTR(-ENOMEM);
3926
3927         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3928         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3929         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3930                                 "rbd", "get_snapshot_name",
3931                                 &snap_id, sizeof (snap_id),
3932                                 reply_buf, size, NULL);
3933         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3934         if (ret < 0) {
3935                 snap_name = ERR_PTR(ret);
3936                 goto out;
3937         }
3938
3939         p = reply_buf;
3940         end = reply_buf + ret;
3941         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3942         if (IS_ERR(snap_name))
3943                 goto out;
3944
3945         dout("  snap_id 0x%016llx snap_name = %s\n",
3946                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3947 out:
3948         kfree(reply_buf);
3949
3950         return snap_name;
3951 }
3952
3953 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3954                 u64 *snap_size, u64 *snap_features)
3955 {
3956         u64 snap_id;
3957         u64 size;
3958         u64 features;
3959         char *snap_name;
3960         int ret;
3961
3962         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3963         snap_id = rbd_dev->header.snapc->snaps[which];
3964         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3965         if (ret)
3966                 goto out_err;
3967
3968         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3969         if (ret)
3970                 goto out_err;
3971
3972         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3973         if (!IS_ERR(snap_name)) {
3974                 *snap_size = size;
3975                 *snap_features = features;
3976         }
3977
3978         return snap_name;
3979 out_err:
3980         return ERR_PTR(ret);
3981 }
3982
3983 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3984                 u64 *snap_size, u64 *snap_features)
3985 {
3986         if (rbd_dev->image_format == 1)
3987                 return rbd_dev_v1_snap_info(rbd_dev, which,
3988                                         snap_size, snap_features);
3989         if (rbd_dev->image_format == 2)
3990                 return rbd_dev_v2_snap_info(rbd_dev, which,
3991                                         snap_size, snap_features);
3992         return ERR_PTR(-EINVAL);
3993 }
3994
3995 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3996 {
3997         int ret;
3998         __u8 obj_order;
3999
4000         down_write(&rbd_dev->header_rwsem);
4001
4002         /* Grab old order first, to see if it changes */
4003
4004         obj_order = rbd_dev->header.obj_order,
4005         ret = rbd_dev_v2_image_size(rbd_dev);
4006         if (ret)
4007                 goto out;
4008         if (rbd_dev->header.obj_order != obj_order) {
4009                 ret = -EIO;
4010                 goto out;
4011         }
4012         rbd_update_mapping_size(rbd_dev);
4013
4014         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4015         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4016         if (ret)
4017                 goto out;
4018         ret = rbd_dev_snaps_update(rbd_dev);
4019         dout("rbd_dev_snaps_update returned %d\n", ret);
4020         if (ret)
4021                 goto out;
4022 out:
4023         up_write(&rbd_dev->header_rwsem);
4024
4025         return ret;
4026 }
4027
4028 /*
4029  * Scan the rbd device's current snapshot list and compare it to the
4030  * newly-received snapshot context.  Remove any existing snapshots
4031  * not present in the new snapshot context.  Add a new snapshot for
4032  * any snaphots in the snapshot context not in the current list.
4033  * And verify there are no changes to snapshots we already know
4034  * about.
4035  *
4036  * Assumes the snapshots in the snapshot context are sorted by
4037  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4038  * are also maintained in that order.)
4039  *
4040  * Note that any error occurs while updating the snapshot list
4041  * aborts the update, and the entire list is cleared.  The snapshot
4042  * list becomes inconsistent at that point anyway, so it might as
4043  * well be empty.
4044  */
4045 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4046 {
4047         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4048         const u32 snap_count = snapc->num_snaps;
4049         struct list_head *head = &rbd_dev->snaps;
4050         struct list_head *links = head->next;
4051         u32 index = 0;
4052         int ret = 0;
4053
4054         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4055         while (index < snap_count || links != head) {
4056                 u64 snap_id;
4057                 struct rbd_snap *snap;
4058                 char *snap_name;
4059                 u64 snap_size = 0;
4060                 u64 snap_features = 0;
4061
4062                 snap_id = index < snap_count ? snapc->snaps[index]
4063                                              : CEPH_NOSNAP;
4064                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4065                                      : NULL;
4066                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4067
4068                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4069                         struct list_head *next = links->next;
4070
4071                         /*
4072                          * A previously-existing snapshot is not in
4073                          * the new snap context.
4074                          *
4075                          * If the now-missing snapshot is the one
4076                          * the image represents, clear its existence
4077                          * flag so we can avoid sending any more
4078                          * requests to it.
4079                          */
4080                         if (rbd_dev->spec->snap_id == snap->id)
4081                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4082                         dout("removing %ssnap id %llu\n",
4083                                 rbd_dev->spec->snap_id == snap->id ?
4084                                                         "mapped " : "",
4085                                 (unsigned long long)snap->id);
4086
4087                         list_del(&snap->node);
4088                         rbd_snap_destroy(snap);
4089
4090                         /* Done with this list entry; advance */
4091
4092                         links = next;
4093                         continue;
4094                 }
4095
4096                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4097                                         &snap_size, &snap_features);
4098                 if (IS_ERR(snap_name)) {
4099                         ret = PTR_ERR(snap_name);
4100                         dout("failed to get snap info, error %d\n", ret);
4101                         goto out_err;
4102                 }
4103
4104                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4105                         (unsigned long long)snap_id);
4106                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4107                         struct rbd_snap *new_snap;
4108
4109                         /* We haven't seen this snapshot before */
4110
4111                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4112                                         snap_id, snap_size, snap_features);
4113                         if (IS_ERR(new_snap)) {
4114                                 ret = PTR_ERR(new_snap);
4115                                 dout("  failed to add dev, error %d\n", ret);
4116                                 goto out_err;
4117                         }
4118
4119                         /* New goes before existing, or at end of list */
4120
4121                         dout("  added dev%s\n", snap ? "" : " at end\n");
4122                         if (snap)
4123                                 list_add_tail(&new_snap->node, &snap->node);
4124                         else
4125                                 list_add_tail(&new_snap->node, head);
4126                 } else {
4127                         /* Already have this one */
4128
4129                         dout("  already present\n");
4130
4131                         rbd_assert(snap->size == snap_size);
4132                         rbd_assert(!strcmp(snap->name, snap_name));
4133                         rbd_assert(snap->features == snap_features);
4134
4135                         /* Done with this list entry; advance */
4136
4137                         links = links->next;
4138                 }
4139
4140                 /* Advance to the next entry in the snapshot context */
4141
4142                 index++;
4143         }
4144         dout("%s: done\n", __func__);
4145
4146         return 0;
4147 out_err:
4148         rbd_remove_all_snaps(rbd_dev);
4149
4150         return ret;
4151 }
4152
4153 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4154 {
4155         struct device *dev;
4156         int ret;
4157
4158         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4159
4160         dev = &rbd_dev->dev;
4161         dev->bus = &rbd_bus_type;
4162         dev->type = &rbd_device_type;
4163         dev->parent = &rbd_root_dev;
4164         dev->release = rbd_dev_release;
4165         dev_set_name(dev, "%d", rbd_dev->dev_id);
4166         ret = device_register(dev);
4167
4168         mutex_unlock(&ctl_mutex);
4169
4170         return ret;
4171 }
4172
4173 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4174 {
4175         device_unregister(&rbd_dev->dev);
4176 }
4177
4178 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4179
4180 /*
4181  * Get a unique rbd identifier for the given new rbd_dev, and add
4182  * the rbd_dev to the global list.  The minimum rbd id is 1.
4183  */
4184 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4185 {
4186         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4187
4188         spin_lock(&rbd_dev_list_lock);
4189         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4190         spin_unlock(&rbd_dev_list_lock);
4191         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4192                 (unsigned long long) rbd_dev->dev_id);
4193 }
4194
4195 /*
4196  * Remove an rbd_dev from the global list, and record that its
4197  * identifier is no longer in use.
4198  */
4199 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4200 {
4201         struct list_head *tmp;
4202         int rbd_id = rbd_dev->dev_id;
4203         int max_id;
4204
4205         rbd_assert(rbd_id > 0);
4206
4207         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4208                 (unsigned long long) rbd_dev->dev_id);
4209         spin_lock(&rbd_dev_list_lock);
4210         list_del_init(&rbd_dev->node);
4211
4212         /*
4213          * If the id being "put" is not the current maximum, there
4214          * is nothing special we need to do.
4215          */
4216         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4217                 spin_unlock(&rbd_dev_list_lock);
4218                 return;
4219         }
4220
4221         /*
4222          * We need to update the current maximum id.  Search the
4223          * list to find out what it is.  We're more likely to find
4224          * the maximum at the end, so search the list backward.
4225          */
4226         max_id = 0;
4227         list_for_each_prev(tmp, &rbd_dev_list) {
4228                 struct rbd_device *rbd_dev;
4229
4230                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4231                 if (rbd_dev->dev_id > max_id)
4232                         max_id = rbd_dev->dev_id;
4233         }
4234         spin_unlock(&rbd_dev_list_lock);
4235
4236         /*
4237          * The max id could have been updated by rbd_dev_id_get(), in
4238          * which case it now accurately reflects the new maximum.
4239          * Be careful not to overwrite the maximum value in that
4240          * case.
4241          */
4242         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4243         dout("  max dev id has been reset\n");
4244 }
4245
4246 /*
4247  * Skips over white space at *buf, and updates *buf to point to the
4248  * first found non-space character (if any). Returns the length of
4249  * the token (string of non-white space characters) found.  Note
4250  * that *buf must be terminated with '\0'.
4251  */
4252 static inline size_t next_token(const char **buf)
4253 {
4254         /*
4255         * These are the characters that produce nonzero for
4256         * isspace() in the "C" and "POSIX" locales.
4257         */
4258         const char *spaces = " \f\n\r\t\v";
4259
4260         *buf += strspn(*buf, spaces);   /* Find start of token */
4261
4262         return strcspn(*buf, spaces);   /* Return token length */
4263 }
4264
4265 /*
4266  * Finds the next token in *buf, and if the provided token buffer is
4267  * big enough, copies the found token into it.  The result, if
4268  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4269  * must be terminated with '\0' on entry.
4270  *
4271  * Returns the length of the token found (not including the '\0').
4272  * Return value will be 0 if no token is found, and it will be >=
4273  * token_size if the token would not fit.
4274  *
4275  * The *buf pointer will be updated to point beyond the end of the
4276  * found token.  Note that this occurs even if the token buffer is
4277  * too small to hold it.
4278  */
4279 static inline size_t copy_token(const char **buf,
4280                                 char *token,
4281                                 size_t token_size)
4282 {
4283         size_t len;
4284
4285         len = next_token(buf);
4286         if (len < token_size) {
4287                 memcpy(token, *buf, len);
4288                 *(token + len) = '\0';
4289         }
4290         *buf += len;
4291
4292         return len;
4293 }
4294
4295 /*
4296  * Finds the next token in *buf, dynamically allocates a buffer big
4297  * enough to hold a copy of it, and copies the token into the new
4298  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4299  * that a duplicate buffer is created even for a zero-length token.
4300  *
4301  * Returns a pointer to the newly-allocated duplicate, or a null
4302  * pointer if memory for the duplicate was not available.  If
4303  * the lenp argument is a non-null pointer, the length of the token
4304  * (not including the '\0') is returned in *lenp.
4305  *
4306  * If successful, the *buf pointer will be updated to point beyond
4307  * the end of the found token.
4308  *
4309  * Note: uses GFP_KERNEL for allocation.
4310  */
4311 static inline char *dup_token(const char **buf, size_t *lenp)
4312 {
4313         char *dup;
4314         size_t len;
4315
4316         len = next_token(buf);
4317         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4318         if (!dup)
4319                 return NULL;
4320         *(dup + len) = '\0';
4321         *buf += len;
4322
4323         if (lenp)
4324                 *lenp = len;
4325
4326         return dup;
4327 }
4328
4329 /*
4330  * Parse the options provided for an "rbd add" (i.e., rbd image
4331  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4332  * and the data written is passed here via a NUL-terminated buffer.
4333  * Returns 0 if successful or an error code otherwise.
4334  *
4335  * The information extracted from these options is recorded in
4336  * the other parameters which return dynamically-allocated
4337  * structures:
4338  *  ceph_opts
4339  *      The address of a pointer that will refer to a ceph options
4340  *      structure.  Caller must release the returned pointer using
4341  *      ceph_destroy_options() when it is no longer needed.
4342  *  rbd_opts
4343  *      Address of an rbd options pointer.  Fully initialized by
4344  *      this function; caller must release with kfree().
4345  *  spec
4346  *      Address of an rbd image specification pointer.  Fully
4347  *      initialized by this function based on parsed options.
4348  *      Caller must release with rbd_spec_put().
4349  *
4350  * The options passed take this form:
4351  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4352  * where:
4353  *  <mon_addrs>
4354  *      A comma-separated list of one or more monitor addresses.
4355  *      A monitor address is an ip address, optionally followed
4356  *      by a port number (separated by a colon).
4357  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4358  *  <options>
4359  *      A comma-separated list of ceph and/or rbd options.
4360  *  <pool_name>
4361  *      The name of the rados pool containing the rbd image.
4362  *  <image_name>
4363  *      The name of the image in that pool to map.
4364  *  <snap_id>
4365  *      An optional snapshot id.  If provided, the mapping will
4366  *      present data from the image at the time that snapshot was
4367  *      created.  The image head is used if no snapshot id is
4368  *      provided.  Snapshot mappings are always read-only.
4369  */
4370 static int rbd_add_parse_args(const char *buf,
4371                                 struct ceph_options **ceph_opts,
4372                                 struct rbd_options **opts,
4373                                 struct rbd_spec **rbd_spec)
4374 {
4375         size_t len;
4376         char *options;
4377         const char *mon_addrs;
4378         char *snap_name;
4379         size_t mon_addrs_size;
4380         struct rbd_spec *spec = NULL;
4381         struct rbd_options *rbd_opts = NULL;
4382         struct ceph_options *copts;
4383         int ret;
4384
4385         /* The first four tokens are required */
4386
4387         len = next_token(&buf);
4388         if (!len) {
4389                 rbd_warn(NULL, "no monitor address(es) provided");
4390                 return -EINVAL;
4391         }
4392         mon_addrs = buf;
4393         mon_addrs_size = len + 1;
4394         buf += len;
4395
4396         ret = -EINVAL;
4397         options = dup_token(&buf, NULL);
4398         if (!options)
4399                 return -ENOMEM;
4400         if (!*options) {
4401                 rbd_warn(NULL, "no options provided");
4402                 goto out_err;
4403         }
4404
4405         spec = rbd_spec_alloc();
4406         if (!spec)
4407                 goto out_mem;
4408
4409         spec->pool_name = dup_token(&buf, NULL);
4410         if (!spec->pool_name)
4411                 goto out_mem;
4412         if (!*spec->pool_name) {
4413                 rbd_warn(NULL, "no pool name provided");
4414                 goto out_err;
4415         }
4416
4417         spec->image_name = dup_token(&buf, NULL);
4418         if (!spec->image_name)
4419                 goto out_mem;
4420         if (!*spec->image_name) {
4421                 rbd_warn(NULL, "no image name provided");
4422                 goto out_err;
4423         }
4424
4425         /*
4426          * Snapshot name is optional; default is to use "-"
4427          * (indicating the head/no snapshot).
4428          */
4429         len = next_token(&buf);
4430         if (!len) {
4431                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4432                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4433         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4434                 ret = -ENAMETOOLONG;
4435                 goto out_err;
4436         }
4437         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4438         if (!snap_name)
4439                 goto out_mem;
4440         *(snap_name + len) = '\0';
4441         spec->snap_name = snap_name;
4442
4443         /* Initialize all rbd options to the defaults */
4444
4445         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4446         if (!rbd_opts)
4447                 goto out_mem;
4448
4449         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4450
4451         copts = ceph_parse_options(options, mon_addrs,
4452                                         mon_addrs + mon_addrs_size - 1,
4453                                         parse_rbd_opts_token, rbd_opts);
4454         if (IS_ERR(copts)) {
4455                 ret = PTR_ERR(copts);
4456                 goto out_err;
4457         }
4458         kfree(options);
4459
4460         *ceph_opts = copts;
4461         *opts = rbd_opts;
4462         *rbd_spec = spec;
4463
4464         return 0;
4465 out_mem:
4466         ret = -ENOMEM;
4467 out_err:
4468         kfree(rbd_opts);
4469         rbd_spec_put(spec);
4470         kfree(options);
4471
4472         return ret;
4473 }
4474
4475 /*
4476  * An rbd format 2 image has a unique identifier, distinct from the
4477  * name given to it by the user.  Internally, that identifier is
4478  * what's used to specify the names of objects related to the image.
4479  *
4480  * A special "rbd id" object is used to map an rbd image name to its
4481  * id.  If that object doesn't exist, then there is no v2 rbd image
4482  * with the supplied name.
4483  *
4484  * This function will record the given rbd_dev's image_id field if
4485  * it can be determined, and in that case will return 0.  If any
4486  * errors occur a negative errno will be returned and the rbd_dev's
4487  * image_id field will be unchanged (and should be NULL).
4488  */
4489 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4490 {
4491         int ret;
4492         size_t size;
4493         char *object_name;
4494         void *response;
4495         char *image_id;
4496
4497         /*
4498          * When probing a parent image, the image id is already
4499          * known (and the image name likely is not).  There's no
4500          * need to fetch the image id again in this case.  We
4501          * do still need to set the image format though.
4502          */
4503         if (rbd_dev->spec->image_id) {
4504                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4505
4506                 return 0;
4507         }
4508
4509         /*
4510          * First, see if the format 2 image id file exists, and if
4511          * so, get the image's persistent id from it.
4512          */
4513         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4514         object_name = kmalloc(size, GFP_NOIO);
4515         if (!object_name)
4516                 return -ENOMEM;
4517         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4518         dout("rbd id object name is %s\n", object_name);
4519
4520         /* Response will be an encoded string, which includes a length */
4521
4522         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4523         response = kzalloc(size, GFP_NOIO);
4524         if (!response) {
4525                 ret = -ENOMEM;
4526                 goto out;
4527         }
4528
4529         /* If it doesn't exist we'll assume it's a format 1 image */
4530
4531         ret = rbd_obj_method_sync(rbd_dev, object_name,
4532                                 "rbd", "get_id", NULL, 0,
4533                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4534         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4535         if (ret == -ENOENT) {
4536                 image_id = kstrdup("", GFP_KERNEL);
4537                 ret = image_id ? 0 : -ENOMEM;
4538                 if (!ret)
4539                         rbd_dev->image_format = 1;
4540         } else if (ret > sizeof (__le32)) {
4541                 void *p = response;
4542
4543                 image_id = ceph_extract_encoded_string(&p, p + ret,
4544                                                 NULL, GFP_NOIO);
4545                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4546                 if (!ret)
4547                         rbd_dev->image_format = 2;
4548         } else {
4549                 ret = -EINVAL;
4550         }
4551
4552         if (!ret) {
4553                 rbd_dev->spec->image_id = image_id;
4554                 dout("image_id is %s\n", image_id);
4555         }
4556 out:
4557         kfree(response);
4558         kfree(object_name);
4559
4560         return ret;
4561 }
4562
4563 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4564 {
4565         int ret;
4566         size_t size;
4567
4568         /* Record the header object name for this rbd image. */
4569
4570         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4571         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4572         if (!rbd_dev->header_name) {
4573                 ret = -ENOMEM;
4574                 goto out_err;
4575         }
4576         sprintf(rbd_dev->header_name, "%s%s",
4577                 rbd_dev->spec->image_name, RBD_SUFFIX);
4578
4579         /* Populate rbd image metadata */
4580
4581         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4582         if (ret < 0)
4583                 goto out_err;
4584
4585         /* Version 1 images have no parent (no layering) */
4586
4587         rbd_dev->parent_spec = NULL;
4588         rbd_dev->parent_overlap = 0;
4589
4590         dout("discovered version 1 image, header name is %s\n",
4591                 rbd_dev->header_name);
4592
4593         return 0;
4594
4595 out_err:
4596         kfree(rbd_dev->header_name);
4597         rbd_dev->header_name = NULL;
4598         kfree(rbd_dev->spec->image_id);
4599         rbd_dev->spec->image_id = NULL;
4600
4601         return ret;
4602 }
4603
4604 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4605 {
4606         size_t size;
4607         int ret;
4608         u64 ver = 0;
4609
4610         /*
4611          * Image id was filled in by the caller.  Record the header
4612          * object name for this rbd image.
4613          */
4614         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4615         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4616         if (!rbd_dev->header_name)
4617                 return -ENOMEM;
4618         sprintf(rbd_dev->header_name, "%s%s",
4619                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4620
4621         /* Get the size and object order for the image */
4622         ret = rbd_dev_v2_image_size(rbd_dev);
4623         if (ret)
4624                 goto out_err;
4625
4626         /* Get the object prefix (a.k.a. block_name) for the image */
4627
4628         ret = rbd_dev_v2_object_prefix(rbd_dev);
4629         if (ret)
4630                 goto out_err;
4631
4632         /* Get the and check features for the image */
4633
4634         ret = rbd_dev_v2_features(rbd_dev);
4635         if (ret)
4636                 goto out_err;
4637
4638         /* If the image supports layering, get the parent info */
4639
4640         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4641                 ret = rbd_dev_v2_parent_info(rbd_dev);
4642                 if (ret)
4643                         goto out_err;
4644                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4645                                         "layered rbd images is EXPERIMENTAL!");
4646         }
4647
4648         /* If the image supports fancy striping, get its parameters */
4649
4650         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4651                 ret = rbd_dev_v2_striping_info(rbd_dev);
4652                 if (ret < 0)
4653                         goto out_err;
4654         }
4655
4656         /* crypto and compression type aren't (yet) supported for v2 images */
4657
4658         rbd_dev->header.crypt_type = 0;
4659         rbd_dev->header.comp_type = 0;
4660
4661         /* Get the snapshot context, plus the header version */
4662
4663         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4664         if (ret)
4665                 goto out_err;
4666         rbd_dev->header.obj_version = ver;
4667
4668         dout("discovered version 2 image, header name is %s\n",
4669                 rbd_dev->header_name);
4670
4671         return 0;
4672 out_err:
4673         rbd_dev->parent_overlap = 0;
4674         rbd_spec_put(rbd_dev->parent_spec);
4675         rbd_dev->parent_spec = NULL;
4676         kfree(rbd_dev->header_name);
4677         rbd_dev->header_name = NULL;
4678         kfree(rbd_dev->header.object_prefix);
4679         rbd_dev->header.object_prefix = NULL;
4680
4681         return ret;
4682 }
4683
4684 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4685 {
4686         struct rbd_device *parent = NULL;
4687         struct rbd_spec *parent_spec = NULL;
4688         struct rbd_client *rbdc = NULL;
4689         int ret;
4690
4691         /* no need to lock here, as rbd_dev is not registered yet */
4692         ret = rbd_dev_snaps_update(rbd_dev);
4693         if (ret)
4694                 return ret;
4695
4696         ret = rbd_dev_probe_update_spec(rbd_dev);
4697         if (ret)
4698                 goto err_out_snaps;
4699
4700         ret = rbd_dev_set_mapping(rbd_dev);
4701         if (ret)
4702                 goto err_out_snaps;
4703
4704         /* generate unique id: find highest unique id, add one */
4705         rbd_dev_id_get(rbd_dev);
4706
4707         /* Fill in the device name, now that we have its id. */
4708         BUILD_BUG_ON(DEV_NAME_LEN
4709                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4710         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4711
4712         /* Get our block major device number. */
4713
4714         ret = register_blkdev(0, rbd_dev->name);
4715         if (ret < 0)
4716                 goto err_out_id;
4717         rbd_dev->major = ret;
4718
4719         /* Set up the blkdev mapping. */
4720
4721         ret = rbd_init_disk(rbd_dev);
4722         if (ret)
4723                 goto err_out_blkdev;
4724
4725         ret = rbd_bus_add_dev(rbd_dev);
4726         if (ret)
4727                 goto err_out_disk;
4728
4729         /*
4730          * At this point cleanup in the event of an error is the job
4731          * of the sysfs code (initiated by rbd_bus_del_dev()).
4732          */
4733         /* Probe the parent if there is one */
4734
4735         if (rbd_dev->parent_spec) {
4736                 /*
4737                  * We need to pass a reference to the client and the
4738                  * parent spec when creating the parent rbd_dev.
4739                  * Images related by parent/child relationships
4740                  * always share both.
4741                  */
4742                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4743                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4744
4745                 parent = rbd_dev_create(rbdc, parent_spec);
4746                 if (!parent) {
4747                         ret = -ENOMEM;
4748                         goto err_out_spec;
4749                 }
4750                 rbdc = NULL;            /* parent now owns reference */
4751                 parent_spec = NULL;     /* parent now owns reference */
4752                 ret = rbd_dev_probe(parent);
4753                 if (ret < 0)
4754                         goto err_out_parent;
4755                 rbd_dev->parent = parent;
4756         }
4757
4758         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4759         if (ret)
4760                 goto err_out_bus;
4761
4762         /* Everything's ready.  Announce the disk to the world. */
4763
4764         add_disk(rbd_dev->disk);
4765
4766         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4767                 (unsigned long long) rbd_dev->mapping.size);
4768
4769         return ret;
4770
4771 err_out_parent:
4772         rbd_dev_destroy(parent);
4773 err_out_spec:
4774         rbd_spec_put(parent_spec);
4775         rbd_put_client(rbdc);
4776 err_out_bus:
4777         /* this will also clean up rest of rbd_dev stuff */
4778
4779         rbd_bus_del_dev(rbd_dev);
4780
4781         return ret;
4782 err_out_disk:
4783         rbd_free_disk(rbd_dev);
4784 err_out_blkdev:
4785         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4786 err_out_id:
4787         rbd_dev_id_put(rbd_dev);
4788 err_out_snaps:
4789         rbd_remove_all_snaps(rbd_dev);
4790
4791         return ret;
4792 }
4793
4794 /*
4795  * Probe for the existence of the header object for the given rbd
4796  * device.  For format 2 images this includes determining the image
4797  * id.
4798  */
4799 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4800 {
4801         int ret;
4802
4803         /*
4804          * Get the id from the image id object.  If it's not a
4805          * format 2 image, we'll get ENOENT back, and we'll assume
4806          * it's a format 1 image.
4807          */
4808         ret = rbd_dev_image_id(rbd_dev);
4809         if (ret)
4810                 return ret;
4811         rbd_assert(rbd_dev->spec->image_id);
4812         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4813
4814         if (rbd_dev->image_format == 1)
4815                 ret = rbd_dev_v1_probe(rbd_dev);
4816         else
4817                 ret = rbd_dev_v2_probe(rbd_dev);
4818         if (ret)
4819                 goto out_err;
4820
4821         ret = rbd_dev_probe_finish(rbd_dev);
4822         if (ret)
4823                 rbd_header_free(&rbd_dev->header);
4824
4825         return ret;
4826 out_err:
4827         kfree(rbd_dev->spec->image_id);
4828         rbd_dev->spec->image_id = NULL;
4829
4830         dout("probe failed, returning %d\n", ret);
4831
4832         return ret;
4833 }
4834
4835 static ssize_t rbd_add(struct bus_type *bus,
4836                        const char *buf,
4837                        size_t count)
4838 {
4839         struct rbd_device *rbd_dev = NULL;
4840         struct ceph_options *ceph_opts = NULL;
4841         struct rbd_options *rbd_opts = NULL;
4842         struct rbd_spec *spec = NULL;
4843         struct rbd_client *rbdc;
4844         struct ceph_osd_client *osdc;
4845         int rc = -ENOMEM;
4846
4847         if (!try_module_get(THIS_MODULE))
4848                 return -ENODEV;
4849
4850         /* parse add command */
4851         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4852         if (rc < 0)
4853                 goto err_out_module;
4854
4855         rbdc = rbd_get_client(ceph_opts);
4856         if (IS_ERR(rbdc)) {
4857                 rc = PTR_ERR(rbdc);
4858                 goto err_out_args;
4859         }
4860         ceph_opts = NULL;       /* rbd_dev client now owns this */
4861
4862         /* pick the pool */
4863         osdc = &rbdc->client->osdc;
4864         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4865         if (rc < 0)
4866                 goto err_out_client;
4867         spec->pool_id = (u64) rc;
4868
4869         /* The ceph file layout needs to fit pool id in 32 bits */
4870
4871         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4872                 rc = -EIO;
4873                 goto err_out_client;
4874         }
4875
4876         rbd_dev = rbd_dev_create(rbdc, spec);
4877         if (!rbd_dev)
4878                 goto err_out_client;
4879         rbdc = NULL;            /* rbd_dev now owns this */
4880         spec = NULL;            /* rbd_dev now owns this */
4881
4882         rbd_dev->mapping.read_only = rbd_opts->read_only;
4883         kfree(rbd_opts);
4884         rbd_opts = NULL;        /* done with this */
4885
4886         rc = rbd_dev_probe(rbd_dev);
4887         if (rc < 0)
4888                 goto err_out_rbd_dev;
4889
4890         return count;
4891 err_out_rbd_dev:
4892         rbd_dev_destroy(rbd_dev);
4893 err_out_client:
4894         rbd_put_client(rbdc);
4895 err_out_args:
4896         if (ceph_opts)
4897                 ceph_destroy_options(ceph_opts);
4898         kfree(rbd_opts);
4899         rbd_spec_put(spec);
4900 err_out_module:
4901         module_put(THIS_MODULE);
4902
4903         dout("Error adding device %s\n", buf);
4904
4905         return (ssize_t) rc;
4906 }
4907
4908 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4909 {
4910         struct list_head *tmp;
4911         struct rbd_device *rbd_dev;
4912
4913         spin_lock(&rbd_dev_list_lock);
4914         list_for_each(tmp, &rbd_dev_list) {
4915                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4916                 if (rbd_dev->dev_id == dev_id) {
4917                         spin_unlock(&rbd_dev_list_lock);
4918                         return rbd_dev;
4919                 }
4920         }
4921         spin_unlock(&rbd_dev_list_lock);
4922         return NULL;
4923 }
4924
4925 static void rbd_dev_release(struct device *dev)
4926 {
4927         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4928
4929         if (rbd_dev->watch_event)
4930                 rbd_dev_header_watch_sync(rbd_dev, 0);
4931
4932         /* clean up and free blkdev */
4933         rbd_free_disk(rbd_dev);
4934         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4935
4936         /* release allocated disk header fields */
4937         rbd_header_free(&rbd_dev->header);
4938
4939         /* done with the id, and with the rbd_dev */
4940         rbd_dev_id_put(rbd_dev);
4941         rbd_assert(rbd_dev->rbd_client != NULL);
4942         rbd_dev_destroy(rbd_dev);
4943
4944         /* release module ref */
4945         module_put(THIS_MODULE);
4946 }
4947
4948 static void __rbd_remove(struct rbd_device *rbd_dev)
4949 {
4950         rbd_remove_all_snaps(rbd_dev);
4951         rbd_bus_del_dev(rbd_dev);
4952 }
4953
4954 static ssize_t rbd_remove(struct bus_type *bus,
4955                           const char *buf,
4956                           size_t count)
4957 {
4958         struct rbd_device *rbd_dev = NULL;
4959         int target_id, rc;
4960         unsigned long ul;
4961         int ret = count;
4962
4963         rc = strict_strtoul(buf, 10, &ul);
4964         if (rc)
4965                 return rc;
4966
4967         /* convert to int; abort if we lost anything in the conversion */
4968         target_id = (int) ul;
4969         if (target_id != ul)
4970                 return -EINVAL;
4971
4972         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4973
4974         rbd_dev = __rbd_get_dev(target_id);
4975         if (!rbd_dev) {
4976                 ret = -ENOENT;
4977                 goto done;
4978         }
4979
4980         spin_lock_irq(&rbd_dev->lock);
4981         if (rbd_dev->open_count)
4982                 ret = -EBUSY;
4983         else
4984                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4985         spin_unlock_irq(&rbd_dev->lock);
4986         if (ret < 0)
4987                 goto done;
4988
4989         while (rbd_dev->parent_spec) {
4990                 struct rbd_device *first = rbd_dev;
4991                 struct rbd_device *second = first->parent;
4992                 struct rbd_device *third;
4993
4994                 /*
4995                  * Follow to the parent with no grandparent and
4996                  * remove it.
4997                  */
4998                 while (second && (third = second->parent)) {
4999                         first = second;
5000                         second = third;
5001                 }
5002                 __rbd_remove(second);
5003                 rbd_spec_put(first->parent_spec);
5004                 first->parent_spec = NULL;
5005                 first->parent_overlap = 0;
5006                 first->parent = NULL;
5007         }
5008         __rbd_remove(rbd_dev);
5009
5010 done:
5011         mutex_unlock(&ctl_mutex);
5012
5013         return ret;
5014 }
5015
5016 /*
5017  * create control files in sysfs
5018  * /sys/bus/rbd/...
5019  */
5020 static int rbd_sysfs_init(void)
5021 {
5022         int ret;
5023
5024         ret = device_register(&rbd_root_dev);
5025         if (ret < 0)
5026                 return ret;
5027
5028         ret = bus_register(&rbd_bus_type);
5029         if (ret < 0)
5030                 device_unregister(&rbd_root_dev);
5031
5032         return ret;
5033 }
5034
5035 static void rbd_sysfs_cleanup(void)
5036 {
5037         bus_unregister(&rbd_bus_type);
5038         device_unregister(&rbd_root_dev);
5039 }
5040
5041 static int __init rbd_init(void)
5042 {
5043         int rc;
5044
5045         if (!libceph_compatible(NULL)) {
5046                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5047
5048                 return -EINVAL;
5049         }
5050         rc = rbd_sysfs_init();
5051         if (rc)
5052                 return rc;
5053         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5054         return 0;
5055 }
5056
5057 static void __exit rbd_exit(void)
5058 {
5059         rbd_sysfs_cleanup();
5060 }
5061
5062 module_init(rbd_init);
5063 module_exit(rbd_exit);
5064
5065 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5066 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5067 MODULE_DESCRIPTION("rados block device");
5068
5069 /* following authorship retained from original osdblk.c */
5070 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5071
5072 MODULE_LICENSE("GPL");