]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
65d021be6c9e5846eb418a91861b0116cffe3ac9
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
675
676 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
677 {
678         struct ceph_snap_context *snapc;
679         size_t size;
680
681         size = sizeof (struct ceph_snap_context);
682         size += snap_count * sizeof (snapc->snaps[0]);
683         snapc = kzalloc(size, GFP_KERNEL);
684         if (!snapc)
685                 return NULL;
686
687         atomic_set(&snapc->nref, 1);
688         snapc->num_snaps = snap_count;
689
690         return snapc;
691 }
692
693 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
694 {
695         (void)ceph_get_snap_context(snapc);
696 }
697
698 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
699 {
700         ceph_put_snap_context(snapc);
701 }
702
703 /*
704  * Drop reference to ceph client node. If it's not referenced anymore, release
705  * it.
706  */
707 static void rbd_put_client(struct rbd_client *rbdc)
708 {
709         if (rbdc)
710                 kref_put(&rbdc->kref, rbd_client_release);
711 }
712
713 static bool rbd_image_format_valid(u32 image_format)
714 {
715         return image_format == 1 || image_format == 2;
716 }
717
718 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
719 {
720         size_t size;
721         u32 snap_count;
722
723         /* The header has to start with the magic rbd header text */
724         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
725                 return false;
726
727         /* The bio layer requires at least sector-sized I/O */
728
729         if (ondisk->options.order < SECTOR_SHIFT)
730                 return false;
731
732         /* If we use u64 in a few spots we may be able to loosen this */
733
734         if (ondisk->options.order > 8 * sizeof (int) - 1)
735                 return false;
736
737         /*
738          * The size of a snapshot header has to fit in a size_t, and
739          * that limits the number of snapshots.
740          */
741         snap_count = le32_to_cpu(ondisk->snap_count);
742         size = SIZE_MAX - sizeof (struct ceph_snap_context);
743         if (snap_count > size / sizeof (__le64))
744                 return false;
745
746         /*
747          * Not only that, but the size of the entire the snapshot
748          * header must also be representable in a size_t.
749          */
750         size -= snap_count * sizeof (__le64);
751         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
752                 return false;
753
754         return true;
755 }
756
757 /*
758  * Create a new header structure, translate header format from the on-disk
759  * header.
760  */
761 static int rbd_header_from_disk(struct rbd_image_header *header,
762                                  struct rbd_image_header_ondisk *ondisk)
763 {
764         u32 snap_count;
765         size_t len;
766         size_t size;
767         u32 i;
768
769         memset(header, 0, sizeof (*header));
770
771         snap_count = le32_to_cpu(ondisk->snap_count);
772
773         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
774         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
775         if (!header->object_prefix)
776                 return -ENOMEM;
777         memcpy(header->object_prefix, ondisk->object_prefix, len);
778         header->object_prefix[len] = '\0';
779
780         if (snap_count) {
781                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
782
783                 /* Save a copy of the snapshot names */
784
785                 if (snap_names_len > (u64) SIZE_MAX)
786                         return -EIO;
787                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
788                 if (!header->snap_names)
789                         goto out_err;
790                 /*
791                  * Note that rbd_dev_v1_header_read() guarantees
792                  * the ondisk buffer we're working with has
793                  * snap_names_len bytes beyond the end of the
794                  * snapshot id array, this memcpy() is safe.
795                  */
796                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
797                         snap_names_len);
798
799                 /* Record each snapshot's size */
800
801                 size = snap_count * sizeof (*header->snap_sizes);
802                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
803                 if (!header->snap_sizes)
804                         goto out_err;
805                 for (i = 0; i < snap_count; i++)
806                         header->snap_sizes[i] =
807                                 le64_to_cpu(ondisk->snaps[i].image_size);
808         } else {
809                 header->snap_names = NULL;
810                 header->snap_sizes = NULL;
811         }
812
813         header->features = 0;   /* No features support in v1 images */
814         header->obj_order = ondisk->options.order;
815         header->crypt_type = ondisk->options.crypt_type;
816         header->comp_type = ondisk->options.comp_type;
817
818         /* Allocate and fill in the snapshot context */
819
820         header->image_size = le64_to_cpu(ondisk->image_size);
821
822         header->snapc = rbd_snap_context_create(snap_count);
823         if (!header->snapc)
824                 goto out_err;
825         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
826         for (i = 0; i < snap_count; i++)
827                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
828
829         return 0;
830
831 out_err:
832         kfree(header->snap_sizes);
833         header->snap_sizes = NULL;
834         kfree(header->snap_names);
835         header->snap_names = NULL;
836         kfree(header->object_prefix);
837         header->object_prefix = NULL;
838
839         return -ENOMEM;
840 }
841
842 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
843 {
844         struct rbd_snap *snap;
845
846         if (snap_id == CEPH_NOSNAP)
847                 return RBD_SNAP_HEAD_NAME;
848
849         list_for_each_entry(snap, &rbd_dev->snaps, node)
850                 if (snap_id == snap->id)
851                         return snap->name;
852
853         return NULL;
854 }
855
856 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
857                                         const char *snap_name)
858 {
859         struct rbd_snap *snap;
860
861         list_for_each_entry(snap, &rbd_dev->snaps, node)
862                 if (!strcmp(snap_name, snap->name))
863                         return snap;
864
865         return NULL;
866 }
867
868 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
869 {
870         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
871                     sizeof (RBD_SNAP_HEAD_NAME))) {
872                 rbd_dev->mapping.size = rbd_dev->header.image_size;
873                 rbd_dev->mapping.features = rbd_dev->header.features;
874         } else {
875                 struct rbd_snap *snap;
876
877                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
878                 if (!snap)
879                         return -ENOENT;
880                 rbd_dev->mapping.size = snap->size;
881                 rbd_dev->mapping.features = snap->features;
882                 rbd_dev->mapping.read_only = true;
883         }
884         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
885
886         return 0;
887 }
888
889 static void rbd_header_free(struct rbd_image_header *header)
890 {
891         kfree(header->object_prefix);
892         header->object_prefix = NULL;
893         kfree(header->snap_sizes);
894         header->snap_sizes = NULL;
895         kfree(header->snap_names);
896         header->snap_names = NULL;
897         rbd_snap_context_put(header->snapc);
898         header->snapc = NULL;
899 }
900
901 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
902 {
903         char *name;
904         u64 segment;
905         int ret;
906
907         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
908         if (!name)
909                 return NULL;
910         segment = offset >> rbd_dev->header.obj_order;
911         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
912                         rbd_dev->header.object_prefix, segment);
913         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
914                 pr_err("error formatting segment name for #%llu (%d)\n",
915                         segment, ret);
916                 kfree(name);
917                 name = NULL;
918         }
919
920         return name;
921 }
922
923 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
924 {
925         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
926
927         return offset & (segment_size - 1);
928 }
929
930 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
931                                 u64 offset, u64 length)
932 {
933         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
934
935         offset &= segment_size - 1;
936
937         rbd_assert(length <= U64_MAX - offset);
938         if (offset + length > segment_size)
939                 length = segment_size - offset;
940
941         return length;
942 }
943
944 /*
945  * returns the size of an object in the image
946  */
947 static u64 rbd_obj_bytes(struct rbd_image_header *header)
948 {
949         return 1 << header->obj_order;
950 }
951
952 /*
953  * bio helpers
954  */
955
956 static void bio_chain_put(struct bio *chain)
957 {
958         struct bio *tmp;
959
960         while (chain) {
961                 tmp = chain;
962                 chain = chain->bi_next;
963                 bio_put(tmp);
964         }
965 }
966
967 /*
968  * zeros a bio chain, starting at specific offset
969  */
970 static void zero_bio_chain(struct bio *chain, int start_ofs)
971 {
972         struct bio_vec *bv;
973         unsigned long flags;
974         void *buf;
975         int i;
976         int pos = 0;
977
978         while (chain) {
979                 bio_for_each_segment(bv, chain, i) {
980                         if (pos + bv->bv_len > start_ofs) {
981                                 int remainder = max(start_ofs - pos, 0);
982                                 buf = bvec_kmap_irq(bv, &flags);
983                                 memset(buf + remainder, 0,
984                                        bv->bv_len - remainder);
985                                 bvec_kunmap_irq(buf, &flags);
986                         }
987                         pos += bv->bv_len;
988                 }
989
990                 chain = chain->bi_next;
991         }
992 }
993
994 /*
995  * similar to zero_bio_chain(), zeros data defined by a page array,
996  * starting at the given byte offset from the start of the array and
997  * continuing up to the given end offset.  The pages array is
998  * assumed to be big enough to hold all bytes up to the end.
999  */
1000 static void zero_pages(struct page **pages, u64 offset, u64 end)
1001 {
1002         struct page **page = &pages[offset >> PAGE_SHIFT];
1003
1004         rbd_assert(end > offset);
1005         rbd_assert(end - offset <= (u64)SIZE_MAX);
1006         while (offset < end) {
1007                 size_t page_offset;
1008                 size_t length;
1009                 unsigned long flags;
1010                 void *kaddr;
1011
1012                 page_offset = (size_t)(offset & ~PAGE_MASK);
1013                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1014                 local_irq_save(flags);
1015                 kaddr = kmap_atomic(*page);
1016                 memset(kaddr + page_offset, 0, length);
1017                 kunmap_atomic(kaddr);
1018                 local_irq_restore(flags);
1019
1020                 offset += length;
1021                 page++;
1022         }
1023 }
1024
1025 /*
1026  * Clone a portion of a bio, starting at the given byte offset
1027  * and continuing for the number of bytes indicated.
1028  */
1029 static struct bio *bio_clone_range(struct bio *bio_src,
1030                                         unsigned int offset,
1031                                         unsigned int len,
1032                                         gfp_t gfpmask)
1033 {
1034         struct bio_vec *bv;
1035         unsigned int resid;
1036         unsigned short idx;
1037         unsigned int voff;
1038         unsigned short end_idx;
1039         unsigned short vcnt;
1040         struct bio *bio;
1041
1042         /* Handle the easy case for the caller */
1043
1044         if (!offset && len == bio_src->bi_size)
1045                 return bio_clone(bio_src, gfpmask);
1046
1047         if (WARN_ON_ONCE(!len))
1048                 return NULL;
1049         if (WARN_ON_ONCE(len > bio_src->bi_size))
1050                 return NULL;
1051         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1052                 return NULL;
1053
1054         /* Find first affected segment... */
1055
1056         resid = offset;
1057         __bio_for_each_segment(bv, bio_src, idx, 0) {
1058                 if (resid < bv->bv_len)
1059                         break;
1060                 resid -= bv->bv_len;
1061         }
1062         voff = resid;
1063
1064         /* ...and the last affected segment */
1065
1066         resid += len;
1067         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1068                 if (resid <= bv->bv_len)
1069                         break;
1070                 resid -= bv->bv_len;
1071         }
1072         vcnt = end_idx - idx + 1;
1073
1074         /* Build the clone */
1075
1076         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1077         if (!bio)
1078                 return NULL;    /* ENOMEM */
1079
1080         bio->bi_bdev = bio_src->bi_bdev;
1081         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1082         bio->bi_rw = bio_src->bi_rw;
1083         bio->bi_flags |= 1 << BIO_CLONED;
1084
1085         /*
1086          * Copy over our part of the bio_vec, then update the first
1087          * and last (or only) entries.
1088          */
1089         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1090                         vcnt * sizeof (struct bio_vec));
1091         bio->bi_io_vec[0].bv_offset += voff;
1092         if (vcnt > 1) {
1093                 bio->bi_io_vec[0].bv_len -= voff;
1094                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1095         } else {
1096                 bio->bi_io_vec[0].bv_len = len;
1097         }
1098
1099         bio->bi_vcnt = vcnt;
1100         bio->bi_size = len;
1101         bio->bi_idx = 0;
1102
1103         return bio;
1104 }
1105
1106 /*
1107  * Clone a portion of a bio chain, starting at the given byte offset
1108  * into the first bio in the source chain and continuing for the
1109  * number of bytes indicated.  The result is another bio chain of
1110  * exactly the given length, or a null pointer on error.
1111  *
1112  * The bio_src and offset parameters are both in-out.  On entry they
1113  * refer to the first source bio and the offset into that bio where
1114  * the start of data to be cloned is located.
1115  *
1116  * On return, bio_src is updated to refer to the bio in the source
1117  * chain that contains first un-cloned byte, and *offset will
1118  * contain the offset of that byte within that bio.
1119  */
1120 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1121                                         unsigned int *offset,
1122                                         unsigned int len,
1123                                         gfp_t gfpmask)
1124 {
1125         struct bio *bi = *bio_src;
1126         unsigned int off = *offset;
1127         struct bio *chain = NULL;
1128         struct bio **end;
1129
1130         /* Build up a chain of clone bios up to the limit */
1131
1132         if (!bi || off >= bi->bi_size || !len)
1133                 return NULL;            /* Nothing to clone */
1134
1135         end = &chain;
1136         while (len) {
1137                 unsigned int bi_size;
1138                 struct bio *bio;
1139
1140                 if (!bi) {
1141                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1142                         goto out_err;   /* EINVAL; ran out of bio's */
1143                 }
1144                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1145                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1146                 if (!bio)
1147                         goto out_err;   /* ENOMEM */
1148
1149                 *end = bio;
1150                 end = &bio->bi_next;
1151
1152                 off += bi_size;
1153                 if (off == bi->bi_size) {
1154                         bi = bi->bi_next;
1155                         off = 0;
1156                 }
1157                 len -= bi_size;
1158         }
1159         *bio_src = bi;
1160         *offset = off;
1161
1162         return chain;
1163 out_err:
1164         bio_chain_put(chain);
1165
1166         return NULL;
1167 }
1168
1169 /*
1170  * The default/initial value for all object request flags is 0.  For
1171  * each flag, once its value is set to 1 it is never reset to 0
1172  * again.
1173  */
1174 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1175 {
1176         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1177                 struct rbd_device *rbd_dev;
1178
1179                 rbd_dev = obj_request->img_request->rbd_dev;
1180                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1181                         obj_request);
1182         }
1183 }
1184
1185 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1186 {
1187         smp_mb();
1188         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1189 }
1190
1191 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1192 {
1193         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1194                 struct rbd_device *rbd_dev = NULL;
1195
1196                 if (obj_request_img_data_test(obj_request))
1197                         rbd_dev = obj_request->img_request->rbd_dev;
1198                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1199                         obj_request);
1200         }
1201 }
1202
1203 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1204 {
1205         smp_mb();
1206         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1207 }
1208
1209 /*
1210  * This sets the KNOWN flag after (possibly) setting the EXISTS
1211  * flag.  The latter is set based on the "exists" value provided.
1212  *
1213  * Note that for our purposes once an object exists it never goes
1214  * away again.  It's possible that the response from two existence
1215  * checks are separated by the creation of the target object, and
1216  * the first ("doesn't exist") response arrives *after* the second
1217  * ("does exist").  In that case we ignore the second one.
1218  */
1219 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1220                                 bool exists)
1221 {
1222         if (exists)
1223                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1224         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1225         smp_mb();
1226 }
1227
1228 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1229 {
1230         smp_mb();
1231         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1232 }
1233
1234 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1235 {
1236         smp_mb();
1237         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1238 }
1239
1240 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1241 {
1242         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1243                 atomic_read(&obj_request->kref.refcount));
1244         kref_get(&obj_request->kref);
1245 }
1246
1247 static void rbd_obj_request_destroy(struct kref *kref);
1248 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1249 {
1250         rbd_assert(obj_request != NULL);
1251         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1252                 atomic_read(&obj_request->kref.refcount));
1253         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1254 }
1255
1256 static void rbd_img_request_get(struct rbd_img_request *img_request)
1257 {
1258         dout("%s: img %p (was %d)\n", __func__, img_request,
1259                 atomic_read(&img_request->kref.refcount));
1260         kref_get(&img_request->kref);
1261 }
1262
1263 static void rbd_img_request_destroy(struct kref *kref);
1264 static void rbd_img_request_put(struct rbd_img_request *img_request)
1265 {
1266         rbd_assert(img_request != NULL);
1267         dout("%s: img %p (was %d)\n", __func__, img_request,
1268                 atomic_read(&img_request->kref.refcount));
1269         kref_put(&img_request->kref, rbd_img_request_destroy);
1270 }
1271
1272 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1273                                         struct rbd_obj_request *obj_request)
1274 {
1275         rbd_assert(obj_request->img_request == NULL);
1276
1277         /* Image request now owns object's original reference */
1278         obj_request->img_request = img_request;
1279         obj_request->which = img_request->obj_request_count;
1280         rbd_assert(!obj_request_img_data_test(obj_request));
1281         obj_request_img_data_set(obj_request);
1282         rbd_assert(obj_request->which != BAD_WHICH);
1283         img_request->obj_request_count++;
1284         list_add_tail(&obj_request->links, &img_request->obj_requests);
1285         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1286                 obj_request->which);
1287 }
1288
1289 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1290                                         struct rbd_obj_request *obj_request)
1291 {
1292         rbd_assert(obj_request->which != BAD_WHICH);
1293
1294         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1295                 obj_request->which);
1296         list_del(&obj_request->links);
1297         rbd_assert(img_request->obj_request_count > 0);
1298         img_request->obj_request_count--;
1299         rbd_assert(obj_request->which == img_request->obj_request_count);
1300         obj_request->which = BAD_WHICH;
1301         rbd_assert(obj_request_img_data_test(obj_request));
1302         rbd_assert(obj_request->img_request == img_request);
1303         obj_request->img_request = NULL;
1304         obj_request->callback = NULL;
1305         rbd_obj_request_put(obj_request);
1306 }
1307
1308 static bool obj_request_type_valid(enum obj_request_type type)
1309 {
1310         switch (type) {
1311         case OBJ_REQUEST_NODATA:
1312         case OBJ_REQUEST_BIO:
1313         case OBJ_REQUEST_PAGES:
1314                 return true;
1315         default:
1316                 return false;
1317         }
1318 }
1319
1320 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1321                                 struct rbd_obj_request *obj_request)
1322 {
1323         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1324
1325         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1326 }
1327
1328 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1329 {
1330
1331         dout("%s: img %p\n", __func__, img_request);
1332
1333         /*
1334          * If no error occurred, compute the aggregate transfer
1335          * count for the image request.  We could instead use
1336          * atomic64_cmpxchg() to update it as each object request
1337          * completes; not clear which way is better off hand.
1338          */
1339         if (!img_request->result) {
1340                 struct rbd_obj_request *obj_request;
1341                 u64 xferred = 0;
1342
1343                 for_each_obj_request(img_request, obj_request)
1344                         xferred += obj_request->xferred;
1345                 img_request->xferred = xferred;
1346         }
1347
1348         if (img_request->callback)
1349                 img_request->callback(img_request);
1350         else
1351                 rbd_img_request_put(img_request);
1352 }
1353
1354 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1355
1356 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1357 {
1358         dout("%s: obj %p\n", __func__, obj_request);
1359
1360         return wait_for_completion_interruptible(&obj_request->completion);
1361 }
1362
1363 /*
1364  * The default/initial value for all image request flags is 0.  Each
1365  * is conditionally set to 1 at image request initialization time
1366  * and currently never change thereafter.
1367  */
1368 static void img_request_write_set(struct rbd_img_request *img_request)
1369 {
1370         set_bit(IMG_REQ_WRITE, &img_request->flags);
1371         smp_mb();
1372 }
1373
1374 static bool img_request_write_test(struct rbd_img_request *img_request)
1375 {
1376         smp_mb();
1377         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1378 }
1379
1380 static void img_request_child_set(struct rbd_img_request *img_request)
1381 {
1382         set_bit(IMG_REQ_CHILD, &img_request->flags);
1383         smp_mb();
1384 }
1385
1386 static bool img_request_child_test(struct rbd_img_request *img_request)
1387 {
1388         smp_mb();
1389         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1390 }
1391
1392 static void img_request_layered_set(struct rbd_img_request *img_request)
1393 {
1394         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1395         smp_mb();
1396 }
1397
1398 static bool img_request_layered_test(struct rbd_img_request *img_request)
1399 {
1400         smp_mb();
1401         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1402 }
1403
1404 static void
1405 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1406 {
1407         u64 xferred = obj_request->xferred;
1408         u64 length = obj_request->length;
1409
1410         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1411                 obj_request, obj_request->img_request, obj_request->result,
1412                 xferred, length);
1413         /*
1414          * ENOENT means a hole in the image.  We zero-fill the
1415          * entire length of the request.  A short read also implies
1416          * zero-fill to the end of the request.  Either way we
1417          * update the xferred count to indicate the whole request
1418          * was satisfied.
1419          */
1420         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1421         if (obj_request->result == -ENOENT) {
1422                 if (obj_request->type == OBJ_REQUEST_BIO)
1423                         zero_bio_chain(obj_request->bio_list, 0);
1424                 else
1425                         zero_pages(obj_request->pages, 0, length);
1426                 obj_request->result = 0;
1427                 obj_request->xferred = length;
1428         } else if (xferred < length && !obj_request->result) {
1429                 if (obj_request->type == OBJ_REQUEST_BIO)
1430                         zero_bio_chain(obj_request->bio_list, xferred);
1431                 else
1432                         zero_pages(obj_request->pages, xferred, length);
1433                 obj_request->xferred = length;
1434         }
1435         obj_request_done_set(obj_request);
1436 }
1437
1438 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1439 {
1440         dout("%s: obj %p cb %p\n", __func__, obj_request,
1441                 obj_request->callback);
1442         if (obj_request->callback)
1443                 obj_request->callback(obj_request);
1444         else
1445                 complete_all(&obj_request->completion);
1446 }
1447
1448 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1449 {
1450         dout("%s: obj %p\n", __func__, obj_request);
1451         obj_request_done_set(obj_request);
1452 }
1453
1454 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1455 {
1456         struct rbd_img_request *img_request = NULL;
1457         struct rbd_device *rbd_dev = NULL;
1458         bool layered = false;
1459
1460         if (obj_request_img_data_test(obj_request)) {
1461                 img_request = obj_request->img_request;
1462                 layered = img_request && img_request_layered_test(img_request);
1463                 rbd_dev = img_request->rbd_dev;
1464         }
1465
1466         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1467                 obj_request, img_request, obj_request->result,
1468                 obj_request->xferred, obj_request->length);
1469         if (layered && obj_request->result == -ENOENT &&
1470                         obj_request->img_offset < rbd_dev->parent_overlap)
1471                 rbd_img_parent_read(obj_request);
1472         else if (img_request)
1473                 rbd_img_obj_request_read_callback(obj_request);
1474         else
1475                 obj_request_done_set(obj_request);
1476 }
1477
1478 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1479 {
1480         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1481                 obj_request->result, obj_request->length);
1482         /*
1483          * There is no such thing as a successful short write.  Set
1484          * it to our originally-requested length.
1485          */
1486         obj_request->xferred = obj_request->length;
1487         obj_request_done_set(obj_request);
1488 }
1489
1490 /*
1491  * For a simple stat call there's nothing to do.  We'll do more if
1492  * this is part of a write sequence for a layered image.
1493  */
1494 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1495 {
1496         dout("%s: obj %p\n", __func__, obj_request);
1497         obj_request_done_set(obj_request);
1498 }
1499
1500 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1501                                 struct ceph_msg *msg)
1502 {
1503         struct rbd_obj_request *obj_request = osd_req->r_priv;
1504         u16 opcode;
1505
1506         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1507         rbd_assert(osd_req == obj_request->osd_req);
1508         if (obj_request_img_data_test(obj_request)) {
1509                 rbd_assert(obj_request->img_request);
1510                 rbd_assert(obj_request->which != BAD_WHICH);
1511         } else {
1512                 rbd_assert(obj_request->which == BAD_WHICH);
1513         }
1514
1515         if (osd_req->r_result < 0)
1516                 obj_request->result = osd_req->r_result;
1517         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1518
1519         BUG_ON(osd_req->r_num_ops > 2);
1520
1521         /*
1522          * We support a 64-bit length, but ultimately it has to be
1523          * passed to blk_end_request(), which takes an unsigned int.
1524          */
1525         obj_request->xferred = osd_req->r_reply_op_len[0];
1526         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1527         opcode = osd_req->r_ops[0].op;
1528         switch (opcode) {
1529         case CEPH_OSD_OP_READ:
1530                 rbd_osd_read_callback(obj_request);
1531                 break;
1532         case CEPH_OSD_OP_WRITE:
1533                 rbd_osd_write_callback(obj_request);
1534                 break;
1535         case CEPH_OSD_OP_STAT:
1536                 rbd_osd_stat_callback(obj_request);
1537                 break;
1538         case CEPH_OSD_OP_CALL:
1539         case CEPH_OSD_OP_NOTIFY_ACK:
1540         case CEPH_OSD_OP_WATCH:
1541                 rbd_osd_trivial_callback(obj_request);
1542                 break;
1543         default:
1544                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1545                         obj_request->object_name, (unsigned short) opcode);
1546                 break;
1547         }
1548
1549         if (obj_request_done_test(obj_request))
1550                 rbd_obj_request_complete(obj_request);
1551 }
1552
1553 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = obj_request->img_request;
1556         struct ceph_osd_request *osd_req = obj_request->osd_req;
1557         u64 snap_id;
1558
1559         rbd_assert(osd_req != NULL);
1560
1561         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1562         ceph_osdc_build_request(osd_req, obj_request->offset,
1563                         NULL, snap_id, NULL);
1564 }
1565
1566 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1567 {
1568         struct rbd_img_request *img_request = obj_request->img_request;
1569         struct ceph_osd_request *osd_req = obj_request->osd_req;
1570         struct ceph_snap_context *snapc;
1571         struct timespec mtime = CURRENT_TIME;
1572
1573         rbd_assert(osd_req != NULL);
1574
1575         snapc = img_request ? img_request->snapc : NULL;
1576         ceph_osdc_build_request(osd_req, obj_request->offset,
1577                         snapc, CEPH_NOSNAP, &mtime);
1578 }
1579
1580 static struct ceph_osd_request *rbd_osd_req_create(
1581                                         struct rbd_device *rbd_dev,
1582                                         bool write_request,
1583                                         struct rbd_obj_request *obj_request)
1584 {
1585         struct ceph_snap_context *snapc = NULL;
1586         struct ceph_osd_client *osdc;
1587         struct ceph_osd_request *osd_req;
1588
1589         if (obj_request_img_data_test(obj_request)) {
1590                 struct rbd_img_request *img_request = obj_request->img_request;
1591
1592                 rbd_assert(write_request ==
1593                                 img_request_write_test(img_request));
1594                 if (write_request)
1595                         snapc = img_request->snapc;
1596         }
1597
1598         /* Allocate and initialize the request, for the single op */
1599
1600         osdc = &rbd_dev->rbd_client->client->osdc;
1601         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1602         if (!osd_req)
1603                 return NULL;    /* ENOMEM */
1604
1605         if (write_request)
1606                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1607         else
1608                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1609
1610         osd_req->r_callback = rbd_osd_req_callback;
1611         osd_req->r_priv = obj_request;
1612
1613         osd_req->r_oid_len = strlen(obj_request->object_name);
1614         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1615         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1616
1617         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1618
1619         return osd_req;
1620 }
1621
1622 /*
1623  * Create a copyup osd request based on the information in the
1624  * object request supplied.  A copyup request has two osd ops,
1625  * a copyup method call, and a "normal" write request.
1626  */
1627 static struct ceph_osd_request *
1628 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1629 {
1630         struct rbd_img_request *img_request;
1631         struct ceph_snap_context *snapc;
1632         struct rbd_device *rbd_dev;
1633         struct ceph_osd_client *osdc;
1634         struct ceph_osd_request *osd_req;
1635
1636         rbd_assert(obj_request_img_data_test(obj_request));
1637         img_request = obj_request->img_request;
1638         rbd_assert(img_request);
1639         rbd_assert(img_request_write_test(img_request));
1640
1641         /* Allocate and initialize the request, for the two ops */
1642
1643         snapc = img_request->snapc;
1644         rbd_dev = img_request->rbd_dev;
1645         osdc = &rbd_dev->rbd_client->client->osdc;
1646         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1647         if (!osd_req)
1648                 return NULL;    /* ENOMEM */
1649
1650         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1651         osd_req->r_callback = rbd_osd_req_callback;
1652         osd_req->r_priv = obj_request;
1653
1654         osd_req->r_oid_len = strlen(obj_request->object_name);
1655         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1656         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1657
1658         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1659
1660         return osd_req;
1661 }
1662
1663
1664 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1665 {
1666         ceph_osdc_put_request(osd_req);
1667 }
1668
1669 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1670
1671 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1672                                                 u64 offset, u64 length,
1673                                                 enum obj_request_type type)
1674 {
1675         struct rbd_obj_request *obj_request;
1676         size_t size;
1677         char *name;
1678
1679         rbd_assert(obj_request_type_valid(type));
1680
1681         size = strlen(object_name) + 1;
1682         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1683         if (!obj_request)
1684                 return NULL;
1685
1686         name = (char *)(obj_request + 1);
1687         obj_request->object_name = memcpy(name, object_name, size);
1688         obj_request->offset = offset;
1689         obj_request->length = length;
1690         obj_request->flags = 0;
1691         obj_request->which = BAD_WHICH;
1692         obj_request->type = type;
1693         INIT_LIST_HEAD(&obj_request->links);
1694         init_completion(&obj_request->completion);
1695         kref_init(&obj_request->kref);
1696
1697         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1698                 offset, length, (int)type, obj_request);
1699
1700         return obj_request;
1701 }
1702
1703 static void rbd_obj_request_destroy(struct kref *kref)
1704 {
1705         struct rbd_obj_request *obj_request;
1706
1707         obj_request = container_of(kref, struct rbd_obj_request, kref);
1708
1709         dout("%s: obj %p\n", __func__, obj_request);
1710
1711         rbd_assert(obj_request->img_request == NULL);
1712         rbd_assert(obj_request->which == BAD_WHICH);
1713
1714         if (obj_request->osd_req)
1715                 rbd_osd_req_destroy(obj_request->osd_req);
1716
1717         rbd_assert(obj_request_type_valid(obj_request->type));
1718         switch (obj_request->type) {
1719         case OBJ_REQUEST_NODATA:
1720                 break;          /* Nothing to do */
1721         case OBJ_REQUEST_BIO:
1722                 if (obj_request->bio_list)
1723                         bio_chain_put(obj_request->bio_list);
1724                 break;
1725         case OBJ_REQUEST_PAGES:
1726                 if (obj_request->pages)
1727                         ceph_release_page_vector(obj_request->pages,
1728                                                 obj_request->page_count);
1729                 break;
1730         }
1731
1732         kfree(obj_request);
1733 }
1734
1735 /*
1736  * Caller is responsible for filling in the list of object requests
1737  * that comprises the image request, and the Linux request pointer
1738  * (if there is one).
1739  */
1740 static struct rbd_img_request *rbd_img_request_create(
1741                                         struct rbd_device *rbd_dev,
1742                                         u64 offset, u64 length,
1743                                         bool write_request,
1744                                         bool child_request)
1745 {
1746         struct rbd_img_request *img_request;
1747
1748         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1749         if (!img_request)
1750                 return NULL;
1751
1752         if (write_request) {
1753                 down_read(&rbd_dev->header_rwsem);
1754                 rbd_snap_context_get(rbd_dev->header.snapc);
1755                 up_read(&rbd_dev->header_rwsem);
1756         }
1757
1758         img_request->rq = NULL;
1759         img_request->rbd_dev = rbd_dev;
1760         img_request->offset = offset;
1761         img_request->length = length;
1762         img_request->flags = 0;
1763         if (write_request) {
1764                 img_request_write_set(img_request);
1765                 img_request->snapc = rbd_dev->header.snapc;
1766         } else {
1767                 img_request->snap_id = rbd_dev->spec->snap_id;
1768         }
1769         if (child_request)
1770                 img_request_child_set(img_request);
1771         if (rbd_dev->parent_spec)
1772                 img_request_layered_set(img_request);
1773         spin_lock_init(&img_request->completion_lock);
1774         img_request->next_completion = 0;
1775         img_request->callback = NULL;
1776         img_request->result = 0;
1777         img_request->obj_request_count = 0;
1778         INIT_LIST_HEAD(&img_request->obj_requests);
1779         kref_init(&img_request->kref);
1780
1781         rbd_img_request_get(img_request);       /* Avoid a warning */
1782         rbd_img_request_put(img_request);       /* TEMPORARY */
1783
1784         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1785                 write_request ? "write" : "read", offset, length,
1786                 img_request);
1787
1788         return img_request;
1789 }
1790
1791 static void rbd_img_request_destroy(struct kref *kref)
1792 {
1793         struct rbd_img_request *img_request;
1794         struct rbd_obj_request *obj_request;
1795         struct rbd_obj_request *next_obj_request;
1796
1797         img_request = container_of(kref, struct rbd_img_request, kref);
1798
1799         dout("%s: img %p\n", __func__, img_request);
1800
1801         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1802                 rbd_img_obj_request_del(img_request, obj_request);
1803         rbd_assert(img_request->obj_request_count == 0);
1804
1805         if (img_request_write_test(img_request))
1806                 rbd_snap_context_put(img_request->snapc);
1807
1808         if (img_request_child_test(img_request))
1809                 rbd_obj_request_put(img_request->obj_request);
1810
1811         kfree(img_request);
1812 }
1813
1814 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1815 {
1816         struct rbd_img_request *img_request;
1817         unsigned int xferred;
1818         int result;
1819         bool more;
1820
1821         rbd_assert(obj_request_img_data_test(obj_request));
1822         img_request = obj_request->img_request;
1823
1824         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1825         xferred = (unsigned int)obj_request->xferred;
1826         result = obj_request->result;
1827         if (result) {
1828                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1829
1830                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1831                         img_request_write_test(img_request) ? "write" : "read",
1832                         obj_request->length, obj_request->img_offset,
1833                         obj_request->offset);
1834                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1835                         result, xferred);
1836                 if (!img_request->result)
1837                         img_request->result = result;
1838         }
1839
1840         /* Image object requests don't own their page array */
1841
1842         if (obj_request->type == OBJ_REQUEST_PAGES) {
1843                 obj_request->pages = NULL;
1844                 obj_request->page_count = 0;
1845         }
1846
1847         if (img_request_child_test(img_request)) {
1848                 rbd_assert(img_request->obj_request != NULL);
1849                 more = obj_request->which < img_request->obj_request_count - 1;
1850         } else {
1851                 rbd_assert(img_request->rq != NULL);
1852                 more = blk_end_request(img_request->rq, result, xferred);
1853         }
1854
1855         return more;
1856 }
1857
1858 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1859 {
1860         struct rbd_img_request *img_request;
1861         u32 which = obj_request->which;
1862         bool more = true;
1863
1864         rbd_assert(obj_request_img_data_test(obj_request));
1865         img_request = obj_request->img_request;
1866
1867         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1868         rbd_assert(img_request != NULL);
1869         rbd_assert(img_request->obj_request_count > 0);
1870         rbd_assert(which != BAD_WHICH);
1871         rbd_assert(which < img_request->obj_request_count);
1872         rbd_assert(which >= img_request->next_completion);
1873
1874         spin_lock_irq(&img_request->completion_lock);
1875         if (which != img_request->next_completion)
1876                 goto out;
1877
1878         for_each_obj_request_from(img_request, obj_request) {
1879                 rbd_assert(more);
1880                 rbd_assert(which < img_request->obj_request_count);
1881
1882                 if (!obj_request_done_test(obj_request))
1883                         break;
1884                 more = rbd_img_obj_end_request(obj_request);
1885                 which++;
1886         }
1887
1888         rbd_assert(more ^ (which == img_request->obj_request_count));
1889         img_request->next_completion = which;
1890 out:
1891         spin_unlock_irq(&img_request->completion_lock);
1892
1893         if (!more)
1894                 rbd_img_request_complete(img_request);
1895 }
1896
1897 /*
1898  * Split up an image request into one or more object requests, each
1899  * to a different object.  The "type" parameter indicates whether
1900  * "data_desc" is the pointer to the head of a list of bio
1901  * structures, or the base of a page array.  In either case this
1902  * function assumes data_desc describes memory sufficient to hold
1903  * all data described by the image request.
1904  */
1905 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1906                                         enum obj_request_type type,
1907                                         void *data_desc)
1908 {
1909         struct rbd_device *rbd_dev = img_request->rbd_dev;
1910         struct rbd_obj_request *obj_request = NULL;
1911         struct rbd_obj_request *next_obj_request;
1912         bool write_request = img_request_write_test(img_request);
1913         struct bio *bio_list;
1914         unsigned int bio_offset = 0;
1915         struct page **pages;
1916         u64 img_offset;
1917         u64 resid;
1918         u16 opcode;
1919
1920         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1921                 (int)type, data_desc);
1922
1923         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1924         img_offset = img_request->offset;
1925         resid = img_request->length;
1926         rbd_assert(resid > 0);
1927
1928         if (type == OBJ_REQUEST_BIO) {
1929                 bio_list = data_desc;
1930                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1931         } else {
1932                 rbd_assert(type == OBJ_REQUEST_PAGES);
1933                 pages = data_desc;
1934         }
1935
1936         while (resid) {
1937                 struct ceph_osd_request *osd_req;
1938                 const char *object_name;
1939                 u64 offset;
1940                 u64 length;
1941
1942                 object_name = rbd_segment_name(rbd_dev, img_offset);
1943                 if (!object_name)
1944                         goto out_unwind;
1945                 offset = rbd_segment_offset(rbd_dev, img_offset);
1946                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1947                 obj_request = rbd_obj_request_create(object_name,
1948                                                 offset, length, type);
1949                 kfree(object_name);     /* object request has its own copy */
1950                 if (!obj_request)
1951                         goto out_unwind;
1952
1953                 if (type == OBJ_REQUEST_BIO) {
1954                         unsigned int clone_size;
1955
1956                         rbd_assert(length <= (u64)UINT_MAX);
1957                         clone_size = (unsigned int)length;
1958                         obj_request->bio_list =
1959                                         bio_chain_clone_range(&bio_list,
1960                                                                 &bio_offset,
1961                                                                 clone_size,
1962                                                                 GFP_ATOMIC);
1963                         if (!obj_request->bio_list)
1964                                 goto out_partial;
1965                 } else {
1966                         unsigned int page_count;
1967
1968                         obj_request->pages = pages;
1969                         page_count = (u32)calc_pages_for(offset, length);
1970                         obj_request->page_count = page_count;
1971                         if ((offset + length) & ~PAGE_MASK)
1972                                 page_count--;   /* more on last page */
1973                         pages += page_count;
1974                 }
1975
1976                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1977                                                 obj_request);
1978                 if (!osd_req)
1979                         goto out_partial;
1980                 obj_request->osd_req = osd_req;
1981                 obj_request->callback = rbd_img_obj_callback;
1982
1983                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1984                                                 0, 0);
1985                 if (type == OBJ_REQUEST_BIO)
1986                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1987                                         obj_request->bio_list, length);
1988                 else
1989                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1990                                         obj_request->pages, length,
1991                                         offset & ~PAGE_MASK, false, false);
1992
1993                 if (write_request)
1994                         rbd_osd_req_format_write(obj_request);
1995                 else
1996                         rbd_osd_req_format_read(obj_request);
1997
1998                 obj_request->img_offset = img_offset;
1999                 rbd_img_obj_request_add(img_request, obj_request);
2000
2001                 img_offset += length;
2002                 resid -= length;
2003         }
2004
2005         return 0;
2006
2007 out_partial:
2008         rbd_obj_request_put(obj_request);
2009 out_unwind:
2010         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2011                 rbd_obj_request_put(obj_request);
2012
2013         return -ENOMEM;
2014 }
2015
2016 static void
2017 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2018 {
2019         struct rbd_img_request *img_request;
2020         struct rbd_device *rbd_dev;
2021         u64 length;
2022         u32 page_count;
2023
2024         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2025         rbd_assert(obj_request_img_data_test(obj_request));
2026         img_request = obj_request->img_request;
2027         rbd_assert(img_request);
2028
2029         rbd_dev = img_request->rbd_dev;
2030         rbd_assert(rbd_dev);
2031         length = (u64)1 << rbd_dev->header.obj_order;
2032         page_count = (u32)calc_pages_for(0, length);
2033
2034         rbd_assert(obj_request->copyup_pages);
2035         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2036         obj_request->copyup_pages = NULL;
2037
2038         /*
2039          * We want the transfer count to reflect the size of the
2040          * original write request.  There is no such thing as a
2041          * successful short write, so if the request was successful
2042          * we can just set it to the originally-requested length.
2043          */
2044         if (!obj_request->result)
2045                 obj_request->xferred = obj_request->length;
2046
2047         /* Finish up with the normal image object callback */
2048
2049         rbd_img_obj_callback(obj_request);
2050 }
2051
2052 static void
2053 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2054 {
2055         struct rbd_obj_request *orig_request;
2056         struct ceph_osd_request *osd_req;
2057         struct ceph_osd_client *osdc;
2058         struct rbd_device *rbd_dev;
2059         struct page **pages;
2060         int result;
2061         u64 obj_size;
2062         u64 xferred;
2063
2064         rbd_assert(img_request_child_test(img_request));
2065
2066         /* First get what we need from the image request */
2067
2068         pages = img_request->copyup_pages;
2069         rbd_assert(pages != NULL);
2070         img_request->copyup_pages = NULL;
2071
2072         orig_request = img_request->obj_request;
2073         rbd_assert(orig_request != NULL);
2074         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2075         result = img_request->result;
2076         obj_size = img_request->length;
2077         xferred = img_request->xferred;
2078
2079         rbd_dev = img_request->rbd_dev;
2080         rbd_assert(rbd_dev);
2081         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2082
2083         rbd_img_request_put(img_request);
2084
2085         if (result)
2086                 goto out_err;
2087
2088         /* Allocate the new copyup osd request for the original request */
2089
2090         result = -ENOMEM;
2091         rbd_assert(!orig_request->osd_req);
2092         osd_req = rbd_osd_req_create_copyup(orig_request);
2093         if (!osd_req)
2094                 goto out_err;
2095         orig_request->osd_req = osd_req;
2096         orig_request->copyup_pages = pages;
2097
2098         /* Initialize the copyup op */
2099
2100         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2101         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2102                                                 false, false);
2103
2104         /* Then the original write request op */
2105
2106         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2107                                         orig_request->offset,
2108                                         orig_request->length, 0, 0);
2109         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2110                                         orig_request->length);
2111
2112         rbd_osd_req_format_write(orig_request);
2113
2114         /* All set, send it off. */
2115
2116         orig_request->callback = rbd_img_obj_copyup_callback;
2117         osdc = &rbd_dev->rbd_client->client->osdc;
2118         result = rbd_obj_request_submit(osdc, orig_request);
2119         if (!result)
2120                 return;
2121 out_err:
2122         /* Record the error code and complete the request */
2123
2124         orig_request->result = result;
2125         orig_request->xferred = 0;
2126         obj_request_done_set(orig_request);
2127         rbd_obj_request_complete(orig_request);
2128 }
2129
2130 /*
2131  * Read from the parent image the range of data that covers the
2132  * entire target of the given object request.  This is used for
2133  * satisfying a layered image write request when the target of an
2134  * object request from the image request does not exist.
2135  *
2136  * A page array big enough to hold the returned data is allocated
2137  * and supplied to rbd_img_request_fill() as the "data descriptor."
2138  * When the read completes, this page array will be transferred to
2139  * the original object request for the copyup operation.
2140  *
2141  * If an error occurs, record it as the result of the original
2142  * object request and mark it done so it gets completed.
2143  */
2144 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2145 {
2146         struct rbd_img_request *img_request = NULL;
2147         struct rbd_img_request *parent_request = NULL;
2148         struct rbd_device *rbd_dev;
2149         u64 img_offset;
2150         u64 length;
2151         struct page **pages = NULL;
2152         u32 page_count;
2153         int result;
2154
2155         rbd_assert(obj_request_img_data_test(obj_request));
2156         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2157
2158         img_request = obj_request->img_request;
2159         rbd_assert(img_request != NULL);
2160         rbd_dev = img_request->rbd_dev;
2161         rbd_assert(rbd_dev->parent != NULL);
2162
2163         /*
2164          * First things first.  The original osd request is of no
2165          * use to use any more, we'll need a new one that can hold
2166          * the two ops in a copyup request.  We'll get that later,
2167          * but for now we can release the old one.
2168          */
2169         rbd_osd_req_destroy(obj_request->osd_req);
2170         obj_request->osd_req = NULL;
2171
2172         /*
2173          * Determine the byte range covered by the object in the
2174          * child image to which the original request was to be sent.
2175          */
2176         img_offset = obj_request->img_offset - obj_request->offset;
2177         length = (u64)1 << rbd_dev->header.obj_order;
2178
2179         /*
2180          * There is no defined parent data beyond the parent
2181          * overlap, so limit what we read at that boundary if
2182          * necessary.
2183          */
2184         if (img_offset + length > rbd_dev->parent_overlap) {
2185                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2186                 length = rbd_dev->parent_overlap - img_offset;
2187         }
2188
2189         /*
2190          * Allocate a page array big enough to receive the data read
2191          * from the parent.
2192          */
2193         page_count = (u32)calc_pages_for(0, length);
2194         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2195         if (IS_ERR(pages)) {
2196                 result = PTR_ERR(pages);
2197                 pages = NULL;
2198                 goto out_err;
2199         }
2200
2201         result = -ENOMEM;
2202         parent_request = rbd_img_request_create(rbd_dev->parent,
2203                                                 img_offset, length,
2204                                                 false, true);
2205         if (!parent_request)
2206                 goto out_err;
2207         rbd_obj_request_get(obj_request);
2208         parent_request->obj_request = obj_request;
2209
2210         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2211         if (result)
2212                 goto out_err;
2213         parent_request->copyup_pages = pages;
2214
2215         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2216         result = rbd_img_request_submit(parent_request);
2217         if (!result)
2218                 return 0;
2219
2220         parent_request->copyup_pages = NULL;
2221         parent_request->obj_request = NULL;
2222         rbd_obj_request_put(obj_request);
2223 out_err:
2224         if (pages)
2225                 ceph_release_page_vector(pages, page_count);
2226         if (parent_request)
2227                 rbd_img_request_put(parent_request);
2228         obj_request->result = result;
2229         obj_request->xferred = 0;
2230         obj_request_done_set(obj_request);
2231
2232         return result;
2233 }
2234
2235 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2236 {
2237         struct rbd_obj_request *orig_request;
2238         int result;
2239
2240         rbd_assert(!obj_request_img_data_test(obj_request));
2241
2242         /*
2243          * All we need from the object request is the original
2244          * request and the result of the STAT op.  Grab those, then
2245          * we're done with the request.
2246          */
2247         orig_request = obj_request->obj_request;
2248         obj_request->obj_request = NULL;
2249         rbd_assert(orig_request);
2250         rbd_assert(orig_request->img_request);
2251
2252         result = obj_request->result;
2253         obj_request->result = 0;
2254
2255         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2256                 obj_request, orig_request, result,
2257                 obj_request->xferred, obj_request->length);
2258         rbd_obj_request_put(obj_request);
2259
2260         rbd_assert(orig_request);
2261         rbd_assert(orig_request->img_request);
2262
2263         /*
2264          * Our only purpose here is to determine whether the object
2265          * exists, and we don't want to treat the non-existence as
2266          * an error.  If something else comes back, transfer the
2267          * error to the original request and complete it now.
2268          */
2269         if (!result) {
2270                 obj_request_existence_set(orig_request, true);
2271         } else if (result == -ENOENT) {
2272                 obj_request_existence_set(orig_request, false);
2273         } else if (result) {
2274                 orig_request->result = result;
2275                 goto out;
2276         }
2277
2278         /*
2279          * Resubmit the original request now that we have recorded
2280          * whether the target object exists.
2281          */
2282         orig_request->result = rbd_img_obj_request_submit(orig_request);
2283 out:
2284         if (orig_request->result)
2285                 rbd_obj_request_complete(orig_request);
2286         rbd_obj_request_put(orig_request);
2287 }
2288
2289 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2290 {
2291         struct rbd_obj_request *stat_request;
2292         struct rbd_device *rbd_dev;
2293         struct ceph_osd_client *osdc;
2294         struct page **pages = NULL;
2295         u32 page_count;
2296         size_t size;
2297         int ret;
2298
2299         /*
2300          * The response data for a STAT call consists of:
2301          *     le64 length;
2302          *     struct {
2303          *         le32 tv_sec;
2304          *         le32 tv_nsec;
2305          *     } mtime;
2306          */
2307         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2308         page_count = (u32)calc_pages_for(0, size);
2309         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2310         if (IS_ERR(pages))
2311                 return PTR_ERR(pages);
2312
2313         ret = -ENOMEM;
2314         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2315                                                         OBJ_REQUEST_PAGES);
2316         if (!stat_request)
2317                 goto out;
2318
2319         rbd_obj_request_get(obj_request);
2320         stat_request->obj_request = obj_request;
2321         stat_request->pages = pages;
2322         stat_request->page_count = page_count;
2323
2324         rbd_assert(obj_request->img_request);
2325         rbd_dev = obj_request->img_request->rbd_dev;
2326         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2327                                                 stat_request);
2328         if (!stat_request->osd_req)
2329                 goto out;
2330         stat_request->callback = rbd_img_obj_exists_callback;
2331
2332         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2333         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2334                                         false, false);
2335         rbd_osd_req_format_read(stat_request);
2336
2337         osdc = &rbd_dev->rbd_client->client->osdc;
2338         ret = rbd_obj_request_submit(osdc, stat_request);
2339 out:
2340         if (ret)
2341                 rbd_obj_request_put(obj_request);
2342
2343         return ret;
2344 }
2345
2346 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2347 {
2348         struct rbd_img_request *img_request;
2349         struct rbd_device *rbd_dev;
2350         bool known;
2351
2352         rbd_assert(obj_request_img_data_test(obj_request));
2353
2354         img_request = obj_request->img_request;
2355         rbd_assert(img_request);
2356         rbd_dev = img_request->rbd_dev;
2357
2358         /*
2359          * Only writes to layered images need special handling.
2360          * Reads and non-layered writes are simple object requests.
2361          * Layered writes that start beyond the end of the overlap
2362          * with the parent have no parent data, so they too are
2363          * simple object requests.  Finally, if the target object is
2364          * known to already exist, its parent data has already been
2365          * copied, so a write to the object can also be handled as a
2366          * simple object request.
2367          */
2368         if (!img_request_write_test(img_request) ||
2369                 !img_request_layered_test(img_request) ||
2370                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2371                 ((known = obj_request_known_test(obj_request)) &&
2372                         obj_request_exists_test(obj_request))) {
2373
2374                 struct rbd_device *rbd_dev;
2375                 struct ceph_osd_client *osdc;
2376
2377                 rbd_dev = obj_request->img_request->rbd_dev;
2378                 osdc = &rbd_dev->rbd_client->client->osdc;
2379
2380                 return rbd_obj_request_submit(osdc, obj_request);
2381         }
2382
2383         /*
2384          * It's a layered write.  The target object might exist but
2385          * we may not know that yet.  If we know it doesn't exist,
2386          * start by reading the data for the full target object from
2387          * the parent so we can use it for a copyup to the target.
2388          */
2389         if (known)
2390                 return rbd_img_obj_parent_read_full(obj_request);
2391
2392         /* We don't know whether the target exists.  Go find out. */
2393
2394         return rbd_img_obj_exists_submit(obj_request);
2395 }
2396
2397 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2398 {
2399         struct rbd_obj_request *obj_request;
2400         struct rbd_obj_request *next_obj_request;
2401
2402         dout("%s: img %p\n", __func__, img_request);
2403         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2404                 int ret;
2405
2406                 ret = rbd_img_obj_request_submit(obj_request);
2407                 if (ret)
2408                         return ret;
2409         }
2410
2411         return 0;
2412 }
2413
2414 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2415 {
2416         struct rbd_obj_request *obj_request;
2417         struct rbd_device *rbd_dev;
2418         u64 obj_end;
2419
2420         rbd_assert(img_request_child_test(img_request));
2421
2422         obj_request = img_request->obj_request;
2423         rbd_assert(obj_request);
2424         rbd_assert(obj_request->img_request);
2425
2426         obj_request->result = img_request->result;
2427         if (obj_request->result)
2428                 goto out;
2429
2430         /*
2431          * We need to zero anything beyond the parent overlap
2432          * boundary.  Since rbd_img_obj_request_read_callback()
2433          * will zero anything beyond the end of a short read, an
2434          * easy way to do this is to pretend the data from the
2435          * parent came up short--ending at the overlap boundary.
2436          */
2437         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2438         obj_end = obj_request->img_offset + obj_request->length;
2439         rbd_dev = obj_request->img_request->rbd_dev;
2440         if (obj_end > rbd_dev->parent_overlap) {
2441                 u64 xferred = 0;
2442
2443                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2444                         xferred = rbd_dev->parent_overlap -
2445                                         obj_request->img_offset;
2446
2447                 obj_request->xferred = min(img_request->xferred, xferred);
2448         } else {
2449                 obj_request->xferred = img_request->xferred;
2450         }
2451 out:
2452         rbd_img_obj_request_read_callback(obj_request);
2453         rbd_obj_request_complete(obj_request);
2454 }
2455
2456 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2457 {
2458         struct rbd_device *rbd_dev;
2459         struct rbd_img_request *img_request;
2460         int result;
2461
2462         rbd_assert(obj_request_img_data_test(obj_request));
2463         rbd_assert(obj_request->img_request != NULL);
2464         rbd_assert(obj_request->result == (s32) -ENOENT);
2465         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2466
2467         rbd_dev = obj_request->img_request->rbd_dev;
2468         rbd_assert(rbd_dev->parent != NULL);
2469         /* rbd_read_finish(obj_request, obj_request->length); */
2470         img_request = rbd_img_request_create(rbd_dev->parent,
2471                                                 obj_request->img_offset,
2472                                                 obj_request->length,
2473                                                 false, true);
2474         result = -ENOMEM;
2475         if (!img_request)
2476                 goto out_err;
2477
2478         rbd_obj_request_get(obj_request);
2479         img_request->obj_request = obj_request;
2480
2481         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2482                                         obj_request->bio_list);
2483         if (result)
2484                 goto out_err;
2485
2486         img_request->callback = rbd_img_parent_read_callback;
2487         result = rbd_img_request_submit(img_request);
2488         if (result)
2489                 goto out_err;
2490
2491         return;
2492 out_err:
2493         if (img_request)
2494                 rbd_img_request_put(img_request);
2495         obj_request->result = result;
2496         obj_request->xferred = 0;
2497         obj_request_done_set(obj_request);
2498 }
2499
2500 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2501                                    u64 ver, u64 notify_id)
2502 {
2503         struct rbd_obj_request *obj_request;
2504         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2505         int ret;
2506
2507         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2508                                                         OBJ_REQUEST_NODATA);
2509         if (!obj_request)
2510                 return -ENOMEM;
2511
2512         ret = -ENOMEM;
2513         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2514         if (!obj_request->osd_req)
2515                 goto out;
2516         obj_request->callback = rbd_obj_request_put;
2517
2518         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2519                                         notify_id, ver, 0);
2520         rbd_osd_req_format_read(obj_request);
2521
2522         ret = rbd_obj_request_submit(osdc, obj_request);
2523 out:
2524         if (ret)
2525                 rbd_obj_request_put(obj_request);
2526
2527         return ret;
2528 }
2529
2530 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2531 {
2532         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2533         u64 hver;
2534
2535         if (!rbd_dev)
2536                 return;
2537
2538         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2539                 rbd_dev->header_name, (unsigned long long) notify_id,
2540                 (unsigned int) opcode);
2541         (void)rbd_dev_refresh(rbd_dev, &hver);
2542
2543         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2544 }
2545
2546 /*
2547  * Request sync osd watch/unwatch.  The value of "start" determines
2548  * whether a watch request is being initiated or torn down.
2549  */
2550 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2551 {
2552         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2553         struct rbd_obj_request *obj_request;
2554         int ret;
2555
2556         rbd_assert(start ^ !!rbd_dev->watch_event);
2557         rbd_assert(start ^ !!rbd_dev->watch_request);
2558
2559         if (start) {
2560                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2561                                                 &rbd_dev->watch_event);
2562                 if (ret < 0)
2563                         return ret;
2564                 rbd_assert(rbd_dev->watch_event != NULL);
2565         }
2566
2567         ret = -ENOMEM;
2568         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2569                                                         OBJ_REQUEST_NODATA);
2570         if (!obj_request)
2571                 goto out_cancel;
2572
2573         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2574         if (!obj_request->osd_req)
2575                 goto out_cancel;
2576
2577         if (start)
2578                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2579         else
2580                 ceph_osdc_unregister_linger_request(osdc,
2581                                         rbd_dev->watch_request->osd_req);
2582
2583         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2584                                 rbd_dev->watch_event->cookie,
2585                                 rbd_dev->header.obj_version, start);
2586         rbd_osd_req_format_write(obj_request);
2587
2588         ret = rbd_obj_request_submit(osdc, obj_request);
2589         if (ret)
2590                 goto out_cancel;
2591         ret = rbd_obj_request_wait(obj_request);
2592         if (ret)
2593                 goto out_cancel;
2594         ret = obj_request->result;
2595         if (ret)
2596                 goto out_cancel;
2597
2598         /*
2599          * A watch request is set to linger, so the underlying osd
2600          * request won't go away until we unregister it.  We retain
2601          * a pointer to the object request during that time (in
2602          * rbd_dev->watch_request), so we'll keep a reference to
2603          * it.  We'll drop that reference (below) after we've
2604          * unregistered it.
2605          */
2606         if (start) {
2607                 rbd_dev->watch_request = obj_request;
2608
2609                 return 0;
2610         }
2611
2612         /* We have successfully torn down the watch request */
2613
2614         rbd_obj_request_put(rbd_dev->watch_request);
2615         rbd_dev->watch_request = NULL;
2616 out_cancel:
2617         /* Cancel the event if we're tearing down, or on error */
2618         ceph_osdc_cancel_event(rbd_dev->watch_event);
2619         rbd_dev->watch_event = NULL;
2620         if (obj_request)
2621                 rbd_obj_request_put(obj_request);
2622
2623         return ret;
2624 }
2625
2626 /*
2627  * Synchronous osd object method call.  Returns the number of bytes
2628  * returned in the outbound buffer, or a negative error code.
2629  */
2630 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2631                              const char *object_name,
2632                              const char *class_name,
2633                              const char *method_name,
2634                              const void *outbound,
2635                              size_t outbound_size,
2636                              void *inbound,
2637                              size_t inbound_size,
2638                              u64 *version)
2639 {
2640         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2641         struct rbd_obj_request *obj_request;
2642         struct page **pages;
2643         u32 page_count;
2644         int ret;
2645
2646         /*
2647          * Method calls are ultimately read operations.  The result
2648          * should placed into the inbound buffer provided.  They
2649          * also supply outbound data--parameters for the object
2650          * method.  Currently if this is present it will be a
2651          * snapshot id.
2652          */
2653         page_count = (u32)calc_pages_for(0, inbound_size);
2654         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2655         if (IS_ERR(pages))
2656                 return PTR_ERR(pages);
2657
2658         ret = -ENOMEM;
2659         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2660                                                         OBJ_REQUEST_PAGES);
2661         if (!obj_request)
2662                 goto out;
2663
2664         obj_request->pages = pages;
2665         obj_request->page_count = page_count;
2666
2667         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2668         if (!obj_request->osd_req)
2669                 goto out;
2670
2671         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2672                                         class_name, method_name);
2673         if (outbound_size) {
2674                 struct ceph_pagelist *pagelist;
2675
2676                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2677                 if (!pagelist)
2678                         goto out;
2679
2680                 ceph_pagelist_init(pagelist);
2681                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2682                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2683                                                 pagelist);
2684         }
2685         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2686                                         obj_request->pages, inbound_size,
2687                                         0, false, false);
2688         rbd_osd_req_format_read(obj_request);
2689
2690         ret = rbd_obj_request_submit(osdc, obj_request);
2691         if (ret)
2692                 goto out;
2693         ret = rbd_obj_request_wait(obj_request);
2694         if (ret)
2695                 goto out;
2696
2697         ret = obj_request->result;
2698         if (ret < 0)
2699                 goto out;
2700
2701         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2702         ret = (int)obj_request->xferred;
2703         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2704         if (version)
2705                 *version = obj_request->version;
2706 out:
2707         if (obj_request)
2708                 rbd_obj_request_put(obj_request);
2709         else
2710                 ceph_release_page_vector(pages, page_count);
2711
2712         return ret;
2713 }
2714
2715 static void rbd_request_fn(struct request_queue *q)
2716                 __releases(q->queue_lock) __acquires(q->queue_lock)
2717 {
2718         struct rbd_device *rbd_dev = q->queuedata;
2719         bool read_only = rbd_dev->mapping.read_only;
2720         struct request *rq;
2721         int result;
2722
2723         while ((rq = blk_fetch_request(q))) {
2724                 bool write_request = rq_data_dir(rq) == WRITE;
2725                 struct rbd_img_request *img_request;
2726                 u64 offset;
2727                 u64 length;
2728
2729                 /* Ignore any non-FS requests that filter through. */
2730
2731                 if (rq->cmd_type != REQ_TYPE_FS) {
2732                         dout("%s: non-fs request type %d\n", __func__,
2733                                 (int) rq->cmd_type);
2734                         __blk_end_request_all(rq, 0);
2735                         continue;
2736                 }
2737
2738                 /* Ignore/skip any zero-length requests */
2739
2740                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2741                 length = (u64) blk_rq_bytes(rq);
2742
2743                 if (!length) {
2744                         dout("%s: zero-length request\n", __func__);
2745                         __blk_end_request_all(rq, 0);
2746                         continue;
2747                 }
2748
2749                 spin_unlock_irq(q->queue_lock);
2750
2751                 /* Disallow writes to a read-only device */
2752
2753                 if (write_request) {
2754                         result = -EROFS;
2755                         if (read_only)
2756                                 goto end_request;
2757                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2758                 }
2759
2760                 /*
2761                  * Quit early if the mapped snapshot no longer
2762                  * exists.  It's still possible the snapshot will
2763                  * have disappeared by the time our request arrives
2764                  * at the osd, but there's no sense in sending it if
2765                  * we already know.
2766                  */
2767                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2768                         dout("request for non-existent snapshot");
2769                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2770                         result = -ENXIO;
2771                         goto end_request;
2772                 }
2773
2774                 result = -EINVAL;
2775                 if (offset && length > U64_MAX - offset + 1) {
2776                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2777                                 offset, length);
2778                         goto end_request;       /* Shouldn't happen */
2779                 }
2780
2781                 result = -ENOMEM;
2782                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2783                                                         write_request, false);
2784                 if (!img_request)
2785                         goto end_request;
2786
2787                 img_request->rq = rq;
2788
2789                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2790                                                 rq->bio);
2791                 if (!result)
2792                         result = rbd_img_request_submit(img_request);
2793                 if (result)
2794                         rbd_img_request_put(img_request);
2795 end_request:
2796                 spin_lock_irq(q->queue_lock);
2797                 if (result < 0) {
2798                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2799                                 write_request ? "write" : "read",
2800                                 length, offset, result);
2801
2802                         __blk_end_request_all(rq, result);
2803                 }
2804         }
2805 }
2806
2807 /*
2808  * a queue callback. Makes sure that we don't create a bio that spans across
2809  * multiple osd objects. One exception would be with a single page bios,
2810  * which we handle later at bio_chain_clone_range()
2811  */
2812 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2813                           struct bio_vec *bvec)
2814 {
2815         struct rbd_device *rbd_dev = q->queuedata;
2816         sector_t sector_offset;
2817         sector_t sectors_per_obj;
2818         sector_t obj_sector_offset;
2819         int ret;
2820
2821         /*
2822          * Find how far into its rbd object the partition-relative
2823          * bio start sector is to offset relative to the enclosing
2824          * device.
2825          */
2826         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2827         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2828         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2829
2830         /*
2831          * Compute the number of bytes from that offset to the end
2832          * of the object.  Account for what's already used by the bio.
2833          */
2834         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2835         if (ret > bmd->bi_size)
2836                 ret -= bmd->bi_size;
2837         else
2838                 ret = 0;
2839
2840         /*
2841          * Don't send back more than was asked for.  And if the bio
2842          * was empty, let the whole thing through because:  "Note
2843          * that a block device *must* allow a single page to be
2844          * added to an empty bio."
2845          */
2846         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2847         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2848                 ret = (int) bvec->bv_len;
2849
2850         return ret;
2851 }
2852
2853 static void rbd_free_disk(struct rbd_device *rbd_dev)
2854 {
2855         struct gendisk *disk = rbd_dev->disk;
2856
2857         if (!disk)
2858                 return;
2859
2860         rbd_dev->disk = NULL;
2861         if (disk->flags & GENHD_FL_UP) {
2862                 del_gendisk(disk);
2863                 if (disk->queue)
2864                         blk_cleanup_queue(disk->queue);
2865         }
2866         put_disk(disk);
2867 }
2868
2869 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2870                                 const char *object_name,
2871                                 u64 offset, u64 length,
2872                                 void *buf, u64 *version)
2873
2874 {
2875         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2876         struct rbd_obj_request *obj_request;
2877         struct page **pages = NULL;
2878         u32 page_count;
2879         size_t size;
2880         int ret;
2881
2882         page_count = (u32) calc_pages_for(offset, length);
2883         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2884         if (IS_ERR(pages))
2885                 ret = PTR_ERR(pages);
2886
2887         ret = -ENOMEM;
2888         obj_request = rbd_obj_request_create(object_name, offset, length,
2889                                                         OBJ_REQUEST_PAGES);
2890         if (!obj_request)
2891                 goto out;
2892
2893         obj_request->pages = pages;
2894         obj_request->page_count = page_count;
2895
2896         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2897         if (!obj_request->osd_req)
2898                 goto out;
2899
2900         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2901                                         offset, length, 0, 0);
2902         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2903                                         obj_request->pages,
2904                                         obj_request->length,
2905                                         obj_request->offset & ~PAGE_MASK,
2906                                         false, false);
2907         rbd_osd_req_format_read(obj_request);
2908
2909         ret = rbd_obj_request_submit(osdc, obj_request);
2910         if (ret)
2911                 goto out;
2912         ret = rbd_obj_request_wait(obj_request);
2913         if (ret)
2914                 goto out;
2915
2916         ret = obj_request->result;
2917         if (ret < 0)
2918                 goto out;
2919
2920         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2921         size = (size_t) obj_request->xferred;
2922         ceph_copy_from_page_vector(pages, buf, 0, size);
2923         rbd_assert(size <= (size_t) INT_MAX);
2924         ret = (int) size;
2925         if (version)
2926                 *version = obj_request->version;
2927 out:
2928         if (obj_request)
2929                 rbd_obj_request_put(obj_request);
2930         else
2931                 ceph_release_page_vector(pages, page_count);
2932
2933         return ret;
2934 }
2935
2936 /*
2937  * Read the complete header for the given rbd device.
2938  *
2939  * Returns a pointer to a dynamically-allocated buffer containing
2940  * the complete and validated header.  Caller can pass the address
2941  * of a variable that will be filled in with the version of the
2942  * header object at the time it was read.
2943  *
2944  * Returns a pointer-coded errno if a failure occurs.
2945  */
2946 static struct rbd_image_header_ondisk *
2947 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2948 {
2949         struct rbd_image_header_ondisk *ondisk = NULL;
2950         u32 snap_count = 0;
2951         u64 names_size = 0;
2952         u32 want_count;
2953         int ret;
2954
2955         /*
2956          * The complete header will include an array of its 64-bit
2957          * snapshot ids, followed by the names of those snapshots as
2958          * a contiguous block of NUL-terminated strings.  Note that
2959          * the number of snapshots could change by the time we read
2960          * it in, in which case we re-read it.
2961          */
2962         do {
2963                 size_t size;
2964
2965                 kfree(ondisk);
2966
2967                 size = sizeof (*ondisk);
2968                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2969                 size += names_size;
2970                 ondisk = kmalloc(size, GFP_KERNEL);
2971                 if (!ondisk)
2972                         return ERR_PTR(-ENOMEM);
2973
2974                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2975                                        0, size, ondisk, version);
2976                 if (ret < 0)
2977                         goto out_err;
2978                 if ((size_t)ret < size) {
2979                         ret = -ENXIO;
2980                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2981                                 size, ret);
2982                         goto out_err;
2983                 }
2984                 if (!rbd_dev_ondisk_valid(ondisk)) {
2985                         ret = -ENXIO;
2986                         rbd_warn(rbd_dev, "invalid header");
2987                         goto out_err;
2988                 }
2989
2990                 names_size = le64_to_cpu(ondisk->snap_names_len);
2991                 want_count = snap_count;
2992                 snap_count = le32_to_cpu(ondisk->snap_count);
2993         } while (snap_count != want_count);
2994
2995         return ondisk;
2996
2997 out_err:
2998         kfree(ondisk);
2999
3000         return ERR_PTR(ret);
3001 }
3002
3003 /*
3004  * reload the ondisk the header
3005  */
3006 static int rbd_read_header(struct rbd_device *rbd_dev,
3007                            struct rbd_image_header *header)
3008 {
3009         struct rbd_image_header_ondisk *ondisk;
3010         u64 ver = 0;
3011         int ret;
3012
3013         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3014         if (IS_ERR(ondisk))
3015                 return PTR_ERR(ondisk);
3016         ret = rbd_header_from_disk(header, ondisk);
3017         if (ret >= 0)
3018                 header->obj_version = ver;
3019         kfree(ondisk);
3020
3021         return ret;
3022 }
3023
3024 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3025 {
3026         struct rbd_snap *snap;
3027         struct rbd_snap *next;
3028
3029         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3030                 list_del(&snap->node);
3031                 rbd_snap_destroy(snap);
3032         }
3033 }
3034
3035 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3036 {
3037         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3038                 return;
3039
3040         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3041                 sector_t size;
3042
3043                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3044                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3045                 dout("setting size to %llu sectors", (unsigned long long)size);
3046                 set_capacity(rbd_dev->disk, size);
3047         }
3048 }
3049
3050 /*
3051  * only read the first part of the ondisk header, without the snaps info
3052  */
3053 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3054 {
3055         int ret;
3056         struct rbd_image_header h;
3057
3058         ret = rbd_read_header(rbd_dev, &h);
3059         if (ret < 0)
3060                 return ret;
3061
3062         down_write(&rbd_dev->header_rwsem);
3063
3064         /* Update image size, and check for resize of mapped image */
3065         rbd_dev->header.image_size = h.image_size;
3066         rbd_update_mapping_size(rbd_dev);
3067
3068         /* rbd_dev->header.object_prefix shouldn't change */
3069         kfree(rbd_dev->header.snap_sizes);
3070         kfree(rbd_dev->header.snap_names);
3071         /* osd requests may still refer to snapc */
3072         rbd_snap_context_put(rbd_dev->header.snapc);
3073
3074         if (hver)
3075                 *hver = h.obj_version;
3076         rbd_dev->header.obj_version = h.obj_version;
3077         rbd_dev->header.image_size = h.image_size;
3078         rbd_dev->header.snapc = h.snapc;
3079         rbd_dev->header.snap_names = h.snap_names;
3080         rbd_dev->header.snap_sizes = h.snap_sizes;
3081         /* Free the extra copy of the object prefix */
3082         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3083                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3084         kfree(h.object_prefix);
3085
3086         ret = rbd_dev_snaps_update(rbd_dev);
3087
3088         up_write(&rbd_dev->header_rwsem);
3089
3090         return ret;
3091 }
3092
3093 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3094 {
3095         int ret;
3096
3097         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3098         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3099         if (rbd_dev->image_format == 1)
3100                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3101         else
3102                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3103         mutex_unlock(&ctl_mutex);
3104         revalidate_disk(rbd_dev->disk);
3105         if (ret)
3106                 rbd_warn(rbd_dev, "got notification but failed to "
3107                            " update snaps: %d\n", ret);
3108
3109         return ret;
3110 }
3111
3112 static int rbd_init_disk(struct rbd_device *rbd_dev)
3113 {
3114         struct gendisk *disk;
3115         struct request_queue *q;
3116         u64 segment_size;
3117
3118         /* create gendisk info */
3119         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3120         if (!disk)
3121                 return -ENOMEM;
3122
3123         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3124                  rbd_dev->dev_id);
3125         disk->major = rbd_dev->major;
3126         disk->first_minor = 0;
3127         disk->fops = &rbd_bd_ops;
3128         disk->private_data = rbd_dev;
3129
3130         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3131         if (!q)
3132                 goto out_disk;
3133
3134         /* We use the default size, but let's be explicit about it. */
3135         blk_queue_physical_block_size(q, SECTOR_SIZE);
3136
3137         /* set io sizes to object size */
3138         segment_size = rbd_obj_bytes(&rbd_dev->header);
3139         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3140         blk_queue_max_segment_size(q, segment_size);
3141         blk_queue_io_min(q, segment_size);
3142         blk_queue_io_opt(q, segment_size);
3143
3144         blk_queue_merge_bvec(q, rbd_merge_bvec);
3145         disk->queue = q;
3146
3147         q->queuedata = rbd_dev;
3148
3149         rbd_dev->disk = disk;
3150
3151         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3152
3153         return 0;
3154 out_disk:
3155         put_disk(disk);
3156
3157         return -ENOMEM;
3158 }
3159
3160 /*
3161   sysfs
3162 */
3163
3164 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3165 {
3166         return container_of(dev, struct rbd_device, dev);
3167 }
3168
3169 static ssize_t rbd_size_show(struct device *dev,
3170                              struct device_attribute *attr, char *buf)
3171 {
3172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3173
3174         return sprintf(buf, "%llu\n",
3175                 (unsigned long long)rbd_dev->mapping.size);
3176 }
3177
3178 /*
3179  * Note this shows the features for whatever's mapped, which is not
3180  * necessarily the base image.
3181  */
3182 static ssize_t rbd_features_show(struct device *dev,
3183                              struct device_attribute *attr, char *buf)
3184 {
3185         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
3187         return sprintf(buf, "0x%016llx\n",
3188                         (unsigned long long)rbd_dev->mapping.features);
3189 }
3190
3191 static ssize_t rbd_major_show(struct device *dev,
3192                               struct device_attribute *attr, char *buf)
3193 {
3194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
3196         if (rbd_dev->major)
3197                 return sprintf(buf, "%d\n", rbd_dev->major);
3198
3199         return sprintf(buf, "(none)\n");
3200
3201 }
3202
3203 static ssize_t rbd_client_id_show(struct device *dev,
3204                                   struct device_attribute *attr, char *buf)
3205 {
3206         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3207
3208         return sprintf(buf, "client%lld\n",
3209                         ceph_client_id(rbd_dev->rbd_client->client));
3210 }
3211
3212 static ssize_t rbd_pool_show(struct device *dev,
3213                              struct device_attribute *attr, char *buf)
3214 {
3215         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3216
3217         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3218 }
3219
3220 static ssize_t rbd_pool_id_show(struct device *dev,
3221                              struct device_attribute *attr, char *buf)
3222 {
3223         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
3225         return sprintf(buf, "%llu\n",
3226                         (unsigned long long) rbd_dev->spec->pool_id);
3227 }
3228
3229 static ssize_t rbd_name_show(struct device *dev,
3230                              struct device_attribute *attr, char *buf)
3231 {
3232         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233
3234         if (rbd_dev->spec->image_name)
3235                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3236
3237         return sprintf(buf, "(unknown)\n");
3238 }
3239
3240 static ssize_t rbd_image_id_show(struct device *dev,
3241                              struct device_attribute *attr, char *buf)
3242 {
3243         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244
3245         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3246 }
3247
3248 /*
3249  * Shows the name of the currently-mapped snapshot (or
3250  * RBD_SNAP_HEAD_NAME for the base image).
3251  */
3252 static ssize_t rbd_snap_show(struct device *dev,
3253                              struct device_attribute *attr,
3254                              char *buf)
3255 {
3256         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257
3258         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3259 }
3260
3261 /*
3262  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3263  * for the parent image.  If there is no parent, simply shows
3264  * "(no parent image)".
3265  */
3266 static ssize_t rbd_parent_show(struct device *dev,
3267                              struct device_attribute *attr,
3268                              char *buf)
3269 {
3270         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3271         struct rbd_spec *spec = rbd_dev->parent_spec;
3272         int count;
3273         char *bufp = buf;
3274
3275         if (!spec)
3276                 return sprintf(buf, "(no parent image)\n");
3277
3278         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3279                         (unsigned long long) spec->pool_id, spec->pool_name);
3280         if (count < 0)
3281                 return count;
3282         bufp += count;
3283
3284         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3285                         spec->image_name ? spec->image_name : "(unknown)");
3286         if (count < 0)
3287                 return count;
3288         bufp += count;
3289
3290         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3291                         (unsigned long long) spec->snap_id, spec->snap_name);
3292         if (count < 0)
3293                 return count;
3294         bufp += count;
3295
3296         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3297         if (count < 0)
3298                 return count;
3299         bufp += count;
3300
3301         return (ssize_t) (bufp - buf);
3302 }
3303
3304 static ssize_t rbd_image_refresh(struct device *dev,
3305                                  struct device_attribute *attr,
3306                                  const char *buf,
3307                                  size_t size)
3308 {
3309         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3310         int ret;
3311
3312         ret = rbd_dev_refresh(rbd_dev, NULL);
3313
3314         return ret < 0 ? ret : size;
3315 }
3316
3317 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3318 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3319 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3320 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3321 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3322 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3323 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3324 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3325 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3326 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3327 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3328
3329 static struct attribute *rbd_attrs[] = {
3330         &dev_attr_size.attr,
3331         &dev_attr_features.attr,
3332         &dev_attr_major.attr,
3333         &dev_attr_client_id.attr,
3334         &dev_attr_pool.attr,
3335         &dev_attr_pool_id.attr,
3336         &dev_attr_name.attr,
3337         &dev_attr_image_id.attr,
3338         &dev_attr_current_snap.attr,
3339         &dev_attr_parent.attr,
3340         &dev_attr_refresh.attr,
3341         NULL
3342 };
3343
3344 static struct attribute_group rbd_attr_group = {
3345         .attrs = rbd_attrs,
3346 };
3347
3348 static const struct attribute_group *rbd_attr_groups[] = {
3349         &rbd_attr_group,
3350         NULL
3351 };
3352
3353 static void rbd_sysfs_dev_release(struct device *dev)
3354 {
3355 }
3356
3357 static struct device_type rbd_device_type = {
3358         .name           = "rbd",
3359         .groups         = rbd_attr_groups,
3360         .release        = rbd_sysfs_dev_release,
3361 };
3362
3363 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3364 {
3365         kref_get(&spec->kref);
3366
3367         return spec;
3368 }
3369
3370 static void rbd_spec_free(struct kref *kref);
3371 static void rbd_spec_put(struct rbd_spec *spec)
3372 {
3373         if (spec)
3374                 kref_put(&spec->kref, rbd_spec_free);
3375 }
3376
3377 static struct rbd_spec *rbd_spec_alloc(void)
3378 {
3379         struct rbd_spec *spec;
3380
3381         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3382         if (!spec)
3383                 return NULL;
3384         kref_init(&spec->kref);
3385
3386         return spec;
3387 }
3388
3389 static void rbd_spec_free(struct kref *kref)
3390 {
3391         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3392
3393         kfree(spec->pool_name);
3394         kfree(spec->image_id);
3395         kfree(spec->image_name);
3396         kfree(spec->snap_name);
3397         kfree(spec);
3398 }
3399
3400 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3401                                 struct rbd_spec *spec)
3402 {
3403         struct rbd_device *rbd_dev;
3404
3405         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3406         if (!rbd_dev)
3407                 return NULL;
3408
3409         spin_lock_init(&rbd_dev->lock);
3410         rbd_dev->flags = 0;
3411         INIT_LIST_HEAD(&rbd_dev->node);
3412         INIT_LIST_HEAD(&rbd_dev->snaps);
3413         init_rwsem(&rbd_dev->header_rwsem);
3414
3415         rbd_dev->spec = spec;
3416         rbd_dev->rbd_client = rbdc;
3417
3418         /* Initialize the layout used for all rbd requests */
3419
3420         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3421         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3422         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3423         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3424
3425         return rbd_dev;
3426 }
3427
3428 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3429 {
3430         rbd_put_client(rbd_dev->rbd_client);
3431         rbd_spec_put(rbd_dev->spec);
3432         kfree(rbd_dev);
3433 }
3434
3435 static void rbd_snap_destroy(struct rbd_snap *snap)
3436 {
3437         kfree(snap->name);
3438         kfree(snap);
3439 }
3440
3441 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3442                                                 const char *snap_name,
3443                                                 u64 snap_id, u64 snap_size,
3444                                                 u64 snap_features)
3445 {
3446         struct rbd_snap *snap;
3447
3448         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3449         if (!snap)
3450                 return ERR_PTR(-ENOMEM);
3451
3452         snap->name = snap_name;
3453         snap->id = snap_id;
3454         snap->size = snap_size;
3455         snap->features = snap_features;
3456
3457         return snap;
3458 }
3459
3460 /*
3461  * Returns a dynamically-allocated snapshot name if successful, or a
3462  * pointer-coded error otherwise.
3463  */
3464 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3465                 u64 *snap_size, u64 *snap_features)
3466 {
3467         char *snap_name;
3468         int i;
3469
3470         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3471
3472         /* Skip over names until we find the one we are looking for */
3473
3474         snap_name = rbd_dev->header.snap_names;
3475         for (i = 0; i < which; i++)
3476                 snap_name += strlen(snap_name) + 1;
3477
3478         snap_name = kstrdup(snap_name, GFP_KERNEL);
3479         if (!snap_name)
3480                 return ERR_PTR(-ENOMEM);
3481
3482         *snap_size = rbd_dev->header.snap_sizes[which];
3483         *snap_features = 0;     /* No features for v1 */
3484
3485         return snap_name;
3486 }
3487
3488 /*
3489  * Get the size and object order for an image snapshot, or if
3490  * snap_id is CEPH_NOSNAP, gets this information for the base
3491  * image.
3492  */
3493 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3494                                 u8 *order, u64 *snap_size)
3495 {
3496         __le64 snapid = cpu_to_le64(snap_id);
3497         int ret;
3498         struct {
3499                 u8 order;
3500                 __le64 size;
3501         } __attribute__ ((packed)) size_buf = { 0 };
3502
3503         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3504                                 "rbd", "get_size",
3505                                 &snapid, sizeof (snapid),
3506                                 &size_buf, sizeof (size_buf), NULL);
3507         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3508         if (ret < 0)
3509                 return ret;
3510         if (ret < sizeof (size_buf))
3511                 return -ERANGE;
3512
3513         if (order)
3514                 *order = size_buf.order;
3515         *snap_size = le64_to_cpu(size_buf.size);
3516
3517         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3518                 (unsigned long long)snap_id, (unsigned int)*order,
3519                 (unsigned long long)*snap_size);
3520
3521         return 0;
3522 }
3523
3524 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3525 {
3526         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3527                                         &rbd_dev->header.obj_order,
3528                                         &rbd_dev->header.image_size);
3529 }
3530
3531 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3532 {
3533         void *reply_buf;
3534         int ret;
3535         void *p;
3536
3537         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3538         if (!reply_buf)
3539                 return -ENOMEM;
3540
3541         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3542                                 "rbd", "get_object_prefix", NULL, 0,
3543                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3544         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3545         if (ret < 0)
3546                 goto out;
3547
3548         p = reply_buf;
3549         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3550                                                 p + ret, NULL, GFP_NOIO);
3551         ret = 0;
3552
3553         if (IS_ERR(rbd_dev->header.object_prefix)) {
3554                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3555                 rbd_dev->header.object_prefix = NULL;
3556         } else {
3557                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3558         }
3559 out:
3560         kfree(reply_buf);
3561
3562         return ret;
3563 }
3564
3565 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3566                 u64 *snap_features)
3567 {
3568         __le64 snapid = cpu_to_le64(snap_id);
3569         struct {
3570                 __le64 features;
3571                 __le64 incompat;
3572         } __attribute__ ((packed)) features_buf = { 0 };
3573         u64 incompat;
3574         int ret;
3575
3576         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3577                                 "rbd", "get_features",
3578                                 &snapid, sizeof (snapid),
3579                                 &features_buf, sizeof (features_buf), NULL);
3580         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3581         if (ret < 0)
3582                 return ret;
3583         if (ret < sizeof (features_buf))
3584                 return -ERANGE;
3585
3586         incompat = le64_to_cpu(features_buf.incompat);
3587         if (incompat & ~RBD_FEATURES_SUPPORTED)
3588                 return -ENXIO;
3589
3590         *snap_features = le64_to_cpu(features_buf.features);
3591
3592         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3593                 (unsigned long long)snap_id,
3594                 (unsigned long long)*snap_features,
3595                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3596
3597         return 0;
3598 }
3599
3600 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3601 {
3602         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3603                                                 &rbd_dev->header.features);
3604 }
3605
3606 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3607 {
3608         struct rbd_spec *parent_spec;
3609         size_t size;
3610         void *reply_buf = NULL;
3611         __le64 snapid;
3612         void *p;
3613         void *end;
3614         char *image_id;
3615         u64 overlap;
3616         int ret;
3617
3618         parent_spec = rbd_spec_alloc();
3619         if (!parent_spec)
3620                 return -ENOMEM;
3621
3622         size = sizeof (__le64) +                                /* pool_id */
3623                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3624                 sizeof (__le64) +                               /* snap_id */
3625                 sizeof (__le64);                                /* overlap */
3626         reply_buf = kmalloc(size, GFP_KERNEL);
3627         if (!reply_buf) {
3628                 ret = -ENOMEM;
3629                 goto out_err;
3630         }
3631
3632         snapid = cpu_to_le64(CEPH_NOSNAP);
3633         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3634                                 "rbd", "get_parent",
3635                                 &snapid, sizeof (snapid),
3636                                 reply_buf, size, NULL);
3637         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3638         if (ret < 0)
3639                 goto out_err;
3640
3641         p = reply_buf;
3642         end = reply_buf + ret;
3643         ret = -ERANGE;
3644         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3645         if (parent_spec->pool_id == CEPH_NOPOOL)
3646                 goto out;       /* No parent?  No problem. */
3647
3648         /* The ceph file layout needs to fit pool id in 32 bits */
3649
3650         ret = -EIO;
3651         if (parent_spec->pool_id > (u64)U32_MAX) {
3652                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3653                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3654                 goto out_err;
3655         }
3656
3657         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3658         if (IS_ERR(image_id)) {
3659                 ret = PTR_ERR(image_id);
3660                 goto out_err;
3661         }
3662         parent_spec->image_id = image_id;
3663         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3664         ceph_decode_64_safe(&p, end, overlap, out_err);
3665
3666         rbd_dev->parent_overlap = overlap;
3667         rbd_dev->parent_spec = parent_spec;
3668         parent_spec = NULL;     /* rbd_dev now owns this */
3669 out:
3670         ret = 0;
3671 out_err:
3672         kfree(reply_buf);
3673         rbd_spec_put(parent_spec);
3674
3675         return ret;
3676 }
3677
3678 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3679 {
3680         struct {
3681                 __le64 stripe_unit;
3682                 __le64 stripe_count;
3683         } __attribute__ ((packed)) striping_info_buf = { 0 };
3684         size_t size = sizeof (striping_info_buf);
3685         void *p;
3686         u64 obj_size;
3687         u64 stripe_unit;
3688         u64 stripe_count;
3689         int ret;
3690
3691         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3692                                 "rbd", "get_stripe_unit_count", NULL, 0,
3693                                 (char *)&striping_info_buf, size, NULL);
3694         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3695         if (ret < 0)
3696                 return ret;
3697         if (ret < size)
3698                 return -ERANGE;
3699
3700         /*
3701          * We don't actually support the "fancy striping" feature
3702          * (STRIPINGV2) yet, but if the striping sizes are the
3703          * defaults the behavior is the same as before.  So find
3704          * out, and only fail if the image has non-default values.
3705          */
3706         ret = -EINVAL;
3707         obj_size = (u64)1 << rbd_dev->header.obj_order;
3708         p = &striping_info_buf;
3709         stripe_unit = ceph_decode_64(&p);
3710         if (stripe_unit != obj_size) {
3711                 rbd_warn(rbd_dev, "unsupported stripe unit "
3712                                 "(got %llu want %llu)",
3713                                 stripe_unit, obj_size);
3714                 return -EINVAL;
3715         }
3716         stripe_count = ceph_decode_64(&p);
3717         if (stripe_count != 1) {
3718                 rbd_warn(rbd_dev, "unsupported stripe count "
3719                                 "(got %llu want 1)", stripe_count);
3720                 return -EINVAL;
3721         }
3722         rbd_dev->header.stripe_unit = stripe_unit;
3723         rbd_dev->header.stripe_count = stripe_count;
3724
3725         return 0;
3726 }
3727
3728 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3729 {
3730         size_t image_id_size;
3731         char *image_id;
3732         void *p;
3733         void *end;
3734         size_t size;
3735         void *reply_buf = NULL;
3736         size_t len = 0;
3737         char *image_name = NULL;
3738         int ret;
3739
3740         rbd_assert(!rbd_dev->spec->image_name);
3741
3742         len = strlen(rbd_dev->spec->image_id);
3743         image_id_size = sizeof (__le32) + len;
3744         image_id = kmalloc(image_id_size, GFP_KERNEL);
3745         if (!image_id)
3746                 return NULL;
3747
3748         p = image_id;
3749         end = image_id + image_id_size;
3750         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3751
3752         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3753         reply_buf = kmalloc(size, GFP_KERNEL);
3754         if (!reply_buf)
3755                 goto out;
3756
3757         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3758                                 "rbd", "dir_get_name",
3759                                 image_id, image_id_size,
3760                                 reply_buf, size, NULL);
3761         if (ret < 0)
3762                 goto out;
3763         p = reply_buf;
3764         end = reply_buf + ret;
3765
3766         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3767         if (IS_ERR(image_name))
3768                 image_name = NULL;
3769         else
3770                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3771 out:
3772         kfree(reply_buf);
3773         kfree(image_id);
3774
3775         return image_name;
3776 }
3777
3778 /*
3779  * When an rbd image has a parent image, it is identified by the
3780  * pool, image, and snapshot ids (not names).  This function fills
3781  * in the names for those ids.  (It's OK if we can't figure out the
3782  * name for an image id, but the pool and snapshot ids should always
3783  * exist and have names.)  All names in an rbd spec are dynamically
3784  * allocated.
3785  *
3786  * When an image being mapped (not a parent) is probed, we have the
3787  * pool name and pool id, image name and image id, and the snapshot
3788  * name.  The only thing we're missing is the snapshot id.
3789  *
3790  * The set of snapshots for an image is not known until they have
3791  * been read by rbd_dev_snaps_update(), so we can't completely fill
3792  * in this information until after that has been called.
3793  */
3794 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3795 {
3796         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3797         struct rbd_spec *spec = rbd_dev->spec;
3798         const char *pool_name;
3799         const char *image_name;
3800         const char *snap_name;
3801         int ret;
3802
3803         /*
3804          * An image being mapped will have the pool name (etc.), but
3805          * we need to look up the snapshot id.
3806          */
3807         if (spec->pool_name) {
3808                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3809                         struct rbd_snap *snap;
3810
3811                         snap = snap_by_name(rbd_dev, spec->snap_name);
3812                         if (!snap)
3813                                 return -ENOENT;
3814                         spec->snap_id = snap->id;
3815                 } else {
3816                         spec->snap_id = CEPH_NOSNAP;
3817                 }
3818
3819                 return 0;
3820         }
3821
3822         /* Get the pool name; we have to make our own copy of this */
3823
3824         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3825         if (!pool_name) {
3826                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3827                 return -EIO;
3828         }
3829         pool_name = kstrdup(pool_name, GFP_KERNEL);
3830         if (!pool_name)
3831                 return -ENOMEM;
3832
3833         /* Fetch the image name; tolerate failure here */
3834
3835         image_name = rbd_dev_image_name(rbd_dev);
3836         if (!image_name)
3837                 rbd_warn(rbd_dev, "unable to get image name");
3838
3839         /* Look up the snapshot name, and make a copy */
3840
3841         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3842         if (!snap_name) {
3843                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3844                 ret = -EIO;
3845                 goto out_err;
3846         }
3847         snap_name = kstrdup(snap_name, GFP_KERNEL);
3848         if (!snap_name) {
3849                 ret = -ENOMEM;
3850                 goto out_err;
3851         }
3852
3853         spec->pool_name = pool_name;
3854         spec->image_name = image_name;
3855         spec->snap_name = snap_name;
3856
3857         return 0;
3858 out_err:
3859         kfree(image_name);
3860         kfree(pool_name);
3861
3862         return ret;
3863 }
3864
3865 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3866 {
3867         size_t size;
3868         int ret;
3869         void *reply_buf;
3870         void *p;
3871         void *end;
3872         u64 seq;
3873         u32 snap_count;
3874         struct ceph_snap_context *snapc;
3875         u32 i;
3876
3877         /*
3878          * We'll need room for the seq value (maximum snapshot id),
3879          * snapshot count, and array of that many snapshot ids.
3880          * For now we have a fixed upper limit on the number we're
3881          * prepared to receive.
3882          */
3883         size = sizeof (__le64) + sizeof (__le32) +
3884                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3885         reply_buf = kzalloc(size, GFP_KERNEL);
3886         if (!reply_buf)
3887                 return -ENOMEM;
3888
3889         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3890                                 "rbd", "get_snapcontext", NULL, 0,
3891                                 reply_buf, size, ver);
3892         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3893         if (ret < 0)
3894                 goto out;
3895
3896         p = reply_buf;
3897         end = reply_buf + ret;
3898         ret = -ERANGE;
3899         ceph_decode_64_safe(&p, end, seq, out);
3900         ceph_decode_32_safe(&p, end, snap_count, out);
3901
3902         /*
3903          * Make sure the reported number of snapshot ids wouldn't go
3904          * beyond the end of our buffer.  But before checking that,
3905          * make sure the computed size of the snapshot context we
3906          * allocate is representable in a size_t.
3907          */
3908         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3909                                  / sizeof (u64)) {
3910                 ret = -EINVAL;
3911                 goto out;
3912         }
3913         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3914                 goto out;
3915         ret = 0;
3916
3917         snapc = rbd_snap_context_create(snap_count);
3918         if (!snapc) {
3919                 ret = -ENOMEM;
3920                 goto out;
3921         }
3922         snapc->seq = seq;
3923         for (i = 0; i < snap_count; i++)
3924                 snapc->snaps[i] = ceph_decode_64(&p);
3925
3926         rbd_dev->header.snapc = snapc;
3927
3928         dout("  snap context seq = %llu, snap_count = %u\n",
3929                 (unsigned long long)seq, (unsigned int)snap_count);
3930 out:
3931         kfree(reply_buf);
3932
3933         return ret;
3934 }
3935
3936 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3937 {
3938         size_t size;
3939         void *reply_buf;
3940         __le64 snap_id;
3941         int ret;
3942         void *p;
3943         void *end;
3944         char *snap_name;
3945
3946         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3947         reply_buf = kmalloc(size, GFP_KERNEL);
3948         if (!reply_buf)
3949                 return ERR_PTR(-ENOMEM);
3950
3951         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3952         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3953         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3954                                 "rbd", "get_snapshot_name",
3955                                 &snap_id, sizeof (snap_id),
3956                                 reply_buf, size, NULL);
3957         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3958         if (ret < 0) {
3959                 snap_name = ERR_PTR(ret);
3960                 goto out;
3961         }
3962
3963         p = reply_buf;
3964         end = reply_buf + ret;
3965         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3966         if (IS_ERR(snap_name))
3967                 goto out;
3968
3969         dout("  snap_id 0x%016llx snap_name = %s\n",
3970                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3971 out:
3972         kfree(reply_buf);
3973
3974         return snap_name;
3975 }
3976
3977 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3978                 u64 *snap_size, u64 *snap_features)
3979 {
3980         u64 snap_id;
3981         u64 size;
3982         u64 features;
3983         char *snap_name;
3984         int ret;
3985
3986         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3987         snap_id = rbd_dev->header.snapc->snaps[which];
3988         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3989         if (ret)
3990                 goto out_err;
3991
3992         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3993         if (ret)
3994                 goto out_err;
3995
3996         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3997         if (!IS_ERR(snap_name)) {
3998                 *snap_size = size;
3999                 *snap_features = features;
4000         }
4001
4002         return snap_name;
4003 out_err:
4004         return ERR_PTR(ret);
4005 }
4006
4007 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4008                 u64 *snap_size, u64 *snap_features)
4009 {
4010         if (rbd_dev->image_format == 1)
4011                 return rbd_dev_v1_snap_info(rbd_dev, which,
4012                                         snap_size, snap_features);
4013         if (rbd_dev->image_format == 2)
4014                 return rbd_dev_v2_snap_info(rbd_dev, which,
4015                                         snap_size, snap_features);
4016         return ERR_PTR(-EINVAL);
4017 }
4018
4019 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4020 {
4021         int ret;
4022         __u8 obj_order;
4023
4024         down_write(&rbd_dev->header_rwsem);
4025
4026         /* Grab old order first, to see if it changes */
4027
4028         obj_order = rbd_dev->header.obj_order,
4029         ret = rbd_dev_v2_image_size(rbd_dev);
4030         if (ret)
4031                 goto out;
4032         if (rbd_dev->header.obj_order != obj_order) {
4033                 ret = -EIO;
4034                 goto out;
4035         }
4036         rbd_update_mapping_size(rbd_dev);
4037
4038         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4039         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4040         if (ret)
4041                 goto out;
4042         ret = rbd_dev_snaps_update(rbd_dev);
4043         dout("rbd_dev_snaps_update returned %d\n", ret);
4044         if (ret)
4045                 goto out;
4046 out:
4047         up_write(&rbd_dev->header_rwsem);
4048
4049         return ret;
4050 }
4051
4052 /*
4053  * Scan the rbd device's current snapshot list and compare it to the
4054  * newly-received snapshot context.  Remove any existing snapshots
4055  * not present in the new snapshot context.  Add a new snapshot for
4056  * any snaphots in the snapshot context not in the current list.
4057  * And verify there are no changes to snapshots we already know
4058  * about.
4059  *
4060  * Assumes the snapshots in the snapshot context are sorted by
4061  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4062  * are also maintained in that order.)
4063  *
4064  * Note that any error occurs while updating the snapshot list
4065  * aborts the update, and the entire list is cleared.  The snapshot
4066  * list becomes inconsistent at that point anyway, so it might as
4067  * well be empty.
4068  */
4069 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4070 {
4071         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4072         const u32 snap_count = snapc->num_snaps;
4073         struct list_head *head = &rbd_dev->snaps;
4074         struct list_head *links = head->next;
4075         u32 index = 0;
4076         int ret = 0;
4077
4078         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4079         while (index < snap_count || links != head) {
4080                 u64 snap_id;
4081                 struct rbd_snap *snap;
4082                 char *snap_name;
4083                 u64 snap_size = 0;
4084                 u64 snap_features = 0;
4085
4086                 snap_id = index < snap_count ? snapc->snaps[index]
4087                                              : CEPH_NOSNAP;
4088                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4089                                      : NULL;
4090                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4091
4092                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4093                         struct list_head *next = links->next;
4094
4095                         /*
4096                          * A previously-existing snapshot is not in
4097                          * the new snap context.
4098                          *
4099                          * If the now-missing snapshot is the one
4100                          * the image represents, clear its existence
4101                          * flag so we can avoid sending any more
4102                          * requests to it.
4103                          */
4104                         if (rbd_dev->spec->snap_id == snap->id)
4105                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4106                         dout("removing %ssnap id %llu\n",
4107                                 rbd_dev->spec->snap_id == snap->id ?
4108                                                         "mapped " : "",
4109                                 (unsigned long long)snap->id);
4110
4111                         list_del(&snap->node);
4112                         rbd_snap_destroy(snap);
4113
4114                         /* Done with this list entry; advance */
4115
4116                         links = next;
4117                         continue;
4118                 }
4119
4120                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4121                                         &snap_size, &snap_features);
4122                 if (IS_ERR(snap_name)) {
4123                         ret = PTR_ERR(snap_name);
4124                         dout("failed to get snap info, error %d\n", ret);
4125                         goto out_err;
4126                 }
4127
4128                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4129                         (unsigned long long)snap_id);
4130                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4131                         struct rbd_snap *new_snap;
4132
4133                         /* We haven't seen this snapshot before */
4134
4135                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4136                                         snap_id, snap_size, snap_features);
4137                         if (IS_ERR(new_snap)) {
4138                                 ret = PTR_ERR(new_snap);
4139                                 dout("  failed to add dev, error %d\n", ret);
4140                                 goto out_err;
4141                         }
4142
4143                         /* New goes before existing, or at end of list */
4144
4145                         dout("  added dev%s\n", snap ? "" : " at end\n");
4146                         if (snap)
4147                                 list_add_tail(&new_snap->node, &snap->node);
4148                         else
4149                                 list_add_tail(&new_snap->node, head);
4150                 } else {
4151                         /* Already have this one */
4152
4153                         dout("  already present\n");
4154
4155                         rbd_assert(snap->size == snap_size);
4156                         rbd_assert(!strcmp(snap->name, snap_name));
4157                         rbd_assert(snap->features == snap_features);
4158
4159                         /* Done with this list entry; advance */
4160
4161                         links = links->next;
4162                 }
4163
4164                 /* Advance to the next entry in the snapshot context */
4165
4166                 index++;
4167         }
4168         dout("%s: done\n", __func__);
4169
4170         return 0;
4171 out_err:
4172         rbd_remove_all_snaps(rbd_dev);
4173
4174         return ret;
4175 }
4176
4177 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4178 {
4179         struct device *dev;
4180         int ret;
4181
4182         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4183
4184         dev = &rbd_dev->dev;
4185         dev->bus = &rbd_bus_type;
4186         dev->type = &rbd_device_type;
4187         dev->parent = &rbd_root_dev;
4188         dev->release = rbd_dev_release;
4189         dev_set_name(dev, "%d", rbd_dev->dev_id);
4190         ret = device_register(dev);
4191
4192         mutex_unlock(&ctl_mutex);
4193
4194         return ret;
4195 }
4196
4197 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4198 {
4199         device_unregister(&rbd_dev->dev);
4200 }
4201
4202 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4203
4204 /*
4205  * Get a unique rbd identifier for the given new rbd_dev, and add
4206  * the rbd_dev to the global list.  The minimum rbd id is 1.
4207  */
4208 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4209 {
4210         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4211
4212         spin_lock(&rbd_dev_list_lock);
4213         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4214         spin_unlock(&rbd_dev_list_lock);
4215         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4216                 (unsigned long long) rbd_dev->dev_id);
4217 }
4218
4219 /*
4220  * Remove an rbd_dev from the global list, and record that its
4221  * identifier is no longer in use.
4222  */
4223 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4224 {
4225         struct list_head *tmp;
4226         int rbd_id = rbd_dev->dev_id;
4227         int max_id;
4228
4229         rbd_assert(rbd_id > 0);
4230
4231         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4232                 (unsigned long long) rbd_dev->dev_id);
4233         spin_lock(&rbd_dev_list_lock);
4234         list_del_init(&rbd_dev->node);
4235
4236         /*
4237          * If the id being "put" is not the current maximum, there
4238          * is nothing special we need to do.
4239          */
4240         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4241                 spin_unlock(&rbd_dev_list_lock);
4242                 return;
4243         }
4244
4245         /*
4246          * We need to update the current maximum id.  Search the
4247          * list to find out what it is.  We're more likely to find
4248          * the maximum at the end, so search the list backward.
4249          */
4250         max_id = 0;
4251         list_for_each_prev(tmp, &rbd_dev_list) {
4252                 struct rbd_device *rbd_dev;
4253
4254                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4255                 if (rbd_dev->dev_id > max_id)
4256                         max_id = rbd_dev->dev_id;
4257         }
4258         spin_unlock(&rbd_dev_list_lock);
4259
4260         /*
4261          * The max id could have been updated by rbd_dev_id_get(), in
4262          * which case it now accurately reflects the new maximum.
4263          * Be careful not to overwrite the maximum value in that
4264          * case.
4265          */
4266         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4267         dout("  max dev id has been reset\n");
4268 }
4269
4270 /*
4271  * Skips over white space at *buf, and updates *buf to point to the
4272  * first found non-space character (if any). Returns the length of
4273  * the token (string of non-white space characters) found.  Note
4274  * that *buf must be terminated with '\0'.
4275  */
4276 static inline size_t next_token(const char **buf)
4277 {
4278         /*
4279         * These are the characters that produce nonzero for
4280         * isspace() in the "C" and "POSIX" locales.
4281         */
4282         const char *spaces = " \f\n\r\t\v";
4283
4284         *buf += strspn(*buf, spaces);   /* Find start of token */
4285
4286         return strcspn(*buf, spaces);   /* Return token length */
4287 }
4288
4289 /*
4290  * Finds the next token in *buf, and if the provided token buffer is
4291  * big enough, copies the found token into it.  The result, if
4292  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4293  * must be terminated with '\0' on entry.
4294  *
4295  * Returns the length of the token found (not including the '\0').
4296  * Return value will be 0 if no token is found, and it will be >=
4297  * token_size if the token would not fit.
4298  *
4299  * The *buf pointer will be updated to point beyond the end of the
4300  * found token.  Note that this occurs even if the token buffer is
4301  * too small to hold it.
4302  */
4303 static inline size_t copy_token(const char **buf,
4304                                 char *token,
4305                                 size_t token_size)
4306 {
4307         size_t len;
4308
4309         len = next_token(buf);
4310         if (len < token_size) {
4311                 memcpy(token, *buf, len);
4312                 *(token + len) = '\0';
4313         }
4314         *buf += len;
4315
4316         return len;
4317 }
4318
4319 /*
4320  * Finds the next token in *buf, dynamically allocates a buffer big
4321  * enough to hold a copy of it, and copies the token into the new
4322  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4323  * that a duplicate buffer is created even for a zero-length token.
4324  *
4325  * Returns a pointer to the newly-allocated duplicate, or a null
4326  * pointer if memory for the duplicate was not available.  If
4327  * the lenp argument is a non-null pointer, the length of the token
4328  * (not including the '\0') is returned in *lenp.
4329  *
4330  * If successful, the *buf pointer will be updated to point beyond
4331  * the end of the found token.
4332  *
4333  * Note: uses GFP_KERNEL for allocation.
4334  */
4335 static inline char *dup_token(const char **buf, size_t *lenp)
4336 {
4337         char *dup;
4338         size_t len;
4339
4340         len = next_token(buf);
4341         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4342         if (!dup)
4343                 return NULL;
4344         *(dup + len) = '\0';
4345         *buf += len;
4346
4347         if (lenp)
4348                 *lenp = len;
4349
4350         return dup;
4351 }
4352
4353 /*
4354  * Parse the options provided for an "rbd add" (i.e., rbd image
4355  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4356  * and the data written is passed here via a NUL-terminated buffer.
4357  * Returns 0 if successful or an error code otherwise.
4358  *
4359  * The information extracted from these options is recorded in
4360  * the other parameters which return dynamically-allocated
4361  * structures:
4362  *  ceph_opts
4363  *      The address of a pointer that will refer to a ceph options
4364  *      structure.  Caller must release the returned pointer using
4365  *      ceph_destroy_options() when it is no longer needed.
4366  *  rbd_opts
4367  *      Address of an rbd options pointer.  Fully initialized by
4368  *      this function; caller must release with kfree().
4369  *  spec
4370  *      Address of an rbd image specification pointer.  Fully
4371  *      initialized by this function based on parsed options.
4372  *      Caller must release with rbd_spec_put().
4373  *
4374  * The options passed take this form:
4375  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4376  * where:
4377  *  <mon_addrs>
4378  *      A comma-separated list of one or more monitor addresses.
4379  *      A monitor address is an ip address, optionally followed
4380  *      by a port number (separated by a colon).
4381  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4382  *  <options>
4383  *      A comma-separated list of ceph and/or rbd options.
4384  *  <pool_name>
4385  *      The name of the rados pool containing the rbd image.
4386  *  <image_name>
4387  *      The name of the image in that pool to map.
4388  *  <snap_id>
4389  *      An optional snapshot id.  If provided, the mapping will
4390  *      present data from the image at the time that snapshot was
4391  *      created.  The image head is used if no snapshot id is
4392  *      provided.  Snapshot mappings are always read-only.
4393  */
4394 static int rbd_add_parse_args(const char *buf,
4395                                 struct ceph_options **ceph_opts,
4396                                 struct rbd_options **opts,
4397                                 struct rbd_spec **rbd_spec)
4398 {
4399         size_t len;
4400         char *options;
4401         const char *mon_addrs;
4402         char *snap_name;
4403         size_t mon_addrs_size;
4404         struct rbd_spec *spec = NULL;
4405         struct rbd_options *rbd_opts = NULL;
4406         struct ceph_options *copts;
4407         int ret;
4408
4409         /* The first four tokens are required */
4410
4411         len = next_token(&buf);
4412         if (!len) {
4413                 rbd_warn(NULL, "no monitor address(es) provided");
4414                 return -EINVAL;
4415         }
4416         mon_addrs = buf;
4417         mon_addrs_size = len + 1;
4418         buf += len;
4419
4420         ret = -EINVAL;
4421         options = dup_token(&buf, NULL);
4422         if (!options)
4423                 return -ENOMEM;
4424         if (!*options) {
4425                 rbd_warn(NULL, "no options provided");
4426                 goto out_err;
4427         }
4428
4429         spec = rbd_spec_alloc();
4430         if (!spec)
4431                 goto out_mem;
4432
4433         spec->pool_name = dup_token(&buf, NULL);
4434         if (!spec->pool_name)
4435                 goto out_mem;
4436         if (!*spec->pool_name) {
4437                 rbd_warn(NULL, "no pool name provided");
4438                 goto out_err;
4439         }
4440
4441         spec->image_name = dup_token(&buf, NULL);
4442         if (!spec->image_name)
4443                 goto out_mem;
4444         if (!*spec->image_name) {
4445                 rbd_warn(NULL, "no image name provided");
4446                 goto out_err;
4447         }
4448
4449         /*
4450          * Snapshot name is optional; default is to use "-"
4451          * (indicating the head/no snapshot).
4452          */
4453         len = next_token(&buf);
4454         if (!len) {
4455                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4456                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4457         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4458                 ret = -ENAMETOOLONG;
4459                 goto out_err;
4460         }
4461         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4462         if (!snap_name)
4463                 goto out_mem;
4464         *(snap_name + len) = '\0';
4465         spec->snap_name = snap_name;
4466
4467         /* Initialize all rbd options to the defaults */
4468
4469         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4470         if (!rbd_opts)
4471                 goto out_mem;
4472
4473         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4474
4475         copts = ceph_parse_options(options, mon_addrs,
4476                                         mon_addrs + mon_addrs_size - 1,
4477                                         parse_rbd_opts_token, rbd_opts);
4478         if (IS_ERR(copts)) {
4479                 ret = PTR_ERR(copts);
4480                 goto out_err;
4481         }
4482         kfree(options);
4483
4484         *ceph_opts = copts;
4485         *opts = rbd_opts;
4486         *rbd_spec = spec;
4487
4488         return 0;
4489 out_mem:
4490         ret = -ENOMEM;
4491 out_err:
4492         kfree(rbd_opts);
4493         rbd_spec_put(spec);
4494         kfree(options);
4495
4496         return ret;
4497 }
4498
4499 /*
4500  * An rbd format 2 image has a unique identifier, distinct from the
4501  * name given to it by the user.  Internally, that identifier is
4502  * what's used to specify the names of objects related to the image.
4503  *
4504  * A special "rbd id" object is used to map an rbd image name to its
4505  * id.  If that object doesn't exist, then there is no v2 rbd image
4506  * with the supplied name.
4507  *
4508  * This function will record the given rbd_dev's image_id field if
4509  * it can be determined, and in that case will return 0.  If any
4510  * errors occur a negative errno will be returned and the rbd_dev's
4511  * image_id field will be unchanged (and should be NULL).
4512  */
4513 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4514 {
4515         int ret;
4516         size_t size;
4517         char *object_name;
4518         void *response;
4519         char *image_id;
4520
4521         /*
4522          * When probing a parent image, the image id is already
4523          * known (and the image name likely is not).  There's no
4524          * need to fetch the image id again in this case.  We
4525          * do still need to set the image format though.
4526          */
4527         if (rbd_dev->spec->image_id) {
4528                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4529
4530                 return 0;
4531         }
4532
4533         /*
4534          * First, see if the format 2 image id file exists, and if
4535          * so, get the image's persistent id from it.
4536          */
4537         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4538         object_name = kmalloc(size, GFP_NOIO);
4539         if (!object_name)
4540                 return -ENOMEM;
4541         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4542         dout("rbd id object name is %s\n", object_name);
4543
4544         /* Response will be an encoded string, which includes a length */
4545
4546         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4547         response = kzalloc(size, GFP_NOIO);
4548         if (!response) {
4549                 ret = -ENOMEM;
4550                 goto out;
4551         }
4552
4553         /* If it doesn't exist we'll assume it's a format 1 image */
4554
4555         ret = rbd_obj_method_sync(rbd_dev, object_name,
4556                                 "rbd", "get_id", NULL, 0,
4557                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4558         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4559         if (ret == -ENOENT) {
4560                 image_id = kstrdup("", GFP_KERNEL);
4561                 ret = image_id ? 0 : -ENOMEM;
4562                 if (!ret)
4563                         rbd_dev->image_format = 1;
4564         } else if (ret > sizeof (__le32)) {
4565                 void *p = response;
4566
4567                 image_id = ceph_extract_encoded_string(&p, p + ret,
4568                                                 NULL, GFP_NOIO);
4569                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4570                 if (!ret)
4571                         rbd_dev->image_format = 2;
4572         } else {
4573                 ret = -EINVAL;
4574         }
4575
4576         if (!ret) {
4577                 rbd_dev->spec->image_id = image_id;
4578                 dout("image_id is %s\n", image_id);
4579         }
4580 out:
4581         kfree(response);
4582         kfree(object_name);
4583
4584         return ret;
4585 }
4586
4587 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4588 {
4589         int ret;
4590         size_t size;
4591
4592         /* Record the header object name for this rbd image. */
4593
4594         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4595         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4596         if (!rbd_dev->header_name) {
4597                 ret = -ENOMEM;
4598                 goto out_err;
4599         }
4600         sprintf(rbd_dev->header_name, "%s%s",
4601                 rbd_dev->spec->image_name, RBD_SUFFIX);
4602
4603         /* Populate rbd image metadata */
4604
4605         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4606         if (ret < 0)
4607                 goto out_err;
4608
4609         /* Version 1 images have no parent (no layering) */
4610
4611         rbd_dev->parent_spec = NULL;
4612         rbd_dev->parent_overlap = 0;
4613
4614         dout("discovered version 1 image, header name is %s\n",
4615                 rbd_dev->header_name);
4616
4617         return 0;
4618
4619 out_err:
4620         kfree(rbd_dev->header_name);
4621         rbd_dev->header_name = NULL;
4622         kfree(rbd_dev->spec->image_id);
4623         rbd_dev->spec->image_id = NULL;
4624
4625         return ret;
4626 }
4627
4628 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4629 {
4630         size_t size;
4631         int ret;
4632         u64 ver = 0;
4633
4634         /*
4635          * Image id was filled in by the caller.  Record the header
4636          * object name for this rbd image.
4637          */
4638         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4639         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4640         if (!rbd_dev->header_name)
4641                 return -ENOMEM;
4642         sprintf(rbd_dev->header_name, "%s%s",
4643                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4644
4645         /* Get the size and object order for the image */
4646         ret = rbd_dev_v2_image_size(rbd_dev);
4647         if (ret)
4648                 goto out_err;
4649
4650         /* Get the object prefix (a.k.a. block_name) for the image */
4651
4652         ret = rbd_dev_v2_object_prefix(rbd_dev);
4653         if (ret)
4654                 goto out_err;
4655
4656         /* Get the and check features for the image */
4657
4658         ret = rbd_dev_v2_features(rbd_dev);
4659         if (ret)
4660                 goto out_err;
4661
4662         /* If the image supports layering, get the parent info */
4663
4664         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4665                 ret = rbd_dev_v2_parent_info(rbd_dev);
4666                 if (ret)
4667                         goto out_err;
4668                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4669                                         "layered rbd images is EXPERIMENTAL!");
4670         }
4671
4672         /* If the image supports fancy striping, get its parameters */
4673
4674         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4675                 ret = rbd_dev_v2_striping_info(rbd_dev);
4676                 if (ret < 0)
4677                         goto out_err;
4678         }
4679
4680         /* crypto and compression type aren't (yet) supported for v2 images */
4681
4682         rbd_dev->header.crypt_type = 0;
4683         rbd_dev->header.comp_type = 0;
4684
4685         /* Get the snapshot context, plus the header version */
4686
4687         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4688         if (ret)
4689                 goto out_err;
4690         rbd_dev->header.obj_version = ver;
4691
4692         dout("discovered version 2 image, header name is %s\n",
4693                 rbd_dev->header_name);
4694
4695         return 0;
4696 out_err:
4697         rbd_dev->parent_overlap = 0;
4698         rbd_spec_put(rbd_dev->parent_spec);
4699         rbd_dev->parent_spec = NULL;
4700         kfree(rbd_dev->header_name);
4701         rbd_dev->header_name = NULL;
4702         kfree(rbd_dev->header.object_prefix);
4703         rbd_dev->header.object_prefix = NULL;
4704
4705         return ret;
4706 }
4707
4708 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4709 {
4710         struct rbd_device *parent = NULL;
4711         struct rbd_spec *parent_spec = NULL;
4712         struct rbd_client *rbdc = NULL;
4713         int ret;
4714
4715         /* no need to lock here, as rbd_dev is not registered yet */
4716         ret = rbd_dev_snaps_update(rbd_dev);
4717         if (ret)
4718                 return ret;
4719
4720         ret = rbd_dev_spec_update(rbd_dev);
4721         if (ret)
4722                 goto err_out_snaps;
4723
4724         ret = rbd_dev_set_mapping(rbd_dev);
4725         if (ret)
4726                 goto err_out_snaps;
4727
4728         /* generate unique id: find highest unique id, add one */
4729         rbd_dev_id_get(rbd_dev);
4730
4731         /* Fill in the device name, now that we have its id. */
4732         BUILD_BUG_ON(DEV_NAME_LEN
4733                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4734         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4735
4736         /* Get our block major device number. */
4737
4738         ret = register_blkdev(0, rbd_dev->name);
4739         if (ret < 0)
4740                 goto err_out_id;
4741         rbd_dev->major = ret;
4742
4743         /* Set up the blkdev mapping. */
4744
4745         ret = rbd_init_disk(rbd_dev);
4746         if (ret)
4747                 goto err_out_blkdev;
4748
4749         ret = rbd_bus_add_dev(rbd_dev);
4750         if (ret)
4751                 goto err_out_disk;
4752
4753         /*
4754          * At this point cleanup in the event of an error is the job
4755          * of the sysfs code (initiated by rbd_bus_del_dev()).
4756          */
4757         /* Probe the parent if there is one */
4758
4759         if (rbd_dev->parent_spec) {
4760                 /*
4761                  * We need to pass a reference to the client and the
4762                  * parent spec when creating the parent rbd_dev.
4763                  * Images related by parent/child relationships
4764                  * always share both.
4765                  */
4766                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4767                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4768
4769                 parent = rbd_dev_create(rbdc, parent_spec);
4770                 if (!parent) {
4771                         ret = -ENOMEM;
4772                         goto err_out_spec;
4773                 }
4774                 rbdc = NULL;            /* parent now owns reference */
4775                 parent_spec = NULL;     /* parent now owns reference */
4776                 ret = rbd_dev_image_probe(parent);
4777                 if (ret < 0)
4778                         goto err_out_parent;
4779                 rbd_dev->parent = parent;
4780         }
4781
4782         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4783         if (ret)
4784                 goto err_out_bus;
4785
4786         /* Everything's ready.  Announce the disk to the world. */
4787
4788         add_disk(rbd_dev->disk);
4789
4790         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4791                 (unsigned long long) rbd_dev->mapping.size);
4792
4793         return ret;
4794
4795 err_out_parent:
4796         rbd_spec_put(rbd_dev->parent_spec);
4797         kfree(rbd_dev->header_name);
4798         rbd_dev_destroy(parent);
4799 err_out_spec:
4800         rbd_spec_put(parent_spec);
4801         rbd_put_client(rbdc);
4802 err_out_bus:
4803         /* this will also clean up rest of rbd_dev stuff */
4804
4805         rbd_bus_del_dev(rbd_dev);
4806
4807         return ret;
4808 err_out_disk:
4809         rbd_free_disk(rbd_dev);
4810 err_out_blkdev:
4811         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4812 err_out_id:
4813         rbd_dev_id_put(rbd_dev);
4814 err_out_snaps:
4815         rbd_remove_all_snaps(rbd_dev);
4816
4817         return ret;
4818 }
4819
4820 /*
4821  * Probe for the existence of the header object for the given rbd
4822  * device.  For format 2 images this includes determining the image
4823  * id.
4824  */
4825 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4826 {
4827         int ret;
4828
4829         /*
4830          * Get the id from the image id object.  If it's not a
4831          * format 2 image, we'll get ENOENT back, and we'll assume
4832          * it's a format 1 image.
4833          */
4834         ret = rbd_dev_image_id(rbd_dev);
4835         if (ret)
4836                 return ret;
4837         rbd_assert(rbd_dev->spec->image_id);
4838         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4839
4840         if (rbd_dev->image_format == 1)
4841                 ret = rbd_dev_v1_probe(rbd_dev);
4842         else
4843                 ret = rbd_dev_v2_probe(rbd_dev);
4844         if (ret)
4845                 goto out_err;
4846
4847         ret = rbd_dev_probe_finish(rbd_dev);
4848         if (ret)
4849                 rbd_header_free(&rbd_dev->header);
4850
4851         return ret;
4852 out_err:
4853         kfree(rbd_dev->spec->image_id);
4854         rbd_dev->spec->image_id = NULL;
4855
4856         dout("probe failed, returning %d\n", ret);
4857
4858         return ret;
4859 }
4860
4861 static ssize_t rbd_add(struct bus_type *bus,
4862                        const char *buf,
4863                        size_t count)
4864 {
4865         struct rbd_device *rbd_dev = NULL;
4866         struct ceph_options *ceph_opts = NULL;
4867         struct rbd_options *rbd_opts = NULL;
4868         struct rbd_spec *spec = NULL;
4869         struct rbd_client *rbdc;
4870         struct ceph_osd_client *osdc;
4871         int rc = -ENOMEM;
4872
4873         if (!try_module_get(THIS_MODULE))
4874                 return -ENODEV;
4875
4876         /* parse add command */
4877         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4878         if (rc < 0)
4879                 goto err_out_module;
4880
4881         rbdc = rbd_get_client(ceph_opts);
4882         if (IS_ERR(rbdc)) {
4883                 rc = PTR_ERR(rbdc);
4884                 goto err_out_args;
4885         }
4886         ceph_opts = NULL;       /* rbd_dev client now owns this */
4887
4888         /* pick the pool */
4889         osdc = &rbdc->client->osdc;
4890         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4891         if (rc < 0)
4892                 goto err_out_client;
4893         spec->pool_id = (u64)rc;
4894
4895         /* The ceph file layout needs to fit pool id in 32 bits */
4896
4897         if (spec->pool_id > (u64)U32_MAX) {
4898                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4899                                 (unsigned long long)spec->pool_id, U32_MAX);
4900                 rc = -EIO;
4901                 goto err_out_client;
4902         }
4903
4904         rbd_dev = rbd_dev_create(rbdc, spec);
4905         if (!rbd_dev)
4906                 goto err_out_client;
4907         rbdc = NULL;            /* rbd_dev now owns this */
4908         spec = NULL;            /* rbd_dev now owns this */
4909
4910         rbd_dev->mapping.read_only = rbd_opts->read_only;
4911         kfree(rbd_opts);
4912         rbd_opts = NULL;        /* done with this */
4913
4914         rc = rbd_dev_image_probe(rbd_dev);
4915         if (rc < 0)
4916                 goto err_out_rbd_dev;
4917
4918         return count;
4919 err_out_rbd_dev:
4920         rbd_spec_put(rbd_dev->parent_spec);
4921         kfree(rbd_dev->header_name);
4922         rbd_dev_destroy(rbd_dev);
4923 err_out_client:
4924         rbd_put_client(rbdc);
4925 err_out_args:
4926         if (ceph_opts)
4927                 ceph_destroy_options(ceph_opts);
4928         kfree(rbd_opts);
4929         rbd_spec_put(spec);
4930 err_out_module:
4931         module_put(THIS_MODULE);
4932
4933         dout("Error adding device %s\n", buf);
4934
4935         return (ssize_t)rc;
4936 }
4937
4938 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4939 {
4940         struct list_head *tmp;
4941         struct rbd_device *rbd_dev;
4942
4943         spin_lock(&rbd_dev_list_lock);
4944         list_for_each(tmp, &rbd_dev_list) {
4945                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4946                 if (rbd_dev->dev_id == dev_id) {
4947                         spin_unlock(&rbd_dev_list_lock);
4948                         return rbd_dev;
4949                 }
4950         }
4951         spin_unlock(&rbd_dev_list_lock);
4952         return NULL;
4953 }
4954
4955 static void rbd_dev_release(struct device *dev)
4956 {
4957         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4958
4959         if (rbd_dev->watch_event)
4960                 rbd_dev_header_watch_sync(rbd_dev, 0);
4961
4962         /* clean up and free blkdev */
4963         rbd_free_disk(rbd_dev);
4964         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4965
4966         /* release allocated disk header fields */
4967         rbd_header_free(&rbd_dev->header);
4968
4969         /* done with the id, and with the rbd_dev */
4970         rbd_dev_id_put(rbd_dev);
4971         rbd_assert(rbd_dev->rbd_client != NULL);
4972         rbd_spec_put(rbd_dev->parent_spec);
4973         kfree(rbd_dev->header_name);
4974         rbd_dev_destroy(rbd_dev);
4975
4976         /* release module ref */
4977         module_put(THIS_MODULE);
4978 }
4979
4980 static void __rbd_remove(struct rbd_device *rbd_dev)
4981 {
4982         rbd_remove_all_snaps(rbd_dev);
4983         rbd_bus_del_dev(rbd_dev);
4984 }
4985
4986 static ssize_t rbd_remove(struct bus_type *bus,
4987                           const char *buf,
4988                           size_t count)
4989 {
4990         struct rbd_device *rbd_dev = NULL;
4991         int target_id, rc;
4992         unsigned long ul;
4993         int ret = count;
4994
4995         rc = strict_strtoul(buf, 10, &ul);
4996         if (rc)
4997                 return rc;
4998
4999         /* convert to int; abort if we lost anything in the conversion */
5000         target_id = (int) ul;
5001         if (target_id != ul)
5002                 return -EINVAL;
5003
5004         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5005
5006         rbd_dev = __rbd_get_dev(target_id);
5007         if (!rbd_dev) {
5008                 ret = -ENOENT;
5009                 goto done;
5010         }
5011
5012         spin_lock_irq(&rbd_dev->lock);
5013         if (rbd_dev->open_count)
5014                 ret = -EBUSY;
5015         else
5016                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5017         spin_unlock_irq(&rbd_dev->lock);
5018         if (ret < 0)
5019                 goto done;
5020
5021         while (rbd_dev->parent_spec) {
5022                 struct rbd_device *first = rbd_dev;
5023                 struct rbd_device *second = first->parent;
5024                 struct rbd_device *third;
5025
5026                 /*
5027                  * Follow to the parent with no grandparent and
5028                  * remove it.
5029                  */
5030                 while (second && (third = second->parent)) {
5031                         first = second;
5032                         second = third;
5033                 }
5034                 __rbd_remove(second);
5035                 rbd_spec_put(first->parent_spec);
5036                 first->parent_spec = NULL;
5037                 first->parent_overlap = 0;
5038                 first->parent = NULL;
5039         }
5040         __rbd_remove(rbd_dev);
5041
5042 done:
5043         mutex_unlock(&ctl_mutex);
5044
5045         return ret;
5046 }
5047
5048 /*
5049  * create control files in sysfs
5050  * /sys/bus/rbd/...
5051  */
5052 static int rbd_sysfs_init(void)
5053 {
5054         int ret;
5055
5056         ret = device_register(&rbd_root_dev);
5057         if (ret < 0)
5058                 return ret;
5059
5060         ret = bus_register(&rbd_bus_type);
5061         if (ret < 0)
5062                 device_unregister(&rbd_root_dev);
5063
5064         return ret;
5065 }
5066
5067 static void rbd_sysfs_cleanup(void)
5068 {
5069         bus_unregister(&rbd_bus_type);
5070         device_unregister(&rbd_root_dev);
5071 }
5072
5073 static int __init rbd_init(void)
5074 {
5075         int rc;
5076
5077         if (!libceph_compatible(NULL)) {
5078                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5079
5080                 return -EINVAL;
5081         }
5082         rc = rbd_sysfs_init();
5083         if (rc)
5084                 return rc;
5085         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5086         return 0;
5087 }
5088
5089 static void __exit rbd_exit(void)
5090 {
5091         rbd_sysfs_cleanup();
5092 }
5093
5094 module_init(rbd_init);
5095 module_exit(rbd_exit);
5096
5097 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5098 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5099 MODULE_DESCRIPTION("rados block device");
5100
5101 /* following authorship retained from original osdblk.c */
5102 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5103
5104 MODULE_LICENSE("GPL");