]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
5d1ed184bed29059549d89049c2f1a1c60163f18
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
70
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX    64
74
75 #define RBD_OBJ_PREFIX_LEN_MAX  64
76
77 /* Feature bits */
78
79 #define RBD_FEATURE_LAYERING    (1<<0)
80 #define RBD_FEATURE_STRIPINGV2  (1<<1)
81 #define RBD_FEATURES_ALL \
82             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 /*
98  * block device image metadata (in-memory version)
99  */
100 struct rbd_image_header {
101         /* These four fields never change for a given rbd image */
102         char *object_prefix;
103         u64 features;
104         __u8 obj_order;
105         __u8 crypt_type;
106         __u8 comp_type;
107
108         /* The remaining fields need to be updated occasionally */
109         u64 image_size;
110         struct ceph_snap_context *snapc;
111         char *snap_names;
112         u64 *snap_sizes;
113
114         u64 stripe_unit;
115         u64 stripe_count;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         const char      *pool_name;
146
147         const char      *image_id;
148         const char      *image_name;
149
150         u64             snap_id;
151         const char      *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 enum obj_req_flags {
178         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
179         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
180         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
181         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
182 };
183
184 struct rbd_obj_request {
185         const char              *object_name;
186         u64                     offset;         /* object start byte */
187         u64                     length;         /* bytes from offset */
188         unsigned long           flags;
189
190         /*
191          * An object request associated with an image will have its
192          * img_data flag set; a standalone object request will not.
193          *
194          * A standalone object request will have which == BAD_WHICH
195          * and a null obj_request pointer.
196          *
197          * An object request initiated in support of a layered image
198          * object (to check for its existence before a write) will
199          * have which == BAD_WHICH and a non-null obj_request pointer.
200          *
201          * Finally, an object request for rbd image data will have
202          * which != BAD_WHICH, and will have a non-null img_request
203          * pointer.  The value of which will be in the range
204          * 0..(img_request->obj_request_count-1).
205          */
206         union {
207                 struct rbd_obj_request  *obj_request;   /* STAT op */
208                 struct {
209                         struct rbd_img_request  *img_request;
210                         u64                     img_offset;
211                         /* links for img_request->obj_requests list */
212                         struct list_head        links;
213                 };
214         };
215         u32                     which;          /* posn image request list */
216
217         enum obj_request_type   type;
218         union {
219                 struct bio      *bio_list;
220                 struct {
221                         struct page     **pages;
222                         u32             page_count;
223                 };
224         };
225         struct page             **copyup_pages;
226
227         struct ceph_osd_request *osd_req;
228
229         u64                     xferred;        /* bytes transferred */
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Create a new header structure, translate header format from the on-disk
731  * header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         u32 snap_count;
737         size_t len;
738         size_t size;
739         u32 i;
740
741         memset(header, 0, sizeof (*header));
742
743         snap_count = le32_to_cpu(ondisk->snap_count);
744
745         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
746         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
747         if (!header->object_prefix)
748                 return -ENOMEM;
749         memcpy(header->object_prefix, ondisk->object_prefix, len);
750         header->object_prefix[len] = '\0';
751
752         if (snap_count) {
753                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754
755                 /* Save a copy of the snapshot names */
756
757                 if (snap_names_len > (u64) SIZE_MAX)
758                         return -EIO;
759                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
760                 if (!header->snap_names)
761                         goto out_err;
762                 /*
763                  * Note that rbd_dev_v1_header_read() guarantees
764                  * the ondisk buffer we're working with has
765                  * snap_names_len bytes beyond the end of the
766                  * snapshot id array, this memcpy() is safe.
767                  */
768                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769                         snap_names_len);
770
771                 /* Record each snapshot's size */
772
773                 size = snap_count * sizeof (*header->snap_sizes);
774                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
775                 if (!header->snap_sizes)
776                         goto out_err;
777                 for (i = 0; i < snap_count; i++)
778                         header->snap_sizes[i] =
779                                 le64_to_cpu(ondisk->snaps[i].image_size);
780         } else {
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793
794         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
798         for (i = 0; i < snap_count; i++)
799                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
815 {
816         const char *snap_name;
817
818         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
819
820         /* Skip over names until we find the one we are looking for */
821
822         snap_name = rbd_dev->header.snap_names;
823         while (which--)
824                 snap_name += strlen(snap_name) + 1;
825
826         return kstrdup(snap_name, GFP_KERNEL);
827 }
828
829 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
830 {
831         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
832         u32 which;
833
834         for (which = 0; which < snapc->num_snaps; which++)
835                 if (snapc->snaps[which] == snap_id)
836                         return which;
837
838         return BAD_SNAP_INDEX;
839 }
840
841 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
842 {
843         struct rbd_snap *snap;
844
845         if (snap_id == CEPH_NOSNAP)
846                 return RBD_SNAP_HEAD_NAME;
847
848         list_for_each_entry(snap, &rbd_dev->snaps, node)
849                 if (snap_id == snap->id)
850                         return snap->name;
851
852         return NULL;
853 }
854
855 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
856                                         const char *snap_name)
857 {
858         struct rbd_snap *snap;
859
860         list_for_each_entry(snap, &rbd_dev->snaps, node)
861                 if (!strcmp(snap_name, snap->name))
862                         return snap;
863
864         return NULL;
865 }
866
867 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
868 {
869         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
870                     sizeof (RBD_SNAP_HEAD_NAME))) {
871                 rbd_dev->mapping.size = rbd_dev->header.image_size;
872                 rbd_dev->mapping.features = rbd_dev->header.features;
873         } else {
874                 struct rbd_snap *snap;
875
876                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
877                 if (!snap)
878                         return -ENOENT;
879                 rbd_dev->mapping.size = snap->size;
880                 rbd_dev->mapping.features = snap->features;
881                 rbd_dev->mapping.read_only = true;
882         }
883
884         return 0;
885 }
886
887 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
888 {
889         rbd_dev->mapping.size = 0;
890         rbd_dev->mapping.features = 0;
891         rbd_dev->mapping.read_only = true;
892 }
893
894 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
895 {
896         rbd_dev->mapping.size = 0;
897         rbd_dev->mapping.features = 0;
898         rbd_dev->mapping.read_only = true;
899 }
900
901 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
902 {
903         char *name;
904         u64 segment;
905         int ret;
906
907         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
908         if (!name)
909                 return NULL;
910         segment = offset >> rbd_dev->header.obj_order;
911         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
912                         rbd_dev->header.object_prefix, segment);
913         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
914                 pr_err("error formatting segment name for #%llu (%d)\n",
915                         segment, ret);
916                 kfree(name);
917                 name = NULL;
918         }
919
920         return name;
921 }
922
923 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
924 {
925         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
926
927         return offset & (segment_size - 1);
928 }
929
930 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
931                                 u64 offset, u64 length)
932 {
933         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
934
935         offset &= segment_size - 1;
936
937         rbd_assert(length <= U64_MAX - offset);
938         if (offset + length > segment_size)
939                 length = segment_size - offset;
940
941         return length;
942 }
943
944 /*
945  * returns the size of an object in the image
946  */
947 static u64 rbd_obj_bytes(struct rbd_image_header *header)
948 {
949         return 1 << header->obj_order;
950 }
951
952 /*
953  * bio helpers
954  */
955
956 static void bio_chain_put(struct bio *chain)
957 {
958         struct bio *tmp;
959
960         while (chain) {
961                 tmp = chain;
962                 chain = chain->bi_next;
963                 bio_put(tmp);
964         }
965 }
966
967 /*
968  * zeros a bio chain, starting at specific offset
969  */
970 static void zero_bio_chain(struct bio *chain, int start_ofs)
971 {
972         struct bio_vec *bv;
973         unsigned long flags;
974         void *buf;
975         int i;
976         int pos = 0;
977
978         while (chain) {
979                 bio_for_each_segment(bv, chain, i) {
980                         if (pos + bv->bv_len > start_ofs) {
981                                 int remainder = max(start_ofs - pos, 0);
982                                 buf = bvec_kmap_irq(bv, &flags);
983                                 memset(buf + remainder, 0,
984                                        bv->bv_len - remainder);
985                                 bvec_kunmap_irq(buf, &flags);
986                         }
987                         pos += bv->bv_len;
988                 }
989
990                 chain = chain->bi_next;
991         }
992 }
993
994 /*
995  * similar to zero_bio_chain(), zeros data defined by a page array,
996  * starting at the given byte offset from the start of the array and
997  * continuing up to the given end offset.  The pages array is
998  * assumed to be big enough to hold all bytes up to the end.
999  */
1000 static void zero_pages(struct page **pages, u64 offset, u64 end)
1001 {
1002         struct page **page = &pages[offset >> PAGE_SHIFT];
1003
1004         rbd_assert(end > offset);
1005         rbd_assert(end - offset <= (u64)SIZE_MAX);
1006         while (offset < end) {
1007                 size_t page_offset;
1008                 size_t length;
1009                 unsigned long flags;
1010                 void *kaddr;
1011
1012                 page_offset = (size_t)(offset & ~PAGE_MASK);
1013                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1014                 local_irq_save(flags);
1015                 kaddr = kmap_atomic(*page);
1016                 memset(kaddr + page_offset, 0, length);
1017                 kunmap_atomic(kaddr);
1018                 local_irq_restore(flags);
1019
1020                 offset += length;
1021                 page++;
1022         }
1023 }
1024
1025 /*
1026  * Clone a portion of a bio, starting at the given byte offset
1027  * and continuing for the number of bytes indicated.
1028  */
1029 static struct bio *bio_clone_range(struct bio *bio_src,
1030                                         unsigned int offset,
1031                                         unsigned int len,
1032                                         gfp_t gfpmask)
1033 {
1034         struct bio_vec *bv;
1035         unsigned int resid;
1036         unsigned short idx;
1037         unsigned int voff;
1038         unsigned short end_idx;
1039         unsigned short vcnt;
1040         struct bio *bio;
1041
1042         /* Handle the easy case for the caller */
1043
1044         if (!offset && len == bio_src->bi_size)
1045                 return bio_clone(bio_src, gfpmask);
1046
1047         if (WARN_ON_ONCE(!len))
1048                 return NULL;
1049         if (WARN_ON_ONCE(len > bio_src->bi_size))
1050                 return NULL;
1051         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1052                 return NULL;
1053
1054         /* Find first affected segment... */
1055
1056         resid = offset;
1057         __bio_for_each_segment(bv, bio_src, idx, 0) {
1058                 if (resid < bv->bv_len)
1059                         break;
1060                 resid -= bv->bv_len;
1061         }
1062         voff = resid;
1063
1064         /* ...and the last affected segment */
1065
1066         resid += len;
1067         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1068                 if (resid <= bv->bv_len)
1069                         break;
1070                 resid -= bv->bv_len;
1071         }
1072         vcnt = end_idx - idx + 1;
1073
1074         /* Build the clone */
1075
1076         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1077         if (!bio)
1078                 return NULL;    /* ENOMEM */
1079
1080         bio->bi_bdev = bio_src->bi_bdev;
1081         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1082         bio->bi_rw = bio_src->bi_rw;
1083         bio->bi_flags |= 1 << BIO_CLONED;
1084
1085         /*
1086          * Copy over our part of the bio_vec, then update the first
1087          * and last (or only) entries.
1088          */
1089         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1090                         vcnt * sizeof (struct bio_vec));
1091         bio->bi_io_vec[0].bv_offset += voff;
1092         if (vcnt > 1) {
1093                 bio->bi_io_vec[0].bv_len -= voff;
1094                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1095         } else {
1096                 bio->bi_io_vec[0].bv_len = len;
1097         }
1098
1099         bio->bi_vcnt = vcnt;
1100         bio->bi_size = len;
1101         bio->bi_idx = 0;
1102
1103         return bio;
1104 }
1105
1106 /*
1107  * Clone a portion of a bio chain, starting at the given byte offset
1108  * into the first bio in the source chain and continuing for the
1109  * number of bytes indicated.  The result is another bio chain of
1110  * exactly the given length, or a null pointer on error.
1111  *
1112  * The bio_src and offset parameters are both in-out.  On entry they
1113  * refer to the first source bio and the offset into that bio where
1114  * the start of data to be cloned is located.
1115  *
1116  * On return, bio_src is updated to refer to the bio in the source
1117  * chain that contains first un-cloned byte, and *offset will
1118  * contain the offset of that byte within that bio.
1119  */
1120 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1121                                         unsigned int *offset,
1122                                         unsigned int len,
1123                                         gfp_t gfpmask)
1124 {
1125         struct bio *bi = *bio_src;
1126         unsigned int off = *offset;
1127         struct bio *chain = NULL;
1128         struct bio **end;
1129
1130         /* Build up a chain of clone bios up to the limit */
1131
1132         if (!bi || off >= bi->bi_size || !len)
1133                 return NULL;            /* Nothing to clone */
1134
1135         end = &chain;
1136         while (len) {
1137                 unsigned int bi_size;
1138                 struct bio *bio;
1139
1140                 if (!bi) {
1141                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1142                         goto out_err;   /* EINVAL; ran out of bio's */
1143                 }
1144                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1145                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1146                 if (!bio)
1147                         goto out_err;   /* ENOMEM */
1148
1149                 *end = bio;
1150                 end = &bio->bi_next;
1151
1152                 off += bi_size;
1153                 if (off == bi->bi_size) {
1154                         bi = bi->bi_next;
1155                         off = 0;
1156                 }
1157                 len -= bi_size;
1158         }
1159         *bio_src = bi;
1160         *offset = off;
1161
1162         return chain;
1163 out_err:
1164         bio_chain_put(chain);
1165
1166         return NULL;
1167 }
1168
1169 /*
1170  * The default/initial value for all object request flags is 0.  For
1171  * each flag, once its value is set to 1 it is never reset to 0
1172  * again.
1173  */
1174 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1175 {
1176         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1177                 struct rbd_device *rbd_dev;
1178
1179                 rbd_dev = obj_request->img_request->rbd_dev;
1180                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1181                         obj_request);
1182         }
1183 }
1184
1185 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1186 {
1187         smp_mb();
1188         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1189 }
1190
1191 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1192 {
1193         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1194                 struct rbd_device *rbd_dev = NULL;
1195
1196                 if (obj_request_img_data_test(obj_request))
1197                         rbd_dev = obj_request->img_request->rbd_dev;
1198                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1199                         obj_request);
1200         }
1201 }
1202
1203 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1204 {
1205         smp_mb();
1206         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1207 }
1208
1209 /*
1210  * This sets the KNOWN flag after (possibly) setting the EXISTS
1211  * flag.  The latter is set based on the "exists" value provided.
1212  *
1213  * Note that for our purposes once an object exists it never goes
1214  * away again.  It's possible that the response from two existence
1215  * checks are separated by the creation of the target object, and
1216  * the first ("doesn't exist") response arrives *after* the second
1217  * ("does exist").  In that case we ignore the second one.
1218  */
1219 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1220                                 bool exists)
1221 {
1222         if (exists)
1223                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1224         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1225         smp_mb();
1226 }
1227
1228 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1229 {
1230         smp_mb();
1231         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1232 }
1233
1234 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1235 {
1236         smp_mb();
1237         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1238 }
1239
1240 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1241 {
1242         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1243                 atomic_read(&obj_request->kref.refcount));
1244         kref_get(&obj_request->kref);
1245 }
1246
1247 static void rbd_obj_request_destroy(struct kref *kref);
1248 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1249 {
1250         rbd_assert(obj_request != NULL);
1251         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1252                 atomic_read(&obj_request->kref.refcount));
1253         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1254 }
1255
1256 static void rbd_img_request_get(struct rbd_img_request *img_request)
1257 {
1258         dout("%s: img %p (was %d)\n", __func__, img_request,
1259                 atomic_read(&img_request->kref.refcount));
1260         kref_get(&img_request->kref);
1261 }
1262
1263 static void rbd_img_request_destroy(struct kref *kref);
1264 static void rbd_img_request_put(struct rbd_img_request *img_request)
1265 {
1266         rbd_assert(img_request != NULL);
1267         dout("%s: img %p (was %d)\n", __func__, img_request,
1268                 atomic_read(&img_request->kref.refcount));
1269         kref_put(&img_request->kref, rbd_img_request_destroy);
1270 }
1271
1272 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1273                                         struct rbd_obj_request *obj_request)
1274 {
1275         rbd_assert(obj_request->img_request == NULL);
1276
1277         /* Image request now owns object's original reference */
1278         obj_request->img_request = img_request;
1279         obj_request->which = img_request->obj_request_count;
1280         rbd_assert(!obj_request_img_data_test(obj_request));
1281         obj_request_img_data_set(obj_request);
1282         rbd_assert(obj_request->which != BAD_WHICH);
1283         img_request->obj_request_count++;
1284         list_add_tail(&obj_request->links, &img_request->obj_requests);
1285         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1286                 obj_request->which);
1287 }
1288
1289 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1290                                         struct rbd_obj_request *obj_request)
1291 {
1292         rbd_assert(obj_request->which != BAD_WHICH);
1293
1294         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1295                 obj_request->which);
1296         list_del(&obj_request->links);
1297         rbd_assert(img_request->obj_request_count > 0);
1298         img_request->obj_request_count--;
1299         rbd_assert(obj_request->which == img_request->obj_request_count);
1300         obj_request->which = BAD_WHICH;
1301         rbd_assert(obj_request_img_data_test(obj_request));
1302         rbd_assert(obj_request->img_request == img_request);
1303         obj_request->img_request = NULL;
1304         obj_request->callback = NULL;
1305         rbd_obj_request_put(obj_request);
1306 }
1307
1308 static bool obj_request_type_valid(enum obj_request_type type)
1309 {
1310         switch (type) {
1311         case OBJ_REQUEST_NODATA:
1312         case OBJ_REQUEST_BIO:
1313         case OBJ_REQUEST_PAGES:
1314                 return true;
1315         default:
1316                 return false;
1317         }
1318 }
1319
1320 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1321                                 struct rbd_obj_request *obj_request)
1322 {
1323         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1324
1325         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1326 }
1327
1328 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1329 {
1330
1331         dout("%s: img %p\n", __func__, img_request);
1332
1333         /*
1334          * If no error occurred, compute the aggregate transfer
1335          * count for the image request.  We could instead use
1336          * atomic64_cmpxchg() to update it as each object request
1337          * completes; not clear which way is better off hand.
1338          */
1339         if (!img_request->result) {
1340                 struct rbd_obj_request *obj_request;
1341                 u64 xferred = 0;
1342
1343                 for_each_obj_request(img_request, obj_request)
1344                         xferred += obj_request->xferred;
1345                 img_request->xferred = xferred;
1346         }
1347
1348         if (img_request->callback)
1349                 img_request->callback(img_request);
1350         else
1351                 rbd_img_request_put(img_request);
1352 }
1353
1354 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1355
1356 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1357 {
1358         dout("%s: obj %p\n", __func__, obj_request);
1359
1360         return wait_for_completion_interruptible(&obj_request->completion);
1361 }
1362
1363 /*
1364  * The default/initial value for all image request flags is 0.  Each
1365  * is conditionally set to 1 at image request initialization time
1366  * and currently never change thereafter.
1367  */
1368 static void img_request_write_set(struct rbd_img_request *img_request)
1369 {
1370         set_bit(IMG_REQ_WRITE, &img_request->flags);
1371         smp_mb();
1372 }
1373
1374 static bool img_request_write_test(struct rbd_img_request *img_request)
1375 {
1376         smp_mb();
1377         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1378 }
1379
1380 static void img_request_child_set(struct rbd_img_request *img_request)
1381 {
1382         set_bit(IMG_REQ_CHILD, &img_request->flags);
1383         smp_mb();
1384 }
1385
1386 static bool img_request_child_test(struct rbd_img_request *img_request)
1387 {
1388         smp_mb();
1389         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1390 }
1391
1392 static void img_request_layered_set(struct rbd_img_request *img_request)
1393 {
1394         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1395         smp_mb();
1396 }
1397
1398 static bool img_request_layered_test(struct rbd_img_request *img_request)
1399 {
1400         smp_mb();
1401         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1402 }
1403
1404 static void
1405 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1406 {
1407         u64 xferred = obj_request->xferred;
1408         u64 length = obj_request->length;
1409
1410         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1411                 obj_request, obj_request->img_request, obj_request->result,
1412                 xferred, length);
1413         /*
1414          * ENOENT means a hole in the image.  We zero-fill the
1415          * entire length of the request.  A short read also implies
1416          * zero-fill to the end of the request.  Either way we
1417          * update the xferred count to indicate the whole request
1418          * was satisfied.
1419          */
1420         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1421         if (obj_request->result == -ENOENT) {
1422                 if (obj_request->type == OBJ_REQUEST_BIO)
1423                         zero_bio_chain(obj_request->bio_list, 0);
1424                 else
1425                         zero_pages(obj_request->pages, 0, length);
1426                 obj_request->result = 0;
1427                 obj_request->xferred = length;
1428         } else if (xferred < length && !obj_request->result) {
1429                 if (obj_request->type == OBJ_REQUEST_BIO)
1430                         zero_bio_chain(obj_request->bio_list, xferred);
1431                 else
1432                         zero_pages(obj_request->pages, xferred, length);
1433                 obj_request->xferred = length;
1434         }
1435         obj_request_done_set(obj_request);
1436 }
1437
1438 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1439 {
1440         dout("%s: obj %p cb %p\n", __func__, obj_request,
1441                 obj_request->callback);
1442         if (obj_request->callback)
1443                 obj_request->callback(obj_request);
1444         else
1445                 complete_all(&obj_request->completion);
1446 }
1447
1448 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1449 {
1450         dout("%s: obj %p\n", __func__, obj_request);
1451         obj_request_done_set(obj_request);
1452 }
1453
1454 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1455 {
1456         struct rbd_img_request *img_request = NULL;
1457         struct rbd_device *rbd_dev = NULL;
1458         bool layered = false;
1459
1460         if (obj_request_img_data_test(obj_request)) {
1461                 img_request = obj_request->img_request;
1462                 layered = img_request && img_request_layered_test(img_request);
1463                 rbd_dev = img_request->rbd_dev;
1464         }
1465
1466         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1467                 obj_request, img_request, obj_request->result,
1468                 obj_request->xferred, obj_request->length);
1469         if (layered && obj_request->result == -ENOENT &&
1470                         obj_request->img_offset < rbd_dev->parent_overlap)
1471                 rbd_img_parent_read(obj_request);
1472         else if (img_request)
1473                 rbd_img_obj_request_read_callback(obj_request);
1474         else
1475                 obj_request_done_set(obj_request);
1476 }
1477
1478 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1479 {
1480         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1481                 obj_request->result, obj_request->length);
1482         /*
1483          * There is no such thing as a successful short write.  Set
1484          * it to our originally-requested length.
1485          */
1486         obj_request->xferred = obj_request->length;
1487         obj_request_done_set(obj_request);
1488 }
1489
1490 /*
1491  * For a simple stat call there's nothing to do.  We'll do more if
1492  * this is part of a write sequence for a layered image.
1493  */
1494 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1495 {
1496         dout("%s: obj %p\n", __func__, obj_request);
1497         obj_request_done_set(obj_request);
1498 }
1499
1500 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1501                                 struct ceph_msg *msg)
1502 {
1503         struct rbd_obj_request *obj_request = osd_req->r_priv;
1504         u16 opcode;
1505
1506         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1507         rbd_assert(osd_req == obj_request->osd_req);
1508         if (obj_request_img_data_test(obj_request)) {
1509                 rbd_assert(obj_request->img_request);
1510                 rbd_assert(obj_request->which != BAD_WHICH);
1511         } else {
1512                 rbd_assert(obj_request->which == BAD_WHICH);
1513         }
1514
1515         if (osd_req->r_result < 0)
1516                 obj_request->result = osd_req->r_result;
1517
1518         BUG_ON(osd_req->r_num_ops > 2);
1519
1520         /*
1521          * We support a 64-bit length, but ultimately it has to be
1522          * passed to blk_end_request(), which takes an unsigned int.
1523          */
1524         obj_request->xferred = osd_req->r_reply_op_len[0];
1525         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1526         opcode = osd_req->r_ops[0].op;
1527         switch (opcode) {
1528         case CEPH_OSD_OP_READ:
1529                 rbd_osd_read_callback(obj_request);
1530                 break;
1531         case CEPH_OSD_OP_WRITE:
1532                 rbd_osd_write_callback(obj_request);
1533                 break;
1534         case CEPH_OSD_OP_STAT:
1535                 rbd_osd_stat_callback(obj_request);
1536                 break;
1537         case CEPH_OSD_OP_CALL:
1538         case CEPH_OSD_OP_NOTIFY_ACK:
1539         case CEPH_OSD_OP_WATCH:
1540                 rbd_osd_trivial_callback(obj_request);
1541                 break;
1542         default:
1543                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1544                         obj_request->object_name, (unsigned short) opcode);
1545                 break;
1546         }
1547
1548         if (obj_request_done_test(obj_request))
1549                 rbd_obj_request_complete(obj_request);
1550 }
1551
1552 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1553 {
1554         struct rbd_img_request *img_request = obj_request->img_request;
1555         struct ceph_osd_request *osd_req = obj_request->osd_req;
1556         u64 snap_id;
1557
1558         rbd_assert(osd_req != NULL);
1559
1560         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1561         ceph_osdc_build_request(osd_req, obj_request->offset,
1562                         NULL, snap_id, NULL);
1563 }
1564
1565 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1566 {
1567         struct rbd_img_request *img_request = obj_request->img_request;
1568         struct ceph_osd_request *osd_req = obj_request->osd_req;
1569         struct ceph_snap_context *snapc;
1570         struct timespec mtime = CURRENT_TIME;
1571
1572         rbd_assert(osd_req != NULL);
1573
1574         snapc = img_request ? img_request->snapc : NULL;
1575         ceph_osdc_build_request(osd_req, obj_request->offset,
1576                         snapc, CEPH_NOSNAP, &mtime);
1577 }
1578
1579 static struct ceph_osd_request *rbd_osd_req_create(
1580                                         struct rbd_device *rbd_dev,
1581                                         bool write_request,
1582                                         struct rbd_obj_request *obj_request)
1583 {
1584         struct ceph_snap_context *snapc = NULL;
1585         struct ceph_osd_client *osdc;
1586         struct ceph_osd_request *osd_req;
1587
1588         if (obj_request_img_data_test(obj_request)) {
1589                 struct rbd_img_request *img_request = obj_request->img_request;
1590
1591                 rbd_assert(write_request ==
1592                                 img_request_write_test(img_request));
1593                 if (write_request)
1594                         snapc = img_request->snapc;
1595         }
1596
1597         /* Allocate and initialize the request, for the single op */
1598
1599         osdc = &rbd_dev->rbd_client->client->osdc;
1600         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1601         if (!osd_req)
1602                 return NULL;    /* ENOMEM */
1603
1604         if (write_request)
1605                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1606         else
1607                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1608
1609         osd_req->r_callback = rbd_osd_req_callback;
1610         osd_req->r_priv = obj_request;
1611
1612         osd_req->r_oid_len = strlen(obj_request->object_name);
1613         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1614         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1615
1616         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1617
1618         return osd_req;
1619 }
1620
1621 /*
1622  * Create a copyup osd request based on the information in the
1623  * object request supplied.  A copyup request has two osd ops,
1624  * a copyup method call, and a "normal" write request.
1625  */
1626 static struct ceph_osd_request *
1627 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1628 {
1629         struct rbd_img_request *img_request;
1630         struct ceph_snap_context *snapc;
1631         struct rbd_device *rbd_dev;
1632         struct ceph_osd_client *osdc;
1633         struct ceph_osd_request *osd_req;
1634
1635         rbd_assert(obj_request_img_data_test(obj_request));
1636         img_request = obj_request->img_request;
1637         rbd_assert(img_request);
1638         rbd_assert(img_request_write_test(img_request));
1639
1640         /* Allocate and initialize the request, for the two ops */
1641
1642         snapc = img_request->snapc;
1643         rbd_dev = img_request->rbd_dev;
1644         osdc = &rbd_dev->rbd_client->client->osdc;
1645         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1646         if (!osd_req)
1647                 return NULL;    /* ENOMEM */
1648
1649         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1650         osd_req->r_callback = rbd_osd_req_callback;
1651         osd_req->r_priv = obj_request;
1652
1653         osd_req->r_oid_len = strlen(obj_request->object_name);
1654         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1655         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1656
1657         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1658
1659         return osd_req;
1660 }
1661
1662
1663 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1664 {
1665         ceph_osdc_put_request(osd_req);
1666 }
1667
1668 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1669
1670 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1671                                                 u64 offset, u64 length,
1672                                                 enum obj_request_type type)
1673 {
1674         struct rbd_obj_request *obj_request;
1675         size_t size;
1676         char *name;
1677
1678         rbd_assert(obj_request_type_valid(type));
1679
1680         size = strlen(object_name) + 1;
1681         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1682         if (!obj_request)
1683                 return NULL;
1684
1685         name = (char *)(obj_request + 1);
1686         obj_request->object_name = memcpy(name, object_name, size);
1687         obj_request->offset = offset;
1688         obj_request->length = length;
1689         obj_request->flags = 0;
1690         obj_request->which = BAD_WHICH;
1691         obj_request->type = type;
1692         INIT_LIST_HEAD(&obj_request->links);
1693         init_completion(&obj_request->completion);
1694         kref_init(&obj_request->kref);
1695
1696         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1697                 offset, length, (int)type, obj_request);
1698
1699         return obj_request;
1700 }
1701
1702 static void rbd_obj_request_destroy(struct kref *kref)
1703 {
1704         struct rbd_obj_request *obj_request;
1705
1706         obj_request = container_of(kref, struct rbd_obj_request, kref);
1707
1708         dout("%s: obj %p\n", __func__, obj_request);
1709
1710         rbd_assert(obj_request->img_request == NULL);
1711         rbd_assert(obj_request->which == BAD_WHICH);
1712
1713         if (obj_request->osd_req)
1714                 rbd_osd_req_destroy(obj_request->osd_req);
1715
1716         rbd_assert(obj_request_type_valid(obj_request->type));
1717         switch (obj_request->type) {
1718         case OBJ_REQUEST_NODATA:
1719                 break;          /* Nothing to do */
1720         case OBJ_REQUEST_BIO:
1721                 if (obj_request->bio_list)
1722                         bio_chain_put(obj_request->bio_list);
1723                 break;
1724         case OBJ_REQUEST_PAGES:
1725                 if (obj_request->pages)
1726                         ceph_release_page_vector(obj_request->pages,
1727                                                 obj_request->page_count);
1728                 break;
1729         }
1730
1731         kfree(obj_request);
1732 }
1733
1734 /*
1735  * Caller is responsible for filling in the list of object requests
1736  * that comprises the image request, and the Linux request pointer
1737  * (if there is one).
1738  */
1739 static struct rbd_img_request *rbd_img_request_create(
1740                                         struct rbd_device *rbd_dev,
1741                                         u64 offset, u64 length,
1742                                         bool write_request,
1743                                         bool child_request)
1744 {
1745         struct rbd_img_request *img_request;
1746
1747         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1748         if (!img_request)
1749                 return NULL;
1750
1751         if (write_request) {
1752                 down_read(&rbd_dev->header_rwsem);
1753                 ceph_get_snap_context(rbd_dev->header.snapc);
1754                 up_read(&rbd_dev->header_rwsem);
1755         }
1756
1757         img_request->rq = NULL;
1758         img_request->rbd_dev = rbd_dev;
1759         img_request->offset = offset;
1760         img_request->length = length;
1761         img_request->flags = 0;
1762         if (write_request) {
1763                 img_request_write_set(img_request);
1764                 img_request->snapc = rbd_dev->header.snapc;
1765         } else {
1766                 img_request->snap_id = rbd_dev->spec->snap_id;
1767         }
1768         if (child_request)
1769                 img_request_child_set(img_request);
1770         if (rbd_dev->parent_spec)
1771                 img_request_layered_set(img_request);
1772         spin_lock_init(&img_request->completion_lock);
1773         img_request->next_completion = 0;
1774         img_request->callback = NULL;
1775         img_request->result = 0;
1776         img_request->obj_request_count = 0;
1777         INIT_LIST_HEAD(&img_request->obj_requests);
1778         kref_init(&img_request->kref);
1779
1780         rbd_img_request_get(img_request);       /* Avoid a warning */
1781         rbd_img_request_put(img_request);       /* TEMPORARY */
1782
1783         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1784                 write_request ? "write" : "read", offset, length,
1785                 img_request);
1786
1787         return img_request;
1788 }
1789
1790 static void rbd_img_request_destroy(struct kref *kref)
1791 {
1792         struct rbd_img_request *img_request;
1793         struct rbd_obj_request *obj_request;
1794         struct rbd_obj_request *next_obj_request;
1795
1796         img_request = container_of(kref, struct rbd_img_request, kref);
1797
1798         dout("%s: img %p\n", __func__, img_request);
1799
1800         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1801                 rbd_img_obj_request_del(img_request, obj_request);
1802         rbd_assert(img_request->obj_request_count == 0);
1803
1804         if (img_request_write_test(img_request))
1805                 ceph_put_snap_context(img_request->snapc);
1806
1807         if (img_request_child_test(img_request))
1808                 rbd_obj_request_put(img_request->obj_request);
1809
1810         kfree(img_request);
1811 }
1812
1813 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1814 {
1815         struct rbd_img_request *img_request;
1816         unsigned int xferred;
1817         int result;
1818         bool more;
1819
1820         rbd_assert(obj_request_img_data_test(obj_request));
1821         img_request = obj_request->img_request;
1822
1823         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1824         xferred = (unsigned int)obj_request->xferred;
1825         result = obj_request->result;
1826         if (result) {
1827                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1828
1829                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1830                         img_request_write_test(img_request) ? "write" : "read",
1831                         obj_request->length, obj_request->img_offset,
1832                         obj_request->offset);
1833                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1834                         result, xferred);
1835                 if (!img_request->result)
1836                         img_request->result = result;
1837         }
1838
1839         /* Image object requests don't own their page array */
1840
1841         if (obj_request->type == OBJ_REQUEST_PAGES) {
1842                 obj_request->pages = NULL;
1843                 obj_request->page_count = 0;
1844         }
1845
1846         if (img_request_child_test(img_request)) {
1847                 rbd_assert(img_request->obj_request != NULL);
1848                 more = obj_request->which < img_request->obj_request_count - 1;
1849         } else {
1850                 rbd_assert(img_request->rq != NULL);
1851                 more = blk_end_request(img_request->rq, result, xferred);
1852         }
1853
1854         return more;
1855 }
1856
1857 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1858 {
1859         struct rbd_img_request *img_request;
1860         u32 which = obj_request->which;
1861         bool more = true;
1862
1863         rbd_assert(obj_request_img_data_test(obj_request));
1864         img_request = obj_request->img_request;
1865
1866         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1867         rbd_assert(img_request != NULL);
1868         rbd_assert(img_request->obj_request_count > 0);
1869         rbd_assert(which != BAD_WHICH);
1870         rbd_assert(which < img_request->obj_request_count);
1871         rbd_assert(which >= img_request->next_completion);
1872
1873         spin_lock_irq(&img_request->completion_lock);
1874         if (which != img_request->next_completion)
1875                 goto out;
1876
1877         for_each_obj_request_from(img_request, obj_request) {
1878                 rbd_assert(more);
1879                 rbd_assert(which < img_request->obj_request_count);
1880
1881                 if (!obj_request_done_test(obj_request))
1882                         break;
1883                 more = rbd_img_obj_end_request(obj_request);
1884                 which++;
1885         }
1886
1887         rbd_assert(more ^ (which == img_request->obj_request_count));
1888         img_request->next_completion = which;
1889 out:
1890         spin_unlock_irq(&img_request->completion_lock);
1891
1892         if (!more)
1893                 rbd_img_request_complete(img_request);
1894 }
1895
1896 /*
1897  * Split up an image request into one or more object requests, each
1898  * to a different object.  The "type" parameter indicates whether
1899  * "data_desc" is the pointer to the head of a list of bio
1900  * structures, or the base of a page array.  In either case this
1901  * function assumes data_desc describes memory sufficient to hold
1902  * all data described by the image request.
1903  */
1904 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1905                                         enum obj_request_type type,
1906                                         void *data_desc)
1907 {
1908         struct rbd_device *rbd_dev = img_request->rbd_dev;
1909         struct rbd_obj_request *obj_request = NULL;
1910         struct rbd_obj_request *next_obj_request;
1911         bool write_request = img_request_write_test(img_request);
1912         struct bio *bio_list;
1913         unsigned int bio_offset = 0;
1914         struct page **pages;
1915         u64 img_offset;
1916         u64 resid;
1917         u16 opcode;
1918
1919         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1920                 (int)type, data_desc);
1921
1922         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1923         img_offset = img_request->offset;
1924         resid = img_request->length;
1925         rbd_assert(resid > 0);
1926
1927         if (type == OBJ_REQUEST_BIO) {
1928                 bio_list = data_desc;
1929                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1930         } else {
1931                 rbd_assert(type == OBJ_REQUEST_PAGES);
1932                 pages = data_desc;
1933         }
1934
1935         while (resid) {
1936                 struct ceph_osd_request *osd_req;
1937                 const char *object_name;
1938                 u64 offset;
1939                 u64 length;
1940
1941                 object_name = rbd_segment_name(rbd_dev, img_offset);
1942                 if (!object_name)
1943                         goto out_unwind;
1944                 offset = rbd_segment_offset(rbd_dev, img_offset);
1945                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1946                 obj_request = rbd_obj_request_create(object_name,
1947                                                 offset, length, type);
1948                 kfree(object_name);     /* object request has its own copy */
1949                 if (!obj_request)
1950                         goto out_unwind;
1951
1952                 if (type == OBJ_REQUEST_BIO) {
1953                         unsigned int clone_size;
1954
1955                         rbd_assert(length <= (u64)UINT_MAX);
1956                         clone_size = (unsigned int)length;
1957                         obj_request->bio_list =
1958                                         bio_chain_clone_range(&bio_list,
1959                                                                 &bio_offset,
1960                                                                 clone_size,
1961                                                                 GFP_ATOMIC);
1962                         if (!obj_request->bio_list)
1963                                 goto out_partial;
1964                 } else {
1965                         unsigned int page_count;
1966
1967                         obj_request->pages = pages;
1968                         page_count = (u32)calc_pages_for(offset, length);
1969                         obj_request->page_count = page_count;
1970                         if ((offset + length) & ~PAGE_MASK)
1971                                 page_count--;   /* more on last page */
1972                         pages += page_count;
1973                 }
1974
1975                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1976                                                 obj_request);
1977                 if (!osd_req)
1978                         goto out_partial;
1979                 obj_request->osd_req = osd_req;
1980                 obj_request->callback = rbd_img_obj_callback;
1981
1982                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1983                                                 0, 0);
1984                 if (type == OBJ_REQUEST_BIO)
1985                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1986                                         obj_request->bio_list, length);
1987                 else
1988                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1989                                         obj_request->pages, length,
1990                                         offset & ~PAGE_MASK, false, false);
1991
1992                 if (write_request)
1993                         rbd_osd_req_format_write(obj_request);
1994                 else
1995                         rbd_osd_req_format_read(obj_request);
1996
1997                 obj_request->img_offset = img_offset;
1998                 rbd_img_obj_request_add(img_request, obj_request);
1999
2000                 img_offset += length;
2001                 resid -= length;
2002         }
2003
2004         return 0;
2005
2006 out_partial:
2007         rbd_obj_request_put(obj_request);
2008 out_unwind:
2009         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2010                 rbd_obj_request_put(obj_request);
2011
2012         return -ENOMEM;
2013 }
2014
2015 static void
2016 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2017 {
2018         struct rbd_img_request *img_request;
2019         struct rbd_device *rbd_dev;
2020         u64 length;
2021         u32 page_count;
2022
2023         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2024         rbd_assert(obj_request_img_data_test(obj_request));
2025         img_request = obj_request->img_request;
2026         rbd_assert(img_request);
2027
2028         rbd_dev = img_request->rbd_dev;
2029         rbd_assert(rbd_dev);
2030         length = (u64)1 << rbd_dev->header.obj_order;
2031         page_count = (u32)calc_pages_for(0, length);
2032
2033         rbd_assert(obj_request->copyup_pages);
2034         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2035         obj_request->copyup_pages = NULL;
2036
2037         /*
2038          * We want the transfer count to reflect the size of the
2039          * original write request.  There is no such thing as a
2040          * successful short write, so if the request was successful
2041          * we can just set it to the originally-requested length.
2042          */
2043         if (!obj_request->result)
2044                 obj_request->xferred = obj_request->length;
2045
2046         /* Finish up with the normal image object callback */
2047
2048         rbd_img_obj_callback(obj_request);
2049 }
2050
2051 static void
2052 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2053 {
2054         struct rbd_obj_request *orig_request;
2055         struct ceph_osd_request *osd_req;
2056         struct ceph_osd_client *osdc;
2057         struct rbd_device *rbd_dev;
2058         struct page **pages;
2059         int result;
2060         u64 obj_size;
2061         u64 xferred;
2062
2063         rbd_assert(img_request_child_test(img_request));
2064
2065         /* First get what we need from the image request */
2066
2067         pages = img_request->copyup_pages;
2068         rbd_assert(pages != NULL);
2069         img_request->copyup_pages = NULL;
2070
2071         orig_request = img_request->obj_request;
2072         rbd_assert(orig_request != NULL);
2073         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2074         result = img_request->result;
2075         obj_size = img_request->length;
2076         xferred = img_request->xferred;
2077
2078         rbd_dev = img_request->rbd_dev;
2079         rbd_assert(rbd_dev);
2080         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2081
2082         rbd_img_request_put(img_request);
2083
2084         if (result)
2085                 goto out_err;
2086
2087         /* Allocate the new copyup osd request for the original request */
2088
2089         result = -ENOMEM;
2090         rbd_assert(!orig_request->osd_req);
2091         osd_req = rbd_osd_req_create_copyup(orig_request);
2092         if (!osd_req)
2093                 goto out_err;
2094         orig_request->osd_req = osd_req;
2095         orig_request->copyup_pages = pages;
2096
2097         /* Initialize the copyup op */
2098
2099         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2100         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2101                                                 false, false);
2102
2103         /* Then the original write request op */
2104
2105         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2106                                         orig_request->offset,
2107                                         orig_request->length, 0, 0);
2108         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2109                                         orig_request->length);
2110
2111         rbd_osd_req_format_write(orig_request);
2112
2113         /* All set, send it off. */
2114
2115         orig_request->callback = rbd_img_obj_copyup_callback;
2116         osdc = &rbd_dev->rbd_client->client->osdc;
2117         result = rbd_obj_request_submit(osdc, orig_request);
2118         if (!result)
2119                 return;
2120 out_err:
2121         /* Record the error code and complete the request */
2122
2123         orig_request->result = result;
2124         orig_request->xferred = 0;
2125         obj_request_done_set(orig_request);
2126         rbd_obj_request_complete(orig_request);
2127 }
2128
2129 /*
2130  * Read from the parent image the range of data that covers the
2131  * entire target of the given object request.  This is used for
2132  * satisfying a layered image write request when the target of an
2133  * object request from the image request does not exist.
2134  *
2135  * A page array big enough to hold the returned data is allocated
2136  * and supplied to rbd_img_request_fill() as the "data descriptor."
2137  * When the read completes, this page array will be transferred to
2138  * the original object request for the copyup operation.
2139  *
2140  * If an error occurs, record it as the result of the original
2141  * object request and mark it done so it gets completed.
2142  */
2143 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2144 {
2145         struct rbd_img_request *img_request = NULL;
2146         struct rbd_img_request *parent_request = NULL;
2147         struct rbd_device *rbd_dev;
2148         u64 img_offset;
2149         u64 length;
2150         struct page **pages = NULL;
2151         u32 page_count;
2152         int result;
2153
2154         rbd_assert(obj_request_img_data_test(obj_request));
2155         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2156
2157         img_request = obj_request->img_request;
2158         rbd_assert(img_request != NULL);
2159         rbd_dev = img_request->rbd_dev;
2160         rbd_assert(rbd_dev->parent != NULL);
2161
2162         /*
2163          * First things first.  The original osd request is of no
2164          * use to use any more, we'll need a new one that can hold
2165          * the two ops in a copyup request.  We'll get that later,
2166          * but for now we can release the old one.
2167          */
2168         rbd_osd_req_destroy(obj_request->osd_req);
2169         obj_request->osd_req = NULL;
2170
2171         /*
2172          * Determine the byte range covered by the object in the
2173          * child image to which the original request was to be sent.
2174          */
2175         img_offset = obj_request->img_offset - obj_request->offset;
2176         length = (u64)1 << rbd_dev->header.obj_order;
2177
2178         /*
2179          * There is no defined parent data beyond the parent
2180          * overlap, so limit what we read at that boundary if
2181          * necessary.
2182          */
2183         if (img_offset + length > rbd_dev->parent_overlap) {
2184                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2185                 length = rbd_dev->parent_overlap - img_offset;
2186         }
2187
2188         /*
2189          * Allocate a page array big enough to receive the data read
2190          * from the parent.
2191          */
2192         page_count = (u32)calc_pages_for(0, length);
2193         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2194         if (IS_ERR(pages)) {
2195                 result = PTR_ERR(pages);
2196                 pages = NULL;
2197                 goto out_err;
2198         }
2199
2200         result = -ENOMEM;
2201         parent_request = rbd_img_request_create(rbd_dev->parent,
2202                                                 img_offset, length,
2203                                                 false, true);
2204         if (!parent_request)
2205                 goto out_err;
2206         rbd_obj_request_get(obj_request);
2207         parent_request->obj_request = obj_request;
2208
2209         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2210         if (result)
2211                 goto out_err;
2212         parent_request->copyup_pages = pages;
2213
2214         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2215         result = rbd_img_request_submit(parent_request);
2216         if (!result)
2217                 return 0;
2218
2219         parent_request->copyup_pages = NULL;
2220         parent_request->obj_request = NULL;
2221         rbd_obj_request_put(obj_request);
2222 out_err:
2223         if (pages)
2224                 ceph_release_page_vector(pages, page_count);
2225         if (parent_request)
2226                 rbd_img_request_put(parent_request);
2227         obj_request->result = result;
2228         obj_request->xferred = 0;
2229         obj_request_done_set(obj_request);
2230
2231         return result;
2232 }
2233
2234 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2235 {
2236         struct rbd_obj_request *orig_request;
2237         int result;
2238
2239         rbd_assert(!obj_request_img_data_test(obj_request));
2240
2241         /*
2242          * All we need from the object request is the original
2243          * request and the result of the STAT op.  Grab those, then
2244          * we're done with the request.
2245          */
2246         orig_request = obj_request->obj_request;
2247         obj_request->obj_request = NULL;
2248         rbd_assert(orig_request);
2249         rbd_assert(orig_request->img_request);
2250
2251         result = obj_request->result;
2252         obj_request->result = 0;
2253
2254         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2255                 obj_request, orig_request, result,
2256                 obj_request->xferred, obj_request->length);
2257         rbd_obj_request_put(obj_request);
2258
2259         rbd_assert(orig_request);
2260         rbd_assert(orig_request->img_request);
2261
2262         /*
2263          * Our only purpose here is to determine whether the object
2264          * exists, and we don't want to treat the non-existence as
2265          * an error.  If something else comes back, transfer the
2266          * error to the original request and complete it now.
2267          */
2268         if (!result) {
2269                 obj_request_existence_set(orig_request, true);
2270         } else if (result == -ENOENT) {
2271                 obj_request_existence_set(orig_request, false);
2272         } else if (result) {
2273                 orig_request->result = result;
2274                 goto out;
2275         }
2276
2277         /*
2278          * Resubmit the original request now that we have recorded
2279          * whether the target object exists.
2280          */
2281         orig_request->result = rbd_img_obj_request_submit(orig_request);
2282 out:
2283         if (orig_request->result)
2284                 rbd_obj_request_complete(orig_request);
2285         rbd_obj_request_put(orig_request);
2286 }
2287
2288 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2289 {
2290         struct rbd_obj_request *stat_request;
2291         struct rbd_device *rbd_dev;
2292         struct ceph_osd_client *osdc;
2293         struct page **pages = NULL;
2294         u32 page_count;
2295         size_t size;
2296         int ret;
2297
2298         /*
2299          * The response data for a STAT call consists of:
2300          *     le64 length;
2301          *     struct {
2302          *         le32 tv_sec;
2303          *         le32 tv_nsec;
2304          *     } mtime;
2305          */
2306         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2307         page_count = (u32)calc_pages_for(0, size);
2308         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2309         if (IS_ERR(pages))
2310                 return PTR_ERR(pages);
2311
2312         ret = -ENOMEM;
2313         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2314                                                         OBJ_REQUEST_PAGES);
2315         if (!stat_request)
2316                 goto out;
2317
2318         rbd_obj_request_get(obj_request);
2319         stat_request->obj_request = obj_request;
2320         stat_request->pages = pages;
2321         stat_request->page_count = page_count;
2322
2323         rbd_assert(obj_request->img_request);
2324         rbd_dev = obj_request->img_request->rbd_dev;
2325         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2326                                                 stat_request);
2327         if (!stat_request->osd_req)
2328                 goto out;
2329         stat_request->callback = rbd_img_obj_exists_callback;
2330
2331         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2332         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2333                                         false, false);
2334         rbd_osd_req_format_read(stat_request);
2335
2336         osdc = &rbd_dev->rbd_client->client->osdc;
2337         ret = rbd_obj_request_submit(osdc, stat_request);
2338 out:
2339         if (ret)
2340                 rbd_obj_request_put(obj_request);
2341
2342         return ret;
2343 }
2344
2345 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2346 {
2347         struct rbd_img_request *img_request;
2348         struct rbd_device *rbd_dev;
2349         bool known;
2350
2351         rbd_assert(obj_request_img_data_test(obj_request));
2352
2353         img_request = obj_request->img_request;
2354         rbd_assert(img_request);
2355         rbd_dev = img_request->rbd_dev;
2356
2357         /*
2358          * Only writes to layered images need special handling.
2359          * Reads and non-layered writes are simple object requests.
2360          * Layered writes that start beyond the end of the overlap
2361          * with the parent have no parent data, so they too are
2362          * simple object requests.  Finally, if the target object is
2363          * known to already exist, its parent data has already been
2364          * copied, so a write to the object can also be handled as a
2365          * simple object request.
2366          */
2367         if (!img_request_write_test(img_request) ||
2368                 !img_request_layered_test(img_request) ||
2369                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2370                 ((known = obj_request_known_test(obj_request)) &&
2371                         obj_request_exists_test(obj_request))) {
2372
2373                 struct rbd_device *rbd_dev;
2374                 struct ceph_osd_client *osdc;
2375
2376                 rbd_dev = obj_request->img_request->rbd_dev;
2377                 osdc = &rbd_dev->rbd_client->client->osdc;
2378
2379                 return rbd_obj_request_submit(osdc, obj_request);
2380         }
2381
2382         /*
2383          * It's a layered write.  The target object might exist but
2384          * we may not know that yet.  If we know it doesn't exist,
2385          * start by reading the data for the full target object from
2386          * the parent so we can use it for a copyup to the target.
2387          */
2388         if (known)
2389                 return rbd_img_obj_parent_read_full(obj_request);
2390
2391         /* We don't know whether the target exists.  Go find out. */
2392
2393         return rbd_img_obj_exists_submit(obj_request);
2394 }
2395
2396 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2397 {
2398         struct rbd_obj_request *obj_request;
2399         struct rbd_obj_request *next_obj_request;
2400
2401         dout("%s: img %p\n", __func__, img_request);
2402         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2403                 int ret;
2404
2405                 ret = rbd_img_obj_request_submit(obj_request);
2406                 if (ret)
2407                         return ret;
2408         }
2409
2410         return 0;
2411 }
2412
2413 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2414 {
2415         struct rbd_obj_request *obj_request;
2416         struct rbd_device *rbd_dev;
2417         u64 obj_end;
2418
2419         rbd_assert(img_request_child_test(img_request));
2420
2421         obj_request = img_request->obj_request;
2422         rbd_assert(obj_request);
2423         rbd_assert(obj_request->img_request);
2424
2425         obj_request->result = img_request->result;
2426         if (obj_request->result)
2427                 goto out;
2428
2429         /*
2430          * We need to zero anything beyond the parent overlap
2431          * boundary.  Since rbd_img_obj_request_read_callback()
2432          * will zero anything beyond the end of a short read, an
2433          * easy way to do this is to pretend the data from the
2434          * parent came up short--ending at the overlap boundary.
2435          */
2436         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2437         obj_end = obj_request->img_offset + obj_request->length;
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         if (obj_end > rbd_dev->parent_overlap) {
2440                 u64 xferred = 0;
2441
2442                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2443                         xferred = rbd_dev->parent_overlap -
2444                                         obj_request->img_offset;
2445
2446                 obj_request->xferred = min(img_request->xferred, xferred);
2447         } else {
2448                 obj_request->xferred = img_request->xferred;
2449         }
2450 out:
2451         rbd_img_obj_request_read_callback(obj_request);
2452         rbd_obj_request_complete(obj_request);
2453 }
2454
2455 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2456 {
2457         struct rbd_device *rbd_dev;
2458         struct rbd_img_request *img_request;
2459         int result;
2460
2461         rbd_assert(obj_request_img_data_test(obj_request));
2462         rbd_assert(obj_request->img_request != NULL);
2463         rbd_assert(obj_request->result == (s32) -ENOENT);
2464         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2465
2466         rbd_dev = obj_request->img_request->rbd_dev;
2467         rbd_assert(rbd_dev->parent != NULL);
2468         /* rbd_read_finish(obj_request, obj_request->length); */
2469         img_request = rbd_img_request_create(rbd_dev->parent,
2470                                                 obj_request->img_offset,
2471                                                 obj_request->length,
2472                                                 false, true);
2473         result = -ENOMEM;
2474         if (!img_request)
2475                 goto out_err;
2476
2477         rbd_obj_request_get(obj_request);
2478         img_request->obj_request = obj_request;
2479
2480         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2481                                         obj_request->bio_list);
2482         if (result)
2483                 goto out_err;
2484
2485         img_request->callback = rbd_img_parent_read_callback;
2486         result = rbd_img_request_submit(img_request);
2487         if (result)
2488                 goto out_err;
2489
2490         return;
2491 out_err:
2492         if (img_request)
2493                 rbd_img_request_put(img_request);
2494         obj_request->result = result;
2495         obj_request->xferred = 0;
2496         obj_request_done_set(obj_request);
2497 }
2498
2499 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2500 {
2501         struct rbd_obj_request *obj_request;
2502         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2503         int ret;
2504
2505         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2506                                                         OBJ_REQUEST_NODATA);
2507         if (!obj_request)
2508                 return -ENOMEM;
2509
2510         ret = -ENOMEM;
2511         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2512         if (!obj_request->osd_req)
2513                 goto out;
2514         obj_request->callback = rbd_obj_request_put;
2515
2516         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2517                                         notify_id, 0, 0);
2518         rbd_osd_req_format_read(obj_request);
2519
2520         ret = rbd_obj_request_submit(osdc, obj_request);
2521 out:
2522         if (ret)
2523                 rbd_obj_request_put(obj_request);
2524
2525         return ret;
2526 }
2527
2528 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2529 {
2530         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2531
2532         if (!rbd_dev)
2533                 return;
2534
2535         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2536                 rbd_dev->header_name, (unsigned long long)notify_id,
2537                 (unsigned int)opcode);
2538         (void)rbd_dev_refresh(rbd_dev);
2539
2540         rbd_obj_notify_ack(rbd_dev, notify_id);
2541 }
2542
2543 /*
2544  * Request sync osd watch/unwatch.  The value of "start" determines
2545  * whether a watch request is being initiated or torn down.
2546  */
2547 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2548 {
2549         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2550         struct rbd_obj_request *obj_request;
2551         int ret;
2552
2553         rbd_assert(start ^ !!rbd_dev->watch_event);
2554         rbd_assert(start ^ !!rbd_dev->watch_request);
2555
2556         if (start) {
2557                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2558                                                 &rbd_dev->watch_event);
2559                 if (ret < 0)
2560                         return ret;
2561                 rbd_assert(rbd_dev->watch_event != NULL);
2562         }
2563
2564         ret = -ENOMEM;
2565         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2566                                                         OBJ_REQUEST_NODATA);
2567         if (!obj_request)
2568                 goto out_cancel;
2569
2570         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2571         if (!obj_request->osd_req)
2572                 goto out_cancel;
2573
2574         if (start)
2575                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2576         else
2577                 ceph_osdc_unregister_linger_request(osdc,
2578                                         rbd_dev->watch_request->osd_req);
2579
2580         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2581                                 rbd_dev->watch_event->cookie, 0, start);
2582         rbd_osd_req_format_write(obj_request);
2583
2584         ret = rbd_obj_request_submit(osdc, obj_request);
2585         if (ret)
2586                 goto out_cancel;
2587         ret = rbd_obj_request_wait(obj_request);
2588         if (ret)
2589                 goto out_cancel;
2590         ret = obj_request->result;
2591         if (ret)
2592                 goto out_cancel;
2593
2594         /*
2595          * A watch request is set to linger, so the underlying osd
2596          * request won't go away until we unregister it.  We retain
2597          * a pointer to the object request during that time (in
2598          * rbd_dev->watch_request), so we'll keep a reference to
2599          * it.  We'll drop that reference (below) after we've
2600          * unregistered it.
2601          */
2602         if (start) {
2603                 rbd_dev->watch_request = obj_request;
2604
2605                 return 0;
2606         }
2607
2608         /* We have successfully torn down the watch request */
2609
2610         rbd_obj_request_put(rbd_dev->watch_request);
2611         rbd_dev->watch_request = NULL;
2612 out_cancel:
2613         /* Cancel the event if we're tearing down, or on error */
2614         ceph_osdc_cancel_event(rbd_dev->watch_event);
2615         rbd_dev->watch_event = NULL;
2616         if (obj_request)
2617                 rbd_obj_request_put(obj_request);
2618
2619         return ret;
2620 }
2621
2622 /*
2623  * Synchronous osd object method call.  Returns the number of bytes
2624  * returned in the outbound buffer, or a negative error code.
2625  */
2626 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2627                              const char *object_name,
2628                              const char *class_name,
2629                              const char *method_name,
2630                              const void *outbound,
2631                              size_t outbound_size,
2632                              void *inbound,
2633                              size_t inbound_size)
2634 {
2635         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2636         struct rbd_obj_request *obj_request;
2637         struct page **pages;
2638         u32 page_count;
2639         int ret;
2640
2641         /*
2642          * Method calls are ultimately read operations.  The result
2643          * should placed into the inbound buffer provided.  They
2644          * also supply outbound data--parameters for the object
2645          * method.  Currently if this is present it will be a
2646          * snapshot id.
2647          */
2648         page_count = (u32)calc_pages_for(0, inbound_size);
2649         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2650         if (IS_ERR(pages))
2651                 return PTR_ERR(pages);
2652
2653         ret = -ENOMEM;
2654         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2655                                                         OBJ_REQUEST_PAGES);
2656         if (!obj_request)
2657                 goto out;
2658
2659         obj_request->pages = pages;
2660         obj_request->page_count = page_count;
2661
2662         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2663         if (!obj_request->osd_req)
2664                 goto out;
2665
2666         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2667                                         class_name, method_name);
2668         if (outbound_size) {
2669                 struct ceph_pagelist *pagelist;
2670
2671                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2672                 if (!pagelist)
2673                         goto out;
2674
2675                 ceph_pagelist_init(pagelist);
2676                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2677                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2678                                                 pagelist);
2679         }
2680         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2681                                         obj_request->pages, inbound_size,
2682                                         0, false, false);
2683         rbd_osd_req_format_read(obj_request);
2684
2685         ret = rbd_obj_request_submit(osdc, obj_request);
2686         if (ret)
2687                 goto out;
2688         ret = rbd_obj_request_wait(obj_request);
2689         if (ret)
2690                 goto out;
2691
2692         ret = obj_request->result;
2693         if (ret < 0)
2694                 goto out;
2695
2696         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2697         ret = (int)obj_request->xferred;
2698         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2699 out:
2700         if (obj_request)
2701                 rbd_obj_request_put(obj_request);
2702         else
2703                 ceph_release_page_vector(pages, page_count);
2704
2705         return ret;
2706 }
2707
2708 static void rbd_request_fn(struct request_queue *q)
2709                 __releases(q->queue_lock) __acquires(q->queue_lock)
2710 {
2711         struct rbd_device *rbd_dev = q->queuedata;
2712         bool read_only = rbd_dev->mapping.read_only;
2713         struct request *rq;
2714         int result;
2715
2716         while ((rq = blk_fetch_request(q))) {
2717                 bool write_request = rq_data_dir(rq) == WRITE;
2718                 struct rbd_img_request *img_request;
2719                 u64 offset;
2720                 u64 length;
2721
2722                 /* Ignore any non-FS requests that filter through. */
2723
2724                 if (rq->cmd_type != REQ_TYPE_FS) {
2725                         dout("%s: non-fs request type %d\n", __func__,
2726                                 (int) rq->cmd_type);
2727                         __blk_end_request_all(rq, 0);
2728                         continue;
2729                 }
2730
2731                 /* Ignore/skip any zero-length requests */
2732
2733                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2734                 length = (u64) blk_rq_bytes(rq);
2735
2736                 if (!length) {
2737                         dout("%s: zero-length request\n", __func__);
2738                         __blk_end_request_all(rq, 0);
2739                         continue;
2740                 }
2741
2742                 spin_unlock_irq(q->queue_lock);
2743
2744                 /* Disallow writes to a read-only device */
2745
2746                 if (write_request) {
2747                         result = -EROFS;
2748                         if (read_only)
2749                                 goto end_request;
2750                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2751                 }
2752
2753                 /*
2754                  * Quit early if the mapped snapshot no longer
2755                  * exists.  It's still possible the snapshot will
2756                  * have disappeared by the time our request arrives
2757                  * at the osd, but there's no sense in sending it if
2758                  * we already know.
2759                  */
2760                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2761                         dout("request for non-existent snapshot");
2762                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2763                         result = -ENXIO;
2764                         goto end_request;
2765                 }
2766
2767                 result = -EINVAL;
2768                 if (offset && length > U64_MAX - offset + 1) {
2769                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2770                                 offset, length);
2771                         goto end_request;       /* Shouldn't happen */
2772                 }
2773
2774                 result = -ENOMEM;
2775                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2776                                                         write_request, false);
2777                 if (!img_request)
2778                         goto end_request;
2779
2780                 img_request->rq = rq;
2781
2782                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2783                                                 rq->bio);
2784                 if (!result)
2785                         result = rbd_img_request_submit(img_request);
2786                 if (result)
2787                         rbd_img_request_put(img_request);
2788 end_request:
2789                 spin_lock_irq(q->queue_lock);
2790                 if (result < 0) {
2791                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2792                                 write_request ? "write" : "read",
2793                                 length, offset, result);
2794
2795                         __blk_end_request_all(rq, result);
2796                 }
2797         }
2798 }
2799
2800 /*
2801  * a queue callback. Makes sure that we don't create a bio that spans across
2802  * multiple osd objects. One exception would be with a single page bios,
2803  * which we handle later at bio_chain_clone_range()
2804  */
2805 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2806                           struct bio_vec *bvec)
2807 {
2808         struct rbd_device *rbd_dev = q->queuedata;
2809         sector_t sector_offset;
2810         sector_t sectors_per_obj;
2811         sector_t obj_sector_offset;
2812         int ret;
2813
2814         /*
2815          * Find how far into its rbd object the partition-relative
2816          * bio start sector is to offset relative to the enclosing
2817          * device.
2818          */
2819         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2820         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2821         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2822
2823         /*
2824          * Compute the number of bytes from that offset to the end
2825          * of the object.  Account for what's already used by the bio.
2826          */
2827         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2828         if (ret > bmd->bi_size)
2829                 ret -= bmd->bi_size;
2830         else
2831                 ret = 0;
2832
2833         /*
2834          * Don't send back more than was asked for.  And if the bio
2835          * was empty, let the whole thing through because:  "Note
2836          * that a block device *must* allow a single page to be
2837          * added to an empty bio."
2838          */
2839         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2840         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2841                 ret = (int) bvec->bv_len;
2842
2843         return ret;
2844 }
2845
2846 static void rbd_free_disk(struct rbd_device *rbd_dev)
2847 {
2848         struct gendisk *disk = rbd_dev->disk;
2849
2850         if (!disk)
2851                 return;
2852
2853         rbd_dev->disk = NULL;
2854         if (disk->flags & GENHD_FL_UP) {
2855                 del_gendisk(disk);
2856                 if (disk->queue)
2857                         blk_cleanup_queue(disk->queue);
2858         }
2859         put_disk(disk);
2860 }
2861
2862 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2863                                 const char *object_name,
2864                                 u64 offset, u64 length, void *buf)
2865
2866 {
2867         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2868         struct rbd_obj_request *obj_request;
2869         struct page **pages = NULL;
2870         u32 page_count;
2871         size_t size;
2872         int ret;
2873
2874         page_count = (u32) calc_pages_for(offset, length);
2875         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2876         if (IS_ERR(pages))
2877                 ret = PTR_ERR(pages);
2878
2879         ret = -ENOMEM;
2880         obj_request = rbd_obj_request_create(object_name, offset, length,
2881                                                         OBJ_REQUEST_PAGES);
2882         if (!obj_request)
2883                 goto out;
2884
2885         obj_request->pages = pages;
2886         obj_request->page_count = page_count;
2887
2888         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2889         if (!obj_request->osd_req)
2890                 goto out;
2891
2892         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2893                                         offset, length, 0, 0);
2894         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2895                                         obj_request->pages,
2896                                         obj_request->length,
2897                                         obj_request->offset & ~PAGE_MASK,
2898                                         false, false);
2899         rbd_osd_req_format_read(obj_request);
2900
2901         ret = rbd_obj_request_submit(osdc, obj_request);
2902         if (ret)
2903                 goto out;
2904         ret = rbd_obj_request_wait(obj_request);
2905         if (ret)
2906                 goto out;
2907
2908         ret = obj_request->result;
2909         if (ret < 0)
2910                 goto out;
2911
2912         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2913         size = (size_t) obj_request->xferred;
2914         ceph_copy_from_page_vector(pages, buf, 0, size);
2915         rbd_assert(size <= (size_t)INT_MAX);
2916         ret = (int)size;
2917 out:
2918         if (obj_request)
2919                 rbd_obj_request_put(obj_request);
2920         else
2921                 ceph_release_page_vector(pages, page_count);
2922
2923         return ret;
2924 }
2925
2926 /*
2927  * Read the complete header for the given rbd device.
2928  *
2929  * Returns a pointer to a dynamically-allocated buffer containing
2930  * the complete and validated header.  Caller can pass the address
2931  * of a variable that will be filled in with the version of the
2932  * header object at the time it was read.
2933  *
2934  * Returns a pointer-coded errno if a failure occurs.
2935  */
2936 static struct rbd_image_header_ondisk *
2937 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2938 {
2939         struct rbd_image_header_ondisk *ondisk = NULL;
2940         u32 snap_count = 0;
2941         u64 names_size = 0;
2942         u32 want_count;
2943         int ret;
2944
2945         /*
2946          * The complete header will include an array of its 64-bit
2947          * snapshot ids, followed by the names of those snapshots as
2948          * a contiguous block of NUL-terminated strings.  Note that
2949          * the number of snapshots could change by the time we read
2950          * it in, in which case we re-read it.
2951          */
2952         do {
2953                 size_t size;
2954
2955                 kfree(ondisk);
2956
2957                 size = sizeof (*ondisk);
2958                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2959                 size += names_size;
2960                 ondisk = kmalloc(size, GFP_KERNEL);
2961                 if (!ondisk)
2962                         return ERR_PTR(-ENOMEM);
2963
2964                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2965                                        0, size, ondisk);
2966                 if (ret < 0)
2967                         goto out_err;
2968                 if ((size_t)ret < size) {
2969                         ret = -ENXIO;
2970                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2971                                 size, ret);
2972                         goto out_err;
2973                 }
2974                 if (!rbd_dev_ondisk_valid(ondisk)) {
2975                         ret = -ENXIO;
2976                         rbd_warn(rbd_dev, "invalid header");
2977                         goto out_err;
2978                 }
2979
2980                 names_size = le64_to_cpu(ondisk->snap_names_len);
2981                 want_count = snap_count;
2982                 snap_count = le32_to_cpu(ondisk->snap_count);
2983         } while (snap_count != want_count);
2984
2985         return ondisk;
2986
2987 out_err:
2988         kfree(ondisk);
2989
2990         return ERR_PTR(ret);
2991 }
2992
2993 /*
2994  * reload the ondisk the header
2995  */
2996 static int rbd_read_header(struct rbd_device *rbd_dev,
2997                            struct rbd_image_header *header)
2998 {
2999         struct rbd_image_header_ondisk *ondisk;
3000         int ret;
3001
3002         ondisk = rbd_dev_v1_header_read(rbd_dev);
3003         if (IS_ERR(ondisk))
3004                 return PTR_ERR(ondisk);
3005         ret = rbd_header_from_disk(header, ondisk);
3006         kfree(ondisk);
3007
3008         return ret;
3009 }
3010
3011 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3012 {
3013         struct rbd_snap *snap;
3014         struct rbd_snap *next;
3015
3016         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3017                 list_del(&snap->node);
3018                 rbd_snap_destroy(snap);
3019         }
3020 }
3021
3022 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3023 {
3024         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3025                 return;
3026
3027         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3028                 sector_t size;
3029
3030                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3031                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3032                 dout("setting size to %llu sectors", (unsigned long long)size);
3033                 set_capacity(rbd_dev->disk, size);
3034         }
3035 }
3036
3037 /*
3038  * only read the first part of the ondisk header, without the snaps info
3039  */
3040 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3041 {
3042         int ret;
3043         struct rbd_image_header h;
3044
3045         ret = rbd_read_header(rbd_dev, &h);
3046         if (ret < 0)
3047                 return ret;
3048
3049         down_write(&rbd_dev->header_rwsem);
3050
3051         /* Update image size, and check for resize of mapped image */
3052         rbd_dev->header.image_size = h.image_size;
3053         rbd_update_mapping_size(rbd_dev);
3054
3055         /* rbd_dev->header.object_prefix shouldn't change */
3056         kfree(rbd_dev->header.snap_sizes);
3057         kfree(rbd_dev->header.snap_names);
3058         /* osd requests may still refer to snapc */
3059         ceph_put_snap_context(rbd_dev->header.snapc);
3060
3061         rbd_dev->header.image_size = h.image_size;
3062         rbd_dev->header.snapc = h.snapc;
3063         rbd_dev->header.snap_names = h.snap_names;
3064         rbd_dev->header.snap_sizes = h.snap_sizes;
3065         /* Free the extra copy of the object prefix */
3066         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3067                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3068         kfree(h.object_prefix);
3069
3070         ret = rbd_dev_snaps_update(rbd_dev);
3071
3072         up_write(&rbd_dev->header_rwsem);
3073
3074         return ret;
3075 }
3076
3077 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3078 {
3079         u64 image_size;
3080         int ret;
3081
3082         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3083         image_size = rbd_dev->header.image_size;
3084         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3085         if (rbd_dev->image_format == 1)
3086                 ret = rbd_dev_v1_refresh(rbd_dev);
3087         else
3088                 ret = rbd_dev_v2_refresh(rbd_dev);
3089         mutex_unlock(&ctl_mutex);
3090         if (ret)
3091                 rbd_warn(rbd_dev, "got notification but failed to "
3092                            " update snaps: %d\n", ret);
3093         if (image_size != rbd_dev->header.image_size)
3094                 revalidate_disk(rbd_dev->disk);
3095
3096         return ret;
3097 }
3098
3099 static int rbd_init_disk(struct rbd_device *rbd_dev)
3100 {
3101         struct gendisk *disk;
3102         struct request_queue *q;
3103         u64 segment_size;
3104
3105         /* create gendisk info */
3106         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3107         if (!disk)
3108                 return -ENOMEM;
3109
3110         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3111                  rbd_dev->dev_id);
3112         disk->major = rbd_dev->major;
3113         disk->first_minor = 0;
3114         disk->fops = &rbd_bd_ops;
3115         disk->private_data = rbd_dev;
3116
3117         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3118         if (!q)
3119                 goto out_disk;
3120
3121         /* We use the default size, but let's be explicit about it. */
3122         blk_queue_physical_block_size(q, SECTOR_SIZE);
3123
3124         /* set io sizes to object size */
3125         segment_size = rbd_obj_bytes(&rbd_dev->header);
3126         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3127         blk_queue_max_segment_size(q, segment_size);
3128         blk_queue_io_min(q, segment_size);
3129         blk_queue_io_opt(q, segment_size);
3130
3131         blk_queue_merge_bvec(q, rbd_merge_bvec);
3132         disk->queue = q;
3133
3134         q->queuedata = rbd_dev;
3135
3136         rbd_dev->disk = disk;
3137
3138         return 0;
3139 out_disk:
3140         put_disk(disk);
3141
3142         return -ENOMEM;
3143 }
3144
3145 /*
3146   sysfs
3147 */
3148
3149 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3150 {
3151         return container_of(dev, struct rbd_device, dev);
3152 }
3153
3154 static ssize_t rbd_size_show(struct device *dev,
3155                              struct device_attribute *attr, char *buf)
3156 {
3157         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3158
3159         return sprintf(buf, "%llu\n",
3160                 (unsigned long long)rbd_dev->mapping.size);
3161 }
3162
3163 /*
3164  * Note this shows the features for whatever's mapped, which is not
3165  * necessarily the base image.
3166  */
3167 static ssize_t rbd_features_show(struct device *dev,
3168                              struct device_attribute *attr, char *buf)
3169 {
3170         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3171
3172         return sprintf(buf, "0x%016llx\n",
3173                         (unsigned long long)rbd_dev->mapping.features);
3174 }
3175
3176 static ssize_t rbd_major_show(struct device *dev,
3177                               struct device_attribute *attr, char *buf)
3178 {
3179         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3180
3181         if (rbd_dev->major)
3182                 return sprintf(buf, "%d\n", rbd_dev->major);
3183
3184         return sprintf(buf, "(none)\n");
3185
3186 }
3187
3188 static ssize_t rbd_client_id_show(struct device *dev,
3189                                   struct device_attribute *attr, char *buf)
3190 {
3191         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3192
3193         return sprintf(buf, "client%lld\n",
3194                         ceph_client_id(rbd_dev->rbd_client->client));
3195 }
3196
3197 static ssize_t rbd_pool_show(struct device *dev,
3198                              struct device_attribute *attr, char *buf)
3199 {
3200         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3201
3202         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3203 }
3204
3205 static ssize_t rbd_pool_id_show(struct device *dev,
3206                              struct device_attribute *attr, char *buf)
3207 {
3208         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3209
3210         return sprintf(buf, "%llu\n",
3211                         (unsigned long long) rbd_dev->spec->pool_id);
3212 }
3213
3214 static ssize_t rbd_name_show(struct device *dev,
3215                              struct device_attribute *attr, char *buf)
3216 {
3217         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3218
3219         if (rbd_dev->spec->image_name)
3220                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3221
3222         return sprintf(buf, "(unknown)\n");
3223 }
3224
3225 static ssize_t rbd_image_id_show(struct device *dev,
3226                              struct device_attribute *attr, char *buf)
3227 {
3228         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229
3230         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3231 }
3232
3233 /*
3234  * Shows the name of the currently-mapped snapshot (or
3235  * RBD_SNAP_HEAD_NAME for the base image).
3236  */
3237 static ssize_t rbd_snap_show(struct device *dev,
3238                              struct device_attribute *attr,
3239                              char *buf)
3240 {
3241         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3242
3243         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3244 }
3245
3246 /*
3247  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3248  * for the parent image.  If there is no parent, simply shows
3249  * "(no parent image)".
3250  */
3251 static ssize_t rbd_parent_show(struct device *dev,
3252                              struct device_attribute *attr,
3253                              char *buf)
3254 {
3255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3256         struct rbd_spec *spec = rbd_dev->parent_spec;
3257         int count;
3258         char *bufp = buf;
3259
3260         if (!spec)
3261                 return sprintf(buf, "(no parent image)\n");
3262
3263         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3264                         (unsigned long long) spec->pool_id, spec->pool_name);
3265         if (count < 0)
3266                 return count;
3267         bufp += count;
3268
3269         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3270                         spec->image_name ? spec->image_name : "(unknown)");
3271         if (count < 0)
3272                 return count;
3273         bufp += count;
3274
3275         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3276                         (unsigned long long) spec->snap_id, spec->snap_name);
3277         if (count < 0)
3278                 return count;
3279         bufp += count;
3280
3281         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3282         if (count < 0)
3283                 return count;
3284         bufp += count;
3285
3286         return (ssize_t) (bufp - buf);
3287 }
3288
3289 static ssize_t rbd_image_refresh(struct device *dev,
3290                                  struct device_attribute *attr,
3291                                  const char *buf,
3292                                  size_t size)
3293 {
3294         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3295         int ret;
3296
3297         ret = rbd_dev_refresh(rbd_dev);
3298
3299         return ret < 0 ? ret : size;
3300 }
3301
3302 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3303 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3304 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3305 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3306 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3307 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3308 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3309 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3310 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3311 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3312 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3313
3314 static struct attribute *rbd_attrs[] = {
3315         &dev_attr_size.attr,
3316         &dev_attr_features.attr,
3317         &dev_attr_major.attr,
3318         &dev_attr_client_id.attr,
3319         &dev_attr_pool.attr,
3320         &dev_attr_pool_id.attr,
3321         &dev_attr_name.attr,
3322         &dev_attr_image_id.attr,
3323         &dev_attr_current_snap.attr,
3324         &dev_attr_parent.attr,
3325         &dev_attr_refresh.attr,
3326         NULL
3327 };
3328
3329 static struct attribute_group rbd_attr_group = {
3330         .attrs = rbd_attrs,
3331 };
3332
3333 static const struct attribute_group *rbd_attr_groups[] = {
3334         &rbd_attr_group,
3335         NULL
3336 };
3337
3338 static void rbd_sysfs_dev_release(struct device *dev)
3339 {
3340 }
3341
3342 static struct device_type rbd_device_type = {
3343         .name           = "rbd",
3344         .groups         = rbd_attr_groups,
3345         .release        = rbd_sysfs_dev_release,
3346 };
3347
3348 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3349 {
3350         kref_get(&spec->kref);
3351
3352         return spec;
3353 }
3354
3355 static void rbd_spec_free(struct kref *kref);
3356 static void rbd_spec_put(struct rbd_spec *spec)
3357 {
3358         if (spec)
3359                 kref_put(&spec->kref, rbd_spec_free);
3360 }
3361
3362 static struct rbd_spec *rbd_spec_alloc(void)
3363 {
3364         struct rbd_spec *spec;
3365
3366         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3367         if (!spec)
3368                 return NULL;
3369         kref_init(&spec->kref);
3370
3371         return spec;
3372 }
3373
3374 static void rbd_spec_free(struct kref *kref)
3375 {
3376         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3377
3378         kfree(spec->pool_name);
3379         kfree(spec->image_id);
3380         kfree(spec->image_name);
3381         kfree(spec->snap_name);
3382         kfree(spec);
3383 }
3384
3385 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3386                                 struct rbd_spec *spec)
3387 {
3388         struct rbd_device *rbd_dev;
3389
3390         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3391         if (!rbd_dev)
3392                 return NULL;
3393
3394         spin_lock_init(&rbd_dev->lock);
3395         rbd_dev->flags = 0;
3396         INIT_LIST_HEAD(&rbd_dev->node);
3397         INIT_LIST_HEAD(&rbd_dev->snaps);
3398         init_rwsem(&rbd_dev->header_rwsem);
3399
3400         rbd_dev->spec = spec;
3401         rbd_dev->rbd_client = rbdc;
3402
3403         /* Initialize the layout used for all rbd requests */
3404
3405         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3406         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3407         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3408         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3409
3410         return rbd_dev;
3411 }
3412
3413 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3414 {
3415         rbd_put_client(rbd_dev->rbd_client);
3416         rbd_spec_put(rbd_dev->spec);
3417         kfree(rbd_dev);
3418 }
3419
3420 static void rbd_snap_destroy(struct rbd_snap *snap)
3421 {
3422         kfree(snap->name);
3423         kfree(snap);
3424 }
3425
3426 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3427                                                 const char *snap_name,
3428                                                 u64 snap_id, u64 snap_size,
3429                                                 u64 snap_features)
3430 {
3431         struct rbd_snap *snap;
3432
3433         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3434         if (!snap)
3435                 return ERR_PTR(-ENOMEM);
3436
3437         snap->name = snap_name;
3438         snap->id = snap_id;
3439         snap->size = snap_size;
3440         snap->features = snap_features;
3441
3442         return snap;
3443 }
3444
3445 /*
3446  * Returns a dynamically-allocated snapshot name if successful, or a
3447  * pointer-coded error otherwise.
3448  */
3449 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3450                 u64 *snap_size, u64 *snap_features)
3451 {
3452         const char *snap_name;
3453
3454         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
3455         if (!snap_name)
3456                 return ERR_PTR(-ENOMEM);
3457
3458         *snap_size = rbd_dev->header.snap_sizes[which];
3459         *snap_features = 0;     /* No features for v1 */
3460
3461         return snap_name;
3462 }
3463
3464 /*
3465  * Get the size and object order for an image snapshot, or if
3466  * snap_id is CEPH_NOSNAP, gets this information for the base
3467  * image.
3468  */
3469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3470                                 u8 *order, u64 *snap_size)
3471 {
3472         __le64 snapid = cpu_to_le64(snap_id);
3473         int ret;
3474         struct {
3475                 u8 order;
3476                 __le64 size;
3477         } __attribute__ ((packed)) size_buf = { 0 };
3478
3479         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3480                                 "rbd", "get_size",
3481                                 &snapid, sizeof (snapid),
3482                                 &size_buf, sizeof (size_buf));
3483         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3484         if (ret < 0)
3485                 return ret;
3486         if (ret < sizeof (size_buf))
3487                 return -ERANGE;
3488
3489         if (order)
3490                 *order = size_buf.order;
3491         *snap_size = le64_to_cpu(size_buf.size);
3492
3493         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3494                 (unsigned long long)snap_id, (unsigned int)*order,
3495                 (unsigned long long)*snap_size);
3496
3497         return 0;
3498 }
3499
3500 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3501 {
3502         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3503                                         &rbd_dev->header.obj_order,
3504                                         &rbd_dev->header.image_size);
3505 }
3506
3507 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3508 {
3509         void *reply_buf;
3510         int ret;
3511         void *p;
3512
3513         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3514         if (!reply_buf)
3515                 return -ENOMEM;
3516
3517         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3518                                 "rbd", "get_object_prefix", NULL, 0,
3519                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3520         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3521         if (ret < 0)
3522                 goto out;
3523
3524         p = reply_buf;
3525         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3526                                                 p + ret, NULL, GFP_NOIO);
3527         ret = 0;
3528
3529         if (IS_ERR(rbd_dev->header.object_prefix)) {
3530                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3531                 rbd_dev->header.object_prefix = NULL;
3532         } else {
3533                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3534         }
3535 out:
3536         kfree(reply_buf);
3537
3538         return ret;
3539 }
3540
3541 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3542                 u64 *snap_features)
3543 {
3544         __le64 snapid = cpu_to_le64(snap_id);
3545         struct {
3546                 __le64 features;
3547                 __le64 incompat;
3548         } __attribute__ ((packed)) features_buf = { 0 };
3549         u64 incompat;
3550         int ret;
3551
3552         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3553                                 "rbd", "get_features",
3554                                 &snapid, sizeof (snapid),
3555                                 &features_buf, sizeof (features_buf));
3556         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3557         if (ret < 0)
3558                 return ret;
3559         if (ret < sizeof (features_buf))
3560                 return -ERANGE;
3561
3562         incompat = le64_to_cpu(features_buf.incompat);
3563         if (incompat & ~RBD_FEATURES_SUPPORTED)
3564                 return -ENXIO;
3565
3566         *snap_features = le64_to_cpu(features_buf.features);
3567
3568         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3569                 (unsigned long long)snap_id,
3570                 (unsigned long long)*snap_features,
3571                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3572
3573         return 0;
3574 }
3575
3576 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3577 {
3578         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3579                                                 &rbd_dev->header.features);
3580 }
3581
3582 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3583 {
3584         struct rbd_spec *parent_spec;
3585         size_t size;
3586         void *reply_buf = NULL;
3587         __le64 snapid;
3588         void *p;
3589         void *end;
3590         char *image_id;
3591         u64 overlap;
3592         int ret;
3593
3594         parent_spec = rbd_spec_alloc();
3595         if (!parent_spec)
3596                 return -ENOMEM;
3597
3598         size = sizeof (__le64) +                                /* pool_id */
3599                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3600                 sizeof (__le64) +                               /* snap_id */
3601                 sizeof (__le64);                                /* overlap */
3602         reply_buf = kmalloc(size, GFP_KERNEL);
3603         if (!reply_buf) {
3604                 ret = -ENOMEM;
3605                 goto out_err;
3606         }
3607
3608         snapid = cpu_to_le64(CEPH_NOSNAP);
3609         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3610                                 "rbd", "get_parent",
3611                                 &snapid, sizeof (snapid),
3612                                 reply_buf, size);
3613         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3614         if (ret < 0)
3615                 goto out_err;
3616
3617         p = reply_buf;
3618         end = reply_buf + ret;
3619         ret = -ERANGE;
3620         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3621         if (parent_spec->pool_id == CEPH_NOPOOL)
3622                 goto out;       /* No parent?  No problem. */
3623
3624         /* The ceph file layout needs to fit pool id in 32 bits */
3625
3626         ret = -EIO;
3627         if (parent_spec->pool_id > (u64)U32_MAX) {
3628                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3629                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3630                 goto out_err;
3631         }
3632
3633         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3634         if (IS_ERR(image_id)) {
3635                 ret = PTR_ERR(image_id);
3636                 goto out_err;
3637         }
3638         parent_spec->image_id = image_id;
3639         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3640         ceph_decode_64_safe(&p, end, overlap, out_err);
3641
3642         rbd_dev->parent_overlap = overlap;
3643         rbd_dev->parent_spec = parent_spec;
3644         parent_spec = NULL;     /* rbd_dev now owns this */
3645 out:
3646         ret = 0;
3647 out_err:
3648         kfree(reply_buf);
3649         rbd_spec_put(parent_spec);
3650
3651         return ret;
3652 }
3653
3654 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3655 {
3656         struct {
3657                 __le64 stripe_unit;
3658                 __le64 stripe_count;
3659         } __attribute__ ((packed)) striping_info_buf = { 0 };
3660         size_t size = sizeof (striping_info_buf);
3661         void *p;
3662         u64 obj_size;
3663         u64 stripe_unit;
3664         u64 stripe_count;
3665         int ret;
3666
3667         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3668                                 "rbd", "get_stripe_unit_count", NULL, 0,
3669                                 (char *)&striping_info_buf, size);
3670         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3671         if (ret < 0)
3672                 return ret;
3673         if (ret < size)
3674                 return -ERANGE;
3675
3676         /*
3677          * We don't actually support the "fancy striping" feature
3678          * (STRIPINGV2) yet, but if the striping sizes are the
3679          * defaults the behavior is the same as before.  So find
3680          * out, and only fail if the image has non-default values.
3681          */
3682         ret = -EINVAL;
3683         obj_size = (u64)1 << rbd_dev->header.obj_order;
3684         p = &striping_info_buf;
3685         stripe_unit = ceph_decode_64(&p);
3686         if (stripe_unit != obj_size) {
3687                 rbd_warn(rbd_dev, "unsupported stripe unit "
3688                                 "(got %llu want %llu)",
3689                                 stripe_unit, obj_size);
3690                 return -EINVAL;
3691         }
3692         stripe_count = ceph_decode_64(&p);
3693         if (stripe_count != 1) {
3694                 rbd_warn(rbd_dev, "unsupported stripe count "
3695                                 "(got %llu want 1)", stripe_count);
3696                 return -EINVAL;
3697         }
3698         rbd_dev->header.stripe_unit = stripe_unit;
3699         rbd_dev->header.stripe_count = stripe_count;
3700
3701         return 0;
3702 }
3703
3704 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3705 {
3706         size_t image_id_size;
3707         char *image_id;
3708         void *p;
3709         void *end;
3710         size_t size;
3711         void *reply_buf = NULL;
3712         size_t len = 0;
3713         char *image_name = NULL;
3714         int ret;
3715
3716         rbd_assert(!rbd_dev->spec->image_name);
3717
3718         len = strlen(rbd_dev->spec->image_id);
3719         image_id_size = sizeof (__le32) + len;
3720         image_id = kmalloc(image_id_size, GFP_KERNEL);
3721         if (!image_id)
3722                 return NULL;
3723
3724         p = image_id;
3725         end = image_id + image_id_size;
3726         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3727
3728         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3729         reply_buf = kmalloc(size, GFP_KERNEL);
3730         if (!reply_buf)
3731                 goto out;
3732
3733         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3734                                 "rbd", "dir_get_name",
3735                                 image_id, image_id_size,
3736                                 reply_buf, size);
3737         if (ret < 0)
3738                 goto out;
3739         p = reply_buf;
3740         end = reply_buf + ret;
3741
3742         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3743         if (IS_ERR(image_name))
3744                 image_name = NULL;
3745         else
3746                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3747 out:
3748         kfree(reply_buf);
3749         kfree(image_id);
3750
3751         return image_name;
3752 }
3753
3754 /*
3755  * When an rbd image has a parent image, it is identified by the
3756  * pool, image, and snapshot ids (not names).  This function fills
3757  * in the names for those ids.  (It's OK if we can't figure out the
3758  * name for an image id, but the pool and snapshot ids should always
3759  * exist and have names.)  All names in an rbd spec are dynamically
3760  * allocated.
3761  *
3762  * When an image being mapped (not a parent) is probed, we have the
3763  * pool name and pool id, image name and image id, and the snapshot
3764  * name.  The only thing we're missing is the snapshot id.
3765  *
3766  * The set of snapshots for an image is not known until they have
3767  * been read by rbd_dev_snaps_update(), so we can't completely fill
3768  * in this information until after that has been called.
3769  */
3770 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3771 {
3772         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3773         struct rbd_spec *spec = rbd_dev->spec;
3774         const char *pool_name;
3775         const char *image_name;
3776         const char *snap_name;
3777         int ret;
3778
3779         /*
3780          * An image being mapped will have the pool name (etc.), but
3781          * we need to look up the snapshot id.
3782          */
3783         if (spec->pool_name) {
3784                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3785                         struct rbd_snap *snap;
3786
3787                         snap = snap_by_name(rbd_dev, spec->snap_name);
3788                         if (!snap)
3789                                 return -ENOENT;
3790                         spec->snap_id = snap->id;
3791                 } else {
3792                         spec->snap_id = CEPH_NOSNAP;
3793                 }
3794
3795                 return 0;
3796         }
3797
3798         /* Get the pool name; we have to make our own copy of this */
3799
3800         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3801         if (!pool_name) {
3802                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3803                 return -EIO;
3804         }
3805         pool_name = kstrdup(pool_name, GFP_KERNEL);
3806         if (!pool_name)
3807                 return -ENOMEM;
3808
3809         /* Fetch the image name; tolerate failure here */
3810
3811         image_name = rbd_dev_image_name(rbd_dev);
3812         if (!image_name)
3813                 rbd_warn(rbd_dev, "unable to get image name");
3814
3815         /* Look up the snapshot name, and make a copy */
3816
3817         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3818         if (!snap_name) {
3819                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3820                 ret = -EIO;
3821                 goto out_err;
3822         }
3823         snap_name = kstrdup(snap_name, GFP_KERNEL);
3824         if (!snap_name) {
3825                 ret = -ENOMEM;
3826                 goto out_err;
3827         }
3828
3829         spec->pool_name = pool_name;
3830         spec->image_name = image_name;
3831         spec->snap_name = snap_name;
3832
3833         return 0;
3834 out_err:
3835         kfree(image_name);
3836         kfree(pool_name);
3837
3838         return ret;
3839 }
3840
3841 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3842 {
3843         size_t size;
3844         int ret;
3845         void *reply_buf;
3846         void *p;
3847         void *end;
3848         u64 seq;
3849         u32 snap_count;
3850         struct ceph_snap_context *snapc;
3851         u32 i;
3852
3853         /*
3854          * We'll need room for the seq value (maximum snapshot id),
3855          * snapshot count, and array of that many snapshot ids.
3856          * For now we have a fixed upper limit on the number we're
3857          * prepared to receive.
3858          */
3859         size = sizeof (__le64) + sizeof (__le32) +
3860                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3861         reply_buf = kzalloc(size, GFP_KERNEL);
3862         if (!reply_buf)
3863                 return -ENOMEM;
3864
3865         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3866                                 "rbd", "get_snapcontext", NULL, 0,
3867                                 reply_buf, size);
3868         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3869         if (ret < 0)
3870                 goto out;
3871
3872         p = reply_buf;
3873         end = reply_buf + ret;
3874         ret = -ERANGE;
3875         ceph_decode_64_safe(&p, end, seq, out);
3876         ceph_decode_32_safe(&p, end, snap_count, out);
3877
3878         /*
3879          * Make sure the reported number of snapshot ids wouldn't go
3880          * beyond the end of our buffer.  But before checking that,
3881          * make sure the computed size of the snapshot context we
3882          * allocate is representable in a size_t.
3883          */
3884         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3885                                  / sizeof (u64)) {
3886                 ret = -EINVAL;
3887                 goto out;
3888         }
3889         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3890                 goto out;
3891         ret = 0;
3892
3893         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3894         if (!snapc) {
3895                 ret = -ENOMEM;
3896                 goto out;
3897         }
3898         snapc->seq = seq;
3899         for (i = 0; i < snap_count; i++)
3900                 snapc->snaps[i] = ceph_decode_64(&p);
3901
3902         rbd_dev->header.snapc = snapc;
3903
3904         dout("  snap context seq = %llu, snap_count = %u\n",
3905                 (unsigned long long)seq, (unsigned int)snap_count);
3906 out:
3907         kfree(reply_buf);
3908
3909         return ret;
3910 }
3911
3912 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3913 {
3914         size_t size;
3915         void *reply_buf;
3916         __le64 snap_id;
3917         int ret;
3918         void *p;
3919         void *end;
3920         char *snap_name;
3921
3922         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3923         reply_buf = kmalloc(size, GFP_KERNEL);
3924         if (!reply_buf)
3925                 return ERR_PTR(-ENOMEM);
3926
3927         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3928         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3929         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3930                                 "rbd", "get_snapshot_name",
3931                                 &snap_id, sizeof (snap_id),
3932                                 reply_buf, size);
3933         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3934         if (ret < 0) {
3935                 snap_name = ERR_PTR(ret);
3936                 goto out;
3937         }
3938
3939         p = reply_buf;
3940         end = reply_buf + ret;
3941         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3942         if (IS_ERR(snap_name))
3943                 goto out;
3944
3945         dout("  snap_id 0x%016llx snap_name = %s\n",
3946                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3947 out:
3948         kfree(reply_buf);
3949
3950         return snap_name;
3951 }
3952
3953 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3954                 u64 *snap_size, u64 *snap_features)
3955 {
3956         u64 snap_id;
3957         u64 size;
3958         u64 features;
3959         const char *snap_name;
3960         int ret;
3961
3962         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3963         snap_id = rbd_dev->header.snapc->snaps[which];
3964         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3965         if (ret)
3966                 goto out_err;
3967
3968         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3969         if (ret)
3970                 goto out_err;
3971
3972         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3973         if (!IS_ERR(snap_name)) {
3974                 *snap_size = size;
3975                 *snap_features = features;
3976         }
3977
3978         return snap_name;
3979 out_err:
3980         return ERR_PTR(ret);
3981 }
3982
3983 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3984                 u64 *snap_size, u64 *snap_features)
3985 {
3986         if (rbd_dev->image_format == 1)
3987                 return rbd_dev_v1_snap_info(rbd_dev, which,
3988                                         snap_size, snap_features);
3989         if (rbd_dev->image_format == 2)
3990                 return rbd_dev_v2_snap_info(rbd_dev, which,
3991                                         snap_size, snap_features);
3992         return ERR_PTR(-EINVAL);
3993 }
3994
3995 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3996 {
3997         int ret;
3998
3999         down_write(&rbd_dev->header_rwsem);
4000
4001         ret = rbd_dev_v2_image_size(rbd_dev);
4002         if (ret)
4003                 goto out;
4004         rbd_update_mapping_size(rbd_dev);
4005
4006         ret = rbd_dev_v2_snap_context(rbd_dev);
4007         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4008         if (ret)
4009                 goto out;
4010         ret = rbd_dev_snaps_update(rbd_dev);
4011         dout("rbd_dev_snaps_update returned %d\n", ret);
4012         if (ret)
4013                 goto out;
4014 out:
4015         up_write(&rbd_dev->header_rwsem);
4016
4017         return ret;
4018 }
4019
4020 /*
4021  * Scan the rbd device's current snapshot list and compare it to the
4022  * newly-received snapshot context.  Remove any existing snapshots
4023  * not present in the new snapshot context.  Add a new snapshot for
4024  * any snaphots in the snapshot context not in the current list.
4025  * And verify there are no changes to snapshots we already know
4026  * about.
4027  *
4028  * Assumes the snapshots in the snapshot context are sorted by
4029  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4030  * are also maintained in that order.)
4031  *
4032  * Note that any error occurs while updating the snapshot list
4033  * aborts the update, and the entire list is cleared.  The snapshot
4034  * list becomes inconsistent at that point anyway, so it might as
4035  * well be empty.
4036  */
4037 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4038 {
4039         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4040         const u32 snap_count = snapc->num_snaps;
4041         struct list_head *head = &rbd_dev->snaps;
4042         struct list_head *links = head->next;
4043         u32 index = 0;
4044         int ret = 0;
4045
4046         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4047         while (index < snap_count || links != head) {
4048                 u64 snap_id;
4049                 struct rbd_snap *snap;
4050                 const char *snap_name;
4051                 u64 snap_size = 0;
4052                 u64 snap_features = 0;
4053
4054                 snap_id = index < snap_count ? snapc->snaps[index]
4055                                              : CEPH_NOSNAP;
4056                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4057                                      : NULL;
4058                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4059
4060                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4061                         struct list_head *next = links->next;
4062
4063                         /*
4064                          * A previously-existing snapshot is not in
4065                          * the new snap context.
4066                          *
4067                          * If the now-missing snapshot is the one
4068                          * the image represents, clear its existence
4069                          * flag so we can avoid sending any more
4070                          * requests to it.
4071                          */
4072                         if (rbd_dev->spec->snap_id == snap->id)
4073                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4074                         dout("removing %ssnap id %llu\n",
4075                                 rbd_dev->spec->snap_id == snap->id ?
4076                                                         "mapped " : "",
4077                                 (unsigned long long)snap->id);
4078
4079                         list_del(&snap->node);
4080                         rbd_snap_destroy(snap);
4081
4082                         /* Done with this list entry; advance */
4083
4084                         links = next;
4085                         continue;
4086                 }
4087
4088                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4089                                         &snap_size, &snap_features);
4090                 if (IS_ERR(snap_name)) {
4091                         ret = PTR_ERR(snap_name);
4092                         dout("failed to get snap info, error %d\n", ret);
4093                         goto out_err;
4094                 }
4095
4096                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4097                         (unsigned long long)snap_id);
4098                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4099                         struct rbd_snap *new_snap;
4100
4101                         /* We haven't seen this snapshot before */
4102
4103                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4104                                         snap_id, snap_size, snap_features);
4105                         if (IS_ERR(new_snap)) {
4106                                 ret = PTR_ERR(new_snap);
4107                                 dout("  failed to add dev, error %d\n", ret);
4108                                 goto out_err;
4109                         }
4110
4111                         /* New goes before existing, or at end of list */
4112
4113                         dout("  added dev%s\n", snap ? "" : " at end\n");
4114                         if (snap)
4115                                 list_add_tail(&new_snap->node, &snap->node);
4116                         else
4117                                 list_add_tail(&new_snap->node, head);
4118                 } else {
4119                         /* Already have this one */
4120
4121                         dout("  already present\n");
4122
4123                         rbd_assert(snap->size == snap_size);
4124                         rbd_assert(!strcmp(snap->name, snap_name));
4125                         rbd_assert(snap->features == snap_features);
4126
4127                         /* Done with this list entry; advance */
4128
4129                         links = links->next;
4130                 }
4131
4132                 /* Advance to the next entry in the snapshot context */
4133
4134                 index++;
4135         }
4136         dout("%s: done\n", __func__);
4137
4138         return 0;
4139 out_err:
4140         rbd_remove_all_snaps(rbd_dev);
4141
4142         return ret;
4143 }
4144
4145 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4146 {
4147         struct device *dev;
4148         int ret;
4149
4150         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4151
4152         dev = &rbd_dev->dev;
4153         dev->bus = &rbd_bus_type;
4154         dev->type = &rbd_device_type;
4155         dev->parent = &rbd_root_dev;
4156         dev->release = rbd_dev_device_release;
4157         dev_set_name(dev, "%d", rbd_dev->dev_id);
4158         ret = device_register(dev);
4159
4160         mutex_unlock(&ctl_mutex);
4161
4162         return ret;
4163 }
4164
4165 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4166 {
4167         device_unregister(&rbd_dev->dev);
4168 }
4169
4170 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4171
4172 /*
4173  * Get a unique rbd identifier for the given new rbd_dev, and add
4174  * the rbd_dev to the global list.  The minimum rbd id is 1.
4175  */
4176 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4177 {
4178         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4179
4180         spin_lock(&rbd_dev_list_lock);
4181         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4182         spin_unlock(&rbd_dev_list_lock);
4183         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4184                 (unsigned long long) rbd_dev->dev_id);
4185 }
4186
4187 /*
4188  * Remove an rbd_dev from the global list, and record that its
4189  * identifier is no longer in use.
4190  */
4191 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4192 {
4193         struct list_head *tmp;
4194         int rbd_id = rbd_dev->dev_id;
4195         int max_id;
4196
4197         rbd_assert(rbd_id > 0);
4198
4199         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4200                 (unsigned long long) rbd_dev->dev_id);
4201         spin_lock(&rbd_dev_list_lock);
4202         list_del_init(&rbd_dev->node);
4203
4204         /*
4205          * If the id being "put" is not the current maximum, there
4206          * is nothing special we need to do.
4207          */
4208         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4209                 spin_unlock(&rbd_dev_list_lock);
4210                 return;
4211         }
4212
4213         /*
4214          * We need to update the current maximum id.  Search the
4215          * list to find out what it is.  We're more likely to find
4216          * the maximum at the end, so search the list backward.
4217          */
4218         max_id = 0;
4219         list_for_each_prev(tmp, &rbd_dev_list) {
4220                 struct rbd_device *rbd_dev;
4221
4222                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4223                 if (rbd_dev->dev_id > max_id)
4224                         max_id = rbd_dev->dev_id;
4225         }
4226         spin_unlock(&rbd_dev_list_lock);
4227
4228         /*
4229          * The max id could have been updated by rbd_dev_id_get(), in
4230          * which case it now accurately reflects the new maximum.
4231          * Be careful not to overwrite the maximum value in that
4232          * case.
4233          */
4234         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4235         dout("  max dev id has been reset\n");
4236 }
4237
4238 /*
4239  * Skips over white space at *buf, and updates *buf to point to the
4240  * first found non-space character (if any). Returns the length of
4241  * the token (string of non-white space characters) found.  Note
4242  * that *buf must be terminated with '\0'.
4243  */
4244 static inline size_t next_token(const char **buf)
4245 {
4246         /*
4247         * These are the characters that produce nonzero for
4248         * isspace() in the "C" and "POSIX" locales.
4249         */
4250         const char *spaces = " \f\n\r\t\v";
4251
4252         *buf += strspn(*buf, spaces);   /* Find start of token */
4253
4254         return strcspn(*buf, spaces);   /* Return token length */
4255 }
4256
4257 /*
4258  * Finds the next token in *buf, and if the provided token buffer is
4259  * big enough, copies the found token into it.  The result, if
4260  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4261  * must be terminated with '\0' on entry.
4262  *
4263  * Returns the length of the token found (not including the '\0').
4264  * Return value will be 0 if no token is found, and it will be >=
4265  * token_size if the token would not fit.
4266  *
4267  * The *buf pointer will be updated to point beyond the end of the
4268  * found token.  Note that this occurs even if the token buffer is
4269  * too small to hold it.
4270  */
4271 static inline size_t copy_token(const char **buf,
4272                                 char *token,
4273                                 size_t token_size)
4274 {
4275         size_t len;
4276
4277         len = next_token(buf);
4278         if (len < token_size) {
4279                 memcpy(token, *buf, len);
4280                 *(token + len) = '\0';
4281         }
4282         *buf += len;
4283
4284         return len;
4285 }
4286
4287 /*
4288  * Finds the next token in *buf, dynamically allocates a buffer big
4289  * enough to hold a copy of it, and copies the token into the new
4290  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4291  * that a duplicate buffer is created even for a zero-length token.
4292  *
4293  * Returns a pointer to the newly-allocated duplicate, or a null
4294  * pointer if memory for the duplicate was not available.  If
4295  * the lenp argument is a non-null pointer, the length of the token
4296  * (not including the '\0') is returned in *lenp.
4297  *
4298  * If successful, the *buf pointer will be updated to point beyond
4299  * the end of the found token.
4300  *
4301  * Note: uses GFP_KERNEL for allocation.
4302  */
4303 static inline char *dup_token(const char **buf, size_t *lenp)
4304 {
4305         char *dup;
4306         size_t len;
4307
4308         len = next_token(buf);
4309         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4310         if (!dup)
4311                 return NULL;
4312         *(dup + len) = '\0';
4313         *buf += len;
4314
4315         if (lenp)
4316                 *lenp = len;
4317
4318         return dup;
4319 }
4320
4321 /*
4322  * Parse the options provided for an "rbd add" (i.e., rbd image
4323  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4324  * and the data written is passed here via a NUL-terminated buffer.
4325  * Returns 0 if successful or an error code otherwise.
4326  *
4327  * The information extracted from these options is recorded in
4328  * the other parameters which return dynamically-allocated
4329  * structures:
4330  *  ceph_opts
4331  *      The address of a pointer that will refer to a ceph options
4332  *      structure.  Caller must release the returned pointer using
4333  *      ceph_destroy_options() when it is no longer needed.
4334  *  rbd_opts
4335  *      Address of an rbd options pointer.  Fully initialized by
4336  *      this function; caller must release with kfree().
4337  *  spec
4338  *      Address of an rbd image specification pointer.  Fully
4339  *      initialized by this function based on parsed options.
4340  *      Caller must release with rbd_spec_put().
4341  *
4342  * The options passed take this form:
4343  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4344  * where:
4345  *  <mon_addrs>
4346  *      A comma-separated list of one or more monitor addresses.
4347  *      A monitor address is an ip address, optionally followed
4348  *      by a port number (separated by a colon).
4349  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4350  *  <options>
4351  *      A comma-separated list of ceph and/or rbd options.
4352  *  <pool_name>
4353  *      The name of the rados pool containing the rbd image.
4354  *  <image_name>
4355  *      The name of the image in that pool to map.
4356  *  <snap_id>
4357  *      An optional snapshot id.  If provided, the mapping will
4358  *      present data from the image at the time that snapshot was
4359  *      created.  The image head is used if no snapshot id is
4360  *      provided.  Snapshot mappings are always read-only.
4361  */
4362 static int rbd_add_parse_args(const char *buf,
4363                                 struct ceph_options **ceph_opts,
4364                                 struct rbd_options **opts,
4365                                 struct rbd_spec **rbd_spec)
4366 {
4367         size_t len;
4368         char *options;
4369         const char *mon_addrs;
4370         char *snap_name;
4371         size_t mon_addrs_size;
4372         struct rbd_spec *spec = NULL;
4373         struct rbd_options *rbd_opts = NULL;
4374         struct ceph_options *copts;
4375         int ret;
4376
4377         /* The first four tokens are required */
4378
4379         len = next_token(&buf);
4380         if (!len) {
4381                 rbd_warn(NULL, "no monitor address(es) provided");
4382                 return -EINVAL;
4383         }
4384         mon_addrs = buf;
4385         mon_addrs_size = len + 1;
4386         buf += len;
4387
4388         ret = -EINVAL;
4389         options = dup_token(&buf, NULL);
4390         if (!options)
4391                 return -ENOMEM;
4392         if (!*options) {
4393                 rbd_warn(NULL, "no options provided");
4394                 goto out_err;
4395         }
4396
4397         spec = rbd_spec_alloc();
4398         if (!spec)
4399                 goto out_mem;
4400
4401         spec->pool_name = dup_token(&buf, NULL);
4402         if (!spec->pool_name)
4403                 goto out_mem;
4404         if (!*spec->pool_name) {
4405                 rbd_warn(NULL, "no pool name provided");
4406                 goto out_err;
4407         }
4408
4409         spec->image_name = dup_token(&buf, NULL);
4410         if (!spec->image_name)
4411                 goto out_mem;
4412         if (!*spec->image_name) {
4413                 rbd_warn(NULL, "no image name provided");
4414                 goto out_err;
4415         }
4416
4417         /*
4418          * Snapshot name is optional; default is to use "-"
4419          * (indicating the head/no snapshot).
4420          */
4421         len = next_token(&buf);
4422         if (!len) {
4423                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4424                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4425         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4426                 ret = -ENAMETOOLONG;
4427                 goto out_err;
4428         }
4429         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4430         if (!snap_name)
4431                 goto out_mem;
4432         *(snap_name + len) = '\0';
4433         spec->snap_name = snap_name;
4434
4435         /* Initialize all rbd options to the defaults */
4436
4437         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4438         if (!rbd_opts)
4439                 goto out_mem;
4440
4441         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4442
4443         copts = ceph_parse_options(options, mon_addrs,
4444                                         mon_addrs + mon_addrs_size - 1,
4445                                         parse_rbd_opts_token, rbd_opts);
4446         if (IS_ERR(copts)) {
4447                 ret = PTR_ERR(copts);
4448                 goto out_err;
4449         }
4450         kfree(options);
4451
4452         *ceph_opts = copts;
4453         *opts = rbd_opts;
4454         *rbd_spec = spec;
4455
4456         return 0;
4457 out_mem:
4458         ret = -ENOMEM;
4459 out_err:
4460         kfree(rbd_opts);
4461         rbd_spec_put(spec);
4462         kfree(options);
4463
4464         return ret;
4465 }
4466
4467 /*
4468  * An rbd format 2 image has a unique identifier, distinct from the
4469  * name given to it by the user.  Internally, that identifier is
4470  * what's used to specify the names of objects related to the image.
4471  *
4472  * A special "rbd id" object is used to map an rbd image name to its
4473  * id.  If that object doesn't exist, then there is no v2 rbd image
4474  * with the supplied name.
4475  *
4476  * This function will record the given rbd_dev's image_id field if
4477  * it can be determined, and in that case will return 0.  If any
4478  * errors occur a negative errno will be returned and the rbd_dev's
4479  * image_id field will be unchanged (and should be NULL).
4480  */
4481 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4482 {
4483         int ret;
4484         size_t size;
4485         char *object_name;
4486         void *response;
4487         char *image_id;
4488
4489         /*
4490          * When probing a parent image, the image id is already
4491          * known (and the image name likely is not).  There's no
4492          * need to fetch the image id again in this case.  We
4493          * do still need to set the image format though.
4494          */
4495         if (rbd_dev->spec->image_id) {
4496                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4497
4498                 return 0;
4499         }
4500
4501         /*
4502          * First, see if the format 2 image id file exists, and if
4503          * so, get the image's persistent id from it.
4504          */
4505         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4506         object_name = kmalloc(size, GFP_NOIO);
4507         if (!object_name)
4508                 return -ENOMEM;
4509         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4510         dout("rbd id object name is %s\n", object_name);
4511
4512         /* Response will be an encoded string, which includes a length */
4513
4514         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4515         response = kzalloc(size, GFP_NOIO);
4516         if (!response) {
4517                 ret = -ENOMEM;
4518                 goto out;
4519         }
4520
4521         /* If it doesn't exist we'll assume it's a format 1 image */
4522
4523         ret = rbd_obj_method_sync(rbd_dev, object_name,
4524                                 "rbd", "get_id", NULL, 0,
4525                                 response, RBD_IMAGE_ID_LEN_MAX);
4526         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4527         if (ret == -ENOENT) {
4528                 image_id = kstrdup("", GFP_KERNEL);
4529                 ret = image_id ? 0 : -ENOMEM;
4530                 if (!ret)
4531                         rbd_dev->image_format = 1;
4532         } else if (ret > sizeof (__le32)) {
4533                 void *p = response;
4534
4535                 image_id = ceph_extract_encoded_string(&p, p + ret,
4536                                                 NULL, GFP_NOIO);
4537                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4538                 if (!ret)
4539                         rbd_dev->image_format = 2;
4540         } else {
4541                 ret = -EINVAL;
4542         }
4543
4544         if (!ret) {
4545                 rbd_dev->spec->image_id = image_id;
4546                 dout("image_id is %s\n", image_id);
4547         }
4548 out:
4549         kfree(response);
4550         kfree(object_name);
4551
4552         return ret;
4553 }
4554
4555 /* Undo whatever state changes are made by v1 or v2 image probe */
4556
4557 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4558 {
4559         struct rbd_image_header *header;
4560
4561         rbd_dev_remove_parent(rbd_dev);
4562         rbd_spec_put(rbd_dev->parent_spec);
4563         rbd_dev->parent_spec = NULL;
4564         rbd_dev->parent_overlap = 0;
4565
4566         /* Free dynamic fields from the header, then zero it out */
4567
4568         header = &rbd_dev->header;
4569         ceph_put_snap_context(header->snapc);
4570         kfree(header->snap_sizes);
4571         kfree(header->snap_names);
4572         kfree(header->object_prefix);
4573         memset(header, 0, sizeof (*header));
4574 }
4575
4576 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4577 {
4578         int ret;
4579
4580         /* Populate rbd image metadata */
4581
4582         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4583         if (ret < 0)
4584                 goto out_err;
4585
4586         /* Version 1 images have no parent (no layering) */
4587
4588         rbd_dev->parent_spec = NULL;
4589         rbd_dev->parent_overlap = 0;
4590
4591         dout("discovered version 1 image, header name is %s\n",
4592                 rbd_dev->header_name);
4593
4594         return 0;
4595
4596 out_err:
4597         kfree(rbd_dev->header_name);
4598         rbd_dev->header_name = NULL;
4599         kfree(rbd_dev->spec->image_id);
4600         rbd_dev->spec->image_id = NULL;
4601
4602         return ret;
4603 }
4604
4605 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4606 {
4607         int ret;
4608
4609         ret = rbd_dev_v2_image_size(rbd_dev);
4610         if (ret)
4611                 goto out_err;
4612
4613         /* Get the object prefix (a.k.a. block_name) for the image */
4614
4615         ret = rbd_dev_v2_object_prefix(rbd_dev);
4616         if (ret)
4617                 goto out_err;
4618
4619         /* Get the and check features for the image */
4620
4621         ret = rbd_dev_v2_features(rbd_dev);
4622         if (ret)
4623                 goto out_err;
4624
4625         /* If the image supports layering, get the parent info */
4626
4627         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4628                 ret = rbd_dev_v2_parent_info(rbd_dev);
4629                 if (ret)
4630                         goto out_err;
4631
4632                 /*
4633                  * Don't print a warning for parent images.  We can
4634                  * tell this point because we won't know its pool
4635                  * name yet (just its pool id).
4636                  */
4637                 if (rbd_dev->spec->pool_name)
4638                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4639                                         "is EXPERIMENTAL!");
4640         }
4641
4642         /* If the image supports fancy striping, get its parameters */
4643
4644         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4645                 ret = rbd_dev_v2_striping_info(rbd_dev);
4646                 if (ret < 0)
4647                         goto out_err;
4648         }
4649
4650         /* crypto and compression type aren't (yet) supported for v2 images */
4651
4652         rbd_dev->header.crypt_type = 0;
4653         rbd_dev->header.comp_type = 0;
4654
4655         /* Get the snapshot context, plus the header version */
4656
4657         ret = rbd_dev_v2_snap_context(rbd_dev);
4658         if (ret)
4659                 goto out_err;
4660
4661         dout("discovered version 2 image, header name is %s\n",
4662                 rbd_dev->header_name);
4663
4664         return 0;
4665 out_err:
4666         rbd_dev->parent_overlap = 0;
4667         rbd_spec_put(rbd_dev->parent_spec);
4668         rbd_dev->parent_spec = NULL;
4669         kfree(rbd_dev->header_name);
4670         rbd_dev->header_name = NULL;
4671         kfree(rbd_dev->header.object_prefix);
4672         rbd_dev->header.object_prefix = NULL;
4673
4674         return ret;
4675 }
4676
4677 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4678 {
4679         struct rbd_device *parent = NULL;
4680         struct rbd_spec *parent_spec;
4681         struct rbd_client *rbdc;
4682         int ret;
4683
4684         if (!rbd_dev->parent_spec)
4685                 return 0;
4686         /*
4687          * We need to pass a reference to the client and the parent
4688          * spec when creating the parent rbd_dev.  Images related by
4689          * parent/child relationships always share both.
4690          */
4691         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4692         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4693
4694         ret = -ENOMEM;
4695         parent = rbd_dev_create(rbdc, parent_spec);
4696         if (!parent)
4697                 goto out_err;
4698
4699         ret = rbd_dev_image_probe(parent);
4700         if (ret < 0)
4701                 goto out_err;
4702         rbd_dev->parent = parent;
4703
4704         return 0;
4705 out_err:
4706         if (parent) {
4707                 rbd_spec_put(rbd_dev->parent_spec);
4708                 kfree(rbd_dev->header_name);
4709                 rbd_dev_destroy(parent);
4710         } else {
4711                 rbd_put_client(rbdc);
4712                 rbd_spec_put(parent_spec);
4713         }
4714
4715         return ret;
4716 }
4717
4718 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4719 {
4720         int ret;
4721
4722         ret = rbd_dev_mapping_set(rbd_dev);
4723         if (ret)
4724                 return ret;
4725
4726         /* generate unique id: find highest unique id, add one */
4727         rbd_dev_id_get(rbd_dev);
4728
4729         /* Fill in the device name, now that we have its id. */
4730         BUILD_BUG_ON(DEV_NAME_LEN
4731                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4732         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4733
4734         /* Get our block major device number. */
4735
4736         ret = register_blkdev(0, rbd_dev->name);
4737         if (ret < 0)
4738                 goto err_out_id;
4739         rbd_dev->major = ret;
4740
4741         /* Set up the blkdev mapping. */
4742
4743         ret = rbd_init_disk(rbd_dev);
4744         if (ret)
4745                 goto err_out_blkdev;
4746
4747         ret = rbd_bus_add_dev(rbd_dev);
4748         if (ret)
4749                 goto err_out_disk;
4750
4751         /* Everything's ready.  Announce the disk to the world. */
4752
4753         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4754         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4755         add_disk(rbd_dev->disk);
4756
4757         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4758                 (unsigned long long) rbd_dev->mapping.size);
4759
4760         return ret;
4761
4762 err_out_disk:
4763         rbd_free_disk(rbd_dev);
4764 err_out_blkdev:
4765         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4766 err_out_id:
4767         rbd_dev_id_put(rbd_dev);
4768         rbd_dev_mapping_clear(rbd_dev);
4769
4770         return ret;
4771 }
4772
4773 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4774 {
4775         struct rbd_spec *spec = rbd_dev->spec;
4776         size_t size;
4777
4778         /* Record the header object name for this rbd image. */
4779
4780         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4781
4782         if (rbd_dev->image_format == 1)
4783                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4784         else
4785                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4786
4787         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4788         if (!rbd_dev->header_name)
4789                 return -ENOMEM;
4790
4791         if (rbd_dev->image_format == 1)
4792                 sprintf(rbd_dev->header_name, "%s%s",
4793                         spec->image_name, RBD_SUFFIX);
4794         else
4795                 sprintf(rbd_dev->header_name, "%s%s",
4796                         RBD_HEADER_PREFIX, spec->image_id);
4797         return 0;
4798 }
4799
4800 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4801 {
4802         int ret;
4803
4804         rbd_remove_all_snaps(rbd_dev);
4805         rbd_dev_unprobe(rbd_dev);
4806         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4807         if (ret)
4808                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4809         kfree(rbd_dev->header_name);
4810         rbd_dev->header_name = NULL;
4811         rbd_dev->image_format = 0;
4812         kfree(rbd_dev->spec->image_id);
4813         rbd_dev->spec->image_id = NULL;
4814
4815         rbd_dev_destroy(rbd_dev);
4816 }
4817
4818 /*
4819  * Probe for the existence of the header object for the given rbd
4820  * device.  For format 2 images this includes determining the image
4821  * id.
4822  */
4823 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4824 {
4825         int ret;
4826         int tmp;
4827
4828         /*
4829          * Get the id from the image id object.  If it's not a
4830          * format 2 image, we'll get ENOENT back, and we'll assume
4831          * it's a format 1 image.
4832          */
4833         ret = rbd_dev_image_id(rbd_dev);
4834         if (ret)
4835                 return ret;
4836         rbd_assert(rbd_dev->spec->image_id);
4837         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4838
4839         ret = rbd_dev_header_name(rbd_dev);
4840         if (ret)
4841                 goto err_out_format;
4842
4843         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4844         if (ret)
4845                 goto out_header_name;
4846
4847         if (rbd_dev->image_format == 1)
4848                 ret = rbd_dev_v1_probe(rbd_dev);
4849         else
4850                 ret = rbd_dev_v2_probe(rbd_dev);
4851         if (ret)
4852                 goto err_out_watch;
4853
4854         ret = rbd_dev_snaps_update(rbd_dev);
4855         if (ret)
4856                 goto err_out_probe;
4857
4858         ret = rbd_dev_spec_update(rbd_dev);
4859         if (ret)
4860                 goto err_out_snaps;
4861
4862         ret = rbd_dev_probe_parent(rbd_dev);
4863         if (!ret)
4864                 return 0;
4865
4866 err_out_snaps:
4867         rbd_remove_all_snaps(rbd_dev);
4868 err_out_probe:
4869         rbd_dev_unprobe(rbd_dev);
4870 err_out_watch:
4871         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4872         if (tmp)
4873                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4874 out_header_name:
4875         kfree(rbd_dev->header_name);
4876         rbd_dev->header_name = NULL;
4877 err_out_format:
4878         rbd_dev->image_format = 0;
4879         kfree(rbd_dev->spec->image_id);
4880         rbd_dev->spec->image_id = NULL;
4881
4882         dout("probe failed, returning %d\n", ret);
4883
4884         return ret;
4885 }
4886
4887 static ssize_t rbd_add(struct bus_type *bus,
4888                        const char *buf,
4889                        size_t count)
4890 {
4891         struct rbd_device *rbd_dev = NULL;
4892         struct ceph_options *ceph_opts = NULL;
4893         struct rbd_options *rbd_opts = NULL;
4894         struct rbd_spec *spec = NULL;
4895         struct rbd_client *rbdc;
4896         struct ceph_osd_client *osdc;
4897         int rc = -ENOMEM;
4898
4899         if (!try_module_get(THIS_MODULE))
4900                 return -ENODEV;
4901
4902         /* parse add command */
4903         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4904         if (rc < 0)
4905                 goto err_out_module;
4906
4907         rbdc = rbd_get_client(ceph_opts);
4908         if (IS_ERR(rbdc)) {
4909                 rc = PTR_ERR(rbdc);
4910                 goto err_out_args;
4911         }
4912         ceph_opts = NULL;       /* rbd_dev client now owns this */
4913
4914         /* pick the pool */
4915         osdc = &rbdc->client->osdc;
4916         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4917         if (rc < 0)
4918                 goto err_out_client;
4919         spec->pool_id = (u64)rc;
4920
4921         /* The ceph file layout needs to fit pool id in 32 bits */
4922
4923         if (spec->pool_id > (u64)U32_MAX) {
4924                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4925                                 (unsigned long long)spec->pool_id, U32_MAX);
4926                 rc = -EIO;
4927                 goto err_out_client;
4928         }
4929
4930         rbd_dev = rbd_dev_create(rbdc, spec);
4931         if (!rbd_dev)
4932                 goto err_out_client;
4933         rbdc = NULL;            /* rbd_dev now owns this */
4934         spec = NULL;            /* rbd_dev now owns this */
4935
4936         rbd_dev->mapping.read_only = rbd_opts->read_only;
4937         kfree(rbd_opts);
4938         rbd_opts = NULL;        /* done with this */
4939
4940         rc = rbd_dev_image_probe(rbd_dev);
4941         if (rc < 0)
4942                 goto err_out_rbd_dev;
4943
4944         rc = rbd_dev_device_setup(rbd_dev);
4945         if (!rc)
4946                 return count;
4947
4948         rbd_dev_image_release(rbd_dev);
4949 err_out_rbd_dev:
4950         rbd_dev_destroy(rbd_dev);
4951 err_out_client:
4952         rbd_put_client(rbdc);
4953 err_out_args:
4954         if (ceph_opts)
4955                 ceph_destroy_options(ceph_opts);
4956         kfree(rbd_opts);
4957         rbd_spec_put(spec);
4958 err_out_module:
4959         module_put(THIS_MODULE);
4960
4961         dout("Error adding device %s\n", buf);
4962
4963         return (ssize_t)rc;
4964 }
4965
4966 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4967 {
4968         struct list_head *tmp;
4969         struct rbd_device *rbd_dev;
4970
4971         spin_lock(&rbd_dev_list_lock);
4972         list_for_each(tmp, &rbd_dev_list) {
4973                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4974                 if (rbd_dev->dev_id == dev_id) {
4975                         spin_unlock(&rbd_dev_list_lock);
4976                         return rbd_dev;
4977                 }
4978         }
4979         spin_unlock(&rbd_dev_list_lock);
4980         return NULL;
4981 }
4982
4983 static void rbd_dev_device_release(struct device *dev)
4984 {
4985         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4986
4987         rbd_free_disk(rbd_dev);
4988         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4989         rbd_dev_clear_mapping(rbd_dev);
4990         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4991         rbd_dev->major = 0;
4992         rbd_dev_id_put(rbd_dev);
4993         rbd_dev_mapping_clear(rbd_dev);
4994 }
4995
4996 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4997 {
4998         while (rbd_dev->parent) {
4999                 struct rbd_device *first = rbd_dev;
5000                 struct rbd_device *second = first->parent;
5001                 struct rbd_device *third;
5002
5003                 /*
5004                  * Follow to the parent with no grandparent and
5005                  * remove it.
5006                  */
5007                 while (second && (third = second->parent)) {
5008                         first = second;
5009                         second = third;
5010                 }
5011                 rbd_assert(second);
5012                 rbd_dev_image_release(second);
5013                 first->parent = NULL;
5014                 first->parent_overlap = 0;
5015
5016                 rbd_assert(first->parent_spec);
5017                 rbd_spec_put(first->parent_spec);
5018                 first->parent_spec = NULL;
5019         }
5020 }
5021
5022 static ssize_t rbd_remove(struct bus_type *bus,
5023                           const char *buf,
5024                           size_t count)
5025 {
5026         struct rbd_device *rbd_dev = NULL;
5027         int target_id;
5028         unsigned long ul;
5029         int ret;
5030
5031         ret = strict_strtoul(buf, 10, &ul);
5032         if (ret)
5033                 return ret;
5034
5035         /* convert to int; abort if we lost anything in the conversion */
5036         target_id = (int) ul;
5037         if (target_id != ul)
5038                 return -EINVAL;
5039
5040         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5041
5042         rbd_dev = __rbd_get_dev(target_id);
5043         if (!rbd_dev) {
5044                 ret = -ENOENT;
5045                 goto done;
5046         }
5047
5048         spin_lock_irq(&rbd_dev->lock);
5049         if (rbd_dev->open_count)
5050                 ret = -EBUSY;
5051         else
5052                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5053         spin_unlock_irq(&rbd_dev->lock);
5054         if (ret < 0)
5055                 goto done;
5056         ret = count;
5057         rbd_bus_del_dev(rbd_dev);
5058         rbd_dev_image_release(rbd_dev);
5059         module_put(THIS_MODULE);
5060 done:
5061         mutex_unlock(&ctl_mutex);
5062
5063         return ret;
5064 }
5065
5066 /*
5067  * create control files in sysfs
5068  * /sys/bus/rbd/...
5069  */
5070 static int rbd_sysfs_init(void)
5071 {
5072         int ret;
5073
5074         ret = device_register(&rbd_root_dev);
5075         if (ret < 0)
5076                 return ret;
5077
5078         ret = bus_register(&rbd_bus_type);
5079         if (ret < 0)
5080                 device_unregister(&rbd_root_dev);
5081
5082         return ret;
5083 }
5084
5085 static void rbd_sysfs_cleanup(void)
5086 {
5087         bus_unregister(&rbd_bus_type);
5088         device_unregister(&rbd_root_dev);
5089 }
5090
5091 static int __init rbd_init(void)
5092 {
5093         int rc;
5094
5095         if (!libceph_compatible(NULL)) {
5096                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5097
5098                 return -EINVAL;
5099         }
5100         rc = rbd_sysfs_init();
5101         if (rc)
5102                 return rc;
5103         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5104         return 0;
5105 }
5106
5107 static void __exit rbd_exit(void)
5108 {
5109         rbd_sysfs_cleanup();
5110 }
5111
5112 module_init(rbd_init);
5113 module_exit(rbd_exit);
5114
5115 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5116 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5117 MODULE_DESCRIPTION("rados block device");
5118
5119 /* following authorship retained from original osdblk.c */
5120 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5121
5122 MODULE_LICENSE("GPL");