]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: stop tracking header object version
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.  Each rbd_dev structure includes a pointer to
120  * an rbd_spec structure that encapsulates this identity.
121  *
122  * Each of the id's in an rbd_spec has an associated name.  For a
123  * user-mapped image, the names are supplied and the id's associated
124  * with them are looked up.  For a layered image, a parent image is
125  * defined by the tuple, and the names are looked up.
126  *
127  * An rbd_dev structure contains a parent_spec pointer which is
128  * non-null if the image it represents is a child in a layered
129  * image.  This pointer will refer to the rbd_spec structure used
130  * by the parent rbd_dev for its own identity (i.e., the structure
131  * is shared between the parent and child).
132  *
133  * Since these structures are populated once, during the discovery
134  * phase of image construction, they are effectively immutable so
135  * we make no effort to synchronize access to them.
136  *
137  * Note that code herein does not assume the image name is known (it
138  * could be a null pointer).
139  */
140 struct rbd_spec {
141         u64             pool_id;
142         const char      *pool_name;
143
144         const char      *image_id;
145         const char      *image_name;
146
147         u64             snap_id;
148         const char      *snap_name;
149
150         struct kref     kref;
151 };
152
153 /*
154  * an instance of the client.  multiple devices may share an rbd client.
155  */
156 struct rbd_client {
157         struct ceph_client      *client;
158         struct kref             kref;
159         struct list_head        node;
160 };
161
162 struct rbd_img_request;
163 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164
165 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
166
167 struct rbd_obj_request;
168 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169
170 enum obj_request_type {
171         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
172 };
173
174 enum obj_req_flags {
175         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
176         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
177         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
178         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
179 };
180
181 struct rbd_obj_request {
182         const char              *object_name;
183         u64                     offset;         /* object start byte */
184         u64                     length;         /* bytes from offset */
185         unsigned long           flags;
186
187         /*
188          * An object request associated with an image will have its
189          * img_data flag set; a standalone object request will not.
190          *
191          * A standalone object request will have which == BAD_WHICH
192          * and a null obj_request pointer.
193          *
194          * An object request initiated in support of a layered image
195          * object (to check for its existence before a write) will
196          * have which == BAD_WHICH and a non-null obj_request pointer.
197          *
198          * Finally, an object request for rbd image data will have
199          * which != BAD_WHICH, and will have a non-null img_request
200          * pointer.  The value of which will be in the range
201          * 0..(img_request->obj_request_count-1).
202          */
203         union {
204                 struct rbd_obj_request  *obj_request;   /* STAT op */
205                 struct {
206                         struct rbd_img_request  *img_request;
207                         u64                     img_offset;
208                         /* links for img_request->obj_requests list */
209                         struct list_head        links;
210                 };
211         };
212         u32                     which;          /* posn image request list */
213
214         enum obj_request_type   type;
215         union {
216                 struct bio      *bio_list;
217                 struct {
218                         struct page     **pages;
219                         u32             page_count;
220                 };
221         };
222         struct page             **copyup_pages;
223
224         struct ceph_osd_request *osd_req;
225
226         u64                     xferred;        /* bytes transferred */
227         u64                     version;
228         int                     result;
229
230         rbd_obj_callback_t      callback;
231         struct completion       completion;
232
233         struct kref             kref;
234 };
235
236 enum img_req_flags {
237         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
238         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
239         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
240 };
241
242 struct rbd_img_request {
243         struct rbd_device       *rbd_dev;
244         u64                     offset; /* starting image byte offset */
245         u64                     length; /* byte count from offset */
246         unsigned long           flags;
247         union {
248                 u64                     snap_id;        /* for reads */
249                 struct ceph_snap_context *snapc;        /* for writes */
250         };
251         union {
252                 struct request          *rq;            /* block request */
253                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
254         };
255         struct page             **copyup_pages;
256         spinlock_t              completion_lock;/* protects next_completion */
257         u32                     next_completion;
258         rbd_img_callback_t      callback;
259         u64                     xferred;/* aggregate bytes transferred */
260         int                     result; /* first nonzero obj_request result */
261
262         u32                     obj_request_count;
263         struct list_head        obj_requests;   /* rbd_obj_request structs */
264
265         struct kref             kref;
266 };
267
268 #define for_each_obj_request(ireq, oreq) \
269         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_from(ireq, oreq) \
271         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_safe(ireq, oreq, n) \
273         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
274
275 struct rbd_snap {
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         /* protects updating the header */
321         struct rw_semaphore     header_rwsem;
322
323         struct rbd_mapping      mapping;
324
325         struct list_head        node;
326
327         /* list of snapshots */
328         struct list_head        snaps;
329
330         /* sysfs related */
331         struct device           dev;
332         unsigned long           open_count;     /* protected by lock */
333 };
334
335 /*
336  * Flag bits for rbd_dev->flags.  If atomicity is required,
337  * rbd_dev->lock is used to protect access.
338  *
339  * Currently, only the "removing" flag (which is coupled with the
340  * "open_count" field) requires atomic access.
341  */
342 enum rbd_dev_flags {
343         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
344         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
345 };
346
347 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
348
349 static LIST_HEAD(rbd_dev_list);    /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
352 static LIST_HEAD(rbd_client_list);              /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358
359 static void rbd_dev_device_release(struct device *dev);
360 static void rbd_snap_destroy(struct rbd_snap *snap);
361
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363                        size_t count);
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365                           size_t count);
366 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
367
368 static struct bus_attribute rbd_bus_attrs[] = {
369         __ATTR(add, S_IWUSR, NULL, rbd_add),
370         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371         __ATTR_NULL
372 };
373
374 static struct bus_type rbd_bus_type = {
375         .name           = "rbd",
376         .bus_attrs      = rbd_bus_attrs,
377 };
378
379 static void rbd_root_dev_release(struct device *dev)
380 {
381 }
382
383 static struct device rbd_root_dev = {
384         .init_name =    "rbd",
385         .release =      rbd_root_dev_release,
386 };
387
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390 {
391         struct va_format vaf;
392         va_list args;
393
394         va_start(args, fmt);
395         vaf.fmt = fmt;
396         vaf.va = &args;
397
398         if (!rbd_dev)
399                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400         else if (rbd_dev->disk)
401                 printk(KERN_WARNING "%s: %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_name)
404                 printk(KERN_WARNING "%s: image %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406         else if (rbd_dev->spec && rbd_dev->spec->image_id)
407                 printk(KERN_WARNING "%s: id %s: %pV\n",
408                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409         else    /* punt */
410                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411                         RBD_DRV_NAME, rbd_dev, &vaf);
412         va_end(args);
413 }
414
415 #ifdef RBD_DEBUG
416 #define rbd_assert(expr)                                                \
417                 if (unlikely(!(expr))) {                                \
418                         printk(KERN_ERR "\nAssertion failure in %s() "  \
419                                                 "at line %d:\n\n"       \
420                                         "\trbd_assert(%s);\n\n",        \
421                                         __func__, __LINE__, #expr);     \
422                         BUG();                                          \
423                 }
424 #else /* !RBD_DEBUG */
425 #  define rbd_assert(expr)      ((void) 0)
426 #endif /* !RBD_DEBUG */
427
428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
431
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791
792         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
793         if (!header->snapc)
794                 goto out_err;
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         for (i = 0; i < snap_count; i++)
797                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
798
799         return 0;
800
801 out_err:
802         kfree(header->snap_sizes);
803         header->snap_sizes = NULL;
804         kfree(header->snap_names);
805         header->snap_names = NULL;
806         kfree(header->object_prefix);
807         header->object_prefix = NULL;
808
809         return -ENOMEM;
810 }
811
812 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
813 {
814         struct rbd_snap *snap;
815
816         if (snap_id == CEPH_NOSNAP)
817                 return RBD_SNAP_HEAD_NAME;
818
819         list_for_each_entry(snap, &rbd_dev->snaps, node)
820                 if (snap_id == snap->id)
821                         return snap->name;
822
823         return NULL;
824 }
825
826 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827                                         const char *snap_name)
828 {
829         struct rbd_snap *snap;
830
831         list_for_each_entry(snap, &rbd_dev->snaps, node)
832                 if (!strcmp(snap_name, snap->name))
833                         return snap;
834
835         return NULL;
836 }
837
838 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
839 {
840         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
841                     sizeof (RBD_SNAP_HEAD_NAME))) {
842                 rbd_dev->mapping.size = rbd_dev->header.image_size;
843                 rbd_dev->mapping.features = rbd_dev->header.features;
844         } else {
845                 struct rbd_snap *snap;
846
847                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
848                 if (!snap)
849                         return -ENOENT;
850                 rbd_dev->mapping.size = snap->size;
851                 rbd_dev->mapping.features = snap->features;
852                 rbd_dev->mapping.read_only = true;
853         }
854
855         return 0;
856 }
857
858 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
859 {
860         rbd_dev->mapping.size = 0;
861         rbd_dev->mapping.features = 0;
862         rbd_dev->mapping.read_only = true;
863 }
864
865 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
866 {
867         rbd_dev->mapping.size = 0;
868         rbd_dev->mapping.features = 0;
869         rbd_dev->mapping.read_only = true;
870 }
871
872 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
873 {
874         char *name;
875         u64 segment;
876         int ret;
877
878         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
879         if (!name)
880                 return NULL;
881         segment = offset >> rbd_dev->header.obj_order;
882         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
883                         rbd_dev->header.object_prefix, segment);
884         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
885                 pr_err("error formatting segment name for #%llu (%d)\n",
886                         segment, ret);
887                 kfree(name);
888                 name = NULL;
889         }
890
891         return name;
892 }
893
894 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
895 {
896         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
897
898         return offset & (segment_size - 1);
899 }
900
901 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902                                 u64 offset, u64 length)
903 {
904         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906         offset &= segment_size - 1;
907
908         rbd_assert(length <= U64_MAX - offset);
909         if (offset + length > segment_size)
910                 length = segment_size - offset;
911
912         return length;
913 }
914
915 /*
916  * returns the size of an object in the image
917  */
918 static u64 rbd_obj_bytes(struct rbd_image_header *header)
919 {
920         return 1 << header->obj_order;
921 }
922
923 /*
924  * bio helpers
925  */
926
927 static void bio_chain_put(struct bio *chain)
928 {
929         struct bio *tmp;
930
931         while (chain) {
932                 tmp = chain;
933                 chain = chain->bi_next;
934                 bio_put(tmp);
935         }
936 }
937
938 /*
939  * zeros a bio chain, starting at specific offset
940  */
941 static void zero_bio_chain(struct bio *chain, int start_ofs)
942 {
943         struct bio_vec *bv;
944         unsigned long flags;
945         void *buf;
946         int i;
947         int pos = 0;
948
949         while (chain) {
950                 bio_for_each_segment(bv, chain, i) {
951                         if (pos + bv->bv_len > start_ofs) {
952                                 int remainder = max(start_ofs - pos, 0);
953                                 buf = bvec_kmap_irq(bv, &flags);
954                                 memset(buf + remainder, 0,
955                                        bv->bv_len - remainder);
956                                 bvec_kunmap_irq(buf, &flags);
957                         }
958                         pos += bv->bv_len;
959                 }
960
961                 chain = chain->bi_next;
962         }
963 }
964
965 /*
966  * similar to zero_bio_chain(), zeros data defined by a page array,
967  * starting at the given byte offset from the start of the array and
968  * continuing up to the given end offset.  The pages array is
969  * assumed to be big enough to hold all bytes up to the end.
970  */
971 static void zero_pages(struct page **pages, u64 offset, u64 end)
972 {
973         struct page **page = &pages[offset >> PAGE_SHIFT];
974
975         rbd_assert(end > offset);
976         rbd_assert(end - offset <= (u64)SIZE_MAX);
977         while (offset < end) {
978                 size_t page_offset;
979                 size_t length;
980                 unsigned long flags;
981                 void *kaddr;
982
983                 page_offset = (size_t)(offset & ~PAGE_MASK);
984                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985                 local_irq_save(flags);
986                 kaddr = kmap_atomic(*page);
987                 memset(kaddr + page_offset, 0, length);
988                 kunmap_atomic(kaddr);
989                 local_irq_restore(flags);
990
991                 offset += length;
992                 page++;
993         }
994 }
995
996 /*
997  * Clone a portion of a bio, starting at the given byte offset
998  * and continuing for the number of bytes indicated.
999  */
1000 static struct bio *bio_clone_range(struct bio *bio_src,
1001                                         unsigned int offset,
1002                                         unsigned int len,
1003                                         gfp_t gfpmask)
1004 {
1005         struct bio_vec *bv;
1006         unsigned int resid;
1007         unsigned short idx;
1008         unsigned int voff;
1009         unsigned short end_idx;
1010         unsigned short vcnt;
1011         struct bio *bio;
1012
1013         /* Handle the easy case for the caller */
1014
1015         if (!offset && len == bio_src->bi_size)
1016                 return bio_clone(bio_src, gfpmask);
1017
1018         if (WARN_ON_ONCE(!len))
1019                 return NULL;
1020         if (WARN_ON_ONCE(len > bio_src->bi_size))
1021                 return NULL;
1022         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1023                 return NULL;
1024
1025         /* Find first affected segment... */
1026
1027         resid = offset;
1028         __bio_for_each_segment(bv, bio_src, idx, 0) {
1029                 if (resid < bv->bv_len)
1030                         break;
1031                 resid -= bv->bv_len;
1032         }
1033         voff = resid;
1034
1035         /* ...and the last affected segment */
1036
1037         resid += len;
1038         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039                 if (resid <= bv->bv_len)
1040                         break;
1041                 resid -= bv->bv_len;
1042         }
1043         vcnt = end_idx - idx + 1;
1044
1045         /* Build the clone */
1046
1047         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1048         if (!bio)
1049                 return NULL;    /* ENOMEM */
1050
1051         bio->bi_bdev = bio_src->bi_bdev;
1052         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053         bio->bi_rw = bio_src->bi_rw;
1054         bio->bi_flags |= 1 << BIO_CLONED;
1055
1056         /*
1057          * Copy over our part of the bio_vec, then update the first
1058          * and last (or only) entries.
1059          */
1060         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061                         vcnt * sizeof (struct bio_vec));
1062         bio->bi_io_vec[0].bv_offset += voff;
1063         if (vcnt > 1) {
1064                 bio->bi_io_vec[0].bv_len -= voff;
1065                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1066         } else {
1067                 bio->bi_io_vec[0].bv_len = len;
1068         }
1069
1070         bio->bi_vcnt = vcnt;
1071         bio->bi_size = len;
1072         bio->bi_idx = 0;
1073
1074         return bio;
1075 }
1076
1077 /*
1078  * Clone a portion of a bio chain, starting at the given byte offset
1079  * into the first bio in the source chain and continuing for the
1080  * number of bytes indicated.  The result is another bio chain of
1081  * exactly the given length, or a null pointer on error.
1082  *
1083  * The bio_src and offset parameters are both in-out.  On entry they
1084  * refer to the first source bio and the offset into that bio where
1085  * the start of data to be cloned is located.
1086  *
1087  * On return, bio_src is updated to refer to the bio in the source
1088  * chain that contains first un-cloned byte, and *offset will
1089  * contain the offset of that byte within that bio.
1090  */
1091 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092                                         unsigned int *offset,
1093                                         unsigned int len,
1094                                         gfp_t gfpmask)
1095 {
1096         struct bio *bi = *bio_src;
1097         unsigned int off = *offset;
1098         struct bio *chain = NULL;
1099         struct bio **end;
1100
1101         /* Build up a chain of clone bios up to the limit */
1102
1103         if (!bi || off >= bi->bi_size || !len)
1104                 return NULL;            /* Nothing to clone */
1105
1106         end = &chain;
1107         while (len) {
1108                 unsigned int bi_size;
1109                 struct bio *bio;
1110
1111                 if (!bi) {
1112                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1113                         goto out_err;   /* EINVAL; ran out of bio's */
1114                 }
1115                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1117                 if (!bio)
1118                         goto out_err;   /* ENOMEM */
1119
1120                 *end = bio;
1121                 end = &bio->bi_next;
1122
1123                 off += bi_size;
1124                 if (off == bi->bi_size) {
1125                         bi = bi->bi_next;
1126                         off = 0;
1127                 }
1128                 len -= bi_size;
1129         }
1130         *bio_src = bi;
1131         *offset = off;
1132
1133         return chain;
1134 out_err:
1135         bio_chain_put(chain);
1136
1137         return NULL;
1138 }
1139
1140 /*
1141  * The default/initial value for all object request flags is 0.  For
1142  * each flag, once its value is set to 1 it is never reset to 0
1143  * again.
1144  */
1145 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1146 {
1147         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1148                 struct rbd_device *rbd_dev;
1149
1150                 rbd_dev = obj_request->img_request->rbd_dev;
1151                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1152                         obj_request);
1153         }
1154 }
1155
1156 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1157 {
1158         smp_mb();
1159         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1160 }
1161
1162 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1163 {
1164         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165                 struct rbd_device *rbd_dev = NULL;
1166
1167                 if (obj_request_img_data_test(obj_request))
1168                         rbd_dev = obj_request->img_request->rbd_dev;
1169                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1170                         obj_request);
1171         }
1172 }
1173
1174 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1175 {
1176         smp_mb();
1177         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1178 }
1179
1180 /*
1181  * This sets the KNOWN flag after (possibly) setting the EXISTS
1182  * flag.  The latter is set based on the "exists" value provided.
1183  *
1184  * Note that for our purposes once an object exists it never goes
1185  * away again.  It's possible that the response from two existence
1186  * checks are separated by the creation of the target object, and
1187  * the first ("doesn't exist") response arrives *after* the second
1188  * ("does exist").  In that case we ignore the second one.
1189  */
1190 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1191                                 bool exists)
1192 {
1193         if (exists)
1194                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1196         smp_mb();
1197 }
1198
1199 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1200 {
1201         smp_mb();
1202         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1203 }
1204
1205 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1206 {
1207         smp_mb();
1208         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1209 }
1210
1211 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1212 {
1213         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214                 atomic_read(&obj_request->kref.refcount));
1215         kref_get(&obj_request->kref);
1216 }
1217
1218 static void rbd_obj_request_destroy(struct kref *kref);
1219 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1220 {
1221         rbd_assert(obj_request != NULL);
1222         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223                 atomic_read(&obj_request->kref.refcount));
1224         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1225 }
1226
1227 static void rbd_img_request_get(struct rbd_img_request *img_request)
1228 {
1229         dout("%s: img %p (was %d)\n", __func__, img_request,
1230                 atomic_read(&img_request->kref.refcount));
1231         kref_get(&img_request->kref);
1232 }
1233
1234 static void rbd_img_request_destroy(struct kref *kref);
1235 static void rbd_img_request_put(struct rbd_img_request *img_request)
1236 {
1237         rbd_assert(img_request != NULL);
1238         dout("%s: img %p (was %d)\n", __func__, img_request,
1239                 atomic_read(&img_request->kref.refcount));
1240         kref_put(&img_request->kref, rbd_img_request_destroy);
1241 }
1242
1243 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244                                         struct rbd_obj_request *obj_request)
1245 {
1246         rbd_assert(obj_request->img_request == NULL);
1247
1248         /* Image request now owns object's original reference */
1249         obj_request->img_request = img_request;
1250         obj_request->which = img_request->obj_request_count;
1251         rbd_assert(!obj_request_img_data_test(obj_request));
1252         obj_request_img_data_set(obj_request);
1253         rbd_assert(obj_request->which != BAD_WHICH);
1254         img_request->obj_request_count++;
1255         list_add_tail(&obj_request->links, &img_request->obj_requests);
1256         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257                 obj_request->which);
1258 }
1259
1260 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261                                         struct rbd_obj_request *obj_request)
1262 {
1263         rbd_assert(obj_request->which != BAD_WHICH);
1264
1265         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266                 obj_request->which);
1267         list_del(&obj_request->links);
1268         rbd_assert(img_request->obj_request_count > 0);
1269         img_request->obj_request_count--;
1270         rbd_assert(obj_request->which == img_request->obj_request_count);
1271         obj_request->which = BAD_WHICH;
1272         rbd_assert(obj_request_img_data_test(obj_request));
1273         rbd_assert(obj_request->img_request == img_request);
1274         obj_request->img_request = NULL;
1275         obj_request->callback = NULL;
1276         rbd_obj_request_put(obj_request);
1277 }
1278
1279 static bool obj_request_type_valid(enum obj_request_type type)
1280 {
1281         switch (type) {
1282         case OBJ_REQUEST_NODATA:
1283         case OBJ_REQUEST_BIO:
1284         case OBJ_REQUEST_PAGES:
1285                 return true;
1286         default:
1287                 return false;
1288         }
1289 }
1290
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292                                 struct rbd_obj_request *obj_request)
1293 {
1294         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1295
1296         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1297 }
1298
1299 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1300 {
1301
1302         dout("%s: img %p\n", __func__, img_request);
1303
1304         /*
1305          * If no error occurred, compute the aggregate transfer
1306          * count for the image request.  We could instead use
1307          * atomic64_cmpxchg() to update it as each object request
1308          * completes; not clear which way is better off hand.
1309          */
1310         if (!img_request->result) {
1311                 struct rbd_obj_request *obj_request;
1312                 u64 xferred = 0;
1313
1314                 for_each_obj_request(img_request, obj_request)
1315                         xferred += obj_request->xferred;
1316                 img_request->xferred = xferred;
1317         }
1318
1319         if (img_request->callback)
1320                 img_request->callback(img_request);
1321         else
1322                 rbd_img_request_put(img_request);
1323 }
1324
1325 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1326
1327 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1328 {
1329         dout("%s: obj %p\n", __func__, obj_request);
1330
1331         return wait_for_completion_interruptible(&obj_request->completion);
1332 }
1333
1334 /*
1335  * The default/initial value for all image request flags is 0.  Each
1336  * is conditionally set to 1 at image request initialization time
1337  * and currently never change thereafter.
1338  */
1339 static void img_request_write_set(struct rbd_img_request *img_request)
1340 {
1341         set_bit(IMG_REQ_WRITE, &img_request->flags);
1342         smp_mb();
1343 }
1344
1345 static bool img_request_write_test(struct rbd_img_request *img_request)
1346 {
1347         smp_mb();
1348         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1349 }
1350
1351 static void img_request_child_set(struct rbd_img_request *img_request)
1352 {
1353         set_bit(IMG_REQ_CHILD, &img_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool img_request_child_test(struct rbd_img_request *img_request)
1358 {
1359         smp_mb();
1360         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1361 }
1362
1363 static void img_request_layered_set(struct rbd_img_request *img_request)
1364 {
1365         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool img_request_layered_test(struct rbd_img_request *img_request)
1370 {
1371         smp_mb();
1372         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1373 }
1374
1375 static void
1376 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1377 {
1378         u64 xferred = obj_request->xferred;
1379         u64 length = obj_request->length;
1380
1381         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382                 obj_request, obj_request->img_request, obj_request->result,
1383                 xferred, length);
1384         /*
1385          * ENOENT means a hole in the image.  We zero-fill the
1386          * entire length of the request.  A short read also implies
1387          * zero-fill to the end of the request.  Either way we
1388          * update the xferred count to indicate the whole request
1389          * was satisfied.
1390          */
1391         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1392         if (obj_request->result == -ENOENT) {
1393                 if (obj_request->type == OBJ_REQUEST_BIO)
1394                         zero_bio_chain(obj_request->bio_list, 0);
1395                 else
1396                         zero_pages(obj_request->pages, 0, length);
1397                 obj_request->result = 0;
1398                 obj_request->xferred = length;
1399         } else if (xferred < length && !obj_request->result) {
1400                 if (obj_request->type == OBJ_REQUEST_BIO)
1401                         zero_bio_chain(obj_request->bio_list, xferred);
1402                 else
1403                         zero_pages(obj_request->pages, xferred, length);
1404                 obj_request->xferred = length;
1405         }
1406         obj_request_done_set(obj_request);
1407 }
1408
1409 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1410 {
1411         dout("%s: obj %p cb %p\n", __func__, obj_request,
1412                 obj_request->callback);
1413         if (obj_request->callback)
1414                 obj_request->callback(obj_request);
1415         else
1416                 complete_all(&obj_request->completion);
1417 }
1418
1419 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1420 {
1421         dout("%s: obj %p\n", __func__, obj_request);
1422         obj_request_done_set(obj_request);
1423 }
1424
1425 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1426 {
1427         struct rbd_img_request *img_request = NULL;
1428         struct rbd_device *rbd_dev = NULL;
1429         bool layered = false;
1430
1431         if (obj_request_img_data_test(obj_request)) {
1432                 img_request = obj_request->img_request;
1433                 layered = img_request && img_request_layered_test(img_request);
1434                 rbd_dev = img_request->rbd_dev;
1435         }
1436
1437         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438                 obj_request, img_request, obj_request->result,
1439                 obj_request->xferred, obj_request->length);
1440         if (layered && obj_request->result == -ENOENT &&
1441                         obj_request->img_offset < rbd_dev->parent_overlap)
1442                 rbd_img_parent_read(obj_request);
1443         else if (img_request)
1444                 rbd_img_obj_request_read_callback(obj_request);
1445         else
1446                 obj_request_done_set(obj_request);
1447 }
1448
1449 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1450 {
1451         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452                 obj_request->result, obj_request->length);
1453         /*
1454          * There is no such thing as a successful short write.  Set
1455          * it to our originally-requested length.
1456          */
1457         obj_request->xferred = obj_request->length;
1458         obj_request_done_set(obj_request);
1459 }
1460
1461 /*
1462  * For a simple stat call there's nothing to do.  We'll do more if
1463  * this is part of a write sequence for a layered image.
1464  */
1465 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1466 {
1467         dout("%s: obj %p\n", __func__, obj_request);
1468         obj_request_done_set(obj_request);
1469 }
1470
1471 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472                                 struct ceph_msg *msg)
1473 {
1474         struct rbd_obj_request *obj_request = osd_req->r_priv;
1475         u16 opcode;
1476
1477         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1478         rbd_assert(osd_req == obj_request->osd_req);
1479         if (obj_request_img_data_test(obj_request)) {
1480                 rbd_assert(obj_request->img_request);
1481                 rbd_assert(obj_request->which != BAD_WHICH);
1482         } else {
1483                 rbd_assert(obj_request->which == BAD_WHICH);
1484         }
1485
1486         if (osd_req->r_result < 0)
1487                 obj_request->result = osd_req->r_result;
1488         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1489
1490         BUG_ON(osd_req->r_num_ops > 2);
1491
1492         /*
1493          * We support a 64-bit length, but ultimately it has to be
1494          * passed to blk_end_request(), which takes an unsigned int.
1495          */
1496         obj_request->xferred = osd_req->r_reply_op_len[0];
1497         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1498         opcode = osd_req->r_ops[0].op;
1499         switch (opcode) {
1500         case CEPH_OSD_OP_READ:
1501                 rbd_osd_read_callback(obj_request);
1502                 break;
1503         case CEPH_OSD_OP_WRITE:
1504                 rbd_osd_write_callback(obj_request);
1505                 break;
1506         case CEPH_OSD_OP_STAT:
1507                 rbd_osd_stat_callback(obj_request);
1508                 break;
1509         case CEPH_OSD_OP_CALL:
1510         case CEPH_OSD_OP_NOTIFY_ACK:
1511         case CEPH_OSD_OP_WATCH:
1512                 rbd_osd_trivial_callback(obj_request);
1513                 break;
1514         default:
1515                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516                         obj_request->object_name, (unsigned short) opcode);
1517                 break;
1518         }
1519
1520         if (obj_request_done_test(obj_request))
1521                 rbd_obj_request_complete(obj_request);
1522 }
1523
1524 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1525 {
1526         struct rbd_img_request *img_request = obj_request->img_request;
1527         struct ceph_osd_request *osd_req = obj_request->osd_req;
1528         u64 snap_id;
1529
1530         rbd_assert(osd_req != NULL);
1531
1532         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1533         ceph_osdc_build_request(osd_req, obj_request->offset,
1534                         NULL, snap_id, NULL);
1535 }
1536
1537 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1538 {
1539         struct rbd_img_request *img_request = obj_request->img_request;
1540         struct ceph_osd_request *osd_req = obj_request->osd_req;
1541         struct ceph_snap_context *snapc;
1542         struct timespec mtime = CURRENT_TIME;
1543
1544         rbd_assert(osd_req != NULL);
1545
1546         snapc = img_request ? img_request->snapc : NULL;
1547         ceph_osdc_build_request(osd_req, obj_request->offset,
1548                         snapc, CEPH_NOSNAP, &mtime);
1549 }
1550
1551 static struct ceph_osd_request *rbd_osd_req_create(
1552                                         struct rbd_device *rbd_dev,
1553                                         bool write_request,
1554                                         struct rbd_obj_request *obj_request)
1555 {
1556         struct ceph_snap_context *snapc = NULL;
1557         struct ceph_osd_client *osdc;
1558         struct ceph_osd_request *osd_req;
1559
1560         if (obj_request_img_data_test(obj_request)) {
1561                 struct rbd_img_request *img_request = obj_request->img_request;
1562
1563                 rbd_assert(write_request ==
1564                                 img_request_write_test(img_request));
1565                 if (write_request)
1566                         snapc = img_request->snapc;
1567         }
1568
1569         /* Allocate and initialize the request, for the single op */
1570
1571         osdc = &rbd_dev->rbd_client->client->osdc;
1572         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1573         if (!osd_req)
1574                 return NULL;    /* ENOMEM */
1575
1576         if (write_request)
1577                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1578         else
1579                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1580
1581         osd_req->r_callback = rbd_osd_req_callback;
1582         osd_req->r_priv = obj_request;
1583
1584         osd_req->r_oid_len = strlen(obj_request->object_name);
1585         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1587
1588         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1589
1590         return osd_req;
1591 }
1592
1593 /*
1594  * Create a copyup osd request based on the information in the
1595  * object request supplied.  A copyup request has two osd ops,
1596  * a copyup method call, and a "normal" write request.
1597  */
1598 static struct ceph_osd_request *
1599 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1600 {
1601         struct rbd_img_request *img_request;
1602         struct ceph_snap_context *snapc;
1603         struct rbd_device *rbd_dev;
1604         struct ceph_osd_client *osdc;
1605         struct ceph_osd_request *osd_req;
1606
1607         rbd_assert(obj_request_img_data_test(obj_request));
1608         img_request = obj_request->img_request;
1609         rbd_assert(img_request);
1610         rbd_assert(img_request_write_test(img_request));
1611
1612         /* Allocate and initialize the request, for the two ops */
1613
1614         snapc = img_request->snapc;
1615         rbd_dev = img_request->rbd_dev;
1616         osdc = &rbd_dev->rbd_client->client->osdc;
1617         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1618         if (!osd_req)
1619                 return NULL;    /* ENOMEM */
1620
1621         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622         osd_req->r_callback = rbd_osd_req_callback;
1623         osd_req->r_priv = obj_request;
1624
1625         osd_req->r_oid_len = strlen(obj_request->object_name);
1626         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1628
1629         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1630
1631         return osd_req;
1632 }
1633
1634
1635 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1636 {
1637         ceph_osdc_put_request(osd_req);
1638 }
1639
1640 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1641
1642 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643                                                 u64 offset, u64 length,
1644                                                 enum obj_request_type type)
1645 {
1646         struct rbd_obj_request *obj_request;
1647         size_t size;
1648         char *name;
1649
1650         rbd_assert(obj_request_type_valid(type));
1651
1652         size = strlen(object_name) + 1;
1653         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1654         if (!obj_request)
1655                 return NULL;
1656
1657         name = (char *)(obj_request + 1);
1658         obj_request->object_name = memcpy(name, object_name, size);
1659         obj_request->offset = offset;
1660         obj_request->length = length;
1661         obj_request->flags = 0;
1662         obj_request->which = BAD_WHICH;
1663         obj_request->type = type;
1664         INIT_LIST_HEAD(&obj_request->links);
1665         init_completion(&obj_request->completion);
1666         kref_init(&obj_request->kref);
1667
1668         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669                 offset, length, (int)type, obj_request);
1670
1671         return obj_request;
1672 }
1673
1674 static void rbd_obj_request_destroy(struct kref *kref)
1675 {
1676         struct rbd_obj_request *obj_request;
1677
1678         obj_request = container_of(kref, struct rbd_obj_request, kref);
1679
1680         dout("%s: obj %p\n", __func__, obj_request);
1681
1682         rbd_assert(obj_request->img_request == NULL);
1683         rbd_assert(obj_request->which == BAD_WHICH);
1684
1685         if (obj_request->osd_req)
1686                 rbd_osd_req_destroy(obj_request->osd_req);
1687
1688         rbd_assert(obj_request_type_valid(obj_request->type));
1689         switch (obj_request->type) {
1690         case OBJ_REQUEST_NODATA:
1691                 break;          /* Nothing to do */
1692         case OBJ_REQUEST_BIO:
1693                 if (obj_request->bio_list)
1694                         bio_chain_put(obj_request->bio_list);
1695                 break;
1696         case OBJ_REQUEST_PAGES:
1697                 if (obj_request->pages)
1698                         ceph_release_page_vector(obj_request->pages,
1699                                                 obj_request->page_count);
1700                 break;
1701         }
1702
1703         kfree(obj_request);
1704 }
1705
1706 /*
1707  * Caller is responsible for filling in the list of object requests
1708  * that comprises the image request, and the Linux request pointer
1709  * (if there is one).
1710  */
1711 static struct rbd_img_request *rbd_img_request_create(
1712                                         struct rbd_device *rbd_dev,
1713                                         u64 offset, u64 length,
1714                                         bool write_request,
1715                                         bool child_request)
1716 {
1717         struct rbd_img_request *img_request;
1718
1719         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1720         if (!img_request)
1721                 return NULL;
1722
1723         if (write_request) {
1724                 down_read(&rbd_dev->header_rwsem);
1725                 ceph_get_snap_context(rbd_dev->header.snapc);
1726                 up_read(&rbd_dev->header_rwsem);
1727         }
1728
1729         img_request->rq = NULL;
1730         img_request->rbd_dev = rbd_dev;
1731         img_request->offset = offset;
1732         img_request->length = length;
1733         img_request->flags = 0;
1734         if (write_request) {
1735                 img_request_write_set(img_request);
1736                 img_request->snapc = rbd_dev->header.snapc;
1737         } else {
1738                 img_request->snap_id = rbd_dev->spec->snap_id;
1739         }
1740         if (child_request)
1741                 img_request_child_set(img_request);
1742         if (rbd_dev->parent_spec)
1743                 img_request_layered_set(img_request);
1744         spin_lock_init(&img_request->completion_lock);
1745         img_request->next_completion = 0;
1746         img_request->callback = NULL;
1747         img_request->result = 0;
1748         img_request->obj_request_count = 0;
1749         INIT_LIST_HEAD(&img_request->obj_requests);
1750         kref_init(&img_request->kref);
1751
1752         rbd_img_request_get(img_request);       /* Avoid a warning */
1753         rbd_img_request_put(img_request);       /* TEMPORARY */
1754
1755         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756                 write_request ? "write" : "read", offset, length,
1757                 img_request);
1758
1759         return img_request;
1760 }
1761
1762 static void rbd_img_request_destroy(struct kref *kref)
1763 {
1764         struct rbd_img_request *img_request;
1765         struct rbd_obj_request *obj_request;
1766         struct rbd_obj_request *next_obj_request;
1767
1768         img_request = container_of(kref, struct rbd_img_request, kref);
1769
1770         dout("%s: img %p\n", __func__, img_request);
1771
1772         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773                 rbd_img_obj_request_del(img_request, obj_request);
1774         rbd_assert(img_request->obj_request_count == 0);
1775
1776         if (img_request_write_test(img_request))
1777                 ceph_put_snap_context(img_request->snapc);
1778
1779         if (img_request_child_test(img_request))
1780                 rbd_obj_request_put(img_request->obj_request);
1781
1782         kfree(img_request);
1783 }
1784
1785 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1786 {
1787         struct rbd_img_request *img_request;
1788         unsigned int xferred;
1789         int result;
1790         bool more;
1791
1792         rbd_assert(obj_request_img_data_test(obj_request));
1793         img_request = obj_request->img_request;
1794
1795         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796         xferred = (unsigned int)obj_request->xferred;
1797         result = obj_request->result;
1798         if (result) {
1799                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1800
1801                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802                         img_request_write_test(img_request) ? "write" : "read",
1803                         obj_request->length, obj_request->img_offset,
1804                         obj_request->offset);
1805                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1806                         result, xferred);
1807                 if (!img_request->result)
1808                         img_request->result = result;
1809         }
1810
1811         /* Image object requests don't own their page array */
1812
1813         if (obj_request->type == OBJ_REQUEST_PAGES) {
1814                 obj_request->pages = NULL;
1815                 obj_request->page_count = 0;
1816         }
1817
1818         if (img_request_child_test(img_request)) {
1819                 rbd_assert(img_request->obj_request != NULL);
1820                 more = obj_request->which < img_request->obj_request_count - 1;
1821         } else {
1822                 rbd_assert(img_request->rq != NULL);
1823                 more = blk_end_request(img_request->rq, result, xferred);
1824         }
1825
1826         return more;
1827 }
1828
1829 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1830 {
1831         struct rbd_img_request *img_request;
1832         u32 which = obj_request->which;
1833         bool more = true;
1834
1835         rbd_assert(obj_request_img_data_test(obj_request));
1836         img_request = obj_request->img_request;
1837
1838         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839         rbd_assert(img_request != NULL);
1840         rbd_assert(img_request->obj_request_count > 0);
1841         rbd_assert(which != BAD_WHICH);
1842         rbd_assert(which < img_request->obj_request_count);
1843         rbd_assert(which >= img_request->next_completion);
1844
1845         spin_lock_irq(&img_request->completion_lock);
1846         if (which != img_request->next_completion)
1847                 goto out;
1848
1849         for_each_obj_request_from(img_request, obj_request) {
1850                 rbd_assert(more);
1851                 rbd_assert(which < img_request->obj_request_count);
1852
1853                 if (!obj_request_done_test(obj_request))
1854                         break;
1855                 more = rbd_img_obj_end_request(obj_request);
1856                 which++;
1857         }
1858
1859         rbd_assert(more ^ (which == img_request->obj_request_count));
1860         img_request->next_completion = which;
1861 out:
1862         spin_unlock_irq(&img_request->completion_lock);
1863
1864         if (!more)
1865                 rbd_img_request_complete(img_request);
1866 }
1867
1868 /*
1869  * Split up an image request into one or more object requests, each
1870  * to a different object.  The "type" parameter indicates whether
1871  * "data_desc" is the pointer to the head of a list of bio
1872  * structures, or the base of a page array.  In either case this
1873  * function assumes data_desc describes memory sufficient to hold
1874  * all data described by the image request.
1875  */
1876 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877                                         enum obj_request_type type,
1878                                         void *data_desc)
1879 {
1880         struct rbd_device *rbd_dev = img_request->rbd_dev;
1881         struct rbd_obj_request *obj_request = NULL;
1882         struct rbd_obj_request *next_obj_request;
1883         bool write_request = img_request_write_test(img_request);
1884         struct bio *bio_list;
1885         unsigned int bio_offset = 0;
1886         struct page **pages;
1887         u64 img_offset;
1888         u64 resid;
1889         u16 opcode;
1890
1891         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892                 (int)type, data_desc);
1893
1894         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1895         img_offset = img_request->offset;
1896         resid = img_request->length;
1897         rbd_assert(resid > 0);
1898
1899         if (type == OBJ_REQUEST_BIO) {
1900                 bio_list = data_desc;
1901                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1902         } else {
1903                 rbd_assert(type == OBJ_REQUEST_PAGES);
1904                 pages = data_desc;
1905         }
1906
1907         while (resid) {
1908                 struct ceph_osd_request *osd_req;
1909                 const char *object_name;
1910                 u64 offset;
1911                 u64 length;
1912
1913                 object_name = rbd_segment_name(rbd_dev, img_offset);
1914                 if (!object_name)
1915                         goto out_unwind;
1916                 offset = rbd_segment_offset(rbd_dev, img_offset);
1917                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1918                 obj_request = rbd_obj_request_create(object_name,
1919                                                 offset, length, type);
1920                 kfree(object_name);     /* object request has its own copy */
1921                 if (!obj_request)
1922                         goto out_unwind;
1923
1924                 if (type == OBJ_REQUEST_BIO) {
1925                         unsigned int clone_size;
1926
1927                         rbd_assert(length <= (u64)UINT_MAX);
1928                         clone_size = (unsigned int)length;
1929                         obj_request->bio_list =
1930                                         bio_chain_clone_range(&bio_list,
1931                                                                 &bio_offset,
1932                                                                 clone_size,
1933                                                                 GFP_ATOMIC);
1934                         if (!obj_request->bio_list)
1935                                 goto out_partial;
1936                 } else {
1937                         unsigned int page_count;
1938
1939                         obj_request->pages = pages;
1940                         page_count = (u32)calc_pages_for(offset, length);
1941                         obj_request->page_count = page_count;
1942                         if ((offset + length) & ~PAGE_MASK)
1943                                 page_count--;   /* more on last page */
1944                         pages += page_count;
1945                 }
1946
1947                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1948                                                 obj_request);
1949                 if (!osd_req)
1950                         goto out_partial;
1951                 obj_request->osd_req = osd_req;
1952                 obj_request->callback = rbd_img_obj_callback;
1953
1954                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1955                                                 0, 0);
1956                 if (type == OBJ_REQUEST_BIO)
1957                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1958                                         obj_request->bio_list, length);
1959                 else
1960                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1961                                         obj_request->pages, length,
1962                                         offset & ~PAGE_MASK, false, false);
1963
1964                 if (write_request)
1965                         rbd_osd_req_format_write(obj_request);
1966                 else
1967                         rbd_osd_req_format_read(obj_request);
1968
1969                 obj_request->img_offset = img_offset;
1970                 rbd_img_obj_request_add(img_request, obj_request);
1971
1972                 img_offset += length;
1973                 resid -= length;
1974         }
1975
1976         return 0;
1977
1978 out_partial:
1979         rbd_obj_request_put(obj_request);
1980 out_unwind:
1981         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982                 rbd_obj_request_put(obj_request);
1983
1984         return -ENOMEM;
1985 }
1986
1987 static void
1988 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1989 {
1990         struct rbd_img_request *img_request;
1991         struct rbd_device *rbd_dev;
1992         u64 length;
1993         u32 page_count;
1994
1995         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996         rbd_assert(obj_request_img_data_test(obj_request));
1997         img_request = obj_request->img_request;
1998         rbd_assert(img_request);
1999
2000         rbd_dev = img_request->rbd_dev;
2001         rbd_assert(rbd_dev);
2002         length = (u64)1 << rbd_dev->header.obj_order;
2003         page_count = (u32)calc_pages_for(0, length);
2004
2005         rbd_assert(obj_request->copyup_pages);
2006         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007         obj_request->copyup_pages = NULL;
2008
2009         /*
2010          * We want the transfer count to reflect the size of the
2011          * original write request.  There is no such thing as a
2012          * successful short write, so if the request was successful
2013          * we can just set it to the originally-requested length.
2014          */
2015         if (!obj_request->result)
2016                 obj_request->xferred = obj_request->length;
2017
2018         /* Finish up with the normal image object callback */
2019
2020         rbd_img_obj_callback(obj_request);
2021 }
2022
2023 static void
2024 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2025 {
2026         struct rbd_obj_request *orig_request;
2027         struct ceph_osd_request *osd_req;
2028         struct ceph_osd_client *osdc;
2029         struct rbd_device *rbd_dev;
2030         struct page **pages;
2031         int result;
2032         u64 obj_size;
2033         u64 xferred;
2034
2035         rbd_assert(img_request_child_test(img_request));
2036
2037         /* First get what we need from the image request */
2038
2039         pages = img_request->copyup_pages;
2040         rbd_assert(pages != NULL);
2041         img_request->copyup_pages = NULL;
2042
2043         orig_request = img_request->obj_request;
2044         rbd_assert(orig_request != NULL);
2045         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2046         result = img_request->result;
2047         obj_size = img_request->length;
2048         xferred = img_request->xferred;
2049
2050         rbd_dev = img_request->rbd_dev;
2051         rbd_assert(rbd_dev);
2052         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2053
2054         rbd_img_request_put(img_request);
2055
2056         if (result)
2057                 goto out_err;
2058
2059         /* Allocate the new copyup osd request for the original request */
2060
2061         result = -ENOMEM;
2062         rbd_assert(!orig_request->osd_req);
2063         osd_req = rbd_osd_req_create_copyup(orig_request);
2064         if (!osd_req)
2065                 goto out_err;
2066         orig_request->osd_req = osd_req;
2067         orig_request->copyup_pages = pages;
2068
2069         /* Initialize the copyup op */
2070
2071         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2073                                                 false, false);
2074
2075         /* Then the original write request op */
2076
2077         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078                                         orig_request->offset,
2079                                         orig_request->length, 0, 0);
2080         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081                                         orig_request->length);
2082
2083         rbd_osd_req_format_write(orig_request);
2084
2085         /* All set, send it off. */
2086
2087         orig_request->callback = rbd_img_obj_copyup_callback;
2088         osdc = &rbd_dev->rbd_client->client->osdc;
2089         result = rbd_obj_request_submit(osdc, orig_request);
2090         if (!result)
2091                 return;
2092 out_err:
2093         /* Record the error code and complete the request */
2094
2095         orig_request->result = result;
2096         orig_request->xferred = 0;
2097         obj_request_done_set(orig_request);
2098         rbd_obj_request_complete(orig_request);
2099 }
2100
2101 /*
2102  * Read from the parent image the range of data that covers the
2103  * entire target of the given object request.  This is used for
2104  * satisfying a layered image write request when the target of an
2105  * object request from the image request does not exist.
2106  *
2107  * A page array big enough to hold the returned data is allocated
2108  * and supplied to rbd_img_request_fill() as the "data descriptor."
2109  * When the read completes, this page array will be transferred to
2110  * the original object request for the copyup operation.
2111  *
2112  * If an error occurs, record it as the result of the original
2113  * object request and mark it done so it gets completed.
2114  */
2115 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2116 {
2117         struct rbd_img_request *img_request = NULL;
2118         struct rbd_img_request *parent_request = NULL;
2119         struct rbd_device *rbd_dev;
2120         u64 img_offset;
2121         u64 length;
2122         struct page **pages = NULL;
2123         u32 page_count;
2124         int result;
2125
2126         rbd_assert(obj_request_img_data_test(obj_request));
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request != NULL);
2131         rbd_dev = img_request->rbd_dev;
2132         rbd_assert(rbd_dev->parent != NULL);
2133
2134         /*
2135          * First things first.  The original osd request is of no
2136          * use to use any more, we'll need a new one that can hold
2137          * the two ops in a copyup request.  We'll get that later,
2138          * but for now we can release the old one.
2139          */
2140         rbd_osd_req_destroy(obj_request->osd_req);
2141         obj_request->osd_req = NULL;
2142
2143         /*
2144          * Determine the byte range covered by the object in the
2145          * child image to which the original request was to be sent.
2146          */
2147         img_offset = obj_request->img_offset - obj_request->offset;
2148         length = (u64)1 << rbd_dev->header.obj_order;
2149
2150         /*
2151          * There is no defined parent data beyond the parent
2152          * overlap, so limit what we read at that boundary if
2153          * necessary.
2154          */
2155         if (img_offset + length > rbd_dev->parent_overlap) {
2156                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157                 length = rbd_dev->parent_overlap - img_offset;
2158         }
2159
2160         /*
2161          * Allocate a page array big enough to receive the data read
2162          * from the parent.
2163          */
2164         page_count = (u32)calc_pages_for(0, length);
2165         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166         if (IS_ERR(pages)) {
2167                 result = PTR_ERR(pages);
2168                 pages = NULL;
2169                 goto out_err;
2170         }
2171
2172         result = -ENOMEM;
2173         parent_request = rbd_img_request_create(rbd_dev->parent,
2174                                                 img_offset, length,
2175                                                 false, true);
2176         if (!parent_request)
2177                 goto out_err;
2178         rbd_obj_request_get(obj_request);
2179         parent_request->obj_request = obj_request;
2180
2181         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2182         if (result)
2183                 goto out_err;
2184         parent_request->copyup_pages = pages;
2185
2186         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187         result = rbd_img_request_submit(parent_request);
2188         if (!result)
2189                 return 0;
2190
2191         parent_request->copyup_pages = NULL;
2192         parent_request->obj_request = NULL;
2193         rbd_obj_request_put(obj_request);
2194 out_err:
2195         if (pages)
2196                 ceph_release_page_vector(pages, page_count);
2197         if (parent_request)
2198                 rbd_img_request_put(parent_request);
2199         obj_request->result = result;
2200         obj_request->xferred = 0;
2201         obj_request_done_set(obj_request);
2202
2203         return result;
2204 }
2205
2206 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2207 {
2208         struct rbd_obj_request *orig_request;
2209         int result;
2210
2211         rbd_assert(!obj_request_img_data_test(obj_request));
2212
2213         /*
2214          * All we need from the object request is the original
2215          * request and the result of the STAT op.  Grab those, then
2216          * we're done with the request.
2217          */
2218         orig_request = obj_request->obj_request;
2219         obj_request->obj_request = NULL;
2220         rbd_assert(orig_request);
2221         rbd_assert(orig_request->img_request);
2222
2223         result = obj_request->result;
2224         obj_request->result = 0;
2225
2226         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227                 obj_request, orig_request, result,
2228                 obj_request->xferred, obj_request->length);
2229         rbd_obj_request_put(obj_request);
2230
2231         rbd_assert(orig_request);
2232         rbd_assert(orig_request->img_request);
2233
2234         /*
2235          * Our only purpose here is to determine whether the object
2236          * exists, and we don't want to treat the non-existence as
2237          * an error.  If something else comes back, transfer the
2238          * error to the original request and complete it now.
2239          */
2240         if (!result) {
2241                 obj_request_existence_set(orig_request, true);
2242         } else if (result == -ENOENT) {
2243                 obj_request_existence_set(orig_request, false);
2244         } else if (result) {
2245                 orig_request->result = result;
2246                 goto out;
2247         }
2248
2249         /*
2250          * Resubmit the original request now that we have recorded
2251          * whether the target object exists.
2252          */
2253         orig_request->result = rbd_img_obj_request_submit(orig_request);
2254 out:
2255         if (orig_request->result)
2256                 rbd_obj_request_complete(orig_request);
2257         rbd_obj_request_put(orig_request);
2258 }
2259
2260 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2261 {
2262         struct rbd_obj_request *stat_request;
2263         struct rbd_device *rbd_dev;
2264         struct ceph_osd_client *osdc;
2265         struct page **pages = NULL;
2266         u32 page_count;
2267         size_t size;
2268         int ret;
2269
2270         /*
2271          * The response data for a STAT call consists of:
2272          *     le64 length;
2273          *     struct {
2274          *         le32 tv_sec;
2275          *         le32 tv_nsec;
2276          *     } mtime;
2277          */
2278         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279         page_count = (u32)calc_pages_for(0, size);
2280         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2281         if (IS_ERR(pages))
2282                 return PTR_ERR(pages);
2283
2284         ret = -ENOMEM;
2285         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2286                                                         OBJ_REQUEST_PAGES);
2287         if (!stat_request)
2288                 goto out;
2289
2290         rbd_obj_request_get(obj_request);
2291         stat_request->obj_request = obj_request;
2292         stat_request->pages = pages;
2293         stat_request->page_count = page_count;
2294
2295         rbd_assert(obj_request->img_request);
2296         rbd_dev = obj_request->img_request->rbd_dev;
2297         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2298                                                 stat_request);
2299         if (!stat_request->osd_req)
2300                 goto out;
2301         stat_request->callback = rbd_img_obj_exists_callback;
2302
2303         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2305                                         false, false);
2306         rbd_osd_req_format_read(stat_request);
2307
2308         osdc = &rbd_dev->rbd_client->client->osdc;
2309         ret = rbd_obj_request_submit(osdc, stat_request);
2310 out:
2311         if (ret)
2312                 rbd_obj_request_put(obj_request);
2313
2314         return ret;
2315 }
2316
2317 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2318 {
2319         struct rbd_img_request *img_request;
2320         struct rbd_device *rbd_dev;
2321         bool known;
2322
2323         rbd_assert(obj_request_img_data_test(obj_request));
2324
2325         img_request = obj_request->img_request;
2326         rbd_assert(img_request);
2327         rbd_dev = img_request->rbd_dev;
2328
2329         /*
2330          * Only writes to layered images need special handling.
2331          * Reads and non-layered writes are simple object requests.
2332          * Layered writes that start beyond the end of the overlap
2333          * with the parent have no parent data, so they too are
2334          * simple object requests.  Finally, if the target object is
2335          * known to already exist, its parent data has already been
2336          * copied, so a write to the object can also be handled as a
2337          * simple object request.
2338          */
2339         if (!img_request_write_test(img_request) ||
2340                 !img_request_layered_test(img_request) ||
2341                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2342                 ((known = obj_request_known_test(obj_request)) &&
2343                         obj_request_exists_test(obj_request))) {
2344
2345                 struct rbd_device *rbd_dev;
2346                 struct ceph_osd_client *osdc;
2347
2348                 rbd_dev = obj_request->img_request->rbd_dev;
2349                 osdc = &rbd_dev->rbd_client->client->osdc;
2350
2351                 return rbd_obj_request_submit(osdc, obj_request);
2352         }
2353
2354         /*
2355          * It's a layered write.  The target object might exist but
2356          * we may not know that yet.  If we know it doesn't exist,
2357          * start by reading the data for the full target object from
2358          * the parent so we can use it for a copyup to the target.
2359          */
2360         if (known)
2361                 return rbd_img_obj_parent_read_full(obj_request);
2362
2363         /* We don't know whether the target exists.  Go find out. */
2364
2365         return rbd_img_obj_exists_submit(obj_request);
2366 }
2367
2368 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2369 {
2370         struct rbd_obj_request *obj_request;
2371         struct rbd_obj_request *next_obj_request;
2372
2373         dout("%s: img %p\n", __func__, img_request);
2374         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2375                 int ret;
2376
2377                 ret = rbd_img_obj_request_submit(obj_request);
2378                 if (ret)
2379                         return ret;
2380         }
2381
2382         return 0;
2383 }
2384
2385 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2386 {
2387         struct rbd_obj_request *obj_request;
2388         struct rbd_device *rbd_dev;
2389         u64 obj_end;
2390
2391         rbd_assert(img_request_child_test(img_request));
2392
2393         obj_request = img_request->obj_request;
2394         rbd_assert(obj_request);
2395         rbd_assert(obj_request->img_request);
2396
2397         obj_request->result = img_request->result;
2398         if (obj_request->result)
2399                 goto out;
2400
2401         /*
2402          * We need to zero anything beyond the parent overlap
2403          * boundary.  Since rbd_img_obj_request_read_callback()
2404          * will zero anything beyond the end of a short read, an
2405          * easy way to do this is to pretend the data from the
2406          * parent came up short--ending at the overlap boundary.
2407          */
2408         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409         obj_end = obj_request->img_offset + obj_request->length;
2410         rbd_dev = obj_request->img_request->rbd_dev;
2411         if (obj_end > rbd_dev->parent_overlap) {
2412                 u64 xferred = 0;
2413
2414                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415                         xferred = rbd_dev->parent_overlap -
2416                                         obj_request->img_offset;
2417
2418                 obj_request->xferred = min(img_request->xferred, xferred);
2419         } else {
2420                 obj_request->xferred = img_request->xferred;
2421         }
2422 out:
2423         rbd_img_obj_request_read_callback(obj_request);
2424         rbd_obj_request_complete(obj_request);
2425 }
2426
2427 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2428 {
2429         struct rbd_device *rbd_dev;
2430         struct rbd_img_request *img_request;
2431         int result;
2432
2433         rbd_assert(obj_request_img_data_test(obj_request));
2434         rbd_assert(obj_request->img_request != NULL);
2435         rbd_assert(obj_request->result == (s32) -ENOENT);
2436         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2437
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         rbd_assert(rbd_dev->parent != NULL);
2440         /* rbd_read_finish(obj_request, obj_request->length); */
2441         img_request = rbd_img_request_create(rbd_dev->parent,
2442                                                 obj_request->img_offset,
2443                                                 obj_request->length,
2444                                                 false, true);
2445         result = -ENOMEM;
2446         if (!img_request)
2447                 goto out_err;
2448
2449         rbd_obj_request_get(obj_request);
2450         img_request->obj_request = obj_request;
2451
2452         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453                                         obj_request->bio_list);
2454         if (result)
2455                 goto out_err;
2456
2457         img_request->callback = rbd_img_parent_read_callback;
2458         result = rbd_img_request_submit(img_request);
2459         if (result)
2460                 goto out_err;
2461
2462         return;
2463 out_err:
2464         if (img_request)
2465                 rbd_img_request_put(img_request);
2466         obj_request->result = result;
2467         obj_request->xferred = 0;
2468         obj_request_done_set(obj_request);
2469 }
2470
2471 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2472                                    u64 ver, u64 notify_id)
2473 {
2474         struct rbd_obj_request *obj_request;
2475         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2476         int ret;
2477
2478         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479                                                         OBJ_REQUEST_NODATA);
2480         if (!obj_request)
2481                 return -ENOMEM;
2482
2483         ret = -ENOMEM;
2484         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2485         if (!obj_request->osd_req)
2486                 goto out;
2487         obj_request->callback = rbd_obj_request_put;
2488
2489         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2490                                         notify_id, ver, 0);
2491         rbd_osd_req_format_read(obj_request);
2492
2493         ret = rbd_obj_request_submit(osdc, obj_request);
2494 out:
2495         if (ret)
2496                 rbd_obj_request_put(obj_request);
2497
2498         return ret;
2499 }
2500
2501 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2502 {
2503         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2504         u64 hver;
2505
2506         if (!rbd_dev)
2507                 return;
2508
2509         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2510                 rbd_dev->header_name, (unsigned long long) notify_id,
2511                 (unsigned int) opcode);
2512         (void)rbd_dev_refresh(rbd_dev, &hver);
2513
2514         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2515 }
2516
2517 /*
2518  * Request sync osd watch/unwatch.  The value of "start" determines
2519  * whether a watch request is being initiated or torn down.
2520  */
2521 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2522 {
2523         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2524         struct rbd_obj_request *obj_request;
2525         int ret;
2526
2527         rbd_assert(start ^ !!rbd_dev->watch_event);
2528         rbd_assert(start ^ !!rbd_dev->watch_request);
2529
2530         if (start) {
2531                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2532                                                 &rbd_dev->watch_event);
2533                 if (ret < 0)
2534                         return ret;
2535                 rbd_assert(rbd_dev->watch_event != NULL);
2536         }
2537
2538         ret = -ENOMEM;
2539         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2540                                                         OBJ_REQUEST_NODATA);
2541         if (!obj_request)
2542                 goto out_cancel;
2543
2544         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2545         if (!obj_request->osd_req)
2546                 goto out_cancel;
2547
2548         if (start)
2549                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2550         else
2551                 ceph_osdc_unregister_linger_request(osdc,
2552                                         rbd_dev->watch_request->osd_req);
2553
2554         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2555                                 rbd_dev->watch_event->cookie, 0, start);
2556         rbd_osd_req_format_write(obj_request);
2557
2558         ret = rbd_obj_request_submit(osdc, obj_request);
2559         if (ret)
2560                 goto out_cancel;
2561         ret = rbd_obj_request_wait(obj_request);
2562         if (ret)
2563                 goto out_cancel;
2564         ret = obj_request->result;
2565         if (ret)
2566                 goto out_cancel;
2567
2568         /*
2569          * A watch request is set to linger, so the underlying osd
2570          * request won't go away until we unregister it.  We retain
2571          * a pointer to the object request during that time (in
2572          * rbd_dev->watch_request), so we'll keep a reference to
2573          * it.  We'll drop that reference (below) after we've
2574          * unregistered it.
2575          */
2576         if (start) {
2577                 rbd_dev->watch_request = obj_request;
2578
2579                 return 0;
2580         }
2581
2582         /* We have successfully torn down the watch request */
2583
2584         rbd_obj_request_put(rbd_dev->watch_request);
2585         rbd_dev->watch_request = NULL;
2586 out_cancel:
2587         /* Cancel the event if we're tearing down, or on error */
2588         ceph_osdc_cancel_event(rbd_dev->watch_event);
2589         rbd_dev->watch_event = NULL;
2590         if (obj_request)
2591                 rbd_obj_request_put(obj_request);
2592
2593         return ret;
2594 }
2595
2596 /*
2597  * Synchronous osd object method call.  Returns the number of bytes
2598  * returned in the outbound buffer, or a negative error code.
2599  */
2600 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2601                              const char *object_name,
2602                              const char *class_name,
2603                              const char *method_name,
2604                              const void *outbound,
2605                              size_t outbound_size,
2606                              void *inbound,
2607                              size_t inbound_size,
2608                              u64 *version)
2609 {
2610         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2611         struct rbd_obj_request *obj_request;
2612         struct page **pages;
2613         u32 page_count;
2614         int ret;
2615
2616         /*
2617          * Method calls are ultimately read operations.  The result
2618          * should placed into the inbound buffer provided.  They
2619          * also supply outbound data--parameters for the object
2620          * method.  Currently if this is present it will be a
2621          * snapshot id.
2622          */
2623         page_count = (u32)calc_pages_for(0, inbound_size);
2624         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2625         if (IS_ERR(pages))
2626                 return PTR_ERR(pages);
2627
2628         ret = -ENOMEM;
2629         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2630                                                         OBJ_REQUEST_PAGES);
2631         if (!obj_request)
2632                 goto out;
2633
2634         obj_request->pages = pages;
2635         obj_request->page_count = page_count;
2636
2637         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2638         if (!obj_request->osd_req)
2639                 goto out;
2640
2641         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2642                                         class_name, method_name);
2643         if (outbound_size) {
2644                 struct ceph_pagelist *pagelist;
2645
2646                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2647                 if (!pagelist)
2648                         goto out;
2649
2650                 ceph_pagelist_init(pagelist);
2651                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2652                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2653                                                 pagelist);
2654         }
2655         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2656                                         obj_request->pages, inbound_size,
2657                                         0, false, false);
2658         rbd_osd_req_format_read(obj_request);
2659
2660         ret = rbd_obj_request_submit(osdc, obj_request);
2661         if (ret)
2662                 goto out;
2663         ret = rbd_obj_request_wait(obj_request);
2664         if (ret)
2665                 goto out;
2666
2667         ret = obj_request->result;
2668         if (ret < 0)
2669                 goto out;
2670
2671         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2672         ret = (int)obj_request->xferred;
2673         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2674         if (version)
2675                 *version = obj_request->version;
2676 out:
2677         if (obj_request)
2678                 rbd_obj_request_put(obj_request);
2679         else
2680                 ceph_release_page_vector(pages, page_count);
2681
2682         return ret;
2683 }
2684
2685 static void rbd_request_fn(struct request_queue *q)
2686                 __releases(q->queue_lock) __acquires(q->queue_lock)
2687 {
2688         struct rbd_device *rbd_dev = q->queuedata;
2689         bool read_only = rbd_dev->mapping.read_only;
2690         struct request *rq;
2691         int result;
2692
2693         while ((rq = blk_fetch_request(q))) {
2694                 bool write_request = rq_data_dir(rq) == WRITE;
2695                 struct rbd_img_request *img_request;
2696                 u64 offset;
2697                 u64 length;
2698
2699                 /* Ignore any non-FS requests that filter through. */
2700
2701                 if (rq->cmd_type != REQ_TYPE_FS) {
2702                         dout("%s: non-fs request type %d\n", __func__,
2703                                 (int) rq->cmd_type);
2704                         __blk_end_request_all(rq, 0);
2705                         continue;
2706                 }
2707
2708                 /* Ignore/skip any zero-length requests */
2709
2710                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2711                 length = (u64) blk_rq_bytes(rq);
2712
2713                 if (!length) {
2714                         dout("%s: zero-length request\n", __func__);
2715                         __blk_end_request_all(rq, 0);
2716                         continue;
2717                 }
2718
2719                 spin_unlock_irq(q->queue_lock);
2720
2721                 /* Disallow writes to a read-only device */
2722
2723                 if (write_request) {
2724                         result = -EROFS;
2725                         if (read_only)
2726                                 goto end_request;
2727                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2728                 }
2729
2730                 /*
2731                  * Quit early if the mapped snapshot no longer
2732                  * exists.  It's still possible the snapshot will
2733                  * have disappeared by the time our request arrives
2734                  * at the osd, but there's no sense in sending it if
2735                  * we already know.
2736                  */
2737                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2738                         dout("request for non-existent snapshot");
2739                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2740                         result = -ENXIO;
2741                         goto end_request;
2742                 }
2743
2744                 result = -EINVAL;
2745                 if (offset && length > U64_MAX - offset + 1) {
2746                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2747                                 offset, length);
2748                         goto end_request;       /* Shouldn't happen */
2749                 }
2750
2751                 result = -ENOMEM;
2752                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2753                                                         write_request, false);
2754                 if (!img_request)
2755                         goto end_request;
2756
2757                 img_request->rq = rq;
2758
2759                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2760                                                 rq->bio);
2761                 if (!result)
2762                         result = rbd_img_request_submit(img_request);
2763                 if (result)
2764                         rbd_img_request_put(img_request);
2765 end_request:
2766                 spin_lock_irq(q->queue_lock);
2767                 if (result < 0) {
2768                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2769                                 write_request ? "write" : "read",
2770                                 length, offset, result);
2771
2772                         __blk_end_request_all(rq, result);
2773                 }
2774         }
2775 }
2776
2777 /*
2778  * a queue callback. Makes sure that we don't create a bio that spans across
2779  * multiple osd objects. One exception would be with a single page bios,
2780  * which we handle later at bio_chain_clone_range()
2781  */
2782 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2783                           struct bio_vec *bvec)
2784 {
2785         struct rbd_device *rbd_dev = q->queuedata;
2786         sector_t sector_offset;
2787         sector_t sectors_per_obj;
2788         sector_t obj_sector_offset;
2789         int ret;
2790
2791         /*
2792          * Find how far into its rbd object the partition-relative
2793          * bio start sector is to offset relative to the enclosing
2794          * device.
2795          */
2796         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2797         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2798         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2799
2800         /*
2801          * Compute the number of bytes from that offset to the end
2802          * of the object.  Account for what's already used by the bio.
2803          */
2804         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2805         if (ret > bmd->bi_size)
2806                 ret -= bmd->bi_size;
2807         else
2808                 ret = 0;
2809
2810         /*
2811          * Don't send back more than was asked for.  And if the bio
2812          * was empty, let the whole thing through because:  "Note
2813          * that a block device *must* allow a single page to be
2814          * added to an empty bio."
2815          */
2816         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2817         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2818                 ret = (int) bvec->bv_len;
2819
2820         return ret;
2821 }
2822
2823 static void rbd_free_disk(struct rbd_device *rbd_dev)
2824 {
2825         struct gendisk *disk = rbd_dev->disk;
2826
2827         if (!disk)
2828                 return;
2829
2830         rbd_dev->disk = NULL;
2831         if (disk->flags & GENHD_FL_UP) {
2832                 del_gendisk(disk);
2833                 if (disk->queue)
2834                         blk_cleanup_queue(disk->queue);
2835         }
2836         put_disk(disk);
2837 }
2838
2839 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2840                                 const char *object_name,
2841                                 u64 offset, u64 length,
2842                                 void *buf, u64 *version)
2843
2844 {
2845         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2846         struct rbd_obj_request *obj_request;
2847         struct page **pages = NULL;
2848         u32 page_count;
2849         size_t size;
2850         int ret;
2851
2852         page_count = (u32) calc_pages_for(offset, length);
2853         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2854         if (IS_ERR(pages))
2855                 ret = PTR_ERR(pages);
2856
2857         ret = -ENOMEM;
2858         obj_request = rbd_obj_request_create(object_name, offset, length,
2859                                                         OBJ_REQUEST_PAGES);
2860         if (!obj_request)
2861                 goto out;
2862
2863         obj_request->pages = pages;
2864         obj_request->page_count = page_count;
2865
2866         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2867         if (!obj_request->osd_req)
2868                 goto out;
2869
2870         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2871                                         offset, length, 0, 0);
2872         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2873                                         obj_request->pages,
2874                                         obj_request->length,
2875                                         obj_request->offset & ~PAGE_MASK,
2876                                         false, false);
2877         rbd_osd_req_format_read(obj_request);
2878
2879         ret = rbd_obj_request_submit(osdc, obj_request);
2880         if (ret)
2881                 goto out;
2882         ret = rbd_obj_request_wait(obj_request);
2883         if (ret)
2884                 goto out;
2885
2886         ret = obj_request->result;
2887         if (ret < 0)
2888                 goto out;
2889
2890         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2891         size = (size_t) obj_request->xferred;
2892         ceph_copy_from_page_vector(pages, buf, 0, size);
2893         rbd_assert(size <= (size_t) INT_MAX);
2894         ret = (int) size;
2895         if (version)
2896                 *version = obj_request->version;
2897 out:
2898         if (obj_request)
2899                 rbd_obj_request_put(obj_request);
2900         else
2901                 ceph_release_page_vector(pages, page_count);
2902
2903         return ret;
2904 }
2905
2906 /*
2907  * Read the complete header for the given rbd device.
2908  *
2909  * Returns a pointer to a dynamically-allocated buffer containing
2910  * the complete and validated header.  Caller can pass the address
2911  * of a variable that will be filled in with the version of the
2912  * header object at the time it was read.
2913  *
2914  * Returns a pointer-coded errno if a failure occurs.
2915  */
2916 static struct rbd_image_header_ondisk *
2917 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2918 {
2919         struct rbd_image_header_ondisk *ondisk = NULL;
2920         u32 snap_count = 0;
2921         u64 names_size = 0;
2922         u32 want_count;
2923         int ret;
2924
2925         /*
2926          * The complete header will include an array of its 64-bit
2927          * snapshot ids, followed by the names of those snapshots as
2928          * a contiguous block of NUL-terminated strings.  Note that
2929          * the number of snapshots could change by the time we read
2930          * it in, in which case we re-read it.
2931          */
2932         do {
2933                 size_t size;
2934
2935                 kfree(ondisk);
2936
2937                 size = sizeof (*ondisk);
2938                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2939                 size += names_size;
2940                 ondisk = kmalloc(size, GFP_KERNEL);
2941                 if (!ondisk)
2942                         return ERR_PTR(-ENOMEM);
2943
2944                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2945                                        0, size, ondisk, version);
2946                 if (ret < 0)
2947                         goto out_err;
2948                 if ((size_t)ret < size) {
2949                         ret = -ENXIO;
2950                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2951                                 size, ret);
2952                         goto out_err;
2953                 }
2954                 if (!rbd_dev_ondisk_valid(ondisk)) {
2955                         ret = -ENXIO;
2956                         rbd_warn(rbd_dev, "invalid header");
2957                         goto out_err;
2958                 }
2959
2960                 names_size = le64_to_cpu(ondisk->snap_names_len);
2961                 want_count = snap_count;
2962                 snap_count = le32_to_cpu(ondisk->snap_count);
2963         } while (snap_count != want_count);
2964
2965         return ondisk;
2966
2967 out_err:
2968         kfree(ondisk);
2969
2970         return ERR_PTR(ret);
2971 }
2972
2973 /*
2974  * reload the ondisk the header
2975  */
2976 static int rbd_read_header(struct rbd_device *rbd_dev,
2977                            struct rbd_image_header *header)
2978 {
2979         struct rbd_image_header_ondisk *ondisk;
2980         u64 ver = 0;
2981         int ret;
2982
2983         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2984         if (IS_ERR(ondisk))
2985                 return PTR_ERR(ondisk);
2986         ret = rbd_header_from_disk(header, ondisk);
2987         kfree(ondisk);
2988
2989         return ret;
2990 }
2991
2992 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2993 {
2994         struct rbd_snap *snap;
2995         struct rbd_snap *next;
2996
2997         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2998                 list_del(&snap->node);
2999                 rbd_snap_destroy(snap);
3000         }
3001 }
3002
3003 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3004 {
3005         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3006                 return;
3007
3008         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3009                 sector_t size;
3010
3011                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3012                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3013                 dout("setting size to %llu sectors", (unsigned long long)size);
3014                 set_capacity(rbd_dev->disk, size);
3015         }
3016 }
3017
3018 /*
3019  * only read the first part of the ondisk header, without the snaps info
3020  */
3021 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3022 {
3023         int ret;
3024         struct rbd_image_header h;
3025
3026         ret = rbd_read_header(rbd_dev, &h);
3027         if (ret < 0)
3028                 return ret;
3029
3030         down_write(&rbd_dev->header_rwsem);
3031
3032         /* Update image size, and check for resize of mapped image */
3033         rbd_dev->header.image_size = h.image_size;
3034         rbd_update_mapping_size(rbd_dev);
3035
3036         /* rbd_dev->header.object_prefix shouldn't change */
3037         kfree(rbd_dev->header.snap_sizes);
3038         kfree(rbd_dev->header.snap_names);
3039         /* osd requests may still refer to snapc */
3040         ceph_put_snap_context(rbd_dev->header.snapc);
3041
3042         rbd_dev->header.image_size = h.image_size;
3043         rbd_dev->header.snapc = h.snapc;
3044         rbd_dev->header.snap_names = h.snap_names;
3045         rbd_dev->header.snap_sizes = h.snap_sizes;
3046         /* Free the extra copy of the object prefix */
3047         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3048                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3049         kfree(h.object_prefix);
3050
3051         ret = rbd_dev_snaps_update(rbd_dev);
3052
3053         up_write(&rbd_dev->header_rwsem);
3054
3055         return ret;
3056 }
3057
3058 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3059 {
3060         u64 image_size;
3061         int ret;
3062
3063         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3064         image_size = rbd_dev->header.image_size;
3065         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3066         if (rbd_dev->image_format == 1)
3067                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3068         else
3069                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3070         mutex_unlock(&ctl_mutex);
3071         if (ret)
3072                 rbd_warn(rbd_dev, "got notification but failed to "
3073                            " update snaps: %d\n", ret);
3074         if (image_size != rbd_dev->header.image_size)
3075                 revalidate_disk(rbd_dev->disk);
3076
3077         return ret;
3078 }
3079
3080 static int rbd_init_disk(struct rbd_device *rbd_dev)
3081 {
3082         struct gendisk *disk;
3083         struct request_queue *q;
3084         u64 segment_size;
3085
3086         /* create gendisk info */
3087         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3088         if (!disk)
3089                 return -ENOMEM;
3090
3091         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3092                  rbd_dev->dev_id);
3093         disk->major = rbd_dev->major;
3094         disk->first_minor = 0;
3095         disk->fops = &rbd_bd_ops;
3096         disk->private_data = rbd_dev;
3097
3098         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3099         if (!q)
3100                 goto out_disk;
3101
3102         /* We use the default size, but let's be explicit about it. */
3103         blk_queue_physical_block_size(q, SECTOR_SIZE);
3104
3105         /* set io sizes to object size */
3106         segment_size = rbd_obj_bytes(&rbd_dev->header);
3107         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3108         blk_queue_max_segment_size(q, segment_size);
3109         blk_queue_io_min(q, segment_size);
3110         blk_queue_io_opt(q, segment_size);
3111
3112         blk_queue_merge_bvec(q, rbd_merge_bvec);
3113         disk->queue = q;
3114
3115         q->queuedata = rbd_dev;
3116
3117         rbd_dev->disk = disk;
3118
3119         return 0;
3120 out_disk:
3121         put_disk(disk);
3122
3123         return -ENOMEM;
3124 }
3125
3126 /*
3127   sysfs
3128 */
3129
3130 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3131 {
3132         return container_of(dev, struct rbd_device, dev);
3133 }
3134
3135 static ssize_t rbd_size_show(struct device *dev,
3136                              struct device_attribute *attr, char *buf)
3137 {
3138         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3139
3140         return sprintf(buf, "%llu\n",
3141                 (unsigned long long)rbd_dev->mapping.size);
3142 }
3143
3144 /*
3145  * Note this shows the features for whatever's mapped, which is not
3146  * necessarily the base image.
3147  */
3148 static ssize_t rbd_features_show(struct device *dev,
3149                              struct device_attribute *attr, char *buf)
3150 {
3151         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3152
3153         return sprintf(buf, "0x%016llx\n",
3154                         (unsigned long long)rbd_dev->mapping.features);
3155 }
3156
3157 static ssize_t rbd_major_show(struct device *dev,
3158                               struct device_attribute *attr, char *buf)
3159 {
3160         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3161
3162         if (rbd_dev->major)
3163                 return sprintf(buf, "%d\n", rbd_dev->major);
3164
3165         return sprintf(buf, "(none)\n");
3166
3167 }
3168
3169 static ssize_t rbd_client_id_show(struct device *dev,
3170                                   struct device_attribute *attr, char *buf)
3171 {
3172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3173
3174         return sprintf(buf, "client%lld\n",
3175                         ceph_client_id(rbd_dev->rbd_client->client));
3176 }
3177
3178 static ssize_t rbd_pool_show(struct device *dev,
3179                              struct device_attribute *attr, char *buf)
3180 {
3181         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3182
3183         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3184 }
3185
3186 static ssize_t rbd_pool_id_show(struct device *dev,
3187                              struct device_attribute *attr, char *buf)
3188 {
3189         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3190
3191         return sprintf(buf, "%llu\n",
3192                         (unsigned long long) rbd_dev->spec->pool_id);
3193 }
3194
3195 static ssize_t rbd_name_show(struct device *dev,
3196                              struct device_attribute *attr, char *buf)
3197 {
3198         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3199
3200         if (rbd_dev->spec->image_name)
3201                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3202
3203         return sprintf(buf, "(unknown)\n");
3204 }
3205
3206 static ssize_t rbd_image_id_show(struct device *dev,
3207                              struct device_attribute *attr, char *buf)
3208 {
3209         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3210
3211         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3212 }
3213
3214 /*
3215  * Shows the name of the currently-mapped snapshot (or
3216  * RBD_SNAP_HEAD_NAME for the base image).
3217  */
3218 static ssize_t rbd_snap_show(struct device *dev,
3219                              struct device_attribute *attr,
3220                              char *buf)
3221 {
3222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
3224         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3225 }
3226
3227 /*
3228  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3229  * for the parent image.  If there is no parent, simply shows
3230  * "(no parent image)".
3231  */
3232 static ssize_t rbd_parent_show(struct device *dev,
3233                              struct device_attribute *attr,
3234                              char *buf)
3235 {
3236         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237         struct rbd_spec *spec = rbd_dev->parent_spec;
3238         int count;
3239         char *bufp = buf;
3240
3241         if (!spec)
3242                 return sprintf(buf, "(no parent image)\n");
3243
3244         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3245                         (unsigned long long) spec->pool_id, spec->pool_name);
3246         if (count < 0)
3247                 return count;
3248         bufp += count;
3249
3250         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3251                         spec->image_name ? spec->image_name : "(unknown)");
3252         if (count < 0)
3253                 return count;
3254         bufp += count;
3255
3256         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3257                         (unsigned long long) spec->snap_id, spec->snap_name);
3258         if (count < 0)
3259                 return count;
3260         bufp += count;
3261
3262         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3263         if (count < 0)
3264                 return count;
3265         bufp += count;
3266
3267         return (ssize_t) (bufp - buf);
3268 }
3269
3270 static ssize_t rbd_image_refresh(struct device *dev,
3271                                  struct device_attribute *attr,
3272                                  const char *buf,
3273                                  size_t size)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276         int ret;
3277
3278         ret = rbd_dev_refresh(rbd_dev, NULL);
3279
3280         return ret < 0 ? ret : size;
3281 }
3282
3283 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3284 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3285 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3286 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3287 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3288 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3289 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3290 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3291 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3292 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3293 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3294
3295 static struct attribute *rbd_attrs[] = {
3296         &dev_attr_size.attr,
3297         &dev_attr_features.attr,
3298         &dev_attr_major.attr,
3299         &dev_attr_client_id.attr,
3300         &dev_attr_pool.attr,
3301         &dev_attr_pool_id.attr,
3302         &dev_attr_name.attr,
3303         &dev_attr_image_id.attr,
3304         &dev_attr_current_snap.attr,
3305         &dev_attr_parent.attr,
3306         &dev_attr_refresh.attr,
3307         NULL
3308 };
3309
3310 static struct attribute_group rbd_attr_group = {
3311         .attrs = rbd_attrs,
3312 };
3313
3314 static const struct attribute_group *rbd_attr_groups[] = {
3315         &rbd_attr_group,
3316         NULL
3317 };
3318
3319 static void rbd_sysfs_dev_release(struct device *dev)
3320 {
3321 }
3322
3323 static struct device_type rbd_device_type = {
3324         .name           = "rbd",
3325         .groups         = rbd_attr_groups,
3326         .release        = rbd_sysfs_dev_release,
3327 };
3328
3329 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3330 {
3331         kref_get(&spec->kref);
3332
3333         return spec;
3334 }
3335
3336 static void rbd_spec_free(struct kref *kref);
3337 static void rbd_spec_put(struct rbd_spec *spec)
3338 {
3339         if (spec)
3340                 kref_put(&spec->kref, rbd_spec_free);
3341 }
3342
3343 static struct rbd_spec *rbd_spec_alloc(void)
3344 {
3345         struct rbd_spec *spec;
3346
3347         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3348         if (!spec)
3349                 return NULL;
3350         kref_init(&spec->kref);
3351
3352         return spec;
3353 }
3354
3355 static void rbd_spec_free(struct kref *kref)
3356 {
3357         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3358
3359         kfree(spec->pool_name);
3360         kfree(spec->image_id);
3361         kfree(spec->image_name);
3362         kfree(spec->snap_name);
3363         kfree(spec);
3364 }
3365
3366 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3367                                 struct rbd_spec *spec)
3368 {
3369         struct rbd_device *rbd_dev;
3370
3371         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3372         if (!rbd_dev)
3373                 return NULL;
3374
3375         spin_lock_init(&rbd_dev->lock);
3376         rbd_dev->flags = 0;
3377         INIT_LIST_HEAD(&rbd_dev->node);
3378         INIT_LIST_HEAD(&rbd_dev->snaps);
3379         init_rwsem(&rbd_dev->header_rwsem);
3380
3381         rbd_dev->spec = spec;
3382         rbd_dev->rbd_client = rbdc;
3383
3384         /* Initialize the layout used for all rbd requests */
3385
3386         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3387         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3388         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3389         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3390
3391         return rbd_dev;
3392 }
3393
3394 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3395 {
3396         rbd_put_client(rbd_dev->rbd_client);
3397         rbd_spec_put(rbd_dev->spec);
3398         kfree(rbd_dev);
3399 }
3400
3401 static void rbd_snap_destroy(struct rbd_snap *snap)
3402 {
3403         kfree(snap->name);
3404         kfree(snap);
3405 }
3406
3407 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3408                                                 const char *snap_name,
3409                                                 u64 snap_id, u64 snap_size,
3410                                                 u64 snap_features)
3411 {
3412         struct rbd_snap *snap;
3413
3414         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3415         if (!snap)
3416                 return ERR_PTR(-ENOMEM);
3417
3418         snap->name = snap_name;
3419         snap->id = snap_id;
3420         snap->size = snap_size;
3421         snap->features = snap_features;
3422
3423         return snap;
3424 }
3425
3426 /*
3427  * Returns a dynamically-allocated snapshot name if successful, or a
3428  * pointer-coded error otherwise.
3429  */
3430 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3431                 u64 *snap_size, u64 *snap_features)
3432 {
3433         const char *snap_name;
3434         int i;
3435
3436         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3437
3438         /* Skip over names until we find the one we are looking for */
3439
3440         snap_name = rbd_dev->header.snap_names;
3441         for (i = 0; i < which; i++)
3442                 snap_name += strlen(snap_name) + 1;
3443
3444         snap_name = kstrdup(snap_name, GFP_KERNEL);
3445         if (!snap_name)
3446                 return ERR_PTR(-ENOMEM);
3447
3448         *snap_size = rbd_dev->header.snap_sizes[which];
3449         *snap_features = 0;     /* No features for v1 */
3450
3451         return snap_name;
3452 }
3453
3454 /*
3455  * Get the size and object order for an image snapshot, or if
3456  * snap_id is CEPH_NOSNAP, gets this information for the base
3457  * image.
3458  */
3459 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3460                                 u8 *order, u64 *snap_size)
3461 {
3462         __le64 snapid = cpu_to_le64(snap_id);
3463         int ret;
3464         struct {
3465                 u8 order;
3466                 __le64 size;
3467         } __attribute__ ((packed)) size_buf = { 0 };
3468
3469         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3470                                 "rbd", "get_size",
3471                                 &snapid, sizeof (snapid),
3472                                 &size_buf, sizeof (size_buf), NULL);
3473         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3474         if (ret < 0)
3475                 return ret;
3476         if (ret < sizeof (size_buf))
3477                 return -ERANGE;
3478
3479         if (order)
3480                 *order = size_buf.order;
3481         *snap_size = le64_to_cpu(size_buf.size);
3482
3483         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3484                 (unsigned long long)snap_id, (unsigned int)*order,
3485                 (unsigned long long)*snap_size);
3486
3487         return 0;
3488 }
3489
3490 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3491 {
3492         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3493                                         &rbd_dev->header.obj_order,
3494                                         &rbd_dev->header.image_size);
3495 }
3496
3497 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3498 {
3499         void *reply_buf;
3500         int ret;
3501         void *p;
3502
3503         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3504         if (!reply_buf)
3505                 return -ENOMEM;
3506
3507         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3508                                 "rbd", "get_object_prefix", NULL, 0,
3509                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3510         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3511         if (ret < 0)
3512                 goto out;
3513
3514         p = reply_buf;
3515         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3516                                                 p + ret, NULL, GFP_NOIO);
3517         ret = 0;
3518
3519         if (IS_ERR(rbd_dev->header.object_prefix)) {
3520                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3521                 rbd_dev->header.object_prefix = NULL;
3522         } else {
3523                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3524         }
3525 out:
3526         kfree(reply_buf);
3527
3528         return ret;
3529 }
3530
3531 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3532                 u64 *snap_features)
3533 {
3534         __le64 snapid = cpu_to_le64(snap_id);
3535         struct {
3536                 __le64 features;
3537                 __le64 incompat;
3538         } __attribute__ ((packed)) features_buf = { 0 };
3539         u64 incompat;
3540         int ret;
3541
3542         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3543                                 "rbd", "get_features",
3544                                 &snapid, sizeof (snapid),
3545                                 &features_buf, sizeof (features_buf), NULL);
3546         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3547         if (ret < 0)
3548                 return ret;
3549         if (ret < sizeof (features_buf))
3550                 return -ERANGE;
3551
3552         incompat = le64_to_cpu(features_buf.incompat);
3553         if (incompat & ~RBD_FEATURES_SUPPORTED)
3554                 return -ENXIO;
3555
3556         *snap_features = le64_to_cpu(features_buf.features);
3557
3558         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3559                 (unsigned long long)snap_id,
3560                 (unsigned long long)*snap_features,
3561                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3562
3563         return 0;
3564 }
3565
3566 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3567 {
3568         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3569                                                 &rbd_dev->header.features);
3570 }
3571
3572 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3573 {
3574         struct rbd_spec *parent_spec;
3575         size_t size;
3576         void *reply_buf = NULL;
3577         __le64 snapid;
3578         void *p;
3579         void *end;
3580         char *image_id;
3581         u64 overlap;
3582         int ret;
3583
3584         parent_spec = rbd_spec_alloc();
3585         if (!parent_spec)
3586                 return -ENOMEM;
3587
3588         size = sizeof (__le64) +                                /* pool_id */
3589                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3590                 sizeof (__le64) +                               /* snap_id */
3591                 sizeof (__le64);                                /* overlap */
3592         reply_buf = kmalloc(size, GFP_KERNEL);
3593         if (!reply_buf) {
3594                 ret = -ENOMEM;
3595                 goto out_err;
3596         }
3597
3598         snapid = cpu_to_le64(CEPH_NOSNAP);
3599         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3600                                 "rbd", "get_parent",
3601                                 &snapid, sizeof (snapid),
3602                                 reply_buf, size, NULL);
3603         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3604         if (ret < 0)
3605                 goto out_err;
3606
3607         p = reply_buf;
3608         end = reply_buf + ret;
3609         ret = -ERANGE;
3610         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3611         if (parent_spec->pool_id == CEPH_NOPOOL)
3612                 goto out;       /* No parent?  No problem. */
3613
3614         /* The ceph file layout needs to fit pool id in 32 bits */
3615
3616         ret = -EIO;
3617         if (parent_spec->pool_id > (u64)U32_MAX) {
3618                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3619                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3620                 goto out_err;
3621         }
3622
3623         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3624         if (IS_ERR(image_id)) {
3625                 ret = PTR_ERR(image_id);
3626                 goto out_err;
3627         }
3628         parent_spec->image_id = image_id;
3629         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3630         ceph_decode_64_safe(&p, end, overlap, out_err);
3631
3632         rbd_dev->parent_overlap = overlap;
3633         rbd_dev->parent_spec = parent_spec;
3634         parent_spec = NULL;     /* rbd_dev now owns this */
3635 out:
3636         ret = 0;
3637 out_err:
3638         kfree(reply_buf);
3639         rbd_spec_put(parent_spec);
3640
3641         return ret;
3642 }
3643
3644 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3645 {
3646         struct {
3647                 __le64 stripe_unit;
3648                 __le64 stripe_count;
3649         } __attribute__ ((packed)) striping_info_buf = { 0 };
3650         size_t size = sizeof (striping_info_buf);
3651         void *p;
3652         u64 obj_size;
3653         u64 stripe_unit;
3654         u64 stripe_count;
3655         int ret;
3656
3657         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3658                                 "rbd", "get_stripe_unit_count", NULL, 0,
3659                                 (char *)&striping_info_buf, size, NULL);
3660         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3661         if (ret < 0)
3662                 return ret;
3663         if (ret < size)
3664                 return -ERANGE;
3665
3666         /*
3667          * We don't actually support the "fancy striping" feature
3668          * (STRIPINGV2) yet, but if the striping sizes are the
3669          * defaults the behavior is the same as before.  So find
3670          * out, and only fail if the image has non-default values.
3671          */
3672         ret = -EINVAL;
3673         obj_size = (u64)1 << rbd_dev->header.obj_order;
3674         p = &striping_info_buf;
3675         stripe_unit = ceph_decode_64(&p);
3676         if (stripe_unit != obj_size) {
3677                 rbd_warn(rbd_dev, "unsupported stripe unit "
3678                                 "(got %llu want %llu)",
3679                                 stripe_unit, obj_size);
3680                 return -EINVAL;
3681         }
3682         stripe_count = ceph_decode_64(&p);
3683         if (stripe_count != 1) {
3684                 rbd_warn(rbd_dev, "unsupported stripe count "
3685                                 "(got %llu want 1)", stripe_count);
3686                 return -EINVAL;
3687         }
3688         rbd_dev->header.stripe_unit = stripe_unit;
3689         rbd_dev->header.stripe_count = stripe_count;
3690
3691         return 0;
3692 }
3693
3694 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3695 {
3696         size_t image_id_size;
3697         char *image_id;
3698         void *p;
3699         void *end;
3700         size_t size;
3701         void *reply_buf = NULL;
3702         size_t len = 0;
3703         char *image_name = NULL;
3704         int ret;
3705
3706         rbd_assert(!rbd_dev->spec->image_name);
3707
3708         len = strlen(rbd_dev->spec->image_id);
3709         image_id_size = sizeof (__le32) + len;
3710         image_id = kmalloc(image_id_size, GFP_KERNEL);
3711         if (!image_id)
3712                 return NULL;
3713
3714         p = image_id;
3715         end = image_id + image_id_size;
3716         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3717
3718         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3719         reply_buf = kmalloc(size, GFP_KERNEL);
3720         if (!reply_buf)
3721                 goto out;
3722
3723         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3724                                 "rbd", "dir_get_name",
3725                                 image_id, image_id_size,
3726                                 reply_buf, size, NULL);
3727         if (ret < 0)
3728                 goto out;
3729         p = reply_buf;
3730         end = reply_buf + ret;
3731
3732         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3733         if (IS_ERR(image_name))
3734                 image_name = NULL;
3735         else
3736                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3737 out:
3738         kfree(reply_buf);
3739         kfree(image_id);
3740
3741         return image_name;
3742 }
3743
3744 /*
3745  * When an rbd image has a parent image, it is identified by the
3746  * pool, image, and snapshot ids (not names).  This function fills
3747  * in the names for those ids.  (It's OK if we can't figure out the
3748  * name for an image id, but the pool and snapshot ids should always
3749  * exist and have names.)  All names in an rbd spec are dynamically
3750  * allocated.
3751  *
3752  * When an image being mapped (not a parent) is probed, we have the
3753  * pool name and pool id, image name and image id, and the snapshot
3754  * name.  The only thing we're missing is the snapshot id.
3755  *
3756  * The set of snapshots for an image is not known until they have
3757  * been read by rbd_dev_snaps_update(), so we can't completely fill
3758  * in this information until after that has been called.
3759  */
3760 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3761 {
3762         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3763         struct rbd_spec *spec = rbd_dev->spec;
3764         const char *pool_name;
3765         const char *image_name;
3766         const char *snap_name;
3767         int ret;
3768
3769         /*
3770          * An image being mapped will have the pool name (etc.), but
3771          * we need to look up the snapshot id.
3772          */
3773         if (spec->pool_name) {
3774                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3775                         struct rbd_snap *snap;
3776
3777                         snap = snap_by_name(rbd_dev, spec->snap_name);
3778                         if (!snap)
3779                                 return -ENOENT;
3780                         spec->snap_id = snap->id;
3781                 } else {
3782                         spec->snap_id = CEPH_NOSNAP;
3783                 }
3784
3785                 return 0;
3786         }
3787
3788         /* Get the pool name; we have to make our own copy of this */
3789
3790         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3791         if (!pool_name) {
3792                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3793                 return -EIO;
3794         }
3795         pool_name = kstrdup(pool_name, GFP_KERNEL);
3796         if (!pool_name)
3797                 return -ENOMEM;
3798
3799         /* Fetch the image name; tolerate failure here */
3800
3801         image_name = rbd_dev_image_name(rbd_dev);
3802         if (!image_name)
3803                 rbd_warn(rbd_dev, "unable to get image name");
3804
3805         /* Look up the snapshot name, and make a copy */
3806
3807         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3808         if (!snap_name) {
3809                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3810                 ret = -EIO;
3811                 goto out_err;
3812         }
3813         snap_name = kstrdup(snap_name, GFP_KERNEL);
3814         if (!snap_name) {
3815                 ret = -ENOMEM;
3816                 goto out_err;
3817         }
3818
3819         spec->pool_name = pool_name;
3820         spec->image_name = image_name;
3821         spec->snap_name = snap_name;
3822
3823         return 0;
3824 out_err:
3825         kfree(image_name);
3826         kfree(pool_name);
3827
3828         return ret;
3829 }
3830
3831 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3832 {
3833         size_t size;
3834         int ret;
3835         void *reply_buf;
3836         void *p;
3837         void *end;
3838         u64 seq;
3839         u32 snap_count;
3840         struct ceph_snap_context *snapc;
3841         u32 i;
3842
3843         /*
3844          * We'll need room for the seq value (maximum snapshot id),
3845          * snapshot count, and array of that many snapshot ids.
3846          * For now we have a fixed upper limit on the number we're
3847          * prepared to receive.
3848          */
3849         size = sizeof (__le64) + sizeof (__le32) +
3850                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3851         reply_buf = kzalloc(size, GFP_KERNEL);
3852         if (!reply_buf)
3853                 return -ENOMEM;
3854
3855         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3856                                 "rbd", "get_snapcontext", NULL, 0,
3857                                 reply_buf, size, ver);
3858         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3859         if (ret < 0)
3860                 goto out;
3861
3862         p = reply_buf;
3863         end = reply_buf + ret;
3864         ret = -ERANGE;
3865         ceph_decode_64_safe(&p, end, seq, out);
3866         ceph_decode_32_safe(&p, end, snap_count, out);
3867
3868         /*
3869          * Make sure the reported number of snapshot ids wouldn't go
3870          * beyond the end of our buffer.  But before checking that,
3871          * make sure the computed size of the snapshot context we
3872          * allocate is representable in a size_t.
3873          */
3874         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3875                                  / sizeof (u64)) {
3876                 ret = -EINVAL;
3877                 goto out;
3878         }
3879         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3880                 goto out;
3881         ret = 0;
3882
3883         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3884         if (!snapc) {
3885                 ret = -ENOMEM;
3886                 goto out;
3887         }
3888         snapc->seq = seq;
3889         for (i = 0; i < snap_count; i++)
3890                 snapc->snaps[i] = ceph_decode_64(&p);
3891
3892         rbd_dev->header.snapc = snapc;
3893
3894         dout("  snap context seq = %llu, snap_count = %u\n",
3895                 (unsigned long long)seq, (unsigned int)snap_count);
3896 out:
3897         kfree(reply_buf);
3898
3899         return ret;
3900 }
3901
3902 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3903 {
3904         size_t size;
3905         void *reply_buf;
3906         __le64 snap_id;
3907         int ret;
3908         void *p;
3909         void *end;
3910         char *snap_name;
3911
3912         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3913         reply_buf = kmalloc(size, GFP_KERNEL);
3914         if (!reply_buf)
3915                 return ERR_PTR(-ENOMEM);
3916
3917         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3918         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3919         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3920                                 "rbd", "get_snapshot_name",
3921                                 &snap_id, sizeof (snap_id),
3922                                 reply_buf, size, NULL);
3923         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3924         if (ret < 0) {
3925                 snap_name = ERR_PTR(ret);
3926                 goto out;
3927         }
3928
3929         p = reply_buf;
3930         end = reply_buf + ret;
3931         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3932         if (IS_ERR(snap_name))
3933                 goto out;
3934
3935         dout("  snap_id 0x%016llx snap_name = %s\n",
3936                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3937 out:
3938         kfree(reply_buf);
3939
3940         return snap_name;
3941 }
3942
3943 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3944                 u64 *snap_size, u64 *snap_features)
3945 {
3946         u64 snap_id;
3947         u64 size;
3948         u64 features;
3949         const char *snap_name;
3950         int ret;
3951
3952         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3953         snap_id = rbd_dev->header.snapc->snaps[which];
3954         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3955         if (ret)
3956                 goto out_err;
3957
3958         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3959         if (ret)
3960                 goto out_err;
3961
3962         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3963         if (!IS_ERR(snap_name)) {
3964                 *snap_size = size;
3965                 *snap_features = features;
3966         }
3967
3968         return snap_name;
3969 out_err:
3970         return ERR_PTR(ret);
3971 }
3972
3973 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3974                 u64 *snap_size, u64 *snap_features)
3975 {
3976         if (rbd_dev->image_format == 1)
3977                 return rbd_dev_v1_snap_info(rbd_dev, which,
3978                                         snap_size, snap_features);
3979         if (rbd_dev->image_format == 2)
3980                 return rbd_dev_v2_snap_info(rbd_dev, which,
3981                                         snap_size, snap_features);
3982         return ERR_PTR(-EINVAL);
3983 }
3984
3985 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3986 {
3987         int ret;
3988
3989         down_write(&rbd_dev->header_rwsem);
3990
3991         ret = rbd_dev_v2_image_size(rbd_dev);
3992         if (ret)
3993                 goto out;
3994         rbd_update_mapping_size(rbd_dev);
3995
3996         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3997         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3998         if (ret)
3999                 goto out;
4000         ret = rbd_dev_snaps_update(rbd_dev);
4001         dout("rbd_dev_snaps_update returned %d\n", ret);
4002         if (ret)
4003                 goto out;
4004 out:
4005         up_write(&rbd_dev->header_rwsem);
4006
4007         return ret;
4008 }
4009
4010 /*
4011  * Scan the rbd device's current snapshot list and compare it to the
4012  * newly-received snapshot context.  Remove any existing snapshots
4013  * not present in the new snapshot context.  Add a new snapshot for
4014  * any snaphots in the snapshot context not in the current list.
4015  * And verify there are no changes to snapshots we already know
4016  * about.
4017  *
4018  * Assumes the snapshots in the snapshot context are sorted by
4019  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4020  * are also maintained in that order.)
4021  *
4022  * Note that any error occurs while updating the snapshot list
4023  * aborts the update, and the entire list is cleared.  The snapshot
4024  * list becomes inconsistent at that point anyway, so it might as
4025  * well be empty.
4026  */
4027 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4028 {
4029         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4030         const u32 snap_count = snapc->num_snaps;
4031         struct list_head *head = &rbd_dev->snaps;
4032         struct list_head *links = head->next;
4033         u32 index = 0;
4034         int ret = 0;
4035
4036         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4037         while (index < snap_count || links != head) {
4038                 u64 snap_id;
4039                 struct rbd_snap *snap;
4040                 const char *snap_name;
4041                 u64 snap_size = 0;
4042                 u64 snap_features = 0;
4043
4044                 snap_id = index < snap_count ? snapc->snaps[index]
4045                                              : CEPH_NOSNAP;
4046                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4047                                      : NULL;
4048                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4049
4050                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4051                         struct list_head *next = links->next;
4052
4053                         /*
4054                          * A previously-existing snapshot is not in
4055                          * the new snap context.
4056                          *
4057                          * If the now-missing snapshot is the one
4058                          * the image represents, clear its existence
4059                          * flag so we can avoid sending any more
4060                          * requests to it.
4061                          */
4062                         if (rbd_dev->spec->snap_id == snap->id)
4063                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4064                         dout("removing %ssnap id %llu\n",
4065                                 rbd_dev->spec->snap_id == snap->id ?
4066                                                         "mapped " : "",
4067                                 (unsigned long long)snap->id);
4068
4069                         list_del(&snap->node);
4070                         rbd_snap_destroy(snap);
4071
4072                         /* Done with this list entry; advance */
4073
4074                         links = next;
4075                         continue;
4076                 }
4077
4078                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4079                                         &snap_size, &snap_features);
4080                 if (IS_ERR(snap_name)) {
4081                         ret = PTR_ERR(snap_name);
4082                         dout("failed to get snap info, error %d\n", ret);
4083                         goto out_err;
4084                 }
4085
4086                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4087                         (unsigned long long)snap_id);
4088                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4089                         struct rbd_snap *new_snap;
4090
4091                         /* We haven't seen this snapshot before */
4092
4093                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4094                                         snap_id, snap_size, snap_features);
4095                         if (IS_ERR(new_snap)) {
4096                                 ret = PTR_ERR(new_snap);
4097                                 dout("  failed to add dev, error %d\n", ret);
4098                                 goto out_err;
4099                         }
4100
4101                         /* New goes before existing, or at end of list */
4102
4103                         dout("  added dev%s\n", snap ? "" : " at end\n");
4104                         if (snap)
4105                                 list_add_tail(&new_snap->node, &snap->node);
4106                         else
4107                                 list_add_tail(&new_snap->node, head);
4108                 } else {
4109                         /* Already have this one */
4110
4111                         dout("  already present\n");
4112
4113                         rbd_assert(snap->size == snap_size);
4114                         rbd_assert(!strcmp(snap->name, snap_name));
4115                         rbd_assert(snap->features == snap_features);
4116
4117                         /* Done with this list entry; advance */
4118
4119                         links = links->next;
4120                 }
4121
4122                 /* Advance to the next entry in the snapshot context */
4123
4124                 index++;
4125         }
4126         dout("%s: done\n", __func__);
4127
4128         return 0;
4129 out_err:
4130         rbd_remove_all_snaps(rbd_dev);
4131
4132         return ret;
4133 }
4134
4135 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4136 {
4137         struct device *dev;
4138         int ret;
4139
4140         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4141
4142         dev = &rbd_dev->dev;
4143         dev->bus = &rbd_bus_type;
4144         dev->type = &rbd_device_type;
4145         dev->parent = &rbd_root_dev;
4146         dev->release = rbd_dev_device_release;
4147         dev_set_name(dev, "%d", rbd_dev->dev_id);
4148         ret = device_register(dev);
4149
4150         mutex_unlock(&ctl_mutex);
4151
4152         return ret;
4153 }
4154
4155 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4156 {
4157         device_unregister(&rbd_dev->dev);
4158 }
4159
4160 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4161
4162 /*
4163  * Get a unique rbd identifier for the given new rbd_dev, and add
4164  * the rbd_dev to the global list.  The minimum rbd id is 1.
4165  */
4166 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4167 {
4168         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4169
4170         spin_lock(&rbd_dev_list_lock);
4171         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4172         spin_unlock(&rbd_dev_list_lock);
4173         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4174                 (unsigned long long) rbd_dev->dev_id);
4175 }
4176
4177 /*
4178  * Remove an rbd_dev from the global list, and record that its
4179  * identifier is no longer in use.
4180  */
4181 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4182 {
4183         struct list_head *tmp;
4184         int rbd_id = rbd_dev->dev_id;
4185         int max_id;
4186
4187         rbd_assert(rbd_id > 0);
4188
4189         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4190                 (unsigned long long) rbd_dev->dev_id);
4191         spin_lock(&rbd_dev_list_lock);
4192         list_del_init(&rbd_dev->node);
4193
4194         /*
4195          * If the id being "put" is not the current maximum, there
4196          * is nothing special we need to do.
4197          */
4198         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4199                 spin_unlock(&rbd_dev_list_lock);
4200                 return;
4201         }
4202
4203         /*
4204          * We need to update the current maximum id.  Search the
4205          * list to find out what it is.  We're more likely to find
4206          * the maximum at the end, so search the list backward.
4207          */
4208         max_id = 0;
4209         list_for_each_prev(tmp, &rbd_dev_list) {
4210                 struct rbd_device *rbd_dev;
4211
4212                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4213                 if (rbd_dev->dev_id > max_id)
4214                         max_id = rbd_dev->dev_id;
4215         }
4216         spin_unlock(&rbd_dev_list_lock);
4217
4218         /*
4219          * The max id could have been updated by rbd_dev_id_get(), in
4220          * which case it now accurately reflects the new maximum.
4221          * Be careful not to overwrite the maximum value in that
4222          * case.
4223          */
4224         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4225         dout("  max dev id has been reset\n");
4226 }
4227
4228 /*
4229  * Skips over white space at *buf, and updates *buf to point to the
4230  * first found non-space character (if any). Returns the length of
4231  * the token (string of non-white space characters) found.  Note
4232  * that *buf must be terminated with '\0'.
4233  */
4234 static inline size_t next_token(const char **buf)
4235 {
4236         /*
4237         * These are the characters that produce nonzero for
4238         * isspace() in the "C" and "POSIX" locales.
4239         */
4240         const char *spaces = " \f\n\r\t\v";
4241
4242         *buf += strspn(*buf, spaces);   /* Find start of token */
4243
4244         return strcspn(*buf, spaces);   /* Return token length */
4245 }
4246
4247 /*
4248  * Finds the next token in *buf, and if the provided token buffer is
4249  * big enough, copies the found token into it.  The result, if
4250  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4251  * must be terminated with '\0' on entry.
4252  *
4253  * Returns the length of the token found (not including the '\0').
4254  * Return value will be 0 if no token is found, and it will be >=
4255  * token_size if the token would not fit.
4256  *
4257  * The *buf pointer will be updated to point beyond the end of the
4258  * found token.  Note that this occurs even if the token buffer is
4259  * too small to hold it.
4260  */
4261 static inline size_t copy_token(const char **buf,
4262                                 char *token,
4263                                 size_t token_size)
4264 {
4265         size_t len;
4266
4267         len = next_token(buf);
4268         if (len < token_size) {
4269                 memcpy(token, *buf, len);
4270                 *(token + len) = '\0';
4271         }
4272         *buf += len;
4273
4274         return len;
4275 }
4276
4277 /*
4278  * Finds the next token in *buf, dynamically allocates a buffer big
4279  * enough to hold a copy of it, and copies the token into the new
4280  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4281  * that a duplicate buffer is created even for a zero-length token.
4282  *
4283  * Returns a pointer to the newly-allocated duplicate, or a null
4284  * pointer if memory for the duplicate was not available.  If
4285  * the lenp argument is a non-null pointer, the length of the token
4286  * (not including the '\0') is returned in *lenp.
4287  *
4288  * If successful, the *buf pointer will be updated to point beyond
4289  * the end of the found token.
4290  *
4291  * Note: uses GFP_KERNEL for allocation.
4292  */
4293 static inline char *dup_token(const char **buf, size_t *lenp)
4294 {
4295         char *dup;
4296         size_t len;
4297
4298         len = next_token(buf);
4299         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4300         if (!dup)
4301                 return NULL;
4302         *(dup + len) = '\0';
4303         *buf += len;
4304
4305         if (lenp)
4306                 *lenp = len;
4307
4308         return dup;
4309 }
4310
4311 /*
4312  * Parse the options provided for an "rbd add" (i.e., rbd image
4313  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4314  * and the data written is passed here via a NUL-terminated buffer.
4315  * Returns 0 if successful or an error code otherwise.
4316  *
4317  * The information extracted from these options is recorded in
4318  * the other parameters which return dynamically-allocated
4319  * structures:
4320  *  ceph_opts
4321  *      The address of a pointer that will refer to a ceph options
4322  *      structure.  Caller must release the returned pointer using
4323  *      ceph_destroy_options() when it is no longer needed.
4324  *  rbd_opts
4325  *      Address of an rbd options pointer.  Fully initialized by
4326  *      this function; caller must release with kfree().
4327  *  spec
4328  *      Address of an rbd image specification pointer.  Fully
4329  *      initialized by this function based on parsed options.
4330  *      Caller must release with rbd_spec_put().
4331  *
4332  * The options passed take this form:
4333  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4334  * where:
4335  *  <mon_addrs>
4336  *      A comma-separated list of one or more monitor addresses.
4337  *      A monitor address is an ip address, optionally followed
4338  *      by a port number (separated by a colon).
4339  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4340  *  <options>
4341  *      A comma-separated list of ceph and/or rbd options.
4342  *  <pool_name>
4343  *      The name of the rados pool containing the rbd image.
4344  *  <image_name>
4345  *      The name of the image in that pool to map.
4346  *  <snap_id>
4347  *      An optional snapshot id.  If provided, the mapping will
4348  *      present data from the image at the time that snapshot was
4349  *      created.  The image head is used if no snapshot id is
4350  *      provided.  Snapshot mappings are always read-only.
4351  */
4352 static int rbd_add_parse_args(const char *buf,
4353                                 struct ceph_options **ceph_opts,
4354                                 struct rbd_options **opts,
4355                                 struct rbd_spec **rbd_spec)
4356 {
4357         size_t len;
4358         char *options;
4359         const char *mon_addrs;
4360         char *snap_name;
4361         size_t mon_addrs_size;
4362         struct rbd_spec *spec = NULL;
4363         struct rbd_options *rbd_opts = NULL;
4364         struct ceph_options *copts;
4365         int ret;
4366
4367         /* The first four tokens are required */
4368
4369         len = next_token(&buf);
4370         if (!len) {
4371                 rbd_warn(NULL, "no monitor address(es) provided");
4372                 return -EINVAL;
4373         }
4374         mon_addrs = buf;
4375         mon_addrs_size = len + 1;
4376         buf += len;
4377
4378         ret = -EINVAL;
4379         options = dup_token(&buf, NULL);
4380         if (!options)
4381                 return -ENOMEM;
4382         if (!*options) {
4383                 rbd_warn(NULL, "no options provided");
4384                 goto out_err;
4385         }
4386
4387         spec = rbd_spec_alloc();
4388         if (!spec)
4389                 goto out_mem;
4390
4391         spec->pool_name = dup_token(&buf, NULL);
4392         if (!spec->pool_name)
4393                 goto out_mem;
4394         if (!*spec->pool_name) {
4395                 rbd_warn(NULL, "no pool name provided");
4396                 goto out_err;
4397         }
4398
4399         spec->image_name = dup_token(&buf, NULL);
4400         if (!spec->image_name)
4401                 goto out_mem;
4402         if (!*spec->image_name) {
4403                 rbd_warn(NULL, "no image name provided");
4404                 goto out_err;
4405         }
4406
4407         /*
4408          * Snapshot name is optional; default is to use "-"
4409          * (indicating the head/no snapshot).
4410          */
4411         len = next_token(&buf);
4412         if (!len) {
4413                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4414                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4415         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4416                 ret = -ENAMETOOLONG;
4417                 goto out_err;
4418         }
4419         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4420         if (!snap_name)
4421                 goto out_mem;
4422         *(snap_name + len) = '\0';
4423         spec->snap_name = snap_name;
4424
4425         /* Initialize all rbd options to the defaults */
4426
4427         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4428         if (!rbd_opts)
4429                 goto out_mem;
4430
4431         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4432
4433         copts = ceph_parse_options(options, mon_addrs,
4434                                         mon_addrs + mon_addrs_size - 1,
4435                                         parse_rbd_opts_token, rbd_opts);
4436         if (IS_ERR(copts)) {
4437                 ret = PTR_ERR(copts);
4438                 goto out_err;
4439         }
4440         kfree(options);
4441
4442         *ceph_opts = copts;
4443         *opts = rbd_opts;
4444         *rbd_spec = spec;
4445
4446         return 0;
4447 out_mem:
4448         ret = -ENOMEM;
4449 out_err:
4450         kfree(rbd_opts);
4451         rbd_spec_put(spec);
4452         kfree(options);
4453
4454         return ret;
4455 }
4456
4457 /*
4458  * An rbd format 2 image has a unique identifier, distinct from the
4459  * name given to it by the user.  Internally, that identifier is
4460  * what's used to specify the names of objects related to the image.
4461  *
4462  * A special "rbd id" object is used to map an rbd image name to its
4463  * id.  If that object doesn't exist, then there is no v2 rbd image
4464  * with the supplied name.
4465  *
4466  * This function will record the given rbd_dev's image_id field if
4467  * it can be determined, and in that case will return 0.  If any
4468  * errors occur a negative errno will be returned and the rbd_dev's
4469  * image_id field will be unchanged (and should be NULL).
4470  */
4471 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4472 {
4473         int ret;
4474         size_t size;
4475         char *object_name;
4476         void *response;
4477         char *image_id;
4478
4479         /*
4480          * When probing a parent image, the image id is already
4481          * known (and the image name likely is not).  There's no
4482          * need to fetch the image id again in this case.  We
4483          * do still need to set the image format though.
4484          */
4485         if (rbd_dev->spec->image_id) {
4486                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4487
4488                 return 0;
4489         }
4490
4491         /*
4492          * First, see if the format 2 image id file exists, and if
4493          * so, get the image's persistent id from it.
4494          */
4495         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4496         object_name = kmalloc(size, GFP_NOIO);
4497         if (!object_name)
4498                 return -ENOMEM;
4499         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4500         dout("rbd id object name is %s\n", object_name);
4501
4502         /* Response will be an encoded string, which includes a length */
4503
4504         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4505         response = kzalloc(size, GFP_NOIO);
4506         if (!response) {
4507                 ret = -ENOMEM;
4508                 goto out;
4509         }
4510
4511         /* If it doesn't exist we'll assume it's a format 1 image */
4512
4513         ret = rbd_obj_method_sync(rbd_dev, object_name,
4514                                 "rbd", "get_id", NULL, 0,
4515                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4516         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4517         if (ret == -ENOENT) {
4518                 image_id = kstrdup("", GFP_KERNEL);
4519                 ret = image_id ? 0 : -ENOMEM;
4520                 if (!ret)
4521                         rbd_dev->image_format = 1;
4522         } else if (ret > sizeof (__le32)) {
4523                 void *p = response;
4524
4525                 image_id = ceph_extract_encoded_string(&p, p + ret,
4526                                                 NULL, GFP_NOIO);
4527                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4528                 if (!ret)
4529                         rbd_dev->image_format = 2;
4530         } else {
4531                 ret = -EINVAL;
4532         }
4533
4534         if (!ret) {
4535                 rbd_dev->spec->image_id = image_id;
4536                 dout("image_id is %s\n", image_id);
4537         }
4538 out:
4539         kfree(response);
4540         kfree(object_name);
4541
4542         return ret;
4543 }
4544
4545 /* Undo whatever state changes are made by v1 or v2 image probe */
4546
4547 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4548 {
4549         struct rbd_image_header *header;
4550
4551         rbd_dev_remove_parent(rbd_dev);
4552         rbd_spec_put(rbd_dev->parent_spec);
4553         rbd_dev->parent_spec = NULL;
4554         rbd_dev->parent_overlap = 0;
4555
4556         /* Free dynamic fields from the header, then zero it out */
4557
4558         header = &rbd_dev->header;
4559         ceph_put_snap_context(header->snapc);
4560         kfree(header->snap_sizes);
4561         kfree(header->snap_names);
4562         kfree(header->object_prefix);
4563         memset(header, 0, sizeof (*header));
4564 }
4565
4566 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4567 {
4568         int ret;
4569
4570         /* Populate rbd image metadata */
4571
4572         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4573         if (ret < 0)
4574                 goto out_err;
4575
4576         /* Version 1 images have no parent (no layering) */
4577
4578         rbd_dev->parent_spec = NULL;
4579         rbd_dev->parent_overlap = 0;
4580
4581         dout("discovered version 1 image, header name is %s\n",
4582                 rbd_dev->header_name);
4583
4584         return 0;
4585
4586 out_err:
4587         kfree(rbd_dev->header_name);
4588         rbd_dev->header_name = NULL;
4589         kfree(rbd_dev->spec->image_id);
4590         rbd_dev->spec->image_id = NULL;
4591
4592         return ret;
4593 }
4594
4595 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4596 {
4597         int ret;
4598         u64 ver = 0;
4599
4600         ret = rbd_dev_v2_image_size(rbd_dev);
4601         if (ret)
4602                 goto out_err;
4603
4604         /* Get the object prefix (a.k.a. block_name) for the image */
4605
4606         ret = rbd_dev_v2_object_prefix(rbd_dev);
4607         if (ret)
4608                 goto out_err;
4609
4610         /* Get the and check features for the image */
4611
4612         ret = rbd_dev_v2_features(rbd_dev);
4613         if (ret)
4614                 goto out_err;
4615
4616         /* If the image supports layering, get the parent info */
4617
4618         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4619                 ret = rbd_dev_v2_parent_info(rbd_dev);
4620                 if (ret)
4621                         goto out_err;
4622
4623                 /*
4624                  * Don't print a warning for parent images.  We can
4625                  * tell this point because we won't know its pool
4626                  * name yet (just its pool id).
4627                  */
4628                 if (rbd_dev->spec->pool_name)
4629                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4630                                         "is EXPERIMENTAL!");
4631         }
4632
4633         /* If the image supports fancy striping, get its parameters */
4634
4635         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4636                 ret = rbd_dev_v2_striping_info(rbd_dev);
4637                 if (ret < 0)
4638                         goto out_err;
4639         }
4640
4641         /* crypto and compression type aren't (yet) supported for v2 images */
4642
4643         rbd_dev->header.crypt_type = 0;
4644         rbd_dev->header.comp_type = 0;
4645
4646         /* Get the snapshot context, plus the header version */
4647
4648         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4649         if (ret)
4650                 goto out_err;
4651
4652         dout("discovered version 2 image, header name is %s\n",
4653                 rbd_dev->header_name);
4654
4655         return 0;
4656 out_err:
4657         rbd_dev->parent_overlap = 0;
4658         rbd_spec_put(rbd_dev->parent_spec);
4659         rbd_dev->parent_spec = NULL;
4660         kfree(rbd_dev->header_name);
4661         rbd_dev->header_name = NULL;
4662         kfree(rbd_dev->header.object_prefix);
4663         rbd_dev->header.object_prefix = NULL;
4664
4665         return ret;
4666 }
4667
4668 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4669 {
4670         struct rbd_device *parent = NULL;
4671         struct rbd_spec *parent_spec;
4672         struct rbd_client *rbdc;
4673         int ret;
4674
4675         if (!rbd_dev->parent_spec)
4676                 return 0;
4677         /*
4678          * We need to pass a reference to the client and the parent
4679          * spec when creating the parent rbd_dev.  Images related by
4680          * parent/child relationships always share both.
4681          */
4682         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4683         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4684
4685         ret = -ENOMEM;
4686         parent = rbd_dev_create(rbdc, parent_spec);
4687         if (!parent)
4688                 goto out_err;
4689
4690         ret = rbd_dev_image_probe(parent);
4691         if (ret < 0)
4692                 goto out_err;
4693         rbd_dev->parent = parent;
4694
4695         return 0;
4696 out_err:
4697         if (parent) {
4698                 rbd_spec_put(rbd_dev->parent_spec);
4699                 kfree(rbd_dev->header_name);
4700                 rbd_dev_destroy(parent);
4701         } else {
4702                 rbd_put_client(rbdc);
4703                 rbd_spec_put(parent_spec);
4704         }
4705
4706         return ret;
4707 }
4708
4709 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4710 {
4711         int ret;
4712
4713         ret = rbd_dev_mapping_set(rbd_dev);
4714         if (ret)
4715                 return ret;
4716
4717         /* generate unique id: find highest unique id, add one */
4718         rbd_dev_id_get(rbd_dev);
4719
4720         /* Fill in the device name, now that we have its id. */
4721         BUILD_BUG_ON(DEV_NAME_LEN
4722                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4723         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4724
4725         /* Get our block major device number. */
4726
4727         ret = register_blkdev(0, rbd_dev->name);
4728         if (ret < 0)
4729                 goto err_out_id;
4730         rbd_dev->major = ret;
4731
4732         /* Set up the blkdev mapping. */
4733
4734         ret = rbd_init_disk(rbd_dev);
4735         if (ret)
4736                 goto err_out_blkdev;
4737
4738         ret = rbd_bus_add_dev(rbd_dev);
4739         if (ret)
4740                 goto err_out_disk;
4741
4742         /* Everything's ready.  Announce the disk to the world. */
4743
4744         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4745         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4746         add_disk(rbd_dev->disk);
4747
4748         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4749                 (unsigned long long) rbd_dev->mapping.size);
4750
4751         return ret;
4752
4753 err_out_disk:
4754         rbd_free_disk(rbd_dev);
4755 err_out_blkdev:
4756         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4757 err_out_id:
4758         rbd_dev_id_put(rbd_dev);
4759         rbd_dev_mapping_clear(rbd_dev);
4760
4761         return ret;
4762 }
4763
4764 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4765 {
4766         struct rbd_spec *spec = rbd_dev->spec;
4767         size_t size;
4768
4769         /* Record the header object name for this rbd image. */
4770
4771         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4772
4773         if (rbd_dev->image_format == 1)
4774                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4775         else
4776                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4777
4778         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4779         if (!rbd_dev->header_name)
4780                 return -ENOMEM;
4781
4782         if (rbd_dev->image_format == 1)
4783                 sprintf(rbd_dev->header_name, "%s%s",
4784                         spec->image_name, RBD_SUFFIX);
4785         else
4786                 sprintf(rbd_dev->header_name, "%s%s",
4787                         RBD_HEADER_PREFIX, spec->image_id);
4788         return 0;
4789 }
4790
4791 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4792 {
4793         int ret;
4794
4795         rbd_remove_all_snaps(rbd_dev);
4796         rbd_dev_unprobe(rbd_dev);
4797         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4798         if (ret)
4799                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4800         kfree(rbd_dev->header_name);
4801         rbd_dev->header_name = NULL;
4802         rbd_dev->image_format = 0;
4803         kfree(rbd_dev->spec->image_id);
4804         rbd_dev->spec->image_id = NULL;
4805
4806         rbd_dev_destroy(rbd_dev);
4807 }
4808
4809 /*
4810  * Probe for the existence of the header object for the given rbd
4811  * device.  For format 2 images this includes determining the image
4812  * id.
4813  */
4814 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4815 {
4816         int ret;
4817         int tmp;
4818
4819         /*
4820          * Get the id from the image id object.  If it's not a
4821          * format 2 image, we'll get ENOENT back, and we'll assume
4822          * it's a format 1 image.
4823          */
4824         ret = rbd_dev_image_id(rbd_dev);
4825         if (ret)
4826                 return ret;
4827         rbd_assert(rbd_dev->spec->image_id);
4828         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4829
4830         ret = rbd_dev_header_name(rbd_dev);
4831         if (ret)
4832                 goto err_out_format;
4833
4834         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4835         if (ret)
4836                 goto out_header_name;
4837
4838         if (rbd_dev->image_format == 1)
4839                 ret = rbd_dev_v1_probe(rbd_dev);
4840         else
4841                 ret = rbd_dev_v2_probe(rbd_dev);
4842         if (ret)
4843                 goto err_out_watch;
4844
4845         ret = rbd_dev_snaps_update(rbd_dev);
4846         if (ret)
4847                 goto err_out_probe;
4848
4849         ret = rbd_dev_spec_update(rbd_dev);
4850         if (ret)
4851                 goto err_out_snaps;
4852
4853         ret = rbd_dev_probe_parent(rbd_dev);
4854         if (!ret)
4855                 return 0;
4856
4857 err_out_snaps:
4858         rbd_remove_all_snaps(rbd_dev);
4859 err_out_probe:
4860         rbd_dev_unprobe(rbd_dev);
4861 err_out_watch:
4862         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4863         if (tmp)
4864                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4865 out_header_name:
4866         kfree(rbd_dev->header_name);
4867         rbd_dev->header_name = NULL;
4868 err_out_format:
4869         rbd_dev->image_format = 0;
4870         kfree(rbd_dev->spec->image_id);
4871         rbd_dev->spec->image_id = NULL;
4872
4873         dout("probe failed, returning %d\n", ret);
4874
4875         return ret;
4876 }
4877
4878 static ssize_t rbd_add(struct bus_type *bus,
4879                        const char *buf,
4880                        size_t count)
4881 {
4882         struct rbd_device *rbd_dev = NULL;
4883         struct ceph_options *ceph_opts = NULL;
4884         struct rbd_options *rbd_opts = NULL;
4885         struct rbd_spec *spec = NULL;
4886         struct rbd_client *rbdc;
4887         struct ceph_osd_client *osdc;
4888         int rc = -ENOMEM;
4889
4890         if (!try_module_get(THIS_MODULE))
4891                 return -ENODEV;
4892
4893         /* parse add command */
4894         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4895         if (rc < 0)
4896                 goto err_out_module;
4897
4898         rbdc = rbd_get_client(ceph_opts);
4899         if (IS_ERR(rbdc)) {
4900                 rc = PTR_ERR(rbdc);
4901                 goto err_out_args;
4902         }
4903         ceph_opts = NULL;       /* rbd_dev client now owns this */
4904
4905         /* pick the pool */
4906         osdc = &rbdc->client->osdc;
4907         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4908         if (rc < 0)
4909                 goto err_out_client;
4910         spec->pool_id = (u64)rc;
4911
4912         /* The ceph file layout needs to fit pool id in 32 bits */
4913
4914         if (spec->pool_id > (u64)U32_MAX) {
4915                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4916                                 (unsigned long long)spec->pool_id, U32_MAX);
4917                 rc = -EIO;
4918                 goto err_out_client;
4919         }
4920
4921         rbd_dev = rbd_dev_create(rbdc, spec);
4922         if (!rbd_dev)
4923                 goto err_out_client;
4924         rbdc = NULL;            /* rbd_dev now owns this */
4925         spec = NULL;            /* rbd_dev now owns this */
4926
4927         rbd_dev->mapping.read_only = rbd_opts->read_only;
4928         kfree(rbd_opts);
4929         rbd_opts = NULL;        /* done with this */
4930
4931         rc = rbd_dev_image_probe(rbd_dev);
4932         if (rc < 0)
4933                 goto err_out_rbd_dev;
4934
4935         rc = rbd_dev_device_setup(rbd_dev);
4936         if (!rc)
4937                 return count;
4938
4939         rbd_dev_image_release(rbd_dev);
4940 err_out_rbd_dev:
4941         rbd_dev_destroy(rbd_dev);
4942 err_out_client:
4943         rbd_put_client(rbdc);
4944 err_out_args:
4945         if (ceph_opts)
4946                 ceph_destroy_options(ceph_opts);
4947         kfree(rbd_opts);
4948         rbd_spec_put(spec);
4949 err_out_module:
4950         module_put(THIS_MODULE);
4951
4952         dout("Error adding device %s\n", buf);
4953
4954         return (ssize_t)rc;
4955 }
4956
4957 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4958 {
4959         struct list_head *tmp;
4960         struct rbd_device *rbd_dev;
4961
4962         spin_lock(&rbd_dev_list_lock);
4963         list_for_each(tmp, &rbd_dev_list) {
4964                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4965                 if (rbd_dev->dev_id == dev_id) {
4966                         spin_unlock(&rbd_dev_list_lock);
4967                         return rbd_dev;
4968                 }
4969         }
4970         spin_unlock(&rbd_dev_list_lock);
4971         return NULL;
4972 }
4973
4974 static void rbd_dev_device_release(struct device *dev)
4975 {
4976         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4977
4978         rbd_free_disk(rbd_dev);
4979         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4980         rbd_dev_clear_mapping(rbd_dev);
4981         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4982         rbd_dev->major = 0;
4983         rbd_dev_id_put(rbd_dev);
4984         rbd_dev_mapping_clear(rbd_dev);
4985 }
4986
4987 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4988 {
4989         while (rbd_dev->parent) {
4990                 struct rbd_device *first = rbd_dev;
4991                 struct rbd_device *second = first->parent;
4992                 struct rbd_device *third;
4993
4994                 /*
4995                  * Follow to the parent with no grandparent and
4996                  * remove it.
4997                  */
4998                 while (second && (third = second->parent)) {
4999                         first = second;
5000                         second = third;
5001                 }
5002                 rbd_assert(second);
5003                 rbd_dev_image_release(second);
5004                 first->parent = NULL;
5005                 first->parent_overlap = 0;
5006
5007                 rbd_assert(first->parent_spec);
5008                 rbd_spec_put(first->parent_spec);
5009                 first->parent_spec = NULL;
5010         }
5011 }
5012
5013 static ssize_t rbd_remove(struct bus_type *bus,
5014                           const char *buf,
5015                           size_t count)
5016 {
5017         struct rbd_device *rbd_dev = NULL;
5018         int target_id;
5019         unsigned long ul;
5020         int ret;
5021
5022         ret = strict_strtoul(buf, 10, &ul);
5023         if (ret)
5024                 return ret;
5025
5026         /* convert to int; abort if we lost anything in the conversion */
5027         target_id = (int) ul;
5028         if (target_id != ul)
5029                 return -EINVAL;
5030
5031         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5032
5033         rbd_dev = __rbd_get_dev(target_id);
5034         if (!rbd_dev) {
5035                 ret = -ENOENT;
5036                 goto done;
5037         }
5038
5039         spin_lock_irq(&rbd_dev->lock);
5040         if (rbd_dev->open_count)
5041                 ret = -EBUSY;
5042         else
5043                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5044         spin_unlock_irq(&rbd_dev->lock);
5045         if (ret < 0)
5046                 goto done;
5047         ret = count;
5048         rbd_bus_del_dev(rbd_dev);
5049         rbd_dev_image_release(rbd_dev);
5050         module_put(THIS_MODULE);
5051 done:
5052         mutex_unlock(&ctl_mutex);
5053
5054         return ret;
5055 }
5056
5057 /*
5058  * create control files in sysfs
5059  * /sys/bus/rbd/...
5060  */
5061 static int rbd_sysfs_init(void)
5062 {
5063         int ret;
5064
5065         ret = device_register(&rbd_root_dev);
5066         if (ret < 0)
5067                 return ret;
5068
5069         ret = bus_register(&rbd_bus_type);
5070         if (ret < 0)
5071                 device_unregister(&rbd_root_dev);
5072
5073         return ret;
5074 }
5075
5076 static void rbd_sysfs_cleanup(void)
5077 {
5078         bus_unregister(&rbd_bus_type);
5079         device_unregister(&rbd_root_dev);
5080 }
5081
5082 static int __init rbd_init(void)
5083 {
5084         int rc;
5085
5086         if (!libceph_compatible(NULL)) {
5087                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5088
5089                 return -EINVAL;
5090         }
5091         rc = rbd_sysfs_init();
5092         if (rc)
5093                 return rc;
5094         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5095         return 0;
5096 }
5097
5098 static void __exit rbd_exit(void)
5099 {
5100         rbd_sysfs_cleanup();
5101 }
5102
5103 module_init(rbd_init);
5104 module_exit(rbd_exit);
5105
5106 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5107 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5108 MODULE_DESCRIPTION("rados block device");
5109
5110 /* following authorship retained from original osdblk.c */
5111 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5112
5113 MODULE_LICENSE("GPL");