]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: set mapping info earlier
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
676
677 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
678 {
679         struct ceph_snap_context *snapc;
680         size_t size;
681
682         size = sizeof (struct ceph_snap_context);
683         size += snap_count * sizeof (snapc->snaps[0]);
684         snapc = kzalloc(size, GFP_KERNEL);
685         if (!snapc)
686                 return NULL;
687
688         atomic_set(&snapc->nref, 1);
689         snapc->num_snaps = snap_count;
690
691         return snapc;
692 }
693
694 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
695 {
696         (void)ceph_get_snap_context(snapc);
697 }
698
699 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
700 {
701         ceph_put_snap_context(snapc);
702 }
703
704 /*
705  * Drop reference to ceph client node. If it's not referenced anymore, release
706  * it.
707  */
708 static void rbd_put_client(struct rbd_client *rbdc)
709 {
710         if (rbdc)
711                 kref_put(&rbdc->kref, rbd_client_release);
712 }
713
714 static bool rbd_image_format_valid(u32 image_format)
715 {
716         return image_format == 1 || image_format == 2;
717 }
718
719 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
720 {
721         size_t size;
722         u32 snap_count;
723
724         /* The header has to start with the magic rbd header text */
725         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
726                 return false;
727
728         /* The bio layer requires at least sector-sized I/O */
729
730         if (ondisk->options.order < SECTOR_SHIFT)
731                 return false;
732
733         /* If we use u64 in a few spots we may be able to loosen this */
734
735         if (ondisk->options.order > 8 * sizeof (int) - 1)
736                 return false;
737
738         /*
739          * The size of a snapshot header has to fit in a size_t, and
740          * that limits the number of snapshots.
741          */
742         snap_count = le32_to_cpu(ondisk->snap_count);
743         size = SIZE_MAX - sizeof (struct ceph_snap_context);
744         if (snap_count > size / sizeof (__le64))
745                 return false;
746
747         /*
748          * Not only that, but the size of the entire the snapshot
749          * header must also be representable in a size_t.
750          */
751         size -= snap_count * sizeof (__le64);
752         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
753                 return false;
754
755         return true;
756 }
757
758 /*
759  * Create a new header structure, translate header format from the on-disk
760  * header.
761  */
762 static int rbd_header_from_disk(struct rbd_image_header *header,
763                                  struct rbd_image_header_ondisk *ondisk)
764 {
765         u32 snap_count;
766         size_t len;
767         size_t size;
768         u32 i;
769
770         memset(header, 0, sizeof (*header));
771
772         snap_count = le32_to_cpu(ondisk->snap_count);
773
774         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
775         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
776         if (!header->object_prefix)
777                 return -ENOMEM;
778         memcpy(header->object_prefix, ondisk->object_prefix, len);
779         header->object_prefix[len] = '\0';
780
781         if (snap_count) {
782                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
783
784                 /* Save a copy of the snapshot names */
785
786                 if (snap_names_len > (u64) SIZE_MAX)
787                         return -EIO;
788                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
789                 if (!header->snap_names)
790                         goto out_err;
791                 /*
792                  * Note that rbd_dev_v1_header_read() guarantees
793                  * the ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
798                         snap_names_len);
799
800                 /* Record each snapshot's size */
801
802                 size = snap_count * sizeof (*header->snap_sizes);
803                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
804                 if (!header->snap_sizes)
805                         goto out_err;
806                 for (i = 0; i < snap_count; i++)
807                         header->snap_sizes[i] =
808                                 le64_to_cpu(ondisk->snaps[i].image_size);
809         } else {
810                 header->snap_names = NULL;
811                 header->snap_sizes = NULL;
812         }
813
814         header->features = 0;   /* No features support in v1 images */
815         header->obj_order = ondisk->options.order;
816         header->crypt_type = ondisk->options.crypt_type;
817         header->comp_type = ondisk->options.comp_type;
818
819         /* Allocate and fill in the snapshot context */
820
821         header->image_size = le64_to_cpu(ondisk->image_size);
822
823         header->snapc = rbd_snap_context_create(snap_count);
824         if (!header->snapc)
825                 goto out_err;
826         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
827         for (i = 0; i < snap_count; i++)
828                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
829
830         return 0;
831
832 out_err:
833         kfree(header->snap_sizes);
834         header->snap_sizes = NULL;
835         kfree(header->snap_names);
836         header->snap_names = NULL;
837         kfree(header->object_prefix);
838         header->object_prefix = NULL;
839
840         return -ENOMEM;
841 }
842
843 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
844 {
845         struct rbd_snap *snap;
846
847         if (snap_id == CEPH_NOSNAP)
848                 return RBD_SNAP_HEAD_NAME;
849
850         list_for_each_entry(snap, &rbd_dev->snaps, node)
851                 if (snap_id == snap->id)
852                         return snap->name;
853
854         return NULL;
855 }
856
857 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
858                                         const char *snap_name)
859 {
860         struct rbd_snap *snap;
861
862         list_for_each_entry(snap, &rbd_dev->snaps, node)
863                 if (!strcmp(snap_name, snap->name))
864                         return snap;
865
866         return NULL;
867 }
868
869 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
870 {
871         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
872                     sizeof (RBD_SNAP_HEAD_NAME))) {
873                 rbd_dev->mapping.size = rbd_dev->header.image_size;
874                 rbd_dev->mapping.features = rbd_dev->header.features;
875         } else {
876                 struct rbd_snap *snap;
877
878                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
879                 if (!snap)
880                         return -ENOENT;
881                 rbd_dev->mapping.size = snap->size;
882                 rbd_dev->mapping.features = snap->features;
883                 rbd_dev->mapping.read_only = true;
884         }
885
886         return 0;
887 }
888
889 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
890 {
891         rbd_dev->mapping.size = 0;
892         rbd_dev->mapping.features = 0;
893         rbd_dev->mapping.read_only = true;
894 }
895
896 static void rbd_header_free(struct rbd_image_header *header)
897 {
898         kfree(header->object_prefix);
899         header->object_prefix = NULL;
900         kfree(header->snap_sizes);
901         header->snap_sizes = NULL;
902         kfree(header->snap_names);
903         header->snap_names = NULL;
904         rbd_snap_context_put(header->snapc);
905         header->snapc = NULL;
906 }
907
908 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
909 {
910         char *name;
911         u64 segment;
912         int ret;
913
914         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
915         if (!name)
916                 return NULL;
917         segment = offset >> rbd_dev->header.obj_order;
918         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
919                         rbd_dev->header.object_prefix, segment);
920         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
921                 pr_err("error formatting segment name for #%llu (%d)\n",
922                         segment, ret);
923                 kfree(name);
924                 name = NULL;
925         }
926
927         return name;
928 }
929
930 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
931 {
932         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
933
934         return offset & (segment_size - 1);
935 }
936
937 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
938                                 u64 offset, u64 length)
939 {
940         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
941
942         offset &= segment_size - 1;
943
944         rbd_assert(length <= U64_MAX - offset);
945         if (offset + length > segment_size)
946                 length = segment_size - offset;
947
948         return length;
949 }
950
951 /*
952  * returns the size of an object in the image
953  */
954 static u64 rbd_obj_bytes(struct rbd_image_header *header)
955 {
956         return 1 << header->obj_order;
957 }
958
959 /*
960  * bio helpers
961  */
962
963 static void bio_chain_put(struct bio *chain)
964 {
965         struct bio *tmp;
966
967         while (chain) {
968                 tmp = chain;
969                 chain = chain->bi_next;
970                 bio_put(tmp);
971         }
972 }
973
974 /*
975  * zeros a bio chain, starting at specific offset
976  */
977 static void zero_bio_chain(struct bio *chain, int start_ofs)
978 {
979         struct bio_vec *bv;
980         unsigned long flags;
981         void *buf;
982         int i;
983         int pos = 0;
984
985         while (chain) {
986                 bio_for_each_segment(bv, chain, i) {
987                         if (pos + bv->bv_len > start_ofs) {
988                                 int remainder = max(start_ofs - pos, 0);
989                                 buf = bvec_kmap_irq(bv, &flags);
990                                 memset(buf + remainder, 0,
991                                        bv->bv_len - remainder);
992                                 bvec_kunmap_irq(buf, &flags);
993                         }
994                         pos += bv->bv_len;
995                 }
996
997                 chain = chain->bi_next;
998         }
999 }
1000
1001 /*
1002  * similar to zero_bio_chain(), zeros data defined by a page array,
1003  * starting at the given byte offset from the start of the array and
1004  * continuing up to the given end offset.  The pages array is
1005  * assumed to be big enough to hold all bytes up to the end.
1006  */
1007 static void zero_pages(struct page **pages, u64 offset, u64 end)
1008 {
1009         struct page **page = &pages[offset >> PAGE_SHIFT];
1010
1011         rbd_assert(end > offset);
1012         rbd_assert(end - offset <= (u64)SIZE_MAX);
1013         while (offset < end) {
1014                 size_t page_offset;
1015                 size_t length;
1016                 unsigned long flags;
1017                 void *kaddr;
1018
1019                 page_offset = (size_t)(offset & ~PAGE_MASK);
1020                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1021                 local_irq_save(flags);
1022                 kaddr = kmap_atomic(*page);
1023                 memset(kaddr + page_offset, 0, length);
1024                 kunmap_atomic(kaddr);
1025                 local_irq_restore(flags);
1026
1027                 offset += length;
1028                 page++;
1029         }
1030 }
1031
1032 /*
1033  * Clone a portion of a bio, starting at the given byte offset
1034  * and continuing for the number of bytes indicated.
1035  */
1036 static struct bio *bio_clone_range(struct bio *bio_src,
1037                                         unsigned int offset,
1038                                         unsigned int len,
1039                                         gfp_t gfpmask)
1040 {
1041         struct bio_vec *bv;
1042         unsigned int resid;
1043         unsigned short idx;
1044         unsigned int voff;
1045         unsigned short end_idx;
1046         unsigned short vcnt;
1047         struct bio *bio;
1048
1049         /* Handle the easy case for the caller */
1050
1051         if (!offset && len == bio_src->bi_size)
1052                 return bio_clone(bio_src, gfpmask);
1053
1054         if (WARN_ON_ONCE(!len))
1055                 return NULL;
1056         if (WARN_ON_ONCE(len > bio_src->bi_size))
1057                 return NULL;
1058         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1059                 return NULL;
1060
1061         /* Find first affected segment... */
1062
1063         resid = offset;
1064         __bio_for_each_segment(bv, bio_src, idx, 0) {
1065                 if (resid < bv->bv_len)
1066                         break;
1067                 resid -= bv->bv_len;
1068         }
1069         voff = resid;
1070
1071         /* ...and the last affected segment */
1072
1073         resid += len;
1074         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1075                 if (resid <= bv->bv_len)
1076                         break;
1077                 resid -= bv->bv_len;
1078         }
1079         vcnt = end_idx - idx + 1;
1080
1081         /* Build the clone */
1082
1083         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1084         if (!bio)
1085                 return NULL;    /* ENOMEM */
1086
1087         bio->bi_bdev = bio_src->bi_bdev;
1088         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1089         bio->bi_rw = bio_src->bi_rw;
1090         bio->bi_flags |= 1 << BIO_CLONED;
1091
1092         /*
1093          * Copy over our part of the bio_vec, then update the first
1094          * and last (or only) entries.
1095          */
1096         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1097                         vcnt * sizeof (struct bio_vec));
1098         bio->bi_io_vec[0].bv_offset += voff;
1099         if (vcnt > 1) {
1100                 bio->bi_io_vec[0].bv_len -= voff;
1101                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1102         } else {
1103                 bio->bi_io_vec[0].bv_len = len;
1104         }
1105
1106         bio->bi_vcnt = vcnt;
1107         bio->bi_size = len;
1108         bio->bi_idx = 0;
1109
1110         return bio;
1111 }
1112
1113 /*
1114  * Clone a portion of a bio chain, starting at the given byte offset
1115  * into the first bio in the source chain and continuing for the
1116  * number of bytes indicated.  The result is another bio chain of
1117  * exactly the given length, or a null pointer on error.
1118  *
1119  * The bio_src and offset parameters are both in-out.  On entry they
1120  * refer to the first source bio and the offset into that bio where
1121  * the start of data to be cloned is located.
1122  *
1123  * On return, bio_src is updated to refer to the bio in the source
1124  * chain that contains first un-cloned byte, and *offset will
1125  * contain the offset of that byte within that bio.
1126  */
1127 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1128                                         unsigned int *offset,
1129                                         unsigned int len,
1130                                         gfp_t gfpmask)
1131 {
1132         struct bio *bi = *bio_src;
1133         unsigned int off = *offset;
1134         struct bio *chain = NULL;
1135         struct bio **end;
1136
1137         /* Build up a chain of clone bios up to the limit */
1138
1139         if (!bi || off >= bi->bi_size || !len)
1140                 return NULL;            /* Nothing to clone */
1141
1142         end = &chain;
1143         while (len) {
1144                 unsigned int bi_size;
1145                 struct bio *bio;
1146
1147                 if (!bi) {
1148                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1149                         goto out_err;   /* EINVAL; ran out of bio's */
1150                 }
1151                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1152                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1153                 if (!bio)
1154                         goto out_err;   /* ENOMEM */
1155
1156                 *end = bio;
1157                 end = &bio->bi_next;
1158
1159                 off += bi_size;
1160                 if (off == bi->bi_size) {
1161                         bi = bi->bi_next;
1162                         off = 0;
1163                 }
1164                 len -= bi_size;
1165         }
1166         *bio_src = bi;
1167         *offset = off;
1168
1169         return chain;
1170 out_err:
1171         bio_chain_put(chain);
1172
1173         return NULL;
1174 }
1175
1176 /*
1177  * The default/initial value for all object request flags is 0.  For
1178  * each flag, once its value is set to 1 it is never reset to 0
1179  * again.
1180  */
1181 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1182 {
1183         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1184                 struct rbd_device *rbd_dev;
1185
1186                 rbd_dev = obj_request->img_request->rbd_dev;
1187                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1188                         obj_request);
1189         }
1190 }
1191
1192 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1193 {
1194         smp_mb();
1195         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1196 }
1197
1198 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1199 {
1200         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1201                 struct rbd_device *rbd_dev = NULL;
1202
1203                 if (obj_request_img_data_test(obj_request))
1204                         rbd_dev = obj_request->img_request->rbd_dev;
1205                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1206                         obj_request);
1207         }
1208 }
1209
1210 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1211 {
1212         smp_mb();
1213         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1214 }
1215
1216 /*
1217  * This sets the KNOWN flag after (possibly) setting the EXISTS
1218  * flag.  The latter is set based on the "exists" value provided.
1219  *
1220  * Note that for our purposes once an object exists it never goes
1221  * away again.  It's possible that the response from two existence
1222  * checks are separated by the creation of the target object, and
1223  * the first ("doesn't exist") response arrives *after* the second
1224  * ("does exist").  In that case we ignore the second one.
1225  */
1226 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1227                                 bool exists)
1228 {
1229         if (exists)
1230                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1231         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1232         smp_mb();
1233 }
1234
1235 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1236 {
1237         smp_mb();
1238         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1239 }
1240
1241 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1242 {
1243         smp_mb();
1244         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1245 }
1246
1247 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1248 {
1249         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1250                 atomic_read(&obj_request->kref.refcount));
1251         kref_get(&obj_request->kref);
1252 }
1253
1254 static void rbd_obj_request_destroy(struct kref *kref);
1255 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1256 {
1257         rbd_assert(obj_request != NULL);
1258         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1259                 atomic_read(&obj_request->kref.refcount));
1260         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1261 }
1262
1263 static void rbd_img_request_get(struct rbd_img_request *img_request)
1264 {
1265         dout("%s: img %p (was %d)\n", __func__, img_request,
1266                 atomic_read(&img_request->kref.refcount));
1267         kref_get(&img_request->kref);
1268 }
1269
1270 static void rbd_img_request_destroy(struct kref *kref);
1271 static void rbd_img_request_put(struct rbd_img_request *img_request)
1272 {
1273         rbd_assert(img_request != NULL);
1274         dout("%s: img %p (was %d)\n", __func__, img_request,
1275                 atomic_read(&img_request->kref.refcount));
1276         kref_put(&img_request->kref, rbd_img_request_destroy);
1277 }
1278
1279 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1280                                         struct rbd_obj_request *obj_request)
1281 {
1282         rbd_assert(obj_request->img_request == NULL);
1283
1284         /* Image request now owns object's original reference */
1285         obj_request->img_request = img_request;
1286         obj_request->which = img_request->obj_request_count;
1287         rbd_assert(!obj_request_img_data_test(obj_request));
1288         obj_request_img_data_set(obj_request);
1289         rbd_assert(obj_request->which != BAD_WHICH);
1290         img_request->obj_request_count++;
1291         list_add_tail(&obj_request->links, &img_request->obj_requests);
1292         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1293                 obj_request->which);
1294 }
1295
1296 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1297                                         struct rbd_obj_request *obj_request)
1298 {
1299         rbd_assert(obj_request->which != BAD_WHICH);
1300
1301         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1302                 obj_request->which);
1303         list_del(&obj_request->links);
1304         rbd_assert(img_request->obj_request_count > 0);
1305         img_request->obj_request_count--;
1306         rbd_assert(obj_request->which == img_request->obj_request_count);
1307         obj_request->which = BAD_WHICH;
1308         rbd_assert(obj_request_img_data_test(obj_request));
1309         rbd_assert(obj_request->img_request == img_request);
1310         obj_request->img_request = NULL;
1311         obj_request->callback = NULL;
1312         rbd_obj_request_put(obj_request);
1313 }
1314
1315 static bool obj_request_type_valid(enum obj_request_type type)
1316 {
1317         switch (type) {
1318         case OBJ_REQUEST_NODATA:
1319         case OBJ_REQUEST_BIO:
1320         case OBJ_REQUEST_PAGES:
1321                 return true;
1322         default:
1323                 return false;
1324         }
1325 }
1326
1327 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1328                                 struct rbd_obj_request *obj_request)
1329 {
1330         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1331
1332         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1333 }
1334
1335 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1336 {
1337
1338         dout("%s: img %p\n", __func__, img_request);
1339
1340         /*
1341          * If no error occurred, compute the aggregate transfer
1342          * count for the image request.  We could instead use
1343          * atomic64_cmpxchg() to update it as each object request
1344          * completes; not clear which way is better off hand.
1345          */
1346         if (!img_request->result) {
1347                 struct rbd_obj_request *obj_request;
1348                 u64 xferred = 0;
1349
1350                 for_each_obj_request(img_request, obj_request)
1351                         xferred += obj_request->xferred;
1352                 img_request->xferred = xferred;
1353         }
1354
1355         if (img_request->callback)
1356                 img_request->callback(img_request);
1357         else
1358                 rbd_img_request_put(img_request);
1359 }
1360
1361 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1362
1363 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1364 {
1365         dout("%s: obj %p\n", __func__, obj_request);
1366
1367         return wait_for_completion_interruptible(&obj_request->completion);
1368 }
1369
1370 /*
1371  * The default/initial value for all image request flags is 0.  Each
1372  * is conditionally set to 1 at image request initialization time
1373  * and currently never change thereafter.
1374  */
1375 static void img_request_write_set(struct rbd_img_request *img_request)
1376 {
1377         set_bit(IMG_REQ_WRITE, &img_request->flags);
1378         smp_mb();
1379 }
1380
1381 static bool img_request_write_test(struct rbd_img_request *img_request)
1382 {
1383         smp_mb();
1384         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1385 }
1386
1387 static void img_request_child_set(struct rbd_img_request *img_request)
1388 {
1389         set_bit(IMG_REQ_CHILD, &img_request->flags);
1390         smp_mb();
1391 }
1392
1393 static bool img_request_child_test(struct rbd_img_request *img_request)
1394 {
1395         smp_mb();
1396         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1397 }
1398
1399 static void img_request_layered_set(struct rbd_img_request *img_request)
1400 {
1401         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1402         smp_mb();
1403 }
1404
1405 static bool img_request_layered_test(struct rbd_img_request *img_request)
1406 {
1407         smp_mb();
1408         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1409 }
1410
1411 static void
1412 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1413 {
1414         u64 xferred = obj_request->xferred;
1415         u64 length = obj_request->length;
1416
1417         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1418                 obj_request, obj_request->img_request, obj_request->result,
1419                 xferred, length);
1420         /*
1421          * ENOENT means a hole in the image.  We zero-fill the
1422          * entire length of the request.  A short read also implies
1423          * zero-fill to the end of the request.  Either way we
1424          * update the xferred count to indicate the whole request
1425          * was satisfied.
1426          */
1427         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1428         if (obj_request->result == -ENOENT) {
1429                 if (obj_request->type == OBJ_REQUEST_BIO)
1430                         zero_bio_chain(obj_request->bio_list, 0);
1431                 else
1432                         zero_pages(obj_request->pages, 0, length);
1433                 obj_request->result = 0;
1434                 obj_request->xferred = length;
1435         } else if (xferred < length && !obj_request->result) {
1436                 if (obj_request->type == OBJ_REQUEST_BIO)
1437                         zero_bio_chain(obj_request->bio_list, xferred);
1438                 else
1439                         zero_pages(obj_request->pages, xferred, length);
1440                 obj_request->xferred = length;
1441         }
1442         obj_request_done_set(obj_request);
1443 }
1444
1445 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1446 {
1447         dout("%s: obj %p cb %p\n", __func__, obj_request,
1448                 obj_request->callback);
1449         if (obj_request->callback)
1450                 obj_request->callback(obj_request);
1451         else
1452                 complete_all(&obj_request->completion);
1453 }
1454
1455 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p\n", __func__, obj_request);
1458         obj_request_done_set(obj_request);
1459 }
1460
1461 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1462 {
1463         struct rbd_img_request *img_request = NULL;
1464         struct rbd_device *rbd_dev = NULL;
1465         bool layered = false;
1466
1467         if (obj_request_img_data_test(obj_request)) {
1468                 img_request = obj_request->img_request;
1469                 layered = img_request && img_request_layered_test(img_request);
1470                 rbd_dev = img_request->rbd_dev;
1471         }
1472
1473         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1474                 obj_request, img_request, obj_request->result,
1475                 obj_request->xferred, obj_request->length);
1476         if (layered && obj_request->result == -ENOENT &&
1477                         obj_request->img_offset < rbd_dev->parent_overlap)
1478                 rbd_img_parent_read(obj_request);
1479         else if (img_request)
1480                 rbd_img_obj_request_read_callback(obj_request);
1481         else
1482                 obj_request_done_set(obj_request);
1483 }
1484
1485 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1486 {
1487         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1488                 obj_request->result, obj_request->length);
1489         /*
1490          * There is no such thing as a successful short write.  Set
1491          * it to our originally-requested length.
1492          */
1493         obj_request->xferred = obj_request->length;
1494         obj_request_done_set(obj_request);
1495 }
1496
1497 /*
1498  * For a simple stat call there's nothing to do.  We'll do more if
1499  * this is part of a write sequence for a layered image.
1500  */
1501 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1502 {
1503         dout("%s: obj %p\n", __func__, obj_request);
1504         obj_request_done_set(obj_request);
1505 }
1506
1507 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1508                                 struct ceph_msg *msg)
1509 {
1510         struct rbd_obj_request *obj_request = osd_req->r_priv;
1511         u16 opcode;
1512
1513         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1514         rbd_assert(osd_req == obj_request->osd_req);
1515         if (obj_request_img_data_test(obj_request)) {
1516                 rbd_assert(obj_request->img_request);
1517                 rbd_assert(obj_request->which != BAD_WHICH);
1518         } else {
1519                 rbd_assert(obj_request->which == BAD_WHICH);
1520         }
1521
1522         if (osd_req->r_result < 0)
1523                 obj_request->result = osd_req->r_result;
1524         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1525
1526         BUG_ON(osd_req->r_num_ops > 2);
1527
1528         /*
1529          * We support a 64-bit length, but ultimately it has to be
1530          * passed to blk_end_request(), which takes an unsigned int.
1531          */
1532         obj_request->xferred = osd_req->r_reply_op_len[0];
1533         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1534         opcode = osd_req->r_ops[0].op;
1535         switch (opcode) {
1536         case CEPH_OSD_OP_READ:
1537                 rbd_osd_read_callback(obj_request);
1538                 break;
1539         case CEPH_OSD_OP_WRITE:
1540                 rbd_osd_write_callback(obj_request);
1541                 break;
1542         case CEPH_OSD_OP_STAT:
1543                 rbd_osd_stat_callback(obj_request);
1544                 break;
1545         case CEPH_OSD_OP_CALL:
1546         case CEPH_OSD_OP_NOTIFY_ACK:
1547         case CEPH_OSD_OP_WATCH:
1548                 rbd_osd_trivial_callback(obj_request);
1549                 break;
1550         default:
1551                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1552                         obj_request->object_name, (unsigned short) opcode);
1553                 break;
1554         }
1555
1556         if (obj_request_done_test(obj_request))
1557                 rbd_obj_request_complete(obj_request);
1558 }
1559
1560 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1561 {
1562         struct rbd_img_request *img_request = obj_request->img_request;
1563         struct ceph_osd_request *osd_req = obj_request->osd_req;
1564         u64 snap_id;
1565
1566         rbd_assert(osd_req != NULL);
1567
1568         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1569         ceph_osdc_build_request(osd_req, obj_request->offset,
1570                         NULL, snap_id, NULL);
1571 }
1572
1573 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1574 {
1575         struct rbd_img_request *img_request = obj_request->img_request;
1576         struct ceph_osd_request *osd_req = obj_request->osd_req;
1577         struct ceph_snap_context *snapc;
1578         struct timespec mtime = CURRENT_TIME;
1579
1580         rbd_assert(osd_req != NULL);
1581
1582         snapc = img_request ? img_request->snapc : NULL;
1583         ceph_osdc_build_request(osd_req, obj_request->offset,
1584                         snapc, CEPH_NOSNAP, &mtime);
1585 }
1586
1587 static struct ceph_osd_request *rbd_osd_req_create(
1588                                         struct rbd_device *rbd_dev,
1589                                         bool write_request,
1590                                         struct rbd_obj_request *obj_request)
1591 {
1592         struct ceph_snap_context *snapc = NULL;
1593         struct ceph_osd_client *osdc;
1594         struct ceph_osd_request *osd_req;
1595
1596         if (obj_request_img_data_test(obj_request)) {
1597                 struct rbd_img_request *img_request = obj_request->img_request;
1598
1599                 rbd_assert(write_request ==
1600                                 img_request_write_test(img_request));
1601                 if (write_request)
1602                         snapc = img_request->snapc;
1603         }
1604
1605         /* Allocate and initialize the request, for the single op */
1606
1607         osdc = &rbd_dev->rbd_client->client->osdc;
1608         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1609         if (!osd_req)
1610                 return NULL;    /* ENOMEM */
1611
1612         if (write_request)
1613                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1614         else
1615                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1616
1617         osd_req->r_callback = rbd_osd_req_callback;
1618         osd_req->r_priv = obj_request;
1619
1620         osd_req->r_oid_len = strlen(obj_request->object_name);
1621         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1622         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1623
1624         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1625
1626         return osd_req;
1627 }
1628
1629 /*
1630  * Create a copyup osd request based on the information in the
1631  * object request supplied.  A copyup request has two osd ops,
1632  * a copyup method call, and a "normal" write request.
1633  */
1634 static struct ceph_osd_request *
1635 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1636 {
1637         struct rbd_img_request *img_request;
1638         struct ceph_snap_context *snapc;
1639         struct rbd_device *rbd_dev;
1640         struct ceph_osd_client *osdc;
1641         struct ceph_osd_request *osd_req;
1642
1643         rbd_assert(obj_request_img_data_test(obj_request));
1644         img_request = obj_request->img_request;
1645         rbd_assert(img_request);
1646         rbd_assert(img_request_write_test(img_request));
1647
1648         /* Allocate and initialize the request, for the two ops */
1649
1650         snapc = img_request->snapc;
1651         rbd_dev = img_request->rbd_dev;
1652         osdc = &rbd_dev->rbd_client->client->osdc;
1653         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1654         if (!osd_req)
1655                 return NULL;    /* ENOMEM */
1656
1657         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1658         osd_req->r_callback = rbd_osd_req_callback;
1659         osd_req->r_priv = obj_request;
1660
1661         osd_req->r_oid_len = strlen(obj_request->object_name);
1662         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1663         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1664
1665         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1666
1667         return osd_req;
1668 }
1669
1670
1671 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1672 {
1673         ceph_osdc_put_request(osd_req);
1674 }
1675
1676 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1677
1678 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1679                                                 u64 offset, u64 length,
1680                                                 enum obj_request_type type)
1681 {
1682         struct rbd_obj_request *obj_request;
1683         size_t size;
1684         char *name;
1685
1686         rbd_assert(obj_request_type_valid(type));
1687
1688         size = strlen(object_name) + 1;
1689         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1690         if (!obj_request)
1691                 return NULL;
1692
1693         name = (char *)(obj_request + 1);
1694         obj_request->object_name = memcpy(name, object_name, size);
1695         obj_request->offset = offset;
1696         obj_request->length = length;
1697         obj_request->flags = 0;
1698         obj_request->which = BAD_WHICH;
1699         obj_request->type = type;
1700         INIT_LIST_HEAD(&obj_request->links);
1701         init_completion(&obj_request->completion);
1702         kref_init(&obj_request->kref);
1703
1704         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1705                 offset, length, (int)type, obj_request);
1706
1707         return obj_request;
1708 }
1709
1710 static void rbd_obj_request_destroy(struct kref *kref)
1711 {
1712         struct rbd_obj_request *obj_request;
1713
1714         obj_request = container_of(kref, struct rbd_obj_request, kref);
1715
1716         dout("%s: obj %p\n", __func__, obj_request);
1717
1718         rbd_assert(obj_request->img_request == NULL);
1719         rbd_assert(obj_request->which == BAD_WHICH);
1720
1721         if (obj_request->osd_req)
1722                 rbd_osd_req_destroy(obj_request->osd_req);
1723
1724         rbd_assert(obj_request_type_valid(obj_request->type));
1725         switch (obj_request->type) {
1726         case OBJ_REQUEST_NODATA:
1727                 break;          /* Nothing to do */
1728         case OBJ_REQUEST_BIO:
1729                 if (obj_request->bio_list)
1730                         bio_chain_put(obj_request->bio_list);
1731                 break;
1732         case OBJ_REQUEST_PAGES:
1733                 if (obj_request->pages)
1734                         ceph_release_page_vector(obj_request->pages,
1735                                                 obj_request->page_count);
1736                 break;
1737         }
1738
1739         kfree(obj_request);
1740 }
1741
1742 /*
1743  * Caller is responsible for filling in the list of object requests
1744  * that comprises the image request, and the Linux request pointer
1745  * (if there is one).
1746  */
1747 static struct rbd_img_request *rbd_img_request_create(
1748                                         struct rbd_device *rbd_dev,
1749                                         u64 offset, u64 length,
1750                                         bool write_request,
1751                                         bool child_request)
1752 {
1753         struct rbd_img_request *img_request;
1754
1755         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1756         if (!img_request)
1757                 return NULL;
1758
1759         if (write_request) {
1760                 down_read(&rbd_dev->header_rwsem);
1761                 rbd_snap_context_get(rbd_dev->header.snapc);
1762                 up_read(&rbd_dev->header_rwsem);
1763         }
1764
1765         img_request->rq = NULL;
1766         img_request->rbd_dev = rbd_dev;
1767         img_request->offset = offset;
1768         img_request->length = length;
1769         img_request->flags = 0;
1770         if (write_request) {
1771                 img_request_write_set(img_request);
1772                 img_request->snapc = rbd_dev->header.snapc;
1773         } else {
1774                 img_request->snap_id = rbd_dev->spec->snap_id;
1775         }
1776         if (child_request)
1777                 img_request_child_set(img_request);
1778         if (rbd_dev->parent_spec)
1779                 img_request_layered_set(img_request);
1780         spin_lock_init(&img_request->completion_lock);
1781         img_request->next_completion = 0;
1782         img_request->callback = NULL;
1783         img_request->result = 0;
1784         img_request->obj_request_count = 0;
1785         INIT_LIST_HEAD(&img_request->obj_requests);
1786         kref_init(&img_request->kref);
1787
1788         rbd_img_request_get(img_request);       /* Avoid a warning */
1789         rbd_img_request_put(img_request);       /* TEMPORARY */
1790
1791         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1792                 write_request ? "write" : "read", offset, length,
1793                 img_request);
1794
1795         return img_request;
1796 }
1797
1798 static void rbd_img_request_destroy(struct kref *kref)
1799 {
1800         struct rbd_img_request *img_request;
1801         struct rbd_obj_request *obj_request;
1802         struct rbd_obj_request *next_obj_request;
1803
1804         img_request = container_of(kref, struct rbd_img_request, kref);
1805
1806         dout("%s: img %p\n", __func__, img_request);
1807
1808         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1809                 rbd_img_obj_request_del(img_request, obj_request);
1810         rbd_assert(img_request->obj_request_count == 0);
1811
1812         if (img_request_write_test(img_request))
1813                 rbd_snap_context_put(img_request->snapc);
1814
1815         if (img_request_child_test(img_request))
1816                 rbd_obj_request_put(img_request->obj_request);
1817
1818         kfree(img_request);
1819 }
1820
1821 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1822 {
1823         struct rbd_img_request *img_request;
1824         unsigned int xferred;
1825         int result;
1826         bool more;
1827
1828         rbd_assert(obj_request_img_data_test(obj_request));
1829         img_request = obj_request->img_request;
1830
1831         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1832         xferred = (unsigned int)obj_request->xferred;
1833         result = obj_request->result;
1834         if (result) {
1835                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1836
1837                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1838                         img_request_write_test(img_request) ? "write" : "read",
1839                         obj_request->length, obj_request->img_offset,
1840                         obj_request->offset);
1841                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1842                         result, xferred);
1843                 if (!img_request->result)
1844                         img_request->result = result;
1845         }
1846
1847         /* Image object requests don't own their page array */
1848
1849         if (obj_request->type == OBJ_REQUEST_PAGES) {
1850                 obj_request->pages = NULL;
1851                 obj_request->page_count = 0;
1852         }
1853
1854         if (img_request_child_test(img_request)) {
1855                 rbd_assert(img_request->obj_request != NULL);
1856                 more = obj_request->which < img_request->obj_request_count - 1;
1857         } else {
1858                 rbd_assert(img_request->rq != NULL);
1859                 more = blk_end_request(img_request->rq, result, xferred);
1860         }
1861
1862         return more;
1863 }
1864
1865 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1866 {
1867         struct rbd_img_request *img_request;
1868         u32 which = obj_request->which;
1869         bool more = true;
1870
1871         rbd_assert(obj_request_img_data_test(obj_request));
1872         img_request = obj_request->img_request;
1873
1874         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1875         rbd_assert(img_request != NULL);
1876         rbd_assert(img_request->obj_request_count > 0);
1877         rbd_assert(which != BAD_WHICH);
1878         rbd_assert(which < img_request->obj_request_count);
1879         rbd_assert(which >= img_request->next_completion);
1880
1881         spin_lock_irq(&img_request->completion_lock);
1882         if (which != img_request->next_completion)
1883                 goto out;
1884
1885         for_each_obj_request_from(img_request, obj_request) {
1886                 rbd_assert(more);
1887                 rbd_assert(which < img_request->obj_request_count);
1888
1889                 if (!obj_request_done_test(obj_request))
1890                         break;
1891                 more = rbd_img_obj_end_request(obj_request);
1892                 which++;
1893         }
1894
1895         rbd_assert(more ^ (which == img_request->obj_request_count));
1896         img_request->next_completion = which;
1897 out:
1898         spin_unlock_irq(&img_request->completion_lock);
1899
1900         if (!more)
1901                 rbd_img_request_complete(img_request);
1902 }
1903
1904 /*
1905  * Split up an image request into one or more object requests, each
1906  * to a different object.  The "type" parameter indicates whether
1907  * "data_desc" is the pointer to the head of a list of bio
1908  * structures, or the base of a page array.  In either case this
1909  * function assumes data_desc describes memory sufficient to hold
1910  * all data described by the image request.
1911  */
1912 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1913                                         enum obj_request_type type,
1914                                         void *data_desc)
1915 {
1916         struct rbd_device *rbd_dev = img_request->rbd_dev;
1917         struct rbd_obj_request *obj_request = NULL;
1918         struct rbd_obj_request *next_obj_request;
1919         bool write_request = img_request_write_test(img_request);
1920         struct bio *bio_list;
1921         unsigned int bio_offset = 0;
1922         struct page **pages;
1923         u64 img_offset;
1924         u64 resid;
1925         u16 opcode;
1926
1927         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1928                 (int)type, data_desc);
1929
1930         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1931         img_offset = img_request->offset;
1932         resid = img_request->length;
1933         rbd_assert(resid > 0);
1934
1935         if (type == OBJ_REQUEST_BIO) {
1936                 bio_list = data_desc;
1937                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1938         } else {
1939                 rbd_assert(type == OBJ_REQUEST_PAGES);
1940                 pages = data_desc;
1941         }
1942
1943         while (resid) {
1944                 struct ceph_osd_request *osd_req;
1945                 const char *object_name;
1946                 u64 offset;
1947                 u64 length;
1948
1949                 object_name = rbd_segment_name(rbd_dev, img_offset);
1950                 if (!object_name)
1951                         goto out_unwind;
1952                 offset = rbd_segment_offset(rbd_dev, img_offset);
1953                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1954                 obj_request = rbd_obj_request_create(object_name,
1955                                                 offset, length, type);
1956                 kfree(object_name);     /* object request has its own copy */
1957                 if (!obj_request)
1958                         goto out_unwind;
1959
1960                 if (type == OBJ_REQUEST_BIO) {
1961                         unsigned int clone_size;
1962
1963                         rbd_assert(length <= (u64)UINT_MAX);
1964                         clone_size = (unsigned int)length;
1965                         obj_request->bio_list =
1966                                         bio_chain_clone_range(&bio_list,
1967                                                                 &bio_offset,
1968                                                                 clone_size,
1969                                                                 GFP_ATOMIC);
1970                         if (!obj_request->bio_list)
1971                                 goto out_partial;
1972                 } else {
1973                         unsigned int page_count;
1974
1975                         obj_request->pages = pages;
1976                         page_count = (u32)calc_pages_for(offset, length);
1977                         obj_request->page_count = page_count;
1978                         if ((offset + length) & ~PAGE_MASK)
1979                                 page_count--;   /* more on last page */
1980                         pages += page_count;
1981                 }
1982
1983                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1984                                                 obj_request);
1985                 if (!osd_req)
1986                         goto out_partial;
1987                 obj_request->osd_req = osd_req;
1988                 obj_request->callback = rbd_img_obj_callback;
1989
1990                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1991                                                 0, 0);
1992                 if (type == OBJ_REQUEST_BIO)
1993                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1994                                         obj_request->bio_list, length);
1995                 else
1996                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1997                                         obj_request->pages, length,
1998                                         offset & ~PAGE_MASK, false, false);
1999
2000                 if (write_request)
2001                         rbd_osd_req_format_write(obj_request);
2002                 else
2003                         rbd_osd_req_format_read(obj_request);
2004
2005                 obj_request->img_offset = img_offset;
2006                 rbd_img_obj_request_add(img_request, obj_request);
2007
2008                 img_offset += length;
2009                 resid -= length;
2010         }
2011
2012         return 0;
2013
2014 out_partial:
2015         rbd_obj_request_put(obj_request);
2016 out_unwind:
2017         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2018                 rbd_obj_request_put(obj_request);
2019
2020         return -ENOMEM;
2021 }
2022
2023 static void
2024 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2025 {
2026         struct rbd_img_request *img_request;
2027         struct rbd_device *rbd_dev;
2028         u64 length;
2029         u32 page_count;
2030
2031         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2032         rbd_assert(obj_request_img_data_test(obj_request));
2033         img_request = obj_request->img_request;
2034         rbd_assert(img_request);
2035
2036         rbd_dev = img_request->rbd_dev;
2037         rbd_assert(rbd_dev);
2038         length = (u64)1 << rbd_dev->header.obj_order;
2039         page_count = (u32)calc_pages_for(0, length);
2040
2041         rbd_assert(obj_request->copyup_pages);
2042         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2043         obj_request->copyup_pages = NULL;
2044
2045         /*
2046          * We want the transfer count to reflect the size of the
2047          * original write request.  There is no such thing as a
2048          * successful short write, so if the request was successful
2049          * we can just set it to the originally-requested length.
2050          */
2051         if (!obj_request->result)
2052                 obj_request->xferred = obj_request->length;
2053
2054         /* Finish up with the normal image object callback */
2055
2056         rbd_img_obj_callback(obj_request);
2057 }
2058
2059 static void
2060 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2061 {
2062         struct rbd_obj_request *orig_request;
2063         struct ceph_osd_request *osd_req;
2064         struct ceph_osd_client *osdc;
2065         struct rbd_device *rbd_dev;
2066         struct page **pages;
2067         int result;
2068         u64 obj_size;
2069         u64 xferred;
2070
2071         rbd_assert(img_request_child_test(img_request));
2072
2073         /* First get what we need from the image request */
2074
2075         pages = img_request->copyup_pages;
2076         rbd_assert(pages != NULL);
2077         img_request->copyup_pages = NULL;
2078
2079         orig_request = img_request->obj_request;
2080         rbd_assert(orig_request != NULL);
2081         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2082         result = img_request->result;
2083         obj_size = img_request->length;
2084         xferred = img_request->xferred;
2085
2086         rbd_dev = img_request->rbd_dev;
2087         rbd_assert(rbd_dev);
2088         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2089
2090         rbd_img_request_put(img_request);
2091
2092         if (result)
2093                 goto out_err;
2094
2095         /* Allocate the new copyup osd request for the original request */
2096
2097         result = -ENOMEM;
2098         rbd_assert(!orig_request->osd_req);
2099         osd_req = rbd_osd_req_create_copyup(orig_request);
2100         if (!osd_req)
2101                 goto out_err;
2102         orig_request->osd_req = osd_req;
2103         orig_request->copyup_pages = pages;
2104
2105         /* Initialize the copyup op */
2106
2107         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2108         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2109                                                 false, false);
2110
2111         /* Then the original write request op */
2112
2113         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2114                                         orig_request->offset,
2115                                         orig_request->length, 0, 0);
2116         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2117                                         orig_request->length);
2118
2119         rbd_osd_req_format_write(orig_request);
2120
2121         /* All set, send it off. */
2122
2123         orig_request->callback = rbd_img_obj_copyup_callback;
2124         osdc = &rbd_dev->rbd_client->client->osdc;
2125         result = rbd_obj_request_submit(osdc, orig_request);
2126         if (!result)
2127                 return;
2128 out_err:
2129         /* Record the error code and complete the request */
2130
2131         orig_request->result = result;
2132         orig_request->xferred = 0;
2133         obj_request_done_set(orig_request);
2134         rbd_obj_request_complete(orig_request);
2135 }
2136
2137 /*
2138  * Read from the parent image the range of data that covers the
2139  * entire target of the given object request.  This is used for
2140  * satisfying a layered image write request when the target of an
2141  * object request from the image request does not exist.
2142  *
2143  * A page array big enough to hold the returned data is allocated
2144  * and supplied to rbd_img_request_fill() as the "data descriptor."
2145  * When the read completes, this page array will be transferred to
2146  * the original object request for the copyup operation.
2147  *
2148  * If an error occurs, record it as the result of the original
2149  * object request and mark it done so it gets completed.
2150  */
2151 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2152 {
2153         struct rbd_img_request *img_request = NULL;
2154         struct rbd_img_request *parent_request = NULL;
2155         struct rbd_device *rbd_dev;
2156         u64 img_offset;
2157         u64 length;
2158         struct page **pages = NULL;
2159         u32 page_count;
2160         int result;
2161
2162         rbd_assert(obj_request_img_data_test(obj_request));
2163         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2164
2165         img_request = obj_request->img_request;
2166         rbd_assert(img_request != NULL);
2167         rbd_dev = img_request->rbd_dev;
2168         rbd_assert(rbd_dev->parent != NULL);
2169
2170         /*
2171          * First things first.  The original osd request is of no
2172          * use to use any more, we'll need a new one that can hold
2173          * the two ops in a copyup request.  We'll get that later,
2174          * but for now we can release the old one.
2175          */
2176         rbd_osd_req_destroy(obj_request->osd_req);
2177         obj_request->osd_req = NULL;
2178
2179         /*
2180          * Determine the byte range covered by the object in the
2181          * child image to which the original request was to be sent.
2182          */
2183         img_offset = obj_request->img_offset - obj_request->offset;
2184         length = (u64)1 << rbd_dev->header.obj_order;
2185
2186         /*
2187          * There is no defined parent data beyond the parent
2188          * overlap, so limit what we read at that boundary if
2189          * necessary.
2190          */
2191         if (img_offset + length > rbd_dev->parent_overlap) {
2192                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2193                 length = rbd_dev->parent_overlap - img_offset;
2194         }
2195
2196         /*
2197          * Allocate a page array big enough to receive the data read
2198          * from the parent.
2199          */
2200         page_count = (u32)calc_pages_for(0, length);
2201         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2202         if (IS_ERR(pages)) {
2203                 result = PTR_ERR(pages);
2204                 pages = NULL;
2205                 goto out_err;
2206         }
2207
2208         result = -ENOMEM;
2209         parent_request = rbd_img_request_create(rbd_dev->parent,
2210                                                 img_offset, length,
2211                                                 false, true);
2212         if (!parent_request)
2213                 goto out_err;
2214         rbd_obj_request_get(obj_request);
2215         parent_request->obj_request = obj_request;
2216
2217         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2218         if (result)
2219                 goto out_err;
2220         parent_request->copyup_pages = pages;
2221
2222         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2223         result = rbd_img_request_submit(parent_request);
2224         if (!result)
2225                 return 0;
2226
2227         parent_request->copyup_pages = NULL;
2228         parent_request->obj_request = NULL;
2229         rbd_obj_request_put(obj_request);
2230 out_err:
2231         if (pages)
2232                 ceph_release_page_vector(pages, page_count);
2233         if (parent_request)
2234                 rbd_img_request_put(parent_request);
2235         obj_request->result = result;
2236         obj_request->xferred = 0;
2237         obj_request_done_set(obj_request);
2238
2239         return result;
2240 }
2241
2242 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2243 {
2244         struct rbd_obj_request *orig_request;
2245         int result;
2246
2247         rbd_assert(!obj_request_img_data_test(obj_request));
2248
2249         /*
2250          * All we need from the object request is the original
2251          * request and the result of the STAT op.  Grab those, then
2252          * we're done with the request.
2253          */
2254         orig_request = obj_request->obj_request;
2255         obj_request->obj_request = NULL;
2256         rbd_assert(orig_request);
2257         rbd_assert(orig_request->img_request);
2258
2259         result = obj_request->result;
2260         obj_request->result = 0;
2261
2262         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2263                 obj_request, orig_request, result,
2264                 obj_request->xferred, obj_request->length);
2265         rbd_obj_request_put(obj_request);
2266
2267         rbd_assert(orig_request);
2268         rbd_assert(orig_request->img_request);
2269
2270         /*
2271          * Our only purpose here is to determine whether the object
2272          * exists, and we don't want to treat the non-existence as
2273          * an error.  If something else comes back, transfer the
2274          * error to the original request and complete it now.
2275          */
2276         if (!result) {
2277                 obj_request_existence_set(orig_request, true);
2278         } else if (result == -ENOENT) {
2279                 obj_request_existence_set(orig_request, false);
2280         } else if (result) {
2281                 orig_request->result = result;
2282                 goto out;
2283         }
2284
2285         /*
2286          * Resubmit the original request now that we have recorded
2287          * whether the target object exists.
2288          */
2289         orig_request->result = rbd_img_obj_request_submit(orig_request);
2290 out:
2291         if (orig_request->result)
2292                 rbd_obj_request_complete(orig_request);
2293         rbd_obj_request_put(orig_request);
2294 }
2295
2296 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2297 {
2298         struct rbd_obj_request *stat_request;
2299         struct rbd_device *rbd_dev;
2300         struct ceph_osd_client *osdc;
2301         struct page **pages = NULL;
2302         u32 page_count;
2303         size_t size;
2304         int ret;
2305
2306         /*
2307          * The response data for a STAT call consists of:
2308          *     le64 length;
2309          *     struct {
2310          *         le32 tv_sec;
2311          *         le32 tv_nsec;
2312          *     } mtime;
2313          */
2314         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2315         page_count = (u32)calc_pages_for(0, size);
2316         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2317         if (IS_ERR(pages))
2318                 return PTR_ERR(pages);
2319
2320         ret = -ENOMEM;
2321         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2322                                                         OBJ_REQUEST_PAGES);
2323         if (!stat_request)
2324                 goto out;
2325
2326         rbd_obj_request_get(obj_request);
2327         stat_request->obj_request = obj_request;
2328         stat_request->pages = pages;
2329         stat_request->page_count = page_count;
2330
2331         rbd_assert(obj_request->img_request);
2332         rbd_dev = obj_request->img_request->rbd_dev;
2333         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2334                                                 stat_request);
2335         if (!stat_request->osd_req)
2336                 goto out;
2337         stat_request->callback = rbd_img_obj_exists_callback;
2338
2339         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2340         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2341                                         false, false);
2342         rbd_osd_req_format_read(stat_request);
2343
2344         osdc = &rbd_dev->rbd_client->client->osdc;
2345         ret = rbd_obj_request_submit(osdc, stat_request);
2346 out:
2347         if (ret)
2348                 rbd_obj_request_put(obj_request);
2349
2350         return ret;
2351 }
2352
2353 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2354 {
2355         struct rbd_img_request *img_request;
2356         struct rbd_device *rbd_dev;
2357         bool known;
2358
2359         rbd_assert(obj_request_img_data_test(obj_request));
2360
2361         img_request = obj_request->img_request;
2362         rbd_assert(img_request);
2363         rbd_dev = img_request->rbd_dev;
2364
2365         /*
2366          * Only writes to layered images need special handling.
2367          * Reads and non-layered writes are simple object requests.
2368          * Layered writes that start beyond the end of the overlap
2369          * with the parent have no parent data, so they too are
2370          * simple object requests.  Finally, if the target object is
2371          * known to already exist, its parent data has already been
2372          * copied, so a write to the object can also be handled as a
2373          * simple object request.
2374          */
2375         if (!img_request_write_test(img_request) ||
2376                 !img_request_layered_test(img_request) ||
2377                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2378                 ((known = obj_request_known_test(obj_request)) &&
2379                         obj_request_exists_test(obj_request))) {
2380
2381                 struct rbd_device *rbd_dev;
2382                 struct ceph_osd_client *osdc;
2383
2384                 rbd_dev = obj_request->img_request->rbd_dev;
2385                 osdc = &rbd_dev->rbd_client->client->osdc;
2386
2387                 return rbd_obj_request_submit(osdc, obj_request);
2388         }
2389
2390         /*
2391          * It's a layered write.  The target object might exist but
2392          * we may not know that yet.  If we know it doesn't exist,
2393          * start by reading the data for the full target object from
2394          * the parent so we can use it for a copyup to the target.
2395          */
2396         if (known)
2397                 return rbd_img_obj_parent_read_full(obj_request);
2398
2399         /* We don't know whether the target exists.  Go find out. */
2400
2401         return rbd_img_obj_exists_submit(obj_request);
2402 }
2403
2404 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2405 {
2406         struct rbd_obj_request *obj_request;
2407         struct rbd_obj_request *next_obj_request;
2408
2409         dout("%s: img %p\n", __func__, img_request);
2410         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2411                 int ret;
2412
2413                 ret = rbd_img_obj_request_submit(obj_request);
2414                 if (ret)
2415                         return ret;
2416         }
2417
2418         return 0;
2419 }
2420
2421 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2422 {
2423         struct rbd_obj_request *obj_request;
2424         struct rbd_device *rbd_dev;
2425         u64 obj_end;
2426
2427         rbd_assert(img_request_child_test(img_request));
2428
2429         obj_request = img_request->obj_request;
2430         rbd_assert(obj_request);
2431         rbd_assert(obj_request->img_request);
2432
2433         obj_request->result = img_request->result;
2434         if (obj_request->result)
2435                 goto out;
2436
2437         /*
2438          * We need to zero anything beyond the parent overlap
2439          * boundary.  Since rbd_img_obj_request_read_callback()
2440          * will zero anything beyond the end of a short read, an
2441          * easy way to do this is to pretend the data from the
2442          * parent came up short--ending at the overlap boundary.
2443          */
2444         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2445         obj_end = obj_request->img_offset + obj_request->length;
2446         rbd_dev = obj_request->img_request->rbd_dev;
2447         if (obj_end > rbd_dev->parent_overlap) {
2448                 u64 xferred = 0;
2449
2450                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2451                         xferred = rbd_dev->parent_overlap -
2452                                         obj_request->img_offset;
2453
2454                 obj_request->xferred = min(img_request->xferred, xferred);
2455         } else {
2456                 obj_request->xferred = img_request->xferred;
2457         }
2458 out:
2459         rbd_img_obj_request_read_callback(obj_request);
2460         rbd_obj_request_complete(obj_request);
2461 }
2462
2463 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2464 {
2465         struct rbd_device *rbd_dev;
2466         struct rbd_img_request *img_request;
2467         int result;
2468
2469         rbd_assert(obj_request_img_data_test(obj_request));
2470         rbd_assert(obj_request->img_request != NULL);
2471         rbd_assert(obj_request->result == (s32) -ENOENT);
2472         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2473
2474         rbd_dev = obj_request->img_request->rbd_dev;
2475         rbd_assert(rbd_dev->parent != NULL);
2476         /* rbd_read_finish(obj_request, obj_request->length); */
2477         img_request = rbd_img_request_create(rbd_dev->parent,
2478                                                 obj_request->img_offset,
2479                                                 obj_request->length,
2480                                                 false, true);
2481         result = -ENOMEM;
2482         if (!img_request)
2483                 goto out_err;
2484
2485         rbd_obj_request_get(obj_request);
2486         img_request->obj_request = obj_request;
2487
2488         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2489                                         obj_request->bio_list);
2490         if (result)
2491                 goto out_err;
2492
2493         img_request->callback = rbd_img_parent_read_callback;
2494         result = rbd_img_request_submit(img_request);
2495         if (result)
2496                 goto out_err;
2497
2498         return;
2499 out_err:
2500         if (img_request)
2501                 rbd_img_request_put(img_request);
2502         obj_request->result = result;
2503         obj_request->xferred = 0;
2504         obj_request_done_set(obj_request);
2505 }
2506
2507 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2508                                    u64 ver, u64 notify_id)
2509 {
2510         struct rbd_obj_request *obj_request;
2511         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2512         int ret;
2513
2514         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2515                                                         OBJ_REQUEST_NODATA);
2516         if (!obj_request)
2517                 return -ENOMEM;
2518
2519         ret = -ENOMEM;
2520         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2521         if (!obj_request->osd_req)
2522                 goto out;
2523         obj_request->callback = rbd_obj_request_put;
2524
2525         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2526                                         notify_id, ver, 0);
2527         rbd_osd_req_format_read(obj_request);
2528
2529         ret = rbd_obj_request_submit(osdc, obj_request);
2530 out:
2531         if (ret)
2532                 rbd_obj_request_put(obj_request);
2533
2534         return ret;
2535 }
2536
2537 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2538 {
2539         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2540         u64 hver;
2541
2542         if (!rbd_dev)
2543                 return;
2544
2545         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2546                 rbd_dev->header_name, (unsigned long long) notify_id,
2547                 (unsigned int) opcode);
2548         (void)rbd_dev_refresh(rbd_dev, &hver);
2549
2550         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2551 }
2552
2553 /*
2554  * Request sync osd watch/unwatch.  The value of "start" determines
2555  * whether a watch request is being initiated or torn down.
2556  */
2557 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2558 {
2559         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2560         struct rbd_obj_request *obj_request;
2561         int ret;
2562
2563         rbd_assert(start ^ !!rbd_dev->watch_event);
2564         rbd_assert(start ^ !!rbd_dev->watch_request);
2565
2566         if (start) {
2567                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2568                                                 &rbd_dev->watch_event);
2569                 if (ret < 0)
2570                         return ret;
2571                 rbd_assert(rbd_dev->watch_event != NULL);
2572         }
2573
2574         ret = -ENOMEM;
2575         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2576                                                         OBJ_REQUEST_NODATA);
2577         if (!obj_request)
2578                 goto out_cancel;
2579
2580         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2581         if (!obj_request->osd_req)
2582                 goto out_cancel;
2583
2584         if (start)
2585                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2586         else
2587                 ceph_osdc_unregister_linger_request(osdc,
2588                                         rbd_dev->watch_request->osd_req);
2589
2590         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2591                                 rbd_dev->watch_event->cookie,
2592                                 rbd_dev->header.obj_version, start);
2593         rbd_osd_req_format_write(obj_request);
2594
2595         ret = rbd_obj_request_submit(osdc, obj_request);
2596         if (ret)
2597                 goto out_cancel;
2598         ret = rbd_obj_request_wait(obj_request);
2599         if (ret)
2600                 goto out_cancel;
2601         ret = obj_request->result;
2602         if (ret)
2603                 goto out_cancel;
2604
2605         /*
2606          * A watch request is set to linger, so the underlying osd
2607          * request won't go away until we unregister it.  We retain
2608          * a pointer to the object request during that time (in
2609          * rbd_dev->watch_request), so we'll keep a reference to
2610          * it.  We'll drop that reference (below) after we've
2611          * unregistered it.
2612          */
2613         if (start) {
2614                 rbd_dev->watch_request = obj_request;
2615
2616                 return 0;
2617         }
2618
2619         /* We have successfully torn down the watch request */
2620
2621         rbd_obj_request_put(rbd_dev->watch_request);
2622         rbd_dev->watch_request = NULL;
2623 out_cancel:
2624         /* Cancel the event if we're tearing down, or on error */
2625         ceph_osdc_cancel_event(rbd_dev->watch_event);
2626         rbd_dev->watch_event = NULL;
2627         if (obj_request)
2628                 rbd_obj_request_put(obj_request);
2629
2630         return ret;
2631 }
2632
2633 /*
2634  * Synchronous osd object method call.  Returns the number of bytes
2635  * returned in the outbound buffer, or a negative error code.
2636  */
2637 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2638                              const char *object_name,
2639                              const char *class_name,
2640                              const char *method_name,
2641                              const void *outbound,
2642                              size_t outbound_size,
2643                              void *inbound,
2644                              size_t inbound_size,
2645                              u64 *version)
2646 {
2647         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2648         struct rbd_obj_request *obj_request;
2649         struct page **pages;
2650         u32 page_count;
2651         int ret;
2652
2653         /*
2654          * Method calls are ultimately read operations.  The result
2655          * should placed into the inbound buffer provided.  They
2656          * also supply outbound data--parameters for the object
2657          * method.  Currently if this is present it will be a
2658          * snapshot id.
2659          */
2660         page_count = (u32)calc_pages_for(0, inbound_size);
2661         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2662         if (IS_ERR(pages))
2663                 return PTR_ERR(pages);
2664
2665         ret = -ENOMEM;
2666         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2667                                                         OBJ_REQUEST_PAGES);
2668         if (!obj_request)
2669                 goto out;
2670
2671         obj_request->pages = pages;
2672         obj_request->page_count = page_count;
2673
2674         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2675         if (!obj_request->osd_req)
2676                 goto out;
2677
2678         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2679                                         class_name, method_name);
2680         if (outbound_size) {
2681                 struct ceph_pagelist *pagelist;
2682
2683                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2684                 if (!pagelist)
2685                         goto out;
2686
2687                 ceph_pagelist_init(pagelist);
2688                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2689                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2690                                                 pagelist);
2691         }
2692         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2693                                         obj_request->pages, inbound_size,
2694                                         0, false, false);
2695         rbd_osd_req_format_read(obj_request);
2696
2697         ret = rbd_obj_request_submit(osdc, obj_request);
2698         if (ret)
2699                 goto out;
2700         ret = rbd_obj_request_wait(obj_request);
2701         if (ret)
2702                 goto out;
2703
2704         ret = obj_request->result;
2705         if (ret < 0)
2706                 goto out;
2707
2708         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2709         ret = (int)obj_request->xferred;
2710         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2711         if (version)
2712                 *version = obj_request->version;
2713 out:
2714         if (obj_request)
2715                 rbd_obj_request_put(obj_request);
2716         else
2717                 ceph_release_page_vector(pages, page_count);
2718
2719         return ret;
2720 }
2721
2722 static void rbd_request_fn(struct request_queue *q)
2723                 __releases(q->queue_lock) __acquires(q->queue_lock)
2724 {
2725         struct rbd_device *rbd_dev = q->queuedata;
2726         bool read_only = rbd_dev->mapping.read_only;
2727         struct request *rq;
2728         int result;
2729
2730         while ((rq = blk_fetch_request(q))) {
2731                 bool write_request = rq_data_dir(rq) == WRITE;
2732                 struct rbd_img_request *img_request;
2733                 u64 offset;
2734                 u64 length;
2735
2736                 /* Ignore any non-FS requests that filter through. */
2737
2738                 if (rq->cmd_type != REQ_TYPE_FS) {
2739                         dout("%s: non-fs request type %d\n", __func__,
2740                                 (int) rq->cmd_type);
2741                         __blk_end_request_all(rq, 0);
2742                         continue;
2743                 }
2744
2745                 /* Ignore/skip any zero-length requests */
2746
2747                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2748                 length = (u64) blk_rq_bytes(rq);
2749
2750                 if (!length) {
2751                         dout("%s: zero-length request\n", __func__);
2752                         __blk_end_request_all(rq, 0);
2753                         continue;
2754                 }
2755
2756                 spin_unlock_irq(q->queue_lock);
2757
2758                 /* Disallow writes to a read-only device */
2759
2760                 if (write_request) {
2761                         result = -EROFS;
2762                         if (read_only)
2763                                 goto end_request;
2764                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2765                 }
2766
2767                 /*
2768                  * Quit early if the mapped snapshot no longer
2769                  * exists.  It's still possible the snapshot will
2770                  * have disappeared by the time our request arrives
2771                  * at the osd, but there's no sense in sending it if
2772                  * we already know.
2773                  */
2774                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2775                         dout("request for non-existent snapshot");
2776                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2777                         result = -ENXIO;
2778                         goto end_request;
2779                 }
2780
2781                 result = -EINVAL;
2782                 if (offset && length > U64_MAX - offset + 1) {
2783                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2784                                 offset, length);
2785                         goto end_request;       /* Shouldn't happen */
2786                 }
2787
2788                 result = -ENOMEM;
2789                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2790                                                         write_request, false);
2791                 if (!img_request)
2792                         goto end_request;
2793
2794                 img_request->rq = rq;
2795
2796                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2797                                                 rq->bio);
2798                 if (!result)
2799                         result = rbd_img_request_submit(img_request);
2800                 if (result)
2801                         rbd_img_request_put(img_request);
2802 end_request:
2803                 spin_lock_irq(q->queue_lock);
2804                 if (result < 0) {
2805                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2806                                 write_request ? "write" : "read",
2807                                 length, offset, result);
2808
2809                         __blk_end_request_all(rq, result);
2810                 }
2811         }
2812 }
2813
2814 /*
2815  * a queue callback. Makes sure that we don't create a bio that spans across
2816  * multiple osd objects. One exception would be with a single page bios,
2817  * which we handle later at bio_chain_clone_range()
2818  */
2819 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2820                           struct bio_vec *bvec)
2821 {
2822         struct rbd_device *rbd_dev = q->queuedata;
2823         sector_t sector_offset;
2824         sector_t sectors_per_obj;
2825         sector_t obj_sector_offset;
2826         int ret;
2827
2828         /*
2829          * Find how far into its rbd object the partition-relative
2830          * bio start sector is to offset relative to the enclosing
2831          * device.
2832          */
2833         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2834         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2835         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2836
2837         /*
2838          * Compute the number of bytes from that offset to the end
2839          * of the object.  Account for what's already used by the bio.
2840          */
2841         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2842         if (ret > bmd->bi_size)
2843                 ret -= bmd->bi_size;
2844         else
2845                 ret = 0;
2846
2847         /*
2848          * Don't send back more than was asked for.  And if the bio
2849          * was empty, let the whole thing through because:  "Note
2850          * that a block device *must* allow a single page to be
2851          * added to an empty bio."
2852          */
2853         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2854         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2855                 ret = (int) bvec->bv_len;
2856
2857         return ret;
2858 }
2859
2860 static void rbd_free_disk(struct rbd_device *rbd_dev)
2861 {
2862         struct gendisk *disk = rbd_dev->disk;
2863
2864         if (!disk)
2865                 return;
2866
2867         rbd_dev->disk = NULL;
2868         if (disk->flags & GENHD_FL_UP) {
2869                 del_gendisk(disk);
2870                 if (disk->queue)
2871                         blk_cleanup_queue(disk->queue);
2872         }
2873         put_disk(disk);
2874 }
2875
2876 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2877                                 const char *object_name,
2878                                 u64 offset, u64 length,
2879                                 void *buf, u64 *version)
2880
2881 {
2882         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2883         struct rbd_obj_request *obj_request;
2884         struct page **pages = NULL;
2885         u32 page_count;
2886         size_t size;
2887         int ret;
2888
2889         page_count = (u32) calc_pages_for(offset, length);
2890         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2891         if (IS_ERR(pages))
2892                 ret = PTR_ERR(pages);
2893
2894         ret = -ENOMEM;
2895         obj_request = rbd_obj_request_create(object_name, offset, length,
2896                                                         OBJ_REQUEST_PAGES);
2897         if (!obj_request)
2898                 goto out;
2899
2900         obj_request->pages = pages;
2901         obj_request->page_count = page_count;
2902
2903         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2904         if (!obj_request->osd_req)
2905                 goto out;
2906
2907         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2908                                         offset, length, 0, 0);
2909         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2910                                         obj_request->pages,
2911                                         obj_request->length,
2912                                         obj_request->offset & ~PAGE_MASK,
2913                                         false, false);
2914         rbd_osd_req_format_read(obj_request);
2915
2916         ret = rbd_obj_request_submit(osdc, obj_request);
2917         if (ret)
2918                 goto out;
2919         ret = rbd_obj_request_wait(obj_request);
2920         if (ret)
2921                 goto out;
2922
2923         ret = obj_request->result;
2924         if (ret < 0)
2925                 goto out;
2926
2927         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2928         size = (size_t) obj_request->xferred;
2929         ceph_copy_from_page_vector(pages, buf, 0, size);
2930         rbd_assert(size <= (size_t) INT_MAX);
2931         ret = (int) size;
2932         if (version)
2933                 *version = obj_request->version;
2934 out:
2935         if (obj_request)
2936                 rbd_obj_request_put(obj_request);
2937         else
2938                 ceph_release_page_vector(pages, page_count);
2939
2940         return ret;
2941 }
2942
2943 /*
2944  * Read the complete header for the given rbd device.
2945  *
2946  * Returns a pointer to a dynamically-allocated buffer containing
2947  * the complete and validated header.  Caller can pass the address
2948  * of a variable that will be filled in with the version of the
2949  * header object at the time it was read.
2950  *
2951  * Returns a pointer-coded errno if a failure occurs.
2952  */
2953 static struct rbd_image_header_ondisk *
2954 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2955 {
2956         struct rbd_image_header_ondisk *ondisk = NULL;
2957         u32 snap_count = 0;
2958         u64 names_size = 0;
2959         u32 want_count;
2960         int ret;
2961
2962         /*
2963          * The complete header will include an array of its 64-bit
2964          * snapshot ids, followed by the names of those snapshots as
2965          * a contiguous block of NUL-terminated strings.  Note that
2966          * the number of snapshots could change by the time we read
2967          * it in, in which case we re-read it.
2968          */
2969         do {
2970                 size_t size;
2971
2972                 kfree(ondisk);
2973
2974                 size = sizeof (*ondisk);
2975                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2976                 size += names_size;
2977                 ondisk = kmalloc(size, GFP_KERNEL);
2978                 if (!ondisk)
2979                         return ERR_PTR(-ENOMEM);
2980
2981                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2982                                        0, size, ondisk, version);
2983                 if (ret < 0)
2984                         goto out_err;
2985                 if ((size_t)ret < size) {
2986                         ret = -ENXIO;
2987                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2988                                 size, ret);
2989                         goto out_err;
2990                 }
2991                 if (!rbd_dev_ondisk_valid(ondisk)) {
2992                         ret = -ENXIO;
2993                         rbd_warn(rbd_dev, "invalid header");
2994                         goto out_err;
2995                 }
2996
2997                 names_size = le64_to_cpu(ondisk->snap_names_len);
2998                 want_count = snap_count;
2999                 snap_count = le32_to_cpu(ondisk->snap_count);
3000         } while (snap_count != want_count);
3001
3002         return ondisk;
3003
3004 out_err:
3005         kfree(ondisk);
3006
3007         return ERR_PTR(ret);
3008 }
3009
3010 /*
3011  * reload the ondisk the header
3012  */
3013 static int rbd_read_header(struct rbd_device *rbd_dev,
3014                            struct rbd_image_header *header)
3015 {
3016         struct rbd_image_header_ondisk *ondisk;
3017         u64 ver = 0;
3018         int ret;
3019
3020         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3021         if (IS_ERR(ondisk))
3022                 return PTR_ERR(ondisk);
3023         ret = rbd_header_from_disk(header, ondisk);
3024         if (ret >= 0)
3025                 header->obj_version = ver;
3026         kfree(ondisk);
3027
3028         return ret;
3029 }
3030
3031 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3032 {
3033         struct rbd_snap *snap;
3034         struct rbd_snap *next;
3035
3036         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3037                 list_del(&snap->node);
3038                 rbd_snap_destroy(snap);
3039         }
3040 }
3041
3042 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3043 {
3044         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3045                 return;
3046
3047         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3048                 sector_t size;
3049
3050                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3051                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3052                 dout("setting size to %llu sectors", (unsigned long long)size);
3053                 set_capacity(rbd_dev->disk, size);
3054         }
3055 }
3056
3057 /*
3058  * only read the first part of the ondisk header, without the snaps info
3059  */
3060 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3061 {
3062         int ret;
3063         struct rbd_image_header h;
3064
3065         ret = rbd_read_header(rbd_dev, &h);
3066         if (ret < 0)
3067                 return ret;
3068
3069         down_write(&rbd_dev->header_rwsem);
3070
3071         /* Update image size, and check for resize of mapped image */
3072         rbd_dev->header.image_size = h.image_size;
3073         rbd_update_mapping_size(rbd_dev);
3074
3075         /* rbd_dev->header.object_prefix shouldn't change */
3076         kfree(rbd_dev->header.snap_sizes);
3077         kfree(rbd_dev->header.snap_names);
3078         /* osd requests may still refer to snapc */
3079         rbd_snap_context_put(rbd_dev->header.snapc);
3080
3081         if (hver)
3082                 *hver = h.obj_version;
3083         rbd_dev->header.obj_version = h.obj_version;
3084         rbd_dev->header.image_size = h.image_size;
3085         rbd_dev->header.snapc = h.snapc;
3086         rbd_dev->header.snap_names = h.snap_names;
3087         rbd_dev->header.snap_sizes = h.snap_sizes;
3088         /* Free the extra copy of the object prefix */
3089         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3090                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3091         kfree(h.object_prefix);
3092
3093         ret = rbd_dev_snaps_update(rbd_dev);
3094
3095         up_write(&rbd_dev->header_rwsem);
3096
3097         return ret;
3098 }
3099
3100 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3101 {
3102         int ret;
3103
3104         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3105         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3106         if (rbd_dev->image_format == 1)
3107                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3108         else
3109                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3110         mutex_unlock(&ctl_mutex);
3111         revalidate_disk(rbd_dev->disk);
3112         if (ret)
3113                 rbd_warn(rbd_dev, "got notification but failed to "
3114                            " update snaps: %d\n", ret);
3115
3116         return ret;
3117 }
3118
3119 static int rbd_init_disk(struct rbd_device *rbd_dev)
3120 {
3121         struct gendisk *disk;
3122         struct request_queue *q;
3123         u64 segment_size;
3124
3125         /* create gendisk info */
3126         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3127         if (!disk)
3128                 return -ENOMEM;
3129
3130         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3131                  rbd_dev->dev_id);
3132         disk->major = rbd_dev->major;
3133         disk->first_minor = 0;
3134         disk->fops = &rbd_bd_ops;
3135         disk->private_data = rbd_dev;
3136
3137         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3138         if (!q)
3139                 goto out_disk;
3140
3141         /* We use the default size, but let's be explicit about it. */
3142         blk_queue_physical_block_size(q, SECTOR_SIZE);
3143
3144         /* set io sizes to object size */
3145         segment_size = rbd_obj_bytes(&rbd_dev->header);
3146         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3147         blk_queue_max_segment_size(q, segment_size);
3148         blk_queue_io_min(q, segment_size);
3149         blk_queue_io_opt(q, segment_size);
3150
3151         blk_queue_merge_bvec(q, rbd_merge_bvec);
3152         disk->queue = q;
3153
3154         q->queuedata = rbd_dev;
3155
3156         rbd_dev->disk = disk;
3157
3158         return 0;
3159 out_disk:
3160         put_disk(disk);
3161
3162         return -ENOMEM;
3163 }
3164
3165 /*
3166   sysfs
3167 */
3168
3169 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3170 {
3171         return container_of(dev, struct rbd_device, dev);
3172 }
3173
3174 static ssize_t rbd_size_show(struct device *dev,
3175                              struct device_attribute *attr, char *buf)
3176 {
3177         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3178
3179         return sprintf(buf, "%llu\n",
3180                 (unsigned long long)rbd_dev->mapping.size);
3181 }
3182
3183 /*
3184  * Note this shows the features for whatever's mapped, which is not
3185  * necessarily the base image.
3186  */
3187 static ssize_t rbd_features_show(struct device *dev,
3188                              struct device_attribute *attr, char *buf)
3189 {
3190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3191
3192         return sprintf(buf, "0x%016llx\n",
3193                         (unsigned long long)rbd_dev->mapping.features);
3194 }
3195
3196 static ssize_t rbd_major_show(struct device *dev,
3197                               struct device_attribute *attr, char *buf)
3198 {
3199         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3200
3201         if (rbd_dev->major)
3202                 return sprintf(buf, "%d\n", rbd_dev->major);
3203
3204         return sprintf(buf, "(none)\n");
3205
3206 }
3207
3208 static ssize_t rbd_client_id_show(struct device *dev,
3209                                   struct device_attribute *attr, char *buf)
3210 {
3211         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3212
3213         return sprintf(buf, "client%lld\n",
3214                         ceph_client_id(rbd_dev->rbd_client->client));
3215 }
3216
3217 static ssize_t rbd_pool_show(struct device *dev,
3218                              struct device_attribute *attr, char *buf)
3219 {
3220         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3221
3222         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3223 }
3224
3225 static ssize_t rbd_pool_id_show(struct device *dev,
3226                              struct device_attribute *attr, char *buf)
3227 {
3228         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229
3230         return sprintf(buf, "%llu\n",
3231                         (unsigned long long) rbd_dev->spec->pool_id);
3232 }
3233
3234 static ssize_t rbd_name_show(struct device *dev,
3235                              struct device_attribute *attr, char *buf)
3236 {
3237         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3238
3239         if (rbd_dev->spec->image_name)
3240                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3241
3242         return sprintf(buf, "(unknown)\n");
3243 }
3244
3245 static ssize_t rbd_image_id_show(struct device *dev,
3246                              struct device_attribute *attr, char *buf)
3247 {
3248         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3249
3250         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3251 }
3252
3253 /*
3254  * Shows the name of the currently-mapped snapshot (or
3255  * RBD_SNAP_HEAD_NAME for the base image).
3256  */
3257 static ssize_t rbd_snap_show(struct device *dev,
3258                              struct device_attribute *attr,
3259                              char *buf)
3260 {
3261         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3262
3263         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3264 }
3265
3266 /*
3267  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3268  * for the parent image.  If there is no parent, simply shows
3269  * "(no parent image)".
3270  */
3271 static ssize_t rbd_parent_show(struct device *dev,
3272                              struct device_attribute *attr,
3273                              char *buf)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276         struct rbd_spec *spec = rbd_dev->parent_spec;
3277         int count;
3278         char *bufp = buf;
3279
3280         if (!spec)
3281                 return sprintf(buf, "(no parent image)\n");
3282
3283         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3284                         (unsigned long long) spec->pool_id, spec->pool_name);
3285         if (count < 0)
3286                 return count;
3287         bufp += count;
3288
3289         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3290                         spec->image_name ? spec->image_name : "(unknown)");
3291         if (count < 0)
3292                 return count;
3293         bufp += count;
3294
3295         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3296                         (unsigned long long) spec->snap_id, spec->snap_name);
3297         if (count < 0)
3298                 return count;
3299         bufp += count;
3300
3301         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3302         if (count < 0)
3303                 return count;
3304         bufp += count;
3305
3306         return (ssize_t) (bufp - buf);
3307 }
3308
3309 static ssize_t rbd_image_refresh(struct device *dev,
3310                                  struct device_attribute *attr,
3311                                  const char *buf,
3312                                  size_t size)
3313 {
3314         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3315         int ret;
3316
3317         ret = rbd_dev_refresh(rbd_dev, NULL);
3318
3319         return ret < 0 ? ret : size;
3320 }
3321
3322 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3323 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3324 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3325 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3326 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3327 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3328 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3329 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3330 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3331 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3332 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3333
3334 static struct attribute *rbd_attrs[] = {
3335         &dev_attr_size.attr,
3336         &dev_attr_features.attr,
3337         &dev_attr_major.attr,
3338         &dev_attr_client_id.attr,
3339         &dev_attr_pool.attr,
3340         &dev_attr_pool_id.attr,
3341         &dev_attr_name.attr,
3342         &dev_attr_image_id.attr,
3343         &dev_attr_current_snap.attr,
3344         &dev_attr_parent.attr,
3345         &dev_attr_refresh.attr,
3346         NULL
3347 };
3348
3349 static struct attribute_group rbd_attr_group = {
3350         .attrs = rbd_attrs,
3351 };
3352
3353 static const struct attribute_group *rbd_attr_groups[] = {
3354         &rbd_attr_group,
3355         NULL
3356 };
3357
3358 static void rbd_sysfs_dev_release(struct device *dev)
3359 {
3360 }
3361
3362 static struct device_type rbd_device_type = {
3363         .name           = "rbd",
3364         .groups         = rbd_attr_groups,
3365         .release        = rbd_sysfs_dev_release,
3366 };
3367
3368 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3369 {
3370         kref_get(&spec->kref);
3371
3372         return spec;
3373 }
3374
3375 static void rbd_spec_free(struct kref *kref);
3376 static void rbd_spec_put(struct rbd_spec *spec)
3377 {
3378         if (spec)
3379                 kref_put(&spec->kref, rbd_spec_free);
3380 }
3381
3382 static struct rbd_spec *rbd_spec_alloc(void)
3383 {
3384         struct rbd_spec *spec;
3385
3386         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3387         if (!spec)
3388                 return NULL;
3389         kref_init(&spec->kref);
3390
3391         return spec;
3392 }
3393
3394 static void rbd_spec_free(struct kref *kref)
3395 {
3396         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3397
3398         kfree(spec->pool_name);
3399         kfree(spec->image_id);
3400         kfree(spec->image_name);
3401         kfree(spec->snap_name);
3402         kfree(spec);
3403 }
3404
3405 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3406                                 struct rbd_spec *spec)
3407 {
3408         struct rbd_device *rbd_dev;
3409
3410         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3411         if (!rbd_dev)
3412                 return NULL;
3413
3414         spin_lock_init(&rbd_dev->lock);
3415         rbd_dev->flags = 0;
3416         INIT_LIST_HEAD(&rbd_dev->node);
3417         INIT_LIST_HEAD(&rbd_dev->snaps);
3418         init_rwsem(&rbd_dev->header_rwsem);
3419
3420         rbd_dev->spec = spec;
3421         rbd_dev->rbd_client = rbdc;
3422
3423         /* Initialize the layout used for all rbd requests */
3424
3425         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3426         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3427         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3428         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3429
3430         return rbd_dev;
3431 }
3432
3433 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3434 {
3435         rbd_put_client(rbd_dev->rbd_client);
3436         rbd_spec_put(rbd_dev->spec);
3437         kfree(rbd_dev);
3438 }
3439
3440 static void rbd_snap_destroy(struct rbd_snap *snap)
3441 {
3442         kfree(snap->name);
3443         kfree(snap);
3444 }
3445
3446 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3447                                                 const char *snap_name,
3448                                                 u64 snap_id, u64 snap_size,
3449                                                 u64 snap_features)
3450 {
3451         struct rbd_snap *snap;
3452
3453         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3454         if (!snap)
3455                 return ERR_PTR(-ENOMEM);
3456
3457         snap->name = snap_name;
3458         snap->id = snap_id;
3459         snap->size = snap_size;
3460         snap->features = snap_features;
3461
3462         return snap;
3463 }
3464
3465 /*
3466  * Returns a dynamically-allocated snapshot name if successful, or a
3467  * pointer-coded error otherwise.
3468  */
3469 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3470                 u64 *snap_size, u64 *snap_features)
3471 {
3472         char *snap_name;
3473         int i;
3474
3475         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3476
3477         /* Skip over names until we find the one we are looking for */
3478
3479         snap_name = rbd_dev->header.snap_names;
3480         for (i = 0; i < which; i++)
3481                 snap_name += strlen(snap_name) + 1;
3482
3483         snap_name = kstrdup(snap_name, GFP_KERNEL);
3484         if (!snap_name)
3485                 return ERR_PTR(-ENOMEM);
3486
3487         *snap_size = rbd_dev->header.snap_sizes[which];
3488         *snap_features = 0;     /* No features for v1 */
3489
3490         return snap_name;
3491 }
3492
3493 /*
3494  * Get the size and object order for an image snapshot, or if
3495  * snap_id is CEPH_NOSNAP, gets this information for the base
3496  * image.
3497  */
3498 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3499                                 u8 *order, u64 *snap_size)
3500 {
3501         __le64 snapid = cpu_to_le64(snap_id);
3502         int ret;
3503         struct {
3504                 u8 order;
3505                 __le64 size;
3506         } __attribute__ ((packed)) size_buf = { 0 };
3507
3508         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3509                                 "rbd", "get_size",
3510                                 &snapid, sizeof (snapid),
3511                                 &size_buf, sizeof (size_buf), NULL);
3512         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3513         if (ret < 0)
3514                 return ret;
3515         if (ret < sizeof (size_buf))
3516                 return -ERANGE;
3517
3518         if (order)
3519                 *order = size_buf.order;
3520         *snap_size = le64_to_cpu(size_buf.size);
3521
3522         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3523                 (unsigned long long)snap_id, (unsigned int)*order,
3524                 (unsigned long long)*snap_size);
3525
3526         return 0;
3527 }
3528
3529 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3530 {
3531         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3532                                         &rbd_dev->header.obj_order,
3533                                         &rbd_dev->header.image_size);
3534 }
3535
3536 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3537 {
3538         void *reply_buf;
3539         int ret;
3540         void *p;
3541
3542         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3543         if (!reply_buf)
3544                 return -ENOMEM;
3545
3546         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3547                                 "rbd", "get_object_prefix", NULL, 0,
3548                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3549         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3550         if (ret < 0)
3551                 goto out;
3552
3553         p = reply_buf;
3554         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3555                                                 p + ret, NULL, GFP_NOIO);
3556         ret = 0;
3557
3558         if (IS_ERR(rbd_dev->header.object_prefix)) {
3559                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3560                 rbd_dev->header.object_prefix = NULL;
3561         } else {
3562                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3563         }
3564 out:
3565         kfree(reply_buf);
3566
3567         return ret;
3568 }
3569
3570 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3571                 u64 *snap_features)
3572 {
3573         __le64 snapid = cpu_to_le64(snap_id);
3574         struct {
3575                 __le64 features;
3576                 __le64 incompat;
3577         } __attribute__ ((packed)) features_buf = { 0 };
3578         u64 incompat;
3579         int ret;
3580
3581         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3582                                 "rbd", "get_features",
3583                                 &snapid, sizeof (snapid),
3584                                 &features_buf, sizeof (features_buf), NULL);
3585         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3586         if (ret < 0)
3587                 return ret;
3588         if (ret < sizeof (features_buf))
3589                 return -ERANGE;
3590
3591         incompat = le64_to_cpu(features_buf.incompat);
3592         if (incompat & ~RBD_FEATURES_SUPPORTED)
3593                 return -ENXIO;
3594
3595         *snap_features = le64_to_cpu(features_buf.features);
3596
3597         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3598                 (unsigned long long)snap_id,
3599                 (unsigned long long)*snap_features,
3600                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3601
3602         return 0;
3603 }
3604
3605 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3606 {
3607         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3608                                                 &rbd_dev->header.features);
3609 }
3610
3611 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3612 {
3613         struct rbd_spec *parent_spec;
3614         size_t size;
3615         void *reply_buf = NULL;
3616         __le64 snapid;
3617         void *p;
3618         void *end;
3619         char *image_id;
3620         u64 overlap;
3621         int ret;
3622
3623         parent_spec = rbd_spec_alloc();
3624         if (!parent_spec)
3625                 return -ENOMEM;
3626
3627         size = sizeof (__le64) +                                /* pool_id */
3628                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3629                 sizeof (__le64) +                               /* snap_id */
3630                 sizeof (__le64);                                /* overlap */
3631         reply_buf = kmalloc(size, GFP_KERNEL);
3632         if (!reply_buf) {
3633                 ret = -ENOMEM;
3634                 goto out_err;
3635         }
3636
3637         snapid = cpu_to_le64(CEPH_NOSNAP);
3638         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3639                                 "rbd", "get_parent",
3640                                 &snapid, sizeof (snapid),
3641                                 reply_buf, size, NULL);
3642         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3643         if (ret < 0)
3644                 goto out_err;
3645
3646         p = reply_buf;
3647         end = reply_buf + ret;
3648         ret = -ERANGE;
3649         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3650         if (parent_spec->pool_id == CEPH_NOPOOL)
3651                 goto out;       /* No parent?  No problem. */
3652
3653         /* The ceph file layout needs to fit pool id in 32 bits */
3654
3655         ret = -EIO;
3656         if (parent_spec->pool_id > (u64)U32_MAX) {
3657                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3658                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3659                 goto out_err;
3660         }
3661
3662         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3663         if (IS_ERR(image_id)) {
3664                 ret = PTR_ERR(image_id);
3665                 goto out_err;
3666         }
3667         parent_spec->image_id = image_id;
3668         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3669         ceph_decode_64_safe(&p, end, overlap, out_err);
3670
3671         rbd_dev->parent_overlap = overlap;
3672         rbd_dev->parent_spec = parent_spec;
3673         parent_spec = NULL;     /* rbd_dev now owns this */
3674 out:
3675         ret = 0;
3676 out_err:
3677         kfree(reply_buf);
3678         rbd_spec_put(parent_spec);
3679
3680         return ret;
3681 }
3682
3683 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3684 {
3685         struct {
3686                 __le64 stripe_unit;
3687                 __le64 stripe_count;
3688         } __attribute__ ((packed)) striping_info_buf = { 0 };
3689         size_t size = sizeof (striping_info_buf);
3690         void *p;
3691         u64 obj_size;
3692         u64 stripe_unit;
3693         u64 stripe_count;
3694         int ret;
3695
3696         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3697                                 "rbd", "get_stripe_unit_count", NULL, 0,
3698                                 (char *)&striping_info_buf, size, NULL);
3699         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3700         if (ret < 0)
3701                 return ret;
3702         if (ret < size)
3703                 return -ERANGE;
3704
3705         /*
3706          * We don't actually support the "fancy striping" feature
3707          * (STRIPINGV2) yet, but if the striping sizes are the
3708          * defaults the behavior is the same as before.  So find
3709          * out, and only fail if the image has non-default values.
3710          */
3711         ret = -EINVAL;
3712         obj_size = (u64)1 << rbd_dev->header.obj_order;
3713         p = &striping_info_buf;
3714         stripe_unit = ceph_decode_64(&p);
3715         if (stripe_unit != obj_size) {
3716                 rbd_warn(rbd_dev, "unsupported stripe unit "
3717                                 "(got %llu want %llu)",
3718                                 stripe_unit, obj_size);
3719                 return -EINVAL;
3720         }
3721         stripe_count = ceph_decode_64(&p);
3722         if (stripe_count != 1) {
3723                 rbd_warn(rbd_dev, "unsupported stripe count "
3724                                 "(got %llu want 1)", stripe_count);
3725                 return -EINVAL;
3726         }
3727         rbd_dev->header.stripe_unit = stripe_unit;
3728         rbd_dev->header.stripe_count = stripe_count;
3729
3730         return 0;
3731 }
3732
3733 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3734 {
3735         size_t image_id_size;
3736         char *image_id;
3737         void *p;
3738         void *end;
3739         size_t size;
3740         void *reply_buf = NULL;
3741         size_t len = 0;
3742         char *image_name = NULL;
3743         int ret;
3744
3745         rbd_assert(!rbd_dev->spec->image_name);
3746
3747         len = strlen(rbd_dev->spec->image_id);
3748         image_id_size = sizeof (__le32) + len;
3749         image_id = kmalloc(image_id_size, GFP_KERNEL);
3750         if (!image_id)
3751                 return NULL;
3752
3753         p = image_id;
3754         end = image_id + image_id_size;
3755         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3756
3757         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3758         reply_buf = kmalloc(size, GFP_KERNEL);
3759         if (!reply_buf)
3760                 goto out;
3761
3762         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3763                                 "rbd", "dir_get_name",
3764                                 image_id, image_id_size,
3765                                 reply_buf, size, NULL);
3766         if (ret < 0)
3767                 goto out;
3768         p = reply_buf;
3769         end = reply_buf + ret;
3770
3771         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3772         if (IS_ERR(image_name))
3773                 image_name = NULL;
3774         else
3775                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3776 out:
3777         kfree(reply_buf);
3778         kfree(image_id);
3779
3780         return image_name;
3781 }
3782
3783 /*
3784  * When an rbd image has a parent image, it is identified by the
3785  * pool, image, and snapshot ids (not names).  This function fills
3786  * in the names for those ids.  (It's OK if we can't figure out the
3787  * name for an image id, but the pool and snapshot ids should always
3788  * exist and have names.)  All names in an rbd spec are dynamically
3789  * allocated.
3790  *
3791  * When an image being mapped (not a parent) is probed, we have the
3792  * pool name and pool id, image name and image id, and the snapshot
3793  * name.  The only thing we're missing is the snapshot id.
3794  *
3795  * The set of snapshots for an image is not known until they have
3796  * been read by rbd_dev_snaps_update(), so we can't completely fill
3797  * in this information until after that has been called.
3798  */
3799 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3800 {
3801         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802         struct rbd_spec *spec = rbd_dev->spec;
3803         const char *pool_name;
3804         const char *image_name;
3805         const char *snap_name;
3806         int ret;
3807
3808         /*
3809          * An image being mapped will have the pool name (etc.), but
3810          * we need to look up the snapshot id.
3811          */
3812         if (spec->pool_name) {
3813                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3814                         struct rbd_snap *snap;
3815
3816                         snap = snap_by_name(rbd_dev, spec->snap_name);
3817                         if (!snap)
3818                                 return -ENOENT;
3819                         spec->snap_id = snap->id;
3820                 } else {
3821                         spec->snap_id = CEPH_NOSNAP;
3822                 }
3823
3824                 return 0;
3825         }
3826
3827         /* Get the pool name; we have to make our own copy of this */
3828
3829         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3830         if (!pool_name) {
3831                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3832                 return -EIO;
3833         }
3834         pool_name = kstrdup(pool_name, GFP_KERNEL);
3835         if (!pool_name)
3836                 return -ENOMEM;
3837
3838         /* Fetch the image name; tolerate failure here */
3839
3840         image_name = rbd_dev_image_name(rbd_dev);
3841         if (!image_name)
3842                 rbd_warn(rbd_dev, "unable to get image name");
3843
3844         /* Look up the snapshot name, and make a copy */
3845
3846         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3847         if (!snap_name) {
3848                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3849                 ret = -EIO;
3850                 goto out_err;
3851         }
3852         snap_name = kstrdup(snap_name, GFP_KERNEL);
3853         if (!snap_name) {
3854                 ret = -ENOMEM;
3855                 goto out_err;
3856         }
3857
3858         spec->pool_name = pool_name;
3859         spec->image_name = image_name;
3860         spec->snap_name = snap_name;
3861
3862         return 0;
3863 out_err:
3864         kfree(image_name);
3865         kfree(pool_name);
3866
3867         return ret;
3868 }
3869
3870 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3871 {
3872         size_t size;
3873         int ret;
3874         void *reply_buf;
3875         void *p;
3876         void *end;
3877         u64 seq;
3878         u32 snap_count;
3879         struct ceph_snap_context *snapc;
3880         u32 i;
3881
3882         /*
3883          * We'll need room for the seq value (maximum snapshot id),
3884          * snapshot count, and array of that many snapshot ids.
3885          * For now we have a fixed upper limit on the number we're
3886          * prepared to receive.
3887          */
3888         size = sizeof (__le64) + sizeof (__le32) +
3889                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3890         reply_buf = kzalloc(size, GFP_KERNEL);
3891         if (!reply_buf)
3892                 return -ENOMEM;
3893
3894         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3895                                 "rbd", "get_snapcontext", NULL, 0,
3896                                 reply_buf, size, ver);
3897         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3898         if (ret < 0)
3899                 goto out;
3900
3901         p = reply_buf;
3902         end = reply_buf + ret;
3903         ret = -ERANGE;
3904         ceph_decode_64_safe(&p, end, seq, out);
3905         ceph_decode_32_safe(&p, end, snap_count, out);
3906
3907         /*
3908          * Make sure the reported number of snapshot ids wouldn't go
3909          * beyond the end of our buffer.  But before checking that,
3910          * make sure the computed size of the snapshot context we
3911          * allocate is representable in a size_t.
3912          */
3913         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3914                                  / sizeof (u64)) {
3915                 ret = -EINVAL;
3916                 goto out;
3917         }
3918         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3919                 goto out;
3920         ret = 0;
3921
3922         snapc = rbd_snap_context_create(snap_count);
3923         if (!snapc) {
3924                 ret = -ENOMEM;
3925                 goto out;
3926         }
3927         snapc->seq = seq;
3928         for (i = 0; i < snap_count; i++)
3929                 snapc->snaps[i] = ceph_decode_64(&p);
3930
3931         rbd_dev->header.snapc = snapc;
3932
3933         dout("  snap context seq = %llu, snap_count = %u\n",
3934                 (unsigned long long)seq, (unsigned int)snap_count);
3935 out:
3936         kfree(reply_buf);
3937
3938         return ret;
3939 }
3940
3941 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3942 {
3943         size_t size;
3944         void *reply_buf;
3945         __le64 snap_id;
3946         int ret;
3947         void *p;
3948         void *end;
3949         char *snap_name;
3950
3951         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3952         reply_buf = kmalloc(size, GFP_KERNEL);
3953         if (!reply_buf)
3954                 return ERR_PTR(-ENOMEM);
3955
3956         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3957         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3958         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3959                                 "rbd", "get_snapshot_name",
3960                                 &snap_id, sizeof (snap_id),
3961                                 reply_buf, size, NULL);
3962         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3963         if (ret < 0) {
3964                 snap_name = ERR_PTR(ret);
3965                 goto out;
3966         }
3967
3968         p = reply_buf;
3969         end = reply_buf + ret;
3970         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3971         if (IS_ERR(snap_name))
3972                 goto out;
3973
3974         dout("  snap_id 0x%016llx snap_name = %s\n",
3975                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3976 out:
3977         kfree(reply_buf);
3978
3979         return snap_name;
3980 }
3981
3982 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3983                 u64 *snap_size, u64 *snap_features)
3984 {
3985         u64 snap_id;
3986         u64 size;
3987         u64 features;
3988         char *snap_name;
3989         int ret;
3990
3991         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3992         snap_id = rbd_dev->header.snapc->snaps[which];
3993         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3994         if (ret)
3995                 goto out_err;
3996
3997         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3998         if (ret)
3999                 goto out_err;
4000
4001         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
4002         if (!IS_ERR(snap_name)) {
4003                 *snap_size = size;
4004                 *snap_features = features;
4005         }
4006
4007         return snap_name;
4008 out_err:
4009         return ERR_PTR(ret);
4010 }
4011
4012 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4013                 u64 *snap_size, u64 *snap_features)
4014 {
4015         if (rbd_dev->image_format == 1)
4016                 return rbd_dev_v1_snap_info(rbd_dev, which,
4017                                         snap_size, snap_features);
4018         if (rbd_dev->image_format == 2)
4019                 return rbd_dev_v2_snap_info(rbd_dev, which,
4020                                         snap_size, snap_features);
4021         return ERR_PTR(-EINVAL);
4022 }
4023
4024 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4025 {
4026         int ret;
4027         __u8 obj_order;
4028
4029         down_write(&rbd_dev->header_rwsem);
4030
4031         /* Grab old order first, to see if it changes */
4032
4033         obj_order = rbd_dev->header.obj_order,
4034         ret = rbd_dev_v2_image_size(rbd_dev);
4035         if (ret)
4036                 goto out;
4037         if (rbd_dev->header.obj_order != obj_order) {
4038                 ret = -EIO;
4039                 goto out;
4040         }
4041         rbd_update_mapping_size(rbd_dev);
4042
4043         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4044         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4045         if (ret)
4046                 goto out;
4047         ret = rbd_dev_snaps_update(rbd_dev);
4048         dout("rbd_dev_snaps_update returned %d\n", ret);
4049         if (ret)
4050                 goto out;
4051 out:
4052         up_write(&rbd_dev->header_rwsem);
4053
4054         return ret;
4055 }
4056
4057 /*
4058  * Scan the rbd device's current snapshot list and compare it to the
4059  * newly-received snapshot context.  Remove any existing snapshots
4060  * not present in the new snapshot context.  Add a new snapshot for
4061  * any snaphots in the snapshot context not in the current list.
4062  * And verify there are no changes to snapshots we already know
4063  * about.
4064  *
4065  * Assumes the snapshots in the snapshot context are sorted by
4066  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4067  * are also maintained in that order.)
4068  *
4069  * Note that any error occurs while updating the snapshot list
4070  * aborts the update, and the entire list is cleared.  The snapshot
4071  * list becomes inconsistent at that point anyway, so it might as
4072  * well be empty.
4073  */
4074 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4075 {
4076         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4077         const u32 snap_count = snapc->num_snaps;
4078         struct list_head *head = &rbd_dev->snaps;
4079         struct list_head *links = head->next;
4080         u32 index = 0;
4081         int ret = 0;
4082
4083         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4084         while (index < snap_count || links != head) {
4085                 u64 snap_id;
4086                 struct rbd_snap *snap;
4087                 char *snap_name;
4088                 u64 snap_size = 0;
4089                 u64 snap_features = 0;
4090
4091                 snap_id = index < snap_count ? snapc->snaps[index]
4092                                              : CEPH_NOSNAP;
4093                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4094                                      : NULL;
4095                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4096
4097                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4098                         struct list_head *next = links->next;
4099
4100                         /*
4101                          * A previously-existing snapshot is not in
4102                          * the new snap context.
4103                          *
4104                          * If the now-missing snapshot is the one
4105                          * the image represents, clear its existence
4106                          * flag so we can avoid sending any more
4107                          * requests to it.
4108                          */
4109                         if (rbd_dev->spec->snap_id == snap->id)
4110                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4111                         dout("removing %ssnap id %llu\n",
4112                                 rbd_dev->spec->snap_id == snap->id ?
4113                                                         "mapped " : "",
4114                                 (unsigned long long)snap->id);
4115
4116                         list_del(&snap->node);
4117                         rbd_snap_destroy(snap);
4118
4119                         /* Done with this list entry; advance */
4120
4121                         links = next;
4122                         continue;
4123                 }
4124
4125                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4126                                         &snap_size, &snap_features);
4127                 if (IS_ERR(snap_name)) {
4128                         ret = PTR_ERR(snap_name);
4129                         dout("failed to get snap info, error %d\n", ret);
4130                         goto out_err;
4131                 }
4132
4133                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4134                         (unsigned long long)snap_id);
4135                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4136                         struct rbd_snap *new_snap;
4137
4138                         /* We haven't seen this snapshot before */
4139
4140                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4141                                         snap_id, snap_size, snap_features);
4142                         if (IS_ERR(new_snap)) {
4143                                 ret = PTR_ERR(new_snap);
4144                                 dout("  failed to add dev, error %d\n", ret);
4145                                 goto out_err;
4146                         }
4147
4148                         /* New goes before existing, or at end of list */
4149
4150                         dout("  added dev%s\n", snap ? "" : " at end\n");
4151                         if (snap)
4152                                 list_add_tail(&new_snap->node, &snap->node);
4153                         else
4154                                 list_add_tail(&new_snap->node, head);
4155                 } else {
4156                         /* Already have this one */
4157
4158                         dout("  already present\n");
4159
4160                         rbd_assert(snap->size == snap_size);
4161                         rbd_assert(!strcmp(snap->name, snap_name));
4162                         rbd_assert(snap->features == snap_features);
4163
4164                         /* Done with this list entry; advance */
4165
4166                         links = links->next;
4167                 }
4168
4169                 /* Advance to the next entry in the snapshot context */
4170
4171                 index++;
4172         }
4173         dout("%s: done\n", __func__);
4174
4175         return 0;
4176 out_err:
4177         rbd_remove_all_snaps(rbd_dev);
4178
4179         return ret;
4180 }
4181
4182 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4183 {
4184         struct device *dev;
4185         int ret;
4186
4187         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4188
4189         dev = &rbd_dev->dev;
4190         dev->bus = &rbd_bus_type;
4191         dev->type = &rbd_device_type;
4192         dev->parent = &rbd_root_dev;
4193         dev->release = rbd_dev_release;
4194         dev_set_name(dev, "%d", rbd_dev->dev_id);
4195         ret = device_register(dev);
4196
4197         mutex_unlock(&ctl_mutex);
4198
4199         return ret;
4200 }
4201
4202 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4203 {
4204         device_unregister(&rbd_dev->dev);
4205 }
4206
4207 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4208
4209 /*
4210  * Get a unique rbd identifier for the given new rbd_dev, and add
4211  * the rbd_dev to the global list.  The minimum rbd id is 1.
4212  */
4213 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4214 {
4215         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4216
4217         spin_lock(&rbd_dev_list_lock);
4218         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4219         spin_unlock(&rbd_dev_list_lock);
4220         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4221                 (unsigned long long) rbd_dev->dev_id);
4222 }
4223
4224 /*
4225  * Remove an rbd_dev from the global list, and record that its
4226  * identifier is no longer in use.
4227  */
4228 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4229 {
4230         struct list_head *tmp;
4231         int rbd_id = rbd_dev->dev_id;
4232         int max_id;
4233
4234         rbd_assert(rbd_id > 0);
4235
4236         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4237                 (unsigned long long) rbd_dev->dev_id);
4238         spin_lock(&rbd_dev_list_lock);
4239         list_del_init(&rbd_dev->node);
4240
4241         /*
4242          * If the id being "put" is not the current maximum, there
4243          * is nothing special we need to do.
4244          */
4245         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4246                 spin_unlock(&rbd_dev_list_lock);
4247                 return;
4248         }
4249
4250         /*
4251          * We need to update the current maximum id.  Search the
4252          * list to find out what it is.  We're more likely to find
4253          * the maximum at the end, so search the list backward.
4254          */
4255         max_id = 0;
4256         list_for_each_prev(tmp, &rbd_dev_list) {
4257                 struct rbd_device *rbd_dev;
4258
4259                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4260                 if (rbd_dev->dev_id > max_id)
4261                         max_id = rbd_dev->dev_id;
4262         }
4263         spin_unlock(&rbd_dev_list_lock);
4264
4265         /*
4266          * The max id could have been updated by rbd_dev_id_get(), in
4267          * which case it now accurately reflects the new maximum.
4268          * Be careful not to overwrite the maximum value in that
4269          * case.
4270          */
4271         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4272         dout("  max dev id has been reset\n");
4273 }
4274
4275 /*
4276  * Skips over white space at *buf, and updates *buf to point to the
4277  * first found non-space character (if any). Returns the length of
4278  * the token (string of non-white space characters) found.  Note
4279  * that *buf must be terminated with '\0'.
4280  */
4281 static inline size_t next_token(const char **buf)
4282 {
4283         /*
4284         * These are the characters that produce nonzero for
4285         * isspace() in the "C" and "POSIX" locales.
4286         */
4287         const char *spaces = " \f\n\r\t\v";
4288
4289         *buf += strspn(*buf, spaces);   /* Find start of token */
4290
4291         return strcspn(*buf, spaces);   /* Return token length */
4292 }
4293
4294 /*
4295  * Finds the next token in *buf, and if the provided token buffer is
4296  * big enough, copies the found token into it.  The result, if
4297  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4298  * must be terminated with '\0' on entry.
4299  *
4300  * Returns the length of the token found (not including the '\0').
4301  * Return value will be 0 if no token is found, and it will be >=
4302  * token_size if the token would not fit.
4303  *
4304  * The *buf pointer will be updated to point beyond the end of the
4305  * found token.  Note that this occurs even if the token buffer is
4306  * too small to hold it.
4307  */
4308 static inline size_t copy_token(const char **buf,
4309                                 char *token,
4310                                 size_t token_size)
4311 {
4312         size_t len;
4313
4314         len = next_token(buf);
4315         if (len < token_size) {
4316                 memcpy(token, *buf, len);
4317                 *(token + len) = '\0';
4318         }
4319         *buf += len;
4320
4321         return len;
4322 }
4323
4324 /*
4325  * Finds the next token in *buf, dynamically allocates a buffer big
4326  * enough to hold a copy of it, and copies the token into the new
4327  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4328  * that a duplicate buffer is created even for a zero-length token.
4329  *
4330  * Returns a pointer to the newly-allocated duplicate, or a null
4331  * pointer if memory for the duplicate was not available.  If
4332  * the lenp argument is a non-null pointer, the length of the token
4333  * (not including the '\0') is returned in *lenp.
4334  *
4335  * If successful, the *buf pointer will be updated to point beyond
4336  * the end of the found token.
4337  *
4338  * Note: uses GFP_KERNEL for allocation.
4339  */
4340 static inline char *dup_token(const char **buf, size_t *lenp)
4341 {
4342         char *dup;
4343         size_t len;
4344
4345         len = next_token(buf);
4346         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4347         if (!dup)
4348                 return NULL;
4349         *(dup + len) = '\0';
4350         *buf += len;
4351
4352         if (lenp)
4353                 *lenp = len;
4354
4355         return dup;
4356 }
4357
4358 /*
4359  * Parse the options provided for an "rbd add" (i.e., rbd image
4360  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4361  * and the data written is passed here via a NUL-terminated buffer.
4362  * Returns 0 if successful or an error code otherwise.
4363  *
4364  * The information extracted from these options is recorded in
4365  * the other parameters which return dynamically-allocated
4366  * structures:
4367  *  ceph_opts
4368  *      The address of a pointer that will refer to a ceph options
4369  *      structure.  Caller must release the returned pointer using
4370  *      ceph_destroy_options() when it is no longer needed.
4371  *  rbd_opts
4372  *      Address of an rbd options pointer.  Fully initialized by
4373  *      this function; caller must release with kfree().
4374  *  spec
4375  *      Address of an rbd image specification pointer.  Fully
4376  *      initialized by this function based on parsed options.
4377  *      Caller must release with rbd_spec_put().
4378  *
4379  * The options passed take this form:
4380  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4381  * where:
4382  *  <mon_addrs>
4383  *      A comma-separated list of one or more monitor addresses.
4384  *      A monitor address is an ip address, optionally followed
4385  *      by a port number (separated by a colon).
4386  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4387  *  <options>
4388  *      A comma-separated list of ceph and/or rbd options.
4389  *  <pool_name>
4390  *      The name of the rados pool containing the rbd image.
4391  *  <image_name>
4392  *      The name of the image in that pool to map.
4393  *  <snap_id>
4394  *      An optional snapshot id.  If provided, the mapping will
4395  *      present data from the image at the time that snapshot was
4396  *      created.  The image head is used if no snapshot id is
4397  *      provided.  Snapshot mappings are always read-only.
4398  */
4399 static int rbd_add_parse_args(const char *buf,
4400                                 struct ceph_options **ceph_opts,
4401                                 struct rbd_options **opts,
4402                                 struct rbd_spec **rbd_spec)
4403 {
4404         size_t len;
4405         char *options;
4406         const char *mon_addrs;
4407         char *snap_name;
4408         size_t mon_addrs_size;
4409         struct rbd_spec *spec = NULL;
4410         struct rbd_options *rbd_opts = NULL;
4411         struct ceph_options *copts;
4412         int ret;
4413
4414         /* The first four tokens are required */
4415
4416         len = next_token(&buf);
4417         if (!len) {
4418                 rbd_warn(NULL, "no monitor address(es) provided");
4419                 return -EINVAL;
4420         }
4421         mon_addrs = buf;
4422         mon_addrs_size = len + 1;
4423         buf += len;
4424
4425         ret = -EINVAL;
4426         options = dup_token(&buf, NULL);
4427         if (!options)
4428                 return -ENOMEM;
4429         if (!*options) {
4430                 rbd_warn(NULL, "no options provided");
4431                 goto out_err;
4432         }
4433
4434         spec = rbd_spec_alloc();
4435         if (!spec)
4436                 goto out_mem;
4437
4438         spec->pool_name = dup_token(&buf, NULL);
4439         if (!spec->pool_name)
4440                 goto out_mem;
4441         if (!*spec->pool_name) {
4442                 rbd_warn(NULL, "no pool name provided");
4443                 goto out_err;
4444         }
4445
4446         spec->image_name = dup_token(&buf, NULL);
4447         if (!spec->image_name)
4448                 goto out_mem;
4449         if (!*spec->image_name) {
4450                 rbd_warn(NULL, "no image name provided");
4451                 goto out_err;
4452         }
4453
4454         /*
4455          * Snapshot name is optional; default is to use "-"
4456          * (indicating the head/no snapshot).
4457          */
4458         len = next_token(&buf);
4459         if (!len) {
4460                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4461                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4462         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4463                 ret = -ENAMETOOLONG;
4464                 goto out_err;
4465         }
4466         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4467         if (!snap_name)
4468                 goto out_mem;
4469         *(snap_name + len) = '\0';
4470         spec->snap_name = snap_name;
4471
4472         /* Initialize all rbd options to the defaults */
4473
4474         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4475         if (!rbd_opts)
4476                 goto out_mem;
4477
4478         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4479
4480         copts = ceph_parse_options(options, mon_addrs,
4481                                         mon_addrs + mon_addrs_size - 1,
4482                                         parse_rbd_opts_token, rbd_opts);
4483         if (IS_ERR(copts)) {
4484                 ret = PTR_ERR(copts);
4485                 goto out_err;
4486         }
4487         kfree(options);
4488
4489         *ceph_opts = copts;
4490         *opts = rbd_opts;
4491         *rbd_spec = spec;
4492
4493         return 0;
4494 out_mem:
4495         ret = -ENOMEM;
4496 out_err:
4497         kfree(rbd_opts);
4498         rbd_spec_put(spec);
4499         kfree(options);
4500
4501         return ret;
4502 }
4503
4504 /*
4505  * An rbd format 2 image has a unique identifier, distinct from the
4506  * name given to it by the user.  Internally, that identifier is
4507  * what's used to specify the names of objects related to the image.
4508  *
4509  * A special "rbd id" object is used to map an rbd image name to its
4510  * id.  If that object doesn't exist, then there is no v2 rbd image
4511  * with the supplied name.
4512  *
4513  * This function will record the given rbd_dev's image_id field if
4514  * it can be determined, and in that case will return 0.  If any
4515  * errors occur a negative errno will be returned and the rbd_dev's
4516  * image_id field will be unchanged (and should be NULL).
4517  */
4518 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4519 {
4520         int ret;
4521         size_t size;
4522         char *object_name;
4523         void *response;
4524         char *image_id;
4525
4526         /*
4527          * When probing a parent image, the image id is already
4528          * known (and the image name likely is not).  There's no
4529          * need to fetch the image id again in this case.  We
4530          * do still need to set the image format though.
4531          */
4532         if (rbd_dev->spec->image_id) {
4533                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4534
4535                 return 0;
4536         }
4537
4538         /*
4539          * First, see if the format 2 image id file exists, and if
4540          * so, get the image's persistent id from it.
4541          */
4542         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4543         object_name = kmalloc(size, GFP_NOIO);
4544         if (!object_name)
4545                 return -ENOMEM;
4546         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4547         dout("rbd id object name is %s\n", object_name);
4548
4549         /* Response will be an encoded string, which includes a length */
4550
4551         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4552         response = kzalloc(size, GFP_NOIO);
4553         if (!response) {
4554                 ret = -ENOMEM;
4555                 goto out;
4556         }
4557
4558         /* If it doesn't exist we'll assume it's a format 1 image */
4559
4560         ret = rbd_obj_method_sync(rbd_dev, object_name,
4561                                 "rbd", "get_id", NULL, 0,
4562                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4563         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4564         if (ret == -ENOENT) {
4565                 image_id = kstrdup("", GFP_KERNEL);
4566                 ret = image_id ? 0 : -ENOMEM;
4567                 if (!ret)
4568                         rbd_dev->image_format = 1;
4569         } else if (ret > sizeof (__le32)) {
4570                 void *p = response;
4571
4572                 image_id = ceph_extract_encoded_string(&p, p + ret,
4573                                                 NULL, GFP_NOIO);
4574                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4575                 if (!ret)
4576                         rbd_dev->image_format = 2;
4577         } else {
4578                 ret = -EINVAL;
4579         }
4580
4581         if (!ret) {
4582                 rbd_dev->spec->image_id = image_id;
4583                 dout("image_id is %s\n", image_id);
4584         }
4585 out:
4586         kfree(response);
4587         kfree(object_name);
4588
4589         return ret;
4590 }
4591
4592 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4593 {
4594         int ret;
4595         size_t size;
4596
4597         /* Record the header object name for this rbd image. */
4598
4599         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4600         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4601         if (!rbd_dev->header_name) {
4602                 ret = -ENOMEM;
4603                 goto out_err;
4604         }
4605         sprintf(rbd_dev->header_name, "%s%s",
4606                 rbd_dev->spec->image_name, RBD_SUFFIX);
4607
4608         /* Populate rbd image metadata */
4609
4610         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4611         if (ret < 0)
4612                 goto out_err;
4613
4614         /* Version 1 images have no parent (no layering) */
4615
4616         rbd_dev->parent_spec = NULL;
4617         rbd_dev->parent_overlap = 0;
4618
4619         dout("discovered version 1 image, header name is %s\n",
4620                 rbd_dev->header_name);
4621
4622         return 0;
4623
4624 out_err:
4625         kfree(rbd_dev->header_name);
4626         rbd_dev->header_name = NULL;
4627         kfree(rbd_dev->spec->image_id);
4628         rbd_dev->spec->image_id = NULL;
4629
4630         return ret;
4631 }
4632
4633 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4634 {
4635         size_t size;
4636         int ret;
4637         u64 ver = 0;
4638
4639         /*
4640          * Image id was filled in by the caller.  Record the header
4641          * object name for this rbd image.
4642          */
4643         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4644         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4645         if (!rbd_dev->header_name)
4646                 return -ENOMEM;
4647         sprintf(rbd_dev->header_name, "%s%s",
4648                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4649
4650         /* Get the size and object order for the image */
4651         ret = rbd_dev_v2_image_size(rbd_dev);
4652         if (ret)
4653                 goto out_err;
4654
4655         /* Get the object prefix (a.k.a. block_name) for the image */
4656
4657         ret = rbd_dev_v2_object_prefix(rbd_dev);
4658         if (ret)
4659                 goto out_err;
4660
4661         /* Get the and check features for the image */
4662
4663         ret = rbd_dev_v2_features(rbd_dev);
4664         if (ret)
4665                 goto out_err;
4666
4667         /* If the image supports layering, get the parent info */
4668
4669         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4670                 ret = rbd_dev_v2_parent_info(rbd_dev);
4671                 if (ret)
4672                         goto out_err;
4673                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4674                                         "layered rbd images is EXPERIMENTAL!");
4675         }
4676
4677         /* If the image supports fancy striping, get its parameters */
4678
4679         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4680                 ret = rbd_dev_v2_striping_info(rbd_dev);
4681                 if (ret < 0)
4682                         goto out_err;
4683         }
4684
4685         /* crypto and compression type aren't (yet) supported for v2 images */
4686
4687         rbd_dev->header.crypt_type = 0;
4688         rbd_dev->header.comp_type = 0;
4689
4690         /* Get the snapshot context, plus the header version */
4691
4692         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4693         if (ret)
4694                 goto out_err;
4695         rbd_dev->header.obj_version = ver;
4696
4697         dout("discovered version 2 image, header name is %s\n",
4698                 rbd_dev->header_name);
4699
4700         return 0;
4701 out_err:
4702         rbd_dev->parent_overlap = 0;
4703         rbd_spec_put(rbd_dev->parent_spec);
4704         rbd_dev->parent_spec = NULL;
4705         kfree(rbd_dev->header_name);
4706         rbd_dev->header_name = NULL;
4707         kfree(rbd_dev->header.object_prefix);
4708         rbd_dev->header.object_prefix = NULL;
4709
4710         return ret;
4711 }
4712
4713 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4714 {
4715         struct rbd_device *parent = NULL;
4716         struct rbd_spec *parent_spec;
4717         struct rbd_client *rbdc;
4718         int ret;
4719
4720         if (!rbd_dev->parent_spec)
4721                 return 0;
4722         /*
4723          * We need to pass a reference to the client and the parent
4724          * spec when creating the parent rbd_dev.  Images related by
4725          * parent/child relationships always share both.
4726          */
4727         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4728         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4729
4730         ret = -ENOMEM;
4731         parent = rbd_dev_create(rbdc, parent_spec);
4732         if (!parent)
4733                 goto out_err;
4734
4735         ret = rbd_dev_image_probe(parent);
4736         if (ret < 0)
4737                 goto out_err;
4738         rbd_dev->parent = parent;
4739
4740         return 0;
4741 out_err:
4742         if (parent) {
4743                 rbd_spec_put(rbd_dev->parent_spec);
4744                 kfree(rbd_dev->header_name);
4745                 rbd_dev_destroy(parent);
4746         } else {
4747                 rbd_put_client(rbdc);
4748                 rbd_spec_put(parent_spec);
4749         }
4750
4751         return ret;
4752 }
4753
4754 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4755 {
4756         int ret;
4757
4758         /* no need to lock here, as rbd_dev is not registered yet */
4759         ret = rbd_dev_snaps_update(rbd_dev);
4760         if (ret)
4761                 return ret;
4762
4763         ret = rbd_dev_spec_update(rbd_dev);
4764         if (ret)
4765                 goto err_out_snaps;
4766
4767         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4768         if (ret)
4769                 goto err_out_snaps;
4770
4771         ret = rbd_dev_mapping_set(rbd_dev);
4772         if (ret)
4773                 goto err_out_snaps;
4774
4775         /* generate unique id: find highest unique id, add one */
4776         rbd_dev_id_get(rbd_dev);
4777
4778         /* Fill in the device name, now that we have its id. */
4779         BUILD_BUG_ON(DEV_NAME_LEN
4780                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4781         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4782
4783         /* Get our block major device number. */
4784
4785         ret = register_blkdev(0, rbd_dev->name);
4786         if (ret < 0)
4787                 goto err_out_id;
4788         rbd_dev->major = ret;
4789
4790         /* Set up the blkdev mapping. */
4791
4792         ret = rbd_init_disk(rbd_dev);
4793         if (ret)
4794                 goto err_out_blkdev;
4795
4796         ret = rbd_bus_add_dev(rbd_dev);
4797         if (ret)
4798                 goto err_out_disk;
4799
4800         ret = rbd_dev_probe_parent(rbd_dev);
4801         if (ret)
4802                 goto err_out_bus;
4803
4804         /* Everything's ready.  Announce the disk to the world. */
4805
4806         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4807         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4808         add_disk(rbd_dev->disk);
4809
4810         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4811                 (unsigned long long) rbd_dev->mapping.size);
4812
4813         return ret;
4814
4815 err_out_bus:
4816         /* this will also clean up rest of rbd_dev stuff */
4817
4818         rbd_bus_del_dev(rbd_dev);
4819
4820         return ret;
4821 err_out_disk:
4822         rbd_free_disk(rbd_dev);
4823 err_out_blkdev:
4824         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4825 err_out_id:
4826         rbd_dev_id_put(rbd_dev);
4827         rbd_dev_mapping_clear(rbd_dev);
4828 err_out_snaps:
4829         rbd_remove_all_snaps(rbd_dev);
4830
4831         return ret;
4832 }
4833
4834 /*
4835  * Probe for the existence of the header object for the given rbd
4836  * device.  For format 2 images this includes determining the image
4837  * id.
4838  */
4839 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4840 {
4841         int ret;
4842
4843         /*
4844          * Get the id from the image id object.  If it's not a
4845          * format 2 image, we'll get ENOENT back, and we'll assume
4846          * it's a format 1 image.
4847          */
4848         ret = rbd_dev_image_id(rbd_dev);
4849         if (ret)
4850                 return ret;
4851         rbd_assert(rbd_dev->spec->image_id);
4852         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4853
4854         if (rbd_dev->image_format == 1)
4855                 ret = rbd_dev_v1_probe(rbd_dev);
4856         else
4857                 ret = rbd_dev_v2_probe(rbd_dev);
4858         if (ret)
4859                 goto out_err;
4860
4861         ret = rbd_dev_probe_finish(rbd_dev);
4862         if (ret)
4863                 rbd_header_free(&rbd_dev->header);
4864
4865         return ret;
4866 out_err:
4867         kfree(rbd_dev->spec->image_id);
4868         rbd_dev->spec->image_id = NULL;
4869
4870         dout("probe failed, returning %d\n", ret);
4871
4872         return ret;
4873 }
4874
4875 static ssize_t rbd_add(struct bus_type *bus,
4876                        const char *buf,
4877                        size_t count)
4878 {
4879         struct rbd_device *rbd_dev = NULL;
4880         struct ceph_options *ceph_opts = NULL;
4881         struct rbd_options *rbd_opts = NULL;
4882         struct rbd_spec *spec = NULL;
4883         struct rbd_client *rbdc;
4884         struct ceph_osd_client *osdc;
4885         int rc = -ENOMEM;
4886
4887         if (!try_module_get(THIS_MODULE))
4888                 return -ENODEV;
4889
4890         /* parse add command */
4891         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4892         if (rc < 0)
4893                 goto err_out_module;
4894
4895         rbdc = rbd_get_client(ceph_opts);
4896         if (IS_ERR(rbdc)) {
4897                 rc = PTR_ERR(rbdc);
4898                 goto err_out_args;
4899         }
4900         ceph_opts = NULL;       /* rbd_dev client now owns this */
4901
4902         /* pick the pool */
4903         osdc = &rbdc->client->osdc;
4904         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4905         if (rc < 0)
4906                 goto err_out_client;
4907         spec->pool_id = (u64)rc;
4908
4909         /* The ceph file layout needs to fit pool id in 32 bits */
4910
4911         if (spec->pool_id > (u64)U32_MAX) {
4912                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4913                                 (unsigned long long)spec->pool_id, U32_MAX);
4914                 rc = -EIO;
4915                 goto err_out_client;
4916         }
4917
4918         rbd_dev = rbd_dev_create(rbdc, spec);
4919         if (!rbd_dev)
4920                 goto err_out_client;
4921         rbdc = NULL;            /* rbd_dev now owns this */
4922         spec = NULL;            /* rbd_dev now owns this */
4923
4924         rbd_dev->mapping.read_only = rbd_opts->read_only;
4925         kfree(rbd_opts);
4926         rbd_opts = NULL;        /* done with this */
4927
4928         rc = rbd_dev_image_probe(rbd_dev);
4929         if (rc < 0)
4930                 goto err_out_rbd_dev;
4931
4932         return count;
4933 err_out_rbd_dev:
4934         rbd_spec_put(rbd_dev->parent_spec);
4935         kfree(rbd_dev->header_name);
4936         rbd_dev_destroy(rbd_dev);
4937 err_out_client:
4938         rbd_put_client(rbdc);
4939 err_out_args:
4940         if (ceph_opts)
4941                 ceph_destroy_options(ceph_opts);
4942         kfree(rbd_opts);
4943         rbd_spec_put(spec);
4944 err_out_module:
4945         module_put(THIS_MODULE);
4946
4947         dout("Error adding device %s\n", buf);
4948
4949         return (ssize_t)rc;
4950 }
4951
4952 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4953 {
4954         struct list_head *tmp;
4955         struct rbd_device *rbd_dev;
4956
4957         spin_lock(&rbd_dev_list_lock);
4958         list_for_each(tmp, &rbd_dev_list) {
4959                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4960                 if (rbd_dev->dev_id == dev_id) {
4961                         spin_unlock(&rbd_dev_list_lock);
4962                         return rbd_dev;
4963                 }
4964         }
4965         spin_unlock(&rbd_dev_list_lock);
4966         return NULL;
4967 }
4968
4969 static void rbd_dev_release(struct device *dev)
4970 {
4971         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4972
4973         if (rbd_dev->watch_event)
4974                 rbd_dev_header_watch_sync(rbd_dev, 0);
4975
4976         /* clean up and free blkdev */
4977         rbd_free_disk(rbd_dev);
4978         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4979
4980         /* release allocated disk header fields */
4981         rbd_header_free(&rbd_dev->header);
4982
4983         /* done with the id, and with the rbd_dev */
4984         rbd_dev_id_put(rbd_dev);
4985         rbd_dev_mapping_clear(rbd_dev);
4986         rbd_assert(rbd_dev->rbd_client != NULL);
4987         rbd_spec_put(rbd_dev->parent_spec);
4988         kfree(rbd_dev->header_name);
4989         rbd_dev_destroy(rbd_dev);
4990
4991         /* release module ref */
4992         module_put(THIS_MODULE);
4993 }
4994
4995 static void __rbd_remove(struct rbd_device *rbd_dev)
4996 {
4997         rbd_remove_all_snaps(rbd_dev);
4998         rbd_bus_del_dev(rbd_dev);
4999 }
5000
5001 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5002 {
5003         while (rbd_dev->parent_spec) {
5004                 struct rbd_device *first = rbd_dev;
5005                 struct rbd_device *second = first->parent;
5006                 struct rbd_device *third;
5007
5008                 /*
5009                  * Follow to the parent with no grandparent and
5010                  * remove it.
5011                  */
5012                 while (second && (third = second->parent)) {
5013                         first = second;
5014                         second = third;
5015                 }
5016                 __rbd_remove(second);
5017                 rbd_spec_put(first->parent_spec);
5018                 first->parent_spec = NULL;
5019                 first->parent_overlap = 0;
5020                 first->parent = NULL;
5021         }
5022 }
5023
5024 static ssize_t rbd_remove(struct bus_type *bus,
5025                           const char *buf,
5026                           size_t count)
5027 {
5028         struct rbd_device *rbd_dev = NULL;
5029         int target_id, rc;
5030         unsigned long ul;
5031         int ret = count;
5032
5033         rc = strict_strtoul(buf, 10, &ul);
5034         if (rc)
5035                 return rc;
5036
5037         /* convert to int; abort if we lost anything in the conversion */
5038         target_id = (int) ul;
5039         if (target_id != ul)
5040                 return -EINVAL;
5041
5042         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5043
5044         rbd_dev = __rbd_get_dev(target_id);
5045         if (!rbd_dev) {
5046                 ret = -ENOENT;
5047                 goto done;
5048         }
5049
5050         spin_lock_irq(&rbd_dev->lock);
5051         if (rbd_dev->open_count)
5052                 ret = -EBUSY;
5053         else
5054                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5055         spin_unlock_irq(&rbd_dev->lock);
5056         if (ret < 0)
5057                 goto done;
5058
5059         rbd_dev_remove_parent(rbd_dev);
5060
5061         __rbd_remove(rbd_dev);
5062
5063 done:
5064         mutex_unlock(&ctl_mutex);
5065
5066         return ret;
5067 }
5068
5069 /*
5070  * create control files in sysfs
5071  * /sys/bus/rbd/...
5072  */
5073 static int rbd_sysfs_init(void)
5074 {
5075         int ret;
5076
5077         ret = device_register(&rbd_root_dev);
5078         if (ret < 0)
5079                 return ret;
5080
5081         ret = bus_register(&rbd_bus_type);
5082         if (ret < 0)
5083                 device_unregister(&rbd_root_dev);
5084
5085         return ret;
5086 }
5087
5088 static void rbd_sysfs_cleanup(void)
5089 {
5090         bus_unregister(&rbd_bus_type);
5091         device_unregister(&rbd_root_dev);
5092 }
5093
5094 static int __init rbd_init(void)
5095 {
5096         int rc;
5097
5098         if (!libceph_compatible(NULL)) {
5099                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5100
5101                 return -EINVAL;
5102         }
5103         rc = rbd_sysfs_init();
5104         if (rc)
5105                 return rc;
5106         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5107         return 0;
5108 }
5109
5110 static void __exit rbd_exit(void)
5111 {
5112         rbd_sysfs_cleanup();
5113 }
5114
5115 module_init(rbd_init);
5116 module_exit(rbd_exit);
5117
5118 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5119 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5120 MODULE_DESCRIPTION("rados block device");
5121
5122 /* following authorship retained from original osdblk.c */
5123 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5124
5125 MODULE_LICENSE("GPL");