]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: don't destroy rbd_dev in device release function
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
676
677 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
678 {
679         struct ceph_snap_context *snapc;
680         size_t size;
681
682         size = sizeof (struct ceph_snap_context);
683         size += snap_count * sizeof (snapc->snaps[0]);
684         snapc = kzalloc(size, GFP_KERNEL);
685         if (!snapc)
686                 return NULL;
687
688         atomic_set(&snapc->nref, 1);
689         snapc->num_snaps = snap_count;
690
691         return snapc;
692 }
693
694 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
695 {
696         (void)ceph_get_snap_context(snapc);
697 }
698
699 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
700 {
701         ceph_put_snap_context(snapc);
702 }
703
704 /*
705  * Drop reference to ceph client node. If it's not referenced anymore, release
706  * it.
707  */
708 static void rbd_put_client(struct rbd_client *rbdc)
709 {
710         if (rbdc)
711                 kref_put(&rbdc->kref, rbd_client_release);
712 }
713
714 static bool rbd_image_format_valid(u32 image_format)
715 {
716         return image_format == 1 || image_format == 2;
717 }
718
719 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
720 {
721         size_t size;
722         u32 snap_count;
723
724         /* The header has to start with the magic rbd header text */
725         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
726                 return false;
727
728         /* The bio layer requires at least sector-sized I/O */
729
730         if (ondisk->options.order < SECTOR_SHIFT)
731                 return false;
732
733         /* If we use u64 in a few spots we may be able to loosen this */
734
735         if (ondisk->options.order > 8 * sizeof (int) - 1)
736                 return false;
737
738         /*
739          * The size of a snapshot header has to fit in a size_t, and
740          * that limits the number of snapshots.
741          */
742         snap_count = le32_to_cpu(ondisk->snap_count);
743         size = SIZE_MAX - sizeof (struct ceph_snap_context);
744         if (snap_count > size / sizeof (__le64))
745                 return false;
746
747         /*
748          * Not only that, but the size of the entire the snapshot
749          * header must also be representable in a size_t.
750          */
751         size -= snap_count * sizeof (__le64);
752         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
753                 return false;
754
755         return true;
756 }
757
758 /*
759  * Create a new header structure, translate header format from the on-disk
760  * header.
761  */
762 static int rbd_header_from_disk(struct rbd_image_header *header,
763                                  struct rbd_image_header_ondisk *ondisk)
764 {
765         u32 snap_count;
766         size_t len;
767         size_t size;
768         u32 i;
769
770         memset(header, 0, sizeof (*header));
771
772         snap_count = le32_to_cpu(ondisk->snap_count);
773
774         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
775         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
776         if (!header->object_prefix)
777                 return -ENOMEM;
778         memcpy(header->object_prefix, ondisk->object_prefix, len);
779         header->object_prefix[len] = '\0';
780
781         if (snap_count) {
782                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
783
784                 /* Save a copy of the snapshot names */
785
786                 if (snap_names_len > (u64) SIZE_MAX)
787                         return -EIO;
788                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
789                 if (!header->snap_names)
790                         goto out_err;
791                 /*
792                  * Note that rbd_dev_v1_header_read() guarantees
793                  * the ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
798                         snap_names_len);
799
800                 /* Record each snapshot's size */
801
802                 size = snap_count * sizeof (*header->snap_sizes);
803                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
804                 if (!header->snap_sizes)
805                         goto out_err;
806                 for (i = 0; i < snap_count; i++)
807                         header->snap_sizes[i] =
808                                 le64_to_cpu(ondisk->snaps[i].image_size);
809         } else {
810                 header->snap_names = NULL;
811                 header->snap_sizes = NULL;
812         }
813
814         header->features = 0;   /* No features support in v1 images */
815         header->obj_order = ondisk->options.order;
816         header->crypt_type = ondisk->options.crypt_type;
817         header->comp_type = ondisk->options.comp_type;
818
819         /* Allocate and fill in the snapshot context */
820
821         header->image_size = le64_to_cpu(ondisk->image_size);
822
823         header->snapc = rbd_snap_context_create(snap_count);
824         if (!header->snapc)
825                 goto out_err;
826         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
827         for (i = 0; i < snap_count; i++)
828                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
829
830         return 0;
831
832 out_err:
833         kfree(header->snap_sizes);
834         header->snap_sizes = NULL;
835         kfree(header->snap_names);
836         header->snap_names = NULL;
837         kfree(header->object_prefix);
838         header->object_prefix = NULL;
839
840         return -ENOMEM;
841 }
842
843 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
844 {
845         struct rbd_snap *snap;
846
847         if (snap_id == CEPH_NOSNAP)
848                 return RBD_SNAP_HEAD_NAME;
849
850         list_for_each_entry(snap, &rbd_dev->snaps, node)
851                 if (snap_id == snap->id)
852                         return snap->name;
853
854         return NULL;
855 }
856
857 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
858                                         const char *snap_name)
859 {
860         struct rbd_snap *snap;
861
862         list_for_each_entry(snap, &rbd_dev->snaps, node)
863                 if (!strcmp(snap_name, snap->name))
864                         return snap;
865
866         return NULL;
867 }
868
869 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
870 {
871         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
872                     sizeof (RBD_SNAP_HEAD_NAME))) {
873                 rbd_dev->mapping.size = rbd_dev->header.image_size;
874                 rbd_dev->mapping.features = rbd_dev->header.features;
875         } else {
876                 struct rbd_snap *snap;
877
878                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
879                 if (!snap)
880                         return -ENOENT;
881                 rbd_dev->mapping.size = snap->size;
882                 rbd_dev->mapping.features = snap->features;
883                 rbd_dev->mapping.read_only = true;
884         }
885
886         return 0;
887 }
888
889 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
890 {
891         rbd_dev->mapping.size = 0;
892         rbd_dev->mapping.features = 0;
893         rbd_dev->mapping.read_only = true;
894 }
895
896 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
897 {
898         rbd_dev->mapping.size = 0;
899         rbd_dev->mapping.features = 0;
900         rbd_dev->mapping.read_only = true;
901 }
902
903 static void rbd_header_free(struct rbd_image_header *header)
904 {
905         kfree(header->object_prefix);
906         header->object_prefix = NULL;
907         kfree(header->snap_sizes);
908         header->snap_sizes = NULL;
909         kfree(header->snap_names);
910         header->snap_names = NULL;
911         rbd_snap_context_put(header->snapc);
912         header->snapc = NULL;
913 }
914
915 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
916 {
917         char *name;
918         u64 segment;
919         int ret;
920
921         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
922         if (!name)
923                 return NULL;
924         segment = offset >> rbd_dev->header.obj_order;
925         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
926                         rbd_dev->header.object_prefix, segment);
927         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
928                 pr_err("error formatting segment name for #%llu (%d)\n",
929                         segment, ret);
930                 kfree(name);
931                 name = NULL;
932         }
933
934         return name;
935 }
936
937 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
938 {
939         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
940
941         return offset & (segment_size - 1);
942 }
943
944 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
945                                 u64 offset, u64 length)
946 {
947         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
948
949         offset &= segment_size - 1;
950
951         rbd_assert(length <= U64_MAX - offset);
952         if (offset + length > segment_size)
953                 length = segment_size - offset;
954
955         return length;
956 }
957
958 /*
959  * returns the size of an object in the image
960  */
961 static u64 rbd_obj_bytes(struct rbd_image_header *header)
962 {
963         return 1 << header->obj_order;
964 }
965
966 /*
967  * bio helpers
968  */
969
970 static void bio_chain_put(struct bio *chain)
971 {
972         struct bio *tmp;
973
974         while (chain) {
975                 tmp = chain;
976                 chain = chain->bi_next;
977                 bio_put(tmp);
978         }
979 }
980
981 /*
982  * zeros a bio chain, starting at specific offset
983  */
984 static void zero_bio_chain(struct bio *chain, int start_ofs)
985 {
986         struct bio_vec *bv;
987         unsigned long flags;
988         void *buf;
989         int i;
990         int pos = 0;
991
992         while (chain) {
993                 bio_for_each_segment(bv, chain, i) {
994                         if (pos + bv->bv_len > start_ofs) {
995                                 int remainder = max(start_ofs - pos, 0);
996                                 buf = bvec_kmap_irq(bv, &flags);
997                                 memset(buf + remainder, 0,
998                                        bv->bv_len - remainder);
999                                 bvec_kunmap_irq(buf, &flags);
1000                         }
1001                         pos += bv->bv_len;
1002                 }
1003
1004                 chain = chain->bi_next;
1005         }
1006 }
1007
1008 /*
1009  * similar to zero_bio_chain(), zeros data defined by a page array,
1010  * starting at the given byte offset from the start of the array and
1011  * continuing up to the given end offset.  The pages array is
1012  * assumed to be big enough to hold all bytes up to the end.
1013  */
1014 static void zero_pages(struct page **pages, u64 offset, u64 end)
1015 {
1016         struct page **page = &pages[offset >> PAGE_SHIFT];
1017
1018         rbd_assert(end > offset);
1019         rbd_assert(end - offset <= (u64)SIZE_MAX);
1020         while (offset < end) {
1021                 size_t page_offset;
1022                 size_t length;
1023                 unsigned long flags;
1024                 void *kaddr;
1025
1026                 page_offset = (size_t)(offset & ~PAGE_MASK);
1027                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1028                 local_irq_save(flags);
1029                 kaddr = kmap_atomic(*page);
1030                 memset(kaddr + page_offset, 0, length);
1031                 kunmap_atomic(kaddr);
1032                 local_irq_restore(flags);
1033
1034                 offset += length;
1035                 page++;
1036         }
1037 }
1038
1039 /*
1040  * Clone a portion of a bio, starting at the given byte offset
1041  * and continuing for the number of bytes indicated.
1042  */
1043 static struct bio *bio_clone_range(struct bio *bio_src,
1044                                         unsigned int offset,
1045                                         unsigned int len,
1046                                         gfp_t gfpmask)
1047 {
1048         struct bio_vec *bv;
1049         unsigned int resid;
1050         unsigned short idx;
1051         unsigned int voff;
1052         unsigned short end_idx;
1053         unsigned short vcnt;
1054         struct bio *bio;
1055
1056         /* Handle the easy case for the caller */
1057
1058         if (!offset && len == bio_src->bi_size)
1059                 return bio_clone(bio_src, gfpmask);
1060
1061         if (WARN_ON_ONCE(!len))
1062                 return NULL;
1063         if (WARN_ON_ONCE(len > bio_src->bi_size))
1064                 return NULL;
1065         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1066                 return NULL;
1067
1068         /* Find first affected segment... */
1069
1070         resid = offset;
1071         __bio_for_each_segment(bv, bio_src, idx, 0) {
1072                 if (resid < bv->bv_len)
1073                         break;
1074                 resid -= bv->bv_len;
1075         }
1076         voff = resid;
1077
1078         /* ...and the last affected segment */
1079
1080         resid += len;
1081         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1082                 if (resid <= bv->bv_len)
1083                         break;
1084                 resid -= bv->bv_len;
1085         }
1086         vcnt = end_idx - idx + 1;
1087
1088         /* Build the clone */
1089
1090         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1091         if (!bio)
1092                 return NULL;    /* ENOMEM */
1093
1094         bio->bi_bdev = bio_src->bi_bdev;
1095         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1096         bio->bi_rw = bio_src->bi_rw;
1097         bio->bi_flags |= 1 << BIO_CLONED;
1098
1099         /*
1100          * Copy over our part of the bio_vec, then update the first
1101          * and last (or only) entries.
1102          */
1103         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1104                         vcnt * sizeof (struct bio_vec));
1105         bio->bi_io_vec[0].bv_offset += voff;
1106         if (vcnt > 1) {
1107                 bio->bi_io_vec[0].bv_len -= voff;
1108                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1109         } else {
1110                 bio->bi_io_vec[0].bv_len = len;
1111         }
1112
1113         bio->bi_vcnt = vcnt;
1114         bio->bi_size = len;
1115         bio->bi_idx = 0;
1116
1117         return bio;
1118 }
1119
1120 /*
1121  * Clone a portion of a bio chain, starting at the given byte offset
1122  * into the first bio in the source chain and continuing for the
1123  * number of bytes indicated.  The result is another bio chain of
1124  * exactly the given length, or a null pointer on error.
1125  *
1126  * The bio_src and offset parameters are both in-out.  On entry they
1127  * refer to the first source bio and the offset into that bio where
1128  * the start of data to be cloned is located.
1129  *
1130  * On return, bio_src is updated to refer to the bio in the source
1131  * chain that contains first un-cloned byte, and *offset will
1132  * contain the offset of that byte within that bio.
1133  */
1134 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1135                                         unsigned int *offset,
1136                                         unsigned int len,
1137                                         gfp_t gfpmask)
1138 {
1139         struct bio *bi = *bio_src;
1140         unsigned int off = *offset;
1141         struct bio *chain = NULL;
1142         struct bio **end;
1143
1144         /* Build up a chain of clone bios up to the limit */
1145
1146         if (!bi || off >= bi->bi_size || !len)
1147                 return NULL;            /* Nothing to clone */
1148
1149         end = &chain;
1150         while (len) {
1151                 unsigned int bi_size;
1152                 struct bio *bio;
1153
1154                 if (!bi) {
1155                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1156                         goto out_err;   /* EINVAL; ran out of bio's */
1157                 }
1158                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1159                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1160                 if (!bio)
1161                         goto out_err;   /* ENOMEM */
1162
1163                 *end = bio;
1164                 end = &bio->bi_next;
1165
1166                 off += bi_size;
1167                 if (off == bi->bi_size) {
1168                         bi = bi->bi_next;
1169                         off = 0;
1170                 }
1171                 len -= bi_size;
1172         }
1173         *bio_src = bi;
1174         *offset = off;
1175
1176         return chain;
1177 out_err:
1178         bio_chain_put(chain);
1179
1180         return NULL;
1181 }
1182
1183 /*
1184  * The default/initial value for all object request flags is 0.  For
1185  * each flag, once its value is set to 1 it is never reset to 0
1186  * again.
1187  */
1188 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1189 {
1190         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1191                 struct rbd_device *rbd_dev;
1192
1193                 rbd_dev = obj_request->img_request->rbd_dev;
1194                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1195                         obj_request);
1196         }
1197 }
1198
1199 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1200 {
1201         smp_mb();
1202         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1203 }
1204
1205 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1206 {
1207         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1208                 struct rbd_device *rbd_dev = NULL;
1209
1210                 if (obj_request_img_data_test(obj_request))
1211                         rbd_dev = obj_request->img_request->rbd_dev;
1212                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1213                         obj_request);
1214         }
1215 }
1216
1217 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1218 {
1219         smp_mb();
1220         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1221 }
1222
1223 /*
1224  * This sets the KNOWN flag after (possibly) setting the EXISTS
1225  * flag.  The latter is set based on the "exists" value provided.
1226  *
1227  * Note that for our purposes once an object exists it never goes
1228  * away again.  It's possible that the response from two existence
1229  * checks are separated by the creation of the target object, and
1230  * the first ("doesn't exist") response arrives *after* the second
1231  * ("does exist").  In that case we ignore the second one.
1232  */
1233 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1234                                 bool exists)
1235 {
1236         if (exists)
1237                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1238         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1239         smp_mb();
1240 }
1241
1242 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1243 {
1244         smp_mb();
1245         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1246 }
1247
1248 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1249 {
1250         smp_mb();
1251         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1252 }
1253
1254 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1255 {
1256         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1257                 atomic_read(&obj_request->kref.refcount));
1258         kref_get(&obj_request->kref);
1259 }
1260
1261 static void rbd_obj_request_destroy(struct kref *kref);
1262 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1263 {
1264         rbd_assert(obj_request != NULL);
1265         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1266                 atomic_read(&obj_request->kref.refcount));
1267         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1268 }
1269
1270 static void rbd_img_request_get(struct rbd_img_request *img_request)
1271 {
1272         dout("%s: img %p (was %d)\n", __func__, img_request,
1273                 atomic_read(&img_request->kref.refcount));
1274         kref_get(&img_request->kref);
1275 }
1276
1277 static void rbd_img_request_destroy(struct kref *kref);
1278 static void rbd_img_request_put(struct rbd_img_request *img_request)
1279 {
1280         rbd_assert(img_request != NULL);
1281         dout("%s: img %p (was %d)\n", __func__, img_request,
1282                 atomic_read(&img_request->kref.refcount));
1283         kref_put(&img_request->kref, rbd_img_request_destroy);
1284 }
1285
1286 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1287                                         struct rbd_obj_request *obj_request)
1288 {
1289         rbd_assert(obj_request->img_request == NULL);
1290
1291         /* Image request now owns object's original reference */
1292         obj_request->img_request = img_request;
1293         obj_request->which = img_request->obj_request_count;
1294         rbd_assert(!obj_request_img_data_test(obj_request));
1295         obj_request_img_data_set(obj_request);
1296         rbd_assert(obj_request->which != BAD_WHICH);
1297         img_request->obj_request_count++;
1298         list_add_tail(&obj_request->links, &img_request->obj_requests);
1299         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1300                 obj_request->which);
1301 }
1302
1303 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1304                                         struct rbd_obj_request *obj_request)
1305 {
1306         rbd_assert(obj_request->which != BAD_WHICH);
1307
1308         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1309                 obj_request->which);
1310         list_del(&obj_request->links);
1311         rbd_assert(img_request->obj_request_count > 0);
1312         img_request->obj_request_count--;
1313         rbd_assert(obj_request->which == img_request->obj_request_count);
1314         obj_request->which = BAD_WHICH;
1315         rbd_assert(obj_request_img_data_test(obj_request));
1316         rbd_assert(obj_request->img_request == img_request);
1317         obj_request->img_request = NULL;
1318         obj_request->callback = NULL;
1319         rbd_obj_request_put(obj_request);
1320 }
1321
1322 static bool obj_request_type_valid(enum obj_request_type type)
1323 {
1324         switch (type) {
1325         case OBJ_REQUEST_NODATA:
1326         case OBJ_REQUEST_BIO:
1327         case OBJ_REQUEST_PAGES:
1328                 return true;
1329         default:
1330                 return false;
1331         }
1332 }
1333
1334 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1335                                 struct rbd_obj_request *obj_request)
1336 {
1337         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1338
1339         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1340 }
1341
1342 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1343 {
1344
1345         dout("%s: img %p\n", __func__, img_request);
1346
1347         /*
1348          * If no error occurred, compute the aggregate transfer
1349          * count for the image request.  We could instead use
1350          * atomic64_cmpxchg() to update it as each object request
1351          * completes; not clear which way is better off hand.
1352          */
1353         if (!img_request->result) {
1354                 struct rbd_obj_request *obj_request;
1355                 u64 xferred = 0;
1356
1357                 for_each_obj_request(img_request, obj_request)
1358                         xferred += obj_request->xferred;
1359                 img_request->xferred = xferred;
1360         }
1361
1362         if (img_request->callback)
1363                 img_request->callback(img_request);
1364         else
1365                 rbd_img_request_put(img_request);
1366 }
1367
1368 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1369
1370 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1371 {
1372         dout("%s: obj %p\n", __func__, obj_request);
1373
1374         return wait_for_completion_interruptible(&obj_request->completion);
1375 }
1376
1377 /*
1378  * The default/initial value for all image request flags is 0.  Each
1379  * is conditionally set to 1 at image request initialization time
1380  * and currently never change thereafter.
1381  */
1382 static void img_request_write_set(struct rbd_img_request *img_request)
1383 {
1384         set_bit(IMG_REQ_WRITE, &img_request->flags);
1385         smp_mb();
1386 }
1387
1388 static bool img_request_write_test(struct rbd_img_request *img_request)
1389 {
1390         smp_mb();
1391         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1392 }
1393
1394 static void img_request_child_set(struct rbd_img_request *img_request)
1395 {
1396         set_bit(IMG_REQ_CHILD, &img_request->flags);
1397         smp_mb();
1398 }
1399
1400 static bool img_request_child_test(struct rbd_img_request *img_request)
1401 {
1402         smp_mb();
1403         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1404 }
1405
1406 static void img_request_layered_set(struct rbd_img_request *img_request)
1407 {
1408         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1409         smp_mb();
1410 }
1411
1412 static bool img_request_layered_test(struct rbd_img_request *img_request)
1413 {
1414         smp_mb();
1415         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1416 }
1417
1418 static void
1419 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1420 {
1421         u64 xferred = obj_request->xferred;
1422         u64 length = obj_request->length;
1423
1424         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1425                 obj_request, obj_request->img_request, obj_request->result,
1426                 xferred, length);
1427         /*
1428          * ENOENT means a hole in the image.  We zero-fill the
1429          * entire length of the request.  A short read also implies
1430          * zero-fill to the end of the request.  Either way we
1431          * update the xferred count to indicate the whole request
1432          * was satisfied.
1433          */
1434         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1435         if (obj_request->result == -ENOENT) {
1436                 if (obj_request->type == OBJ_REQUEST_BIO)
1437                         zero_bio_chain(obj_request->bio_list, 0);
1438                 else
1439                         zero_pages(obj_request->pages, 0, length);
1440                 obj_request->result = 0;
1441                 obj_request->xferred = length;
1442         } else if (xferred < length && !obj_request->result) {
1443                 if (obj_request->type == OBJ_REQUEST_BIO)
1444                         zero_bio_chain(obj_request->bio_list, xferred);
1445                 else
1446                         zero_pages(obj_request->pages, xferred, length);
1447                 obj_request->xferred = length;
1448         }
1449         obj_request_done_set(obj_request);
1450 }
1451
1452 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1453 {
1454         dout("%s: obj %p cb %p\n", __func__, obj_request,
1455                 obj_request->callback);
1456         if (obj_request->callback)
1457                 obj_request->callback(obj_request);
1458         else
1459                 complete_all(&obj_request->completion);
1460 }
1461
1462 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1463 {
1464         dout("%s: obj %p\n", __func__, obj_request);
1465         obj_request_done_set(obj_request);
1466 }
1467
1468 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1469 {
1470         struct rbd_img_request *img_request = NULL;
1471         struct rbd_device *rbd_dev = NULL;
1472         bool layered = false;
1473
1474         if (obj_request_img_data_test(obj_request)) {
1475                 img_request = obj_request->img_request;
1476                 layered = img_request && img_request_layered_test(img_request);
1477                 rbd_dev = img_request->rbd_dev;
1478         }
1479
1480         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1481                 obj_request, img_request, obj_request->result,
1482                 obj_request->xferred, obj_request->length);
1483         if (layered && obj_request->result == -ENOENT &&
1484                         obj_request->img_offset < rbd_dev->parent_overlap)
1485                 rbd_img_parent_read(obj_request);
1486         else if (img_request)
1487                 rbd_img_obj_request_read_callback(obj_request);
1488         else
1489                 obj_request_done_set(obj_request);
1490 }
1491
1492 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1493 {
1494         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1495                 obj_request->result, obj_request->length);
1496         /*
1497          * There is no such thing as a successful short write.  Set
1498          * it to our originally-requested length.
1499          */
1500         obj_request->xferred = obj_request->length;
1501         obj_request_done_set(obj_request);
1502 }
1503
1504 /*
1505  * For a simple stat call there's nothing to do.  We'll do more if
1506  * this is part of a write sequence for a layered image.
1507  */
1508 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1509 {
1510         dout("%s: obj %p\n", __func__, obj_request);
1511         obj_request_done_set(obj_request);
1512 }
1513
1514 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1515                                 struct ceph_msg *msg)
1516 {
1517         struct rbd_obj_request *obj_request = osd_req->r_priv;
1518         u16 opcode;
1519
1520         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1521         rbd_assert(osd_req == obj_request->osd_req);
1522         if (obj_request_img_data_test(obj_request)) {
1523                 rbd_assert(obj_request->img_request);
1524                 rbd_assert(obj_request->which != BAD_WHICH);
1525         } else {
1526                 rbd_assert(obj_request->which == BAD_WHICH);
1527         }
1528
1529         if (osd_req->r_result < 0)
1530                 obj_request->result = osd_req->r_result;
1531         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1532
1533         BUG_ON(osd_req->r_num_ops > 2);
1534
1535         /*
1536          * We support a 64-bit length, but ultimately it has to be
1537          * passed to blk_end_request(), which takes an unsigned int.
1538          */
1539         obj_request->xferred = osd_req->r_reply_op_len[0];
1540         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1541         opcode = osd_req->r_ops[0].op;
1542         switch (opcode) {
1543         case CEPH_OSD_OP_READ:
1544                 rbd_osd_read_callback(obj_request);
1545                 break;
1546         case CEPH_OSD_OP_WRITE:
1547                 rbd_osd_write_callback(obj_request);
1548                 break;
1549         case CEPH_OSD_OP_STAT:
1550                 rbd_osd_stat_callback(obj_request);
1551                 break;
1552         case CEPH_OSD_OP_CALL:
1553         case CEPH_OSD_OP_NOTIFY_ACK:
1554         case CEPH_OSD_OP_WATCH:
1555                 rbd_osd_trivial_callback(obj_request);
1556                 break;
1557         default:
1558                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1559                         obj_request->object_name, (unsigned short) opcode);
1560                 break;
1561         }
1562
1563         if (obj_request_done_test(obj_request))
1564                 rbd_obj_request_complete(obj_request);
1565 }
1566
1567 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1568 {
1569         struct rbd_img_request *img_request = obj_request->img_request;
1570         struct ceph_osd_request *osd_req = obj_request->osd_req;
1571         u64 snap_id;
1572
1573         rbd_assert(osd_req != NULL);
1574
1575         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1576         ceph_osdc_build_request(osd_req, obj_request->offset,
1577                         NULL, snap_id, NULL);
1578 }
1579
1580 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1581 {
1582         struct rbd_img_request *img_request = obj_request->img_request;
1583         struct ceph_osd_request *osd_req = obj_request->osd_req;
1584         struct ceph_snap_context *snapc;
1585         struct timespec mtime = CURRENT_TIME;
1586
1587         rbd_assert(osd_req != NULL);
1588
1589         snapc = img_request ? img_request->snapc : NULL;
1590         ceph_osdc_build_request(osd_req, obj_request->offset,
1591                         snapc, CEPH_NOSNAP, &mtime);
1592 }
1593
1594 static struct ceph_osd_request *rbd_osd_req_create(
1595                                         struct rbd_device *rbd_dev,
1596                                         bool write_request,
1597                                         struct rbd_obj_request *obj_request)
1598 {
1599         struct ceph_snap_context *snapc = NULL;
1600         struct ceph_osd_client *osdc;
1601         struct ceph_osd_request *osd_req;
1602
1603         if (obj_request_img_data_test(obj_request)) {
1604                 struct rbd_img_request *img_request = obj_request->img_request;
1605
1606                 rbd_assert(write_request ==
1607                                 img_request_write_test(img_request));
1608                 if (write_request)
1609                         snapc = img_request->snapc;
1610         }
1611
1612         /* Allocate and initialize the request, for the single op */
1613
1614         osdc = &rbd_dev->rbd_client->client->osdc;
1615         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1616         if (!osd_req)
1617                 return NULL;    /* ENOMEM */
1618
1619         if (write_request)
1620                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1621         else
1622                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1623
1624         osd_req->r_callback = rbd_osd_req_callback;
1625         osd_req->r_priv = obj_request;
1626
1627         osd_req->r_oid_len = strlen(obj_request->object_name);
1628         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1629         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1630
1631         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1632
1633         return osd_req;
1634 }
1635
1636 /*
1637  * Create a copyup osd request based on the information in the
1638  * object request supplied.  A copyup request has two osd ops,
1639  * a copyup method call, and a "normal" write request.
1640  */
1641 static struct ceph_osd_request *
1642 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1643 {
1644         struct rbd_img_request *img_request;
1645         struct ceph_snap_context *snapc;
1646         struct rbd_device *rbd_dev;
1647         struct ceph_osd_client *osdc;
1648         struct ceph_osd_request *osd_req;
1649
1650         rbd_assert(obj_request_img_data_test(obj_request));
1651         img_request = obj_request->img_request;
1652         rbd_assert(img_request);
1653         rbd_assert(img_request_write_test(img_request));
1654
1655         /* Allocate and initialize the request, for the two ops */
1656
1657         snapc = img_request->snapc;
1658         rbd_dev = img_request->rbd_dev;
1659         osdc = &rbd_dev->rbd_client->client->osdc;
1660         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1661         if (!osd_req)
1662                 return NULL;    /* ENOMEM */
1663
1664         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1665         osd_req->r_callback = rbd_osd_req_callback;
1666         osd_req->r_priv = obj_request;
1667
1668         osd_req->r_oid_len = strlen(obj_request->object_name);
1669         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1670         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1671
1672         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1673
1674         return osd_req;
1675 }
1676
1677
1678 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1679 {
1680         ceph_osdc_put_request(osd_req);
1681 }
1682
1683 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1684
1685 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1686                                                 u64 offset, u64 length,
1687                                                 enum obj_request_type type)
1688 {
1689         struct rbd_obj_request *obj_request;
1690         size_t size;
1691         char *name;
1692
1693         rbd_assert(obj_request_type_valid(type));
1694
1695         size = strlen(object_name) + 1;
1696         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1697         if (!obj_request)
1698                 return NULL;
1699
1700         name = (char *)(obj_request + 1);
1701         obj_request->object_name = memcpy(name, object_name, size);
1702         obj_request->offset = offset;
1703         obj_request->length = length;
1704         obj_request->flags = 0;
1705         obj_request->which = BAD_WHICH;
1706         obj_request->type = type;
1707         INIT_LIST_HEAD(&obj_request->links);
1708         init_completion(&obj_request->completion);
1709         kref_init(&obj_request->kref);
1710
1711         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1712                 offset, length, (int)type, obj_request);
1713
1714         return obj_request;
1715 }
1716
1717 static void rbd_obj_request_destroy(struct kref *kref)
1718 {
1719         struct rbd_obj_request *obj_request;
1720
1721         obj_request = container_of(kref, struct rbd_obj_request, kref);
1722
1723         dout("%s: obj %p\n", __func__, obj_request);
1724
1725         rbd_assert(obj_request->img_request == NULL);
1726         rbd_assert(obj_request->which == BAD_WHICH);
1727
1728         if (obj_request->osd_req)
1729                 rbd_osd_req_destroy(obj_request->osd_req);
1730
1731         rbd_assert(obj_request_type_valid(obj_request->type));
1732         switch (obj_request->type) {
1733         case OBJ_REQUEST_NODATA:
1734                 break;          /* Nothing to do */
1735         case OBJ_REQUEST_BIO:
1736                 if (obj_request->bio_list)
1737                         bio_chain_put(obj_request->bio_list);
1738                 break;
1739         case OBJ_REQUEST_PAGES:
1740                 if (obj_request->pages)
1741                         ceph_release_page_vector(obj_request->pages,
1742                                                 obj_request->page_count);
1743                 break;
1744         }
1745
1746         kfree(obj_request);
1747 }
1748
1749 /*
1750  * Caller is responsible for filling in the list of object requests
1751  * that comprises the image request, and the Linux request pointer
1752  * (if there is one).
1753  */
1754 static struct rbd_img_request *rbd_img_request_create(
1755                                         struct rbd_device *rbd_dev,
1756                                         u64 offset, u64 length,
1757                                         bool write_request,
1758                                         bool child_request)
1759 {
1760         struct rbd_img_request *img_request;
1761
1762         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1763         if (!img_request)
1764                 return NULL;
1765
1766         if (write_request) {
1767                 down_read(&rbd_dev->header_rwsem);
1768                 rbd_snap_context_get(rbd_dev->header.snapc);
1769                 up_read(&rbd_dev->header_rwsem);
1770         }
1771
1772         img_request->rq = NULL;
1773         img_request->rbd_dev = rbd_dev;
1774         img_request->offset = offset;
1775         img_request->length = length;
1776         img_request->flags = 0;
1777         if (write_request) {
1778                 img_request_write_set(img_request);
1779                 img_request->snapc = rbd_dev->header.snapc;
1780         } else {
1781                 img_request->snap_id = rbd_dev->spec->snap_id;
1782         }
1783         if (child_request)
1784                 img_request_child_set(img_request);
1785         if (rbd_dev->parent_spec)
1786                 img_request_layered_set(img_request);
1787         spin_lock_init(&img_request->completion_lock);
1788         img_request->next_completion = 0;
1789         img_request->callback = NULL;
1790         img_request->result = 0;
1791         img_request->obj_request_count = 0;
1792         INIT_LIST_HEAD(&img_request->obj_requests);
1793         kref_init(&img_request->kref);
1794
1795         rbd_img_request_get(img_request);       /* Avoid a warning */
1796         rbd_img_request_put(img_request);       /* TEMPORARY */
1797
1798         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1799                 write_request ? "write" : "read", offset, length,
1800                 img_request);
1801
1802         return img_request;
1803 }
1804
1805 static void rbd_img_request_destroy(struct kref *kref)
1806 {
1807         struct rbd_img_request *img_request;
1808         struct rbd_obj_request *obj_request;
1809         struct rbd_obj_request *next_obj_request;
1810
1811         img_request = container_of(kref, struct rbd_img_request, kref);
1812
1813         dout("%s: img %p\n", __func__, img_request);
1814
1815         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1816                 rbd_img_obj_request_del(img_request, obj_request);
1817         rbd_assert(img_request->obj_request_count == 0);
1818
1819         if (img_request_write_test(img_request))
1820                 rbd_snap_context_put(img_request->snapc);
1821
1822         if (img_request_child_test(img_request))
1823                 rbd_obj_request_put(img_request->obj_request);
1824
1825         kfree(img_request);
1826 }
1827
1828 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1829 {
1830         struct rbd_img_request *img_request;
1831         unsigned int xferred;
1832         int result;
1833         bool more;
1834
1835         rbd_assert(obj_request_img_data_test(obj_request));
1836         img_request = obj_request->img_request;
1837
1838         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1839         xferred = (unsigned int)obj_request->xferred;
1840         result = obj_request->result;
1841         if (result) {
1842                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1843
1844                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1845                         img_request_write_test(img_request) ? "write" : "read",
1846                         obj_request->length, obj_request->img_offset,
1847                         obj_request->offset);
1848                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1849                         result, xferred);
1850                 if (!img_request->result)
1851                         img_request->result = result;
1852         }
1853
1854         /* Image object requests don't own their page array */
1855
1856         if (obj_request->type == OBJ_REQUEST_PAGES) {
1857                 obj_request->pages = NULL;
1858                 obj_request->page_count = 0;
1859         }
1860
1861         if (img_request_child_test(img_request)) {
1862                 rbd_assert(img_request->obj_request != NULL);
1863                 more = obj_request->which < img_request->obj_request_count - 1;
1864         } else {
1865                 rbd_assert(img_request->rq != NULL);
1866                 more = blk_end_request(img_request->rq, result, xferred);
1867         }
1868
1869         return more;
1870 }
1871
1872 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1873 {
1874         struct rbd_img_request *img_request;
1875         u32 which = obj_request->which;
1876         bool more = true;
1877
1878         rbd_assert(obj_request_img_data_test(obj_request));
1879         img_request = obj_request->img_request;
1880
1881         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1882         rbd_assert(img_request != NULL);
1883         rbd_assert(img_request->obj_request_count > 0);
1884         rbd_assert(which != BAD_WHICH);
1885         rbd_assert(which < img_request->obj_request_count);
1886         rbd_assert(which >= img_request->next_completion);
1887
1888         spin_lock_irq(&img_request->completion_lock);
1889         if (which != img_request->next_completion)
1890                 goto out;
1891
1892         for_each_obj_request_from(img_request, obj_request) {
1893                 rbd_assert(more);
1894                 rbd_assert(which < img_request->obj_request_count);
1895
1896                 if (!obj_request_done_test(obj_request))
1897                         break;
1898                 more = rbd_img_obj_end_request(obj_request);
1899                 which++;
1900         }
1901
1902         rbd_assert(more ^ (which == img_request->obj_request_count));
1903         img_request->next_completion = which;
1904 out:
1905         spin_unlock_irq(&img_request->completion_lock);
1906
1907         if (!more)
1908                 rbd_img_request_complete(img_request);
1909 }
1910
1911 /*
1912  * Split up an image request into one or more object requests, each
1913  * to a different object.  The "type" parameter indicates whether
1914  * "data_desc" is the pointer to the head of a list of bio
1915  * structures, or the base of a page array.  In either case this
1916  * function assumes data_desc describes memory sufficient to hold
1917  * all data described by the image request.
1918  */
1919 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1920                                         enum obj_request_type type,
1921                                         void *data_desc)
1922 {
1923         struct rbd_device *rbd_dev = img_request->rbd_dev;
1924         struct rbd_obj_request *obj_request = NULL;
1925         struct rbd_obj_request *next_obj_request;
1926         bool write_request = img_request_write_test(img_request);
1927         struct bio *bio_list;
1928         unsigned int bio_offset = 0;
1929         struct page **pages;
1930         u64 img_offset;
1931         u64 resid;
1932         u16 opcode;
1933
1934         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1935                 (int)type, data_desc);
1936
1937         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1938         img_offset = img_request->offset;
1939         resid = img_request->length;
1940         rbd_assert(resid > 0);
1941
1942         if (type == OBJ_REQUEST_BIO) {
1943                 bio_list = data_desc;
1944                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1945         } else {
1946                 rbd_assert(type == OBJ_REQUEST_PAGES);
1947                 pages = data_desc;
1948         }
1949
1950         while (resid) {
1951                 struct ceph_osd_request *osd_req;
1952                 const char *object_name;
1953                 u64 offset;
1954                 u64 length;
1955
1956                 object_name = rbd_segment_name(rbd_dev, img_offset);
1957                 if (!object_name)
1958                         goto out_unwind;
1959                 offset = rbd_segment_offset(rbd_dev, img_offset);
1960                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1961                 obj_request = rbd_obj_request_create(object_name,
1962                                                 offset, length, type);
1963                 kfree(object_name);     /* object request has its own copy */
1964                 if (!obj_request)
1965                         goto out_unwind;
1966
1967                 if (type == OBJ_REQUEST_BIO) {
1968                         unsigned int clone_size;
1969
1970                         rbd_assert(length <= (u64)UINT_MAX);
1971                         clone_size = (unsigned int)length;
1972                         obj_request->bio_list =
1973                                         bio_chain_clone_range(&bio_list,
1974                                                                 &bio_offset,
1975                                                                 clone_size,
1976                                                                 GFP_ATOMIC);
1977                         if (!obj_request->bio_list)
1978                                 goto out_partial;
1979                 } else {
1980                         unsigned int page_count;
1981
1982                         obj_request->pages = pages;
1983                         page_count = (u32)calc_pages_for(offset, length);
1984                         obj_request->page_count = page_count;
1985                         if ((offset + length) & ~PAGE_MASK)
1986                                 page_count--;   /* more on last page */
1987                         pages += page_count;
1988                 }
1989
1990                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1991                                                 obj_request);
1992                 if (!osd_req)
1993                         goto out_partial;
1994                 obj_request->osd_req = osd_req;
1995                 obj_request->callback = rbd_img_obj_callback;
1996
1997                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1998                                                 0, 0);
1999                 if (type == OBJ_REQUEST_BIO)
2000                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2001                                         obj_request->bio_list, length);
2002                 else
2003                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2004                                         obj_request->pages, length,
2005                                         offset & ~PAGE_MASK, false, false);
2006
2007                 if (write_request)
2008                         rbd_osd_req_format_write(obj_request);
2009                 else
2010                         rbd_osd_req_format_read(obj_request);
2011
2012                 obj_request->img_offset = img_offset;
2013                 rbd_img_obj_request_add(img_request, obj_request);
2014
2015                 img_offset += length;
2016                 resid -= length;
2017         }
2018
2019         return 0;
2020
2021 out_partial:
2022         rbd_obj_request_put(obj_request);
2023 out_unwind:
2024         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2025                 rbd_obj_request_put(obj_request);
2026
2027         return -ENOMEM;
2028 }
2029
2030 static void
2031 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2032 {
2033         struct rbd_img_request *img_request;
2034         struct rbd_device *rbd_dev;
2035         u64 length;
2036         u32 page_count;
2037
2038         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2039         rbd_assert(obj_request_img_data_test(obj_request));
2040         img_request = obj_request->img_request;
2041         rbd_assert(img_request);
2042
2043         rbd_dev = img_request->rbd_dev;
2044         rbd_assert(rbd_dev);
2045         length = (u64)1 << rbd_dev->header.obj_order;
2046         page_count = (u32)calc_pages_for(0, length);
2047
2048         rbd_assert(obj_request->copyup_pages);
2049         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2050         obj_request->copyup_pages = NULL;
2051
2052         /*
2053          * We want the transfer count to reflect the size of the
2054          * original write request.  There is no such thing as a
2055          * successful short write, so if the request was successful
2056          * we can just set it to the originally-requested length.
2057          */
2058         if (!obj_request->result)
2059                 obj_request->xferred = obj_request->length;
2060
2061         /* Finish up with the normal image object callback */
2062
2063         rbd_img_obj_callback(obj_request);
2064 }
2065
2066 static void
2067 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2068 {
2069         struct rbd_obj_request *orig_request;
2070         struct ceph_osd_request *osd_req;
2071         struct ceph_osd_client *osdc;
2072         struct rbd_device *rbd_dev;
2073         struct page **pages;
2074         int result;
2075         u64 obj_size;
2076         u64 xferred;
2077
2078         rbd_assert(img_request_child_test(img_request));
2079
2080         /* First get what we need from the image request */
2081
2082         pages = img_request->copyup_pages;
2083         rbd_assert(pages != NULL);
2084         img_request->copyup_pages = NULL;
2085
2086         orig_request = img_request->obj_request;
2087         rbd_assert(orig_request != NULL);
2088         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2089         result = img_request->result;
2090         obj_size = img_request->length;
2091         xferred = img_request->xferred;
2092
2093         rbd_dev = img_request->rbd_dev;
2094         rbd_assert(rbd_dev);
2095         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2096
2097         rbd_img_request_put(img_request);
2098
2099         if (result)
2100                 goto out_err;
2101
2102         /* Allocate the new copyup osd request for the original request */
2103
2104         result = -ENOMEM;
2105         rbd_assert(!orig_request->osd_req);
2106         osd_req = rbd_osd_req_create_copyup(orig_request);
2107         if (!osd_req)
2108                 goto out_err;
2109         orig_request->osd_req = osd_req;
2110         orig_request->copyup_pages = pages;
2111
2112         /* Initialize the copyup op */
2113
2114         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2115         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2116                                                 false, false);
2117
2118         /* Then the original write request op */
2119
2120         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2121                                         orig_request->offset,
2122                                         orig_request->length, 0, 0);
2123         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2124                                         orig_request->length);
2125
2126         rbd_osd_req_format_write(orig_request);
2127
2128         /* All set, send it off. */
2129
2130         orig_request->callback = rbd_img_obj_copyup_callback;
2131         osdc = &rbd_dev->rbd_client->client->osdc;
2132         result = rbd_obj_request_submit(osdc, orig_request);
2133         if (!result)
2134                 return;
2135 out_err:
2136         /* Record the error code and complete the request */
2137
2138         orig_request->result = result;
2139         orig_request->xferred = 0;
2140         obj_request_done_set(orig_request);
2141         rbd_obj_request_complete(orig_request);
2142 }
2143
2144 /*
2145  * Read from the parent image the range of data that covers the
2146  * entire target of the given object request.  This is used for
2147  * satisfying a layered image write request when the target of an
2148  * object request from the image request does not exist.
2149  *
2150  * A page array big enough to hold the returned data is allocated
2151  * and supplied to rbd_img_request_fill() as the "data descriptor."
2152  * When the read completes, this page array will be transferred to
2153  * the original object request for the copyup operation.
2154  *
2155  * If an error occurs, record it as the result of the original
2156  * object request and mark it done so it gets completed.
2157  */
2158 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2159 {
2160         struct rbd_img_request *img_request = NULL;
2161         struct rbd_img_request *parent_request = NULL;
2162         struct rbd_device *rbd_dev;
2163         u64 img_offset;
2164         u64 length;
2165         struct page **pages = NULL;
2166         u32 page_count;
2167         int result;
2168
2169         rbd_assert(obj_request_img_data_test(obj_request));
2170         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2171
2172         img_request = obj_request->img_request;
2173         rbd_assert(img_request != NULL);
2174         rbd_dev = img_request->rbd_dev;
2175         rbd_assert(rbd_dev->parent != NULL);
2176
2177         /*
2178          * First things first.  The original osd request is of no
2179          * use to use any more, we'll need a new one that can hold
2180          * the two ops in a copyup request.  We'll get that later,
2181          * but for now we can release the old one.
2182          */
2183         rbd_osd_req_destroy(obj_request->osd_req);
2184         obj_request->osd_req = NULL;
2185
2186         /*
2187          * Determine the byte range covered by the object in the
2188          * child image to which the original request was to be sent.
2189          */
2190         img_offset = obj_request->img_offset - obj_request->offset;
2191         length = (u64)1 << rbd_dev->header.obj_order;
2192
2193         /*
2194          * There is no defined parent data beyond the parent
2195          * overlap, so limit what we read at that boundary if
2196          * necessary.
2197          */
2198         if (img_offset + length > rbd_dev->parent_overlap) {
2199                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2200                 length = rbd_dev->parent_overlap - img_offset;
2201         }
2202
2203         /*
2204          * Allocate a page array big enough to receive the data read
2205          * from the parent.
2206          */
2207         page_count = (u32)calc_pages_for(0, length);
2208         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2209         if (IS_ERR(pages)) {
2210                 result = PTR_ERR(pages);
2211                 pages = NULL;
2212                 goto out_err;
2213         }
2214
2215         result = -ENOMEM;
2216         parent_request = rbd_img_request_create(rbd_dev->parent,
2217                                                 img_offset, length,
2218                                                 false, true);
2219         if (!parent_request)
2220                 goto out_err;
2221         rbd_obj_request_get(obj_request);
2222         parent_request->obj_request = obj_request;
2223
2224         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2225         if (result)
2226                 goto out_err;
2227         parent_request->copyup_pages = pages;
2228
2229         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2230         result = rbd_img_request_submit(parent_request);
2231         if (!result)
2232                 return 0;
2233
2234         parent_request->copyup_pages = NULL;
2235         parent_request->obj_request = NULL;
2236         rbd_obj_request_put(obj_request);
2237 out_err:
2238         if (pages)
2239                 ceph_release_page_vector(pages, page_count);
2240         if (parent_request)
2241                 rbd_img_request_put(parent_request);
2242         obj_request->result = result;
2243         obj_request->xferred = 0;
2244         obj_request_done_set(obj_request);
2245
2246         return result;
2247 }
2248
2249 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2250 {
2251         struct rbd_obj_request *orig_request;
2252         int result;
2253
2254         rbd_assert(!obj_request_img_data_test(obj_request));
2255
2256         /*
2257          * All we need from the object request is the original
2258          * request and the result of the STAT op.  Grab those, then
2259          * we're done with the request.
2260          */
2261         orig_request = obj_request->obj_request;
2262         obj_request->obj_request = NULL;
2263         rbd_assert(orig_request);
2264         rbd_assert(orig_request->img_request);
2265
2266         result = obj_request->result;
2267         obj_request->result = 0;
2268
2269         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2270                 obj_request, orig_request, result,
2271                 obj_request->xferred, obj_request->length);
2272         rbd_obj_request_put(obj_request);
2273
2274         rbd_assert(orig_request);
2275         rbd_assert(orig_request->img_request);
2276
2277         /*
2278          * Our only purpose here is to determine whether the object
2279          * exists, and we don't want to treat the non-existence as
2280          * an error.  If something else comes back, transfer the
2281          * error to the original request and complete it now.
2282          */
2283         if (!result) {
2284                 obj_request_existence_set(orig_request, true);
2285         } else if (result == -ENOENT) {
2286                 obj_request_existence_set(orig_request, false);
2287         } else if (result) {
2288                 orig_request->result = result;
2289                 goto out;
2290         }
2291
2292         /*
2293          * Resubmit the original request now that we have recorded
2294          * whether the target object exists.
2295          */
2296         orig_request->result = rbd_img_obj_request_submit(orig_request);
2297 out:
2298         if (orig_request->result)
2299                 rbd_obj_request_complete(orig_request);
2300         rbd_obj_request_put(orig_request);
2301 }
2302
2303 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2304 {
2305         struct rbd_obj_request *stat_request;
2306         struct rbd_device *rbd_dev;
2307         struct ceph_osd_client *osdc;
2308         struct page **pages = NULL;
2309         u32 page_count;
2310         size_t size;
2311         int ret;
2312
2313         /*
2314          * The response data for a STAT call consists of:
2315          *     le64 length;
2316          *     struct {
2317          *         le32 tv_sec;
2318          *         le32 tv_nsec;
2319          *     } mtime;
2320          */
2321         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2322         page_count = (u32)calc_pages_for(0, size);
2323         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2324         if (IS_ERR(pages))
2325                 return PTR_ERR(pages);
2326
2327         ret = -ENOMEM;
2328         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2329                                                         OBJ_REQUEST_PAGES);
2330         if (!stat_request)
2331                 goto out;
2332
2333         rbd_obj_request_get(obj_request);
2334         stat_request->obj_request = obj_request;
2335         stat_request->pages = pages;
2336         stat_request->page_count = page_count;
2337
2338         rbd_assert(obj_request->img_request);
2339         rbd_dev = obj_request->img_request->rbd_dev;
2340         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2341                                                 stat_request);
2342         if (!stat_request->osd_req)
2343                 goto out;
2344         stat_request->callback = rbd_img_obj_exists_callback;
2345
2346         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2347         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2348                                         false, false);
2349         rbd_osd_req_format_read(stat_request);
2350
2351         osdc = &rbd_dev->rbd_client->client->osdc;
2352         ret = rbd_obj_request_submit(osdc, stat_request);
2353 out:
2354         if (ret)
2355                 rbd_obj_request_put(obj_request);
2356
2357         return ret;
2358 }
2359
2360 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2361 {
2362         struct rbd_img_request *img_request;
2363         struct rbd_device *rbd_dev;
2364         bool known;
2365
2366         rbd_assert(obj_request_img_data_test(obj_request));
2367
2368         img_request = obj_request->img_request;
2369         rbd_assert(img_request);
2370         rbd_dev = img_request->rbd_dev;
2371
2372         /*
2373          * Only writes to layered images need special handling.
2374          * Reads and non-layered writes are simple object requests.
2375          * Layered writes that start beyond the end of the overlap
2376          * with the parent have no parent data, so they too are
2377          * simple object requests.  Finally, if the target object is
2378          * known to already exist, its parent data has already been
2379          * copied, so a write to the object can also be handled as a
2380          * simple object request.
2381          */
2382         if (!img_request_write_test(img_request) ||
2383                 !img_request_layered_test(img_request) ||
2384                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2385                 ((known = obj_request_known_test(obj_request)) &&
2386                         obj_request_exists_test(obj_request))) {
2387
2388                 struct rbd_device *rbd_dev;
2389                 struct ceph_osd_client *osdc;
2390
2391                 rbd_dev = obj_request->img_request->rbd_dev;
2392                 osdc = &rbd_dev->rbd_client->client->osdc;
2393
2394                 return rbd_obj_request_submit(osdc, obj_request);
2395         }
2396
2397         /*
2398          * It's a layered write.  The target object might exist but
2399          * we may not know that yet.  If we know it doesn't exist,
2400          * start by reading the data for the full target object from
2401          * the parent so we can use it for a copyup to the target.
2402          */
2403         if (known)
2404                 return rbd_img_obj_parent_read_full(obj_request);
2405
2406         /* We don't know whether the target exists.  Go find out. */
2407
2408         return rbd_img_obj_exists_submit(obj_request);
2409 }
2410
2411 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2412 {
2413         struct rbd_obj_request *obj_request;
2414         struct rbd_obj_request *next_obj_request;
2415
2416         dout("%s: img %p\n", __func__, img_request);
2417         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2418                 int ret;
2419
2420                 ret = rbd_img_obj_request_submit(obj_request);
2421                 if (ret)
2422                         return ret;
2423         }
2424
2425         return 0;
2426 }
2427
2428 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2429 {
2430         struct rbd_obj_request *obj_request;
2431         struct rbd_device *rbd_dev;
2432         u64 obj_end;
2433
2434         rbd_assert(img_request_child_test(img_request));
2435
2436         obj_request = img_request->obj_request;
2437         rbd_assert(obj_request);
2438         rbd_assert(obj_request->img_request);
2439
2440         obj_request->result = img_request->result;
2441         if (obj_request->result)
2442                 goto out;
2443
2444         /*
2445          * We need to zero anything beyond the parent overlap
2446          * boundary.  Since rbd_img_obj_request_read_callback()
2447          * will zero anything beyond the end of a short read, an
2448          * easy way to do this is to pretend the data from the
2449          * parent came up short--ending at the overlap boundary.
2450          */
2451         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2452         obj_end = obj_request->img_offset + obj_request->length;
2453         rbd_dev = obj_request->img_request->rbd_dev;
2454         if (obj_end > rbd_dev->parent_overlap) {
2455                 u64 xferred = 0;
2456
2457                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2458                         xferred = rbd_dev->parent_overlap -
2459                                         obj_request->img_offset;
2460
2461                 obj_request->xferred = min(img_request->xferred, xferred);
2462         } else {
2463                 obj_request->xferred = img_request->xferred;
2464         }
2465 out:
2466         rbd_img_obj_request_read_callback(obj_request);
2467         rbd_obj_request_complete(obj_request);
2468 }
2469
2470 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2471 {
2472         struct rbd_device *rbd_dev;
2473         struct rbd_img_request *img_request;
2474         int result;
2475
2476         rbd_assert(obj_request_img_data_test(obj_request));
2477         rbd_assert(obj_request->img_request != NULL);
2478         rbd_assert(obj_request->result == (s32) -ENOENT);
2479         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2480
2481         rbd_dev = obj_request->img_request->rbd_dev;
2482         rbd_assert(rbd_dev->parent != NULL);
2483         /* rbd_read_finish(obj_request, obj_request->length); */
2484         img_request = rbd_img_request_create(rbd_dev->parent,
2485                                                 obj_request->img_offset,
2486                                                 obj_request->length,
2487                                                 false, true);
2488         result = -ENOMEM;
2489         if (!img_request)
2490                 goto out_err;
2491
2492         rbd_obj_request_get(obj_request);
2493         img_request->obj_request = obj_request;
2494
2495         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2496                                         obj_request->bio_list);
2497         if (result)
2498                 goto out_err;
2499
2500         img_request->callback = rbd_img_parent_read_callback;
2501         result = rbd_img_request_submit(img_request);
2502         if (result)
2503                 goto out_err;
2504
2505         return;
2506 out_err:
2507         if (img_request)
2508                 rbd_img_request_put(img_request);
2509         obj_request->result = result;
2510         obj_request->xferred = 0;
2511         obj_request_done_set(obj_request);
2512 }
2513
2514 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2515                                    u64 ver, u64 notify_id)
2516 {
2517         struct rbd_obj_request *obj_request;
2518         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2519         int ret;
2520
2521         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2522                                                         OBJ_REQUEST_NODATA);
2523         if (!obj_request)
2524                 return -ENOMEM;
2525
2526         ret = -ENOMEM;
2527         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2528         if (!obj_request->osd_req)
2529                 goto out;
2530         obj_request->callback = rbd_obj_request_put;
2531
2532         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2533                                         notify_id, ver, 0);
2534         rbd_osd_req_format_read(obj_request);
2535
2536         ret = rbd_obj_request_submit(osdc, obj_request);
2537 out:
2538         if (ret)
2539                 rbd_obj_request_put(obj_request);
2540
2541         return ret;
2542 }
2543
2544 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2545 {
2546         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2547         u64 hver;
2548
2549         if (!rbd_dev)
2550                 return;
2551
2552         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2553                 rbd_dev->header_name, (unsigned long long) notify_id,
2554                 (unsigned int) opcode);
2555         (void)rbd_dev_refresh(rbd_dev, &hver);
2556
2557         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2558 }
2559
2560 /*
2561  * Request sync osd watch/unwatch.  The value of "start" determines
2562  * whether a watch request is being initiated or torn down.
2563  */
2564 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2565 {
2566         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2567         struct rbd_obj_request *obj_request;
2568         int ret;
2569
2570         rbd_assert(start ^ !!rbd_dev->watch_event);
2571         rbd_assert(start ^ !!rbd_dev->watch_request);
2572
2573         if (start) {
2574                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2575                                                 &rbd_dev->watch_event);
2576                 if (ret < 0)
2577                         return ret;
2578                 rbd_assert(rbd_dev->watch_event != NULL);
2579         }
2580
2581         ret = -ENOMEM;
2582         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2583                                                         OBJ_REQUEST_NODATA);
2584         if (!obj_request)
2585                 goto out_cancel;
2586
2587         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2588         if (!obj_request->osd_req)
2589                 goto out_cancel;
2590
2591         if (start)
2592                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2593         else
2594                 ceph_osdc_unregister_linger_request(osdc,
2595                                         rbd_dev->watch_request->osd_req);
2596
2597         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2598                                 rbd_dev->watch_event->cookie,
2599                                 rbd_dev->header.obj_version, start);
2600         rbd_osd_req_format_write(obj_request);
2601
2602         ret = rbd_obj_request_submit(osdc, obj_request);
2603         if (ret)
2604                 goto out_cancel;
2605         ret = rbd_obj_request_wait(obj_request);
2606         if (ret)
2607                 goto out_cancel;
2608         ret = obj_request->result;
2609         if (ret)
2610                 goto out_cancel;
2611
2612         /*
2613          * A watch request is set to linger, so the underlying osd
2614          * request won't go away until we unregister it.  We retain
2615          * a pointer to the object request during that time (in
2616          * rbd_dev->watch_request), so we'll keep a reference to
2617          * it.  We'll drop that reference (below) after we've
2618          * unregistered it.
2619          */
2620         if (start) {
2621                 rbd_dev->watch_request = obj_request;
2622
2623                 return 0;
2624         }
2625
2626         /* We have successfully torn down the watch request */
2627
2628         rbd_obj_request_put(rbd_dev->watch_request);
2629         rbd_dev->watch_request = NULL;
2630 out_cancel:
2631         /* Cancel the event if we're tearing down, or on error */
2632         ceph_osdc_cancel_event(rbd_dev->watch_event);
2633         rbd_dev->watch_event = NULL;
2634         if (obj_request)
2635                 rbd_obj_request_put(obj_request);
2636
2637         return ret;
2638 }
2639
2640 /*
2641  * Synchronous osd object method call.  Returns the number of bytes
2642  * returned in the outbound buffer, or a negative error code.
2643  */
2644 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2645                              const char *object_name,
2646                              const char *class_name,
2647                              const char *method_name,
2648                              const void *outbound,
2649                              size_t outbound_size,
2650                              void *inbound,
2651                              size_t inbound_size,
2652                              u64 *version)
2653 {
2654         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2655         struct rbd_obj_request *obj_request;
2656         struct page **pages;
2657         u32 page_count;
2658         int ret;
2659
2660         /*
2661          * Method calls are ultimately read operations.  The result
2662          * should placed into the inbound buffer provided.  They
2663          * also supply outbound data--parameters for the object
2664          * method.  Currently if this is present it will be a
2665          * snapshot id.
2666          */
2667         page_count = (u32)calc_pages_for(0, inbound_size);
2668         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2669         if (IS_ERR(pages))
2670                 return PTR_ERR(pages);
2671
2672         ret = -ENOMEM;
2673         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2674                                                         OBJ_REQUEST_PAGES);
2675         if (!obj_request)
2676                 goto out;
2677
2678         obj_request->pages = pages;
2679         obj_request->page_count = page_count;
2680
2681         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2682         if (!obj_request->osd_req)
2683                 goto out;
2684
2685         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2686                                         class_name, method_name);
2687         if (outbound_size) {
2688                 struct ceph_pagelist *pagelist;
2689
2690                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2691                 if (!pagelist)
2692                         goto out;
2693
2694                 ceph_pagelist_init(pagelist);
2695                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2696                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2697                                                 pagelist);
2698         }
2699         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2700                                         obj_request->pages, inbound_size,
2701                                         0, false, false);
2702         rbd_osd_req_format_read(obj_request);
2703
2704         ret = rbd_obj_request_submit(osdc, obj_request);
2705         if (ret)
2706                 goto out;
2707         ret = rbd_obj_request_wait(obj_request);
2708         if (ret)
2709                 goto out;
2710
2711         ret = obj_request->result;
2712         if (ret < 0)
2713                 goto out;
2714
2715         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2716         ret = (int)obj_request->xferred;
2717         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2718         if (version)
2719                 *version = obj_request->version;
2720 out:
2721         if (obj_request)
2722                 rbd_obj_request_put(obj_request);
2723         else
2724                 ceph_release_page_vector(pages, page_count);
2725
2726         return ret;
2727 }
2728
2729 static void rbd_request_fn(struct request_queue *q)
2730                 __releases(q->queue_lock) __acquires(q->queue_lock)
2731 {
2732         struct rbd_device *rbd_dev = q->queuedata;
2733         bool read_only = rbd_dev->mapping.read_only;
2734         struct request *rq;
2735         int result;
2736
2737         while ((rq = blk_fetch_request(q))) {
2738                 bool write_request = rq_data_dir(rq) == WRITE;
2739                 struct rbd_img_request *img_request;
2740                 u64 offset;
2741                 u64 length;
2742
2743                 /* Ignore any non-FS requests that filter through. */
2744
2745                 if (rq->cmd_type != REQ_TYPE_FS) {
2746                         dout("%s: non-fs request type %d\n", __func__,
2747                                 (int) rq->cmd_type);
2748                         __blk_end_request_all(rq, 0);
2749                         continue;
2750                 }
2751
2752                 /* Ignore/skip any zero-length requests */
2753
2754                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2755                 length = (u64) blk_rq_bytes(rq);
2756
2757                 if (!length) {
2758                         dout("%s: zero-length request\n", __func__);
2759                         __blk_end_request_all(rq, 0);
2760                         continue;
2761                 }
2762
2763                 spin_unlock_irq(q->queue_lock);
2764
2765                 /* Disallow writes to a read-only device */
2766
2767                 if (write_request) {
2768                         result = -EROFS;
2769                         if (read_only)
2770                                 goto end_request;
2771                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2772                 }
2773
2774                 /*
2775                  * Quit early if the mapped snapshot no longer
2776                  * exists.  It's still possible the snapshot will
2777                  * have disappeared by the time our request arrives
2778                  * at the osd, but there's no sense in sending it if
2779                  * we already know.
2780                  */
2781                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2782                         dout("request for non-existent snapshot");
2783                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2784                         result = -ENXIO;
2785                         goto end_request;
2786                 }
2787
2788                 result = -EINVAL;
2789                 if (offset && length > U64_MAX - offset + 1) {
2790                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2791                                 offset, length);
2792                         goto end_request;       /* Shouldn't happen */
2793                 }
2794
2795                 result = -ENOMEM;
2796                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2797                                                         write_request, false);
2798                 if (!img_request)
2799                         goto end_request;
2800
2801                 img_request->rq = rq;
2802
2803                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2804                                                 rq->bio);
2805                 if (!result)
2806                         result = rbd_img_request_submit(img_request);
2807                 if (result)
2808                         rbd_img_request_put(img_request);
2809 end_request:
2810                 spin_lock_irq(q->queue_lock);
2811                 if (result < 0) {
2812                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2813                                 write_request ? "write" : "read",
2814                                 length, offset, result);
2815
2816                         __blk_end_request_all(rq, result);
2817                 }
2818         }
2819 }
2820
2821 /*
2822  * a queue callback. Makes sure that we don't create a bio that spans across
2823  * multiple osd objects. One exception would be with a single page bios,
2824  * which we handle later at bio_chain_clone_range()
2825  */
2826 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2827                           struct bio_vec *bvec)
2828 {
2829         struct rbd_device *rbd_dev = q->queuedata;
2830         sector_t sector_offset;
2831         sector_t sectors_per_obj;
2832         sector_t obj_sector_offset;
2833         int ret;
2834
2835         /*
2836          * Find how far into its rbd object the partition-relative
2837          * bio start sector is to offset relative to the enclosing
2838          * device.
2839          */
2840         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2841         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2842         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2843
2844         /*
2845          * Compute the number of bytes from that offset to the end
2846          * of the object.  Account for what's already used by the bio.
2847          */
2848         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2849         if (ret > bmd->bi_size)
2850                 ret -= bmd->bi_size;
2851         else
2852                 ret = 0;
2853
2854         /*
2855          * Don't send back more than was asked for.  And if the bio
2856          * was empty, let the whole thing through because:  "Note
2857          * that a block device *must* allow a single page to be
2858          * added to an empty bio."
2859          */
2860         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2861         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2862                 ret = (int) bvec->bv_len;
2863
2864         return ret;
2865 }
2866
2867 static void rbd_free_disk(struct rbd_device *rbd_dev)
2868 {
2869         struct gendisk *disk = rbd_dev->disk;
2870
2871         if (!disk)
2872                 return;
2873
2874         rbd_dev->disk = NULL;
2875         if (disk->flags & GENHD_FL_UP) {
2876                 del_gendisk(disk);
2877                 if (disk->queue)
2878                         blk_cleanup_queue(disk->queue);
2879         }
2880         put_disk(disk);
2881 }
2882
2883 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2884                                 const char *object_name,
2885                                 u64 offset, u64 length,
2886                                 void *buf, u64 *version)
2887
2888 {
2889         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2890         struct rbd_obj_request *obj_request;
2891         struct page **pages = NULL;
2892         u32 page_count;
2893         size_t size;
2894         int ret;
2895
2896         page_count = (u32) calc_pages_for(offset, length);
2897         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2898         if (IS_ERR(pages))
2899                 ret = PTR_ERR(pages);
2900
2901         ret = -ENOMEM;
2902         obj_request = rbd_obj_request_create(object_name, offset, length,
2903                                                         OBJ_REQUEST_PAGES);
2904         if (!obj_request)
2905                 goto out;
2906
2907         obj_request->pages = pages;
2908         obj_request->page_count = page_count;
2909
2910         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2911         if (!obj_request->osd_req)
2912                 goto out;
2913
2914         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2915                                         offset, length, 0, 0);
2916         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2917                                         obj_request->pages,
2918                                         obj_request->length,
2919                                         obj_request->offset & ~PAGE_MASK,
2920                                         false, false);
2921         rbd_osd_req_format_read(obj_request);
2922
2923         ret = rbd_obj_request_submit(osdc, obj_request);
2924         if (ret)
2925                 goto out;
2926         ret = rbd_obj_request_wait(obj_request);
2927         if (ret)
2928                 goto out;
2929
2930         ret = obj_request->result;
2931         if (ret < 0)
2932                 goto out;
2933
2934         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2935         size = (size_t) obj_request->xferred;
2936         ceph_copy_from_page_vector(pages, buf, 0, size);
2937         rbd_assert(size <= (size_t) INT_MAX);
2938         ret = (int) size;
2939         if (version)
2940                 *version = obj_request->version;
2941 out:
2942         if (obj_request)
2943                 rbd_obj_request_put(obj_request);
2944         else
2945                 ceph_release_page_vector(pages, page_count);
2946
2947         return ret;
2948 }
2949
2950 /*
2951  * Read the complete header for the given rbd device.
2952  *
2953  * Returns a pointer to a dynamically-allocated buffer containing
2954  * the complete and validated header.  Caller can pass the address
2955  * of a variable that will be filled in with the version of the
2956  * header object at the time it was read.
2957  *
2958  * Returns a pointer-coded errno if a failure occurs.
2959  */
2960 static struct rbd_image_header_ondisk *
2961 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2962 {
2963         struct rbd_image_header_ondisk *ondisk = NULL;
2964         u32 snap_count = 0;
2965         u64 names_size = 0;
2966         u32 want_count;
2967         int ret;
2968
2969         /*
2970          * The complete header will include an array of its 64-bit
2971          * snapshot ids, followed by the names of those snapshots as
2972          * a contiguous block of NUL-terminated strings.  Note that
2973          * the number of snapshots could change by the time we read
2974          * it in, in which case we re-read it.
2975          */
2976         do {
2977                 size_t size;
2978
2979                 kfree(ondisk);
2980
2981                 size = sizeof (*ondisk);
2982                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2983                 size += names_size;
2984                 ondisk = kmalloc(size, GFP_KERNEL);
2985                 if (!ondisk)
2986                         return ERR_PTR(-ENOMEM);
2987
2988                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2989                                        0, size, ondisk, version);
2990                 if (ret < 0)
2991                         goto out_err;
2992                 if ((size_t)ret < size) {
2993                         ret = -ENXIO;
2994                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2995                                 size, ret);
2996                         goto out_err;
2997                 }
2998                 if (!rbd_dev_ondisk_valid(ondisk)) {
2999                         ret = -ENXIO;
3000                         rbd_warn(rbd_dev, "invalid header");
3001                         goto out_err;
3002                 }
3003
3004                 names_size = le64_to_cpu(ondisk->snap_names_len);
3005                 want_count = snap_count;
3006                 snap_count = le32_to_cpu(ondisk->snap_count);
3007         } while (snap_count != want_count);
3008
3009         return ondisk;
3010
3011 out_err:
3012         kfree(ondisk);
3013
3014         return ERR_PTR(ret);
3015 }
3016
3017 /*
3018  * reload the ondisk the header
3019  */
3020 static int rbd_read_header(struct rbd_device *rbd_dev,
3021                            struct rbd_image_header *header)
3022 {
3023         struct rbd_image_header_ondisk *ondisk;
3024         u64 ver = 0;
3025         int ret;
3026
3027         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3028         if (IS_ERR(ondisk))
3029                 return PTR_ERR(ondisk);
3030         ret = rbd_header_from_disk(header, ondisk);
3031         if (ret >= 0)
3032                 header->obj_version = ver;
3033         kfree(ondisk);
3034
3035         return ret;
3036 }
3037
3038 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3039 {
3040         struct rbd_snap *snap;
3041         struct rbd_snap *next;
3042
3043         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3044                 list_del(&snap->node);
3045                 rbd_snap_destroy(snap);
3046         }
3047 }
3048
3049 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3050 {
3051         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3052                 return;
3053
3054         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3055                 sector_t size;
3056
3057                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3058                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3059                 dout("setting size to %llu sectors", (unsigned long long)size);
3060                 set_capacity(rbd_dev->disk, size);
3061         }
3062 }
3063
3064 /*
3065  * only read the first part of the ondisk header, without the snaps info
3066  */
3067 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3068 {
3069         int ret;
3070         struct rbd_image_header h;
3071
3072         ret = rbd_read_header(rbd_dev, &h);
3073         if (ret < 0)
3074                 return ret;
3075
3076         down_write(&rbd_dev->header_rwsem);
3077
3078         /* Update image size, and check for resize of mapped image */
3079         rbd_dev->header.image_size = h.image_size;
3080         rbd_update_mapping_size(rbd_dev);
3081
3082         /* rbd_dev->header.object_prefix shouldn't change */
3083         kfree(rbd_dev->header.snap_sizes);
3084         kfree(rbd_dev->header.snap_names);
3085         /* osd requests may still refer to snapc */
3086         rbd_snap_context_put(rbd_dev->header.snapc);
3087
3088         if (hver)
3089                 *hver = h.obj_version;
3090         rbd_dev->header.obj_version = h.obj_version;
3091         rbd_dev->header.image_size = h.image_size;
3092         rbd_dev->header.snapc = h.snapc;
3093         rbd_dev->header.snap_names = h.snap_names;
3094         rbd_dev->header.snap_sizes = h.snap_sizes;
3095         /* Free the extra copy of the object prefix */
3096         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3097                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3098         kfree(h.object_prefix);
3099
3100         ret = rbd_dev_snaps_update(rbd_dev);
3101
3102         up_write(&rbd_dev->header_rwsem);
3103
3104         return ret;
3105 }
3106
3107 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3108 {
3109         int ret;
3110
3111         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3112         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3113         if (rbd_dev->image_format == 1)
3114                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3115         else
3116                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3117         mutex_unlock(&ctl_mutex);
3118         revalidate_disk(rbd_dev->disk);
3119         if (ret)
3120                 rbd_warn(rbd_dev, "got notification but failed to "
3121                            " update snaps: %d\n", ret);
3122
3123         return ret;
3124 }
3125
3126 static int rbd_init_disk(struct rbd_device *rbd_dev)
3127 {
3128         struct gendisk *disk;
3129         struct request_queue *q;
3130         u64 segment_size;
3131
3132         /* create gendisk info */
3133         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3134         if (!disk)
3135                 return -ENOMEM;
3136
3137         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3138                  rbd_dev->dev_id);
3139         disk->major = rbd_dev->major;
3140         disk->first_minor = 0;
3141         disk->fops = &rbd_bd_ops;
3142         disk->private_data = rbd_dev;
3143
3144         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3145         if (!q)
3146                 goto out_disk;
3147
3148         /* We use the default size, but let's be explicit about it. */
3149         blk_queue_physical_block_size(q, SECTOR_SIZE);
3150
3151         /* set io sizes to object size */
3152         segment_size = rbd_obj_bytes(&rbd_dev->header);
3153         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3154         blk_queue_max_segment_size(q, segment_size);
3155         blk_queue_io_min(q, segment_size);
3156         blk_queue_io_opt(q, segment_size);
3157
3158         blk_queue_merge_bvec(q, rbd_merge_bvec);
3159         disk->queue = q;
3160
3161         q->queuedata = rbd_dev;
3162
3163         rbd_dev->disk = disk;
3164
3165         return 0;
3166 out_disk:
3167         put_disk(disk);
3168
3169         return -ENOMEM;
3170 }
3171
3172 /*
3173   sysfs
3174 */
3175
3176 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3177 {
3178         return container_of(dev, struct rbd_device, dev);
3179 }
3180
3181 static ssize_t rbd_size_show(struct device *dev,
3182                              struct device_attribute *attr, char *buf)
3183 {
3184         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3185
3186         return sprintf(buf, "%llu\n",
3187                 (unsigned long long)rbd_dev->mapping.size);
3188 }
3189
3190 /*
3191  * Note this shows the features for whatever's mapped, which is not
3192  * necessarily the base image.
3193  */
3194 static ssize_t rbd_features_show(struct device *dev,
3195                              struct device_attribute *attr, char *buf)
3196 {
3197         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3198
3199         return sprintf(buf, "0x%016llx\n",
3200                         (unsigned long long)rbd_dev->mapping.features);
3201 }
3202
3203 static ssize_t rbd_major_show(struct device *dev,
3204                               struct device_attribute *attr, char *buf)
3205 {
3206         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3207
3208         if (rbd_dev->major)
3209                 return sprintf(buf, "%d\n", rbd_dev->major);
3210
3211         return sprintf(buf, "(none)\n");
3212
3213 }
3214
3215 static ssize_t rbd_client_id_show(struct device *dev,
3216                                   struct device_attribute *attr, char *buf)
3217 {
3218         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3219
3220         return sprintf(buf, "client%lld\n",
3221                         ceph_client_id(rbd_dev->rbd_client->client));
3222 }
3223
3224 static ssize_t rbd_pool_show(struct device *dev,
3225                              struct device_attribute *attr, char *buf)
3226 {
3227         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3228
3229         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3230 }
3231
3232 static ssize_t rbd_pool_id_show(struct device *dev,
3233                              struct device_attribute *attr, char *buf)
3234 {
3235         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3236
3237         return sprintf(buf, "%llu\n",
3238                         (unsigned long long) rbd_dev->spec->pool_id);
3239 }
3240
3241 static ssize_t rbd_name_show(struct device *dev,
3242                              struct device_attribute *attr, char *buf)
3243 {
3244         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3245
3246         if (rbd_dev->spec->image_name)
3247                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3248
3249         return sprintf(buf, "(unknown)\n");
3250 }
3251
3252 static ssize_t rbd_image_id_show(struct device *dev,
3253                              struct device_attribute *attr, char *buf)
3254 {
3255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3256
3257         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3258 }
3259
3260 /*
3261  * Shows the name of the currently-mapped snapshot (or
3262  * RBD_SNAP_HEAD_NAME for the base image).
3263  */
3264 static ssize_t rbd_snap_show(struct device *dev,
3265                              struct device_attribute *attr,
3266                              char *buf)
3267 {
3268         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3269
3270         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3271 }
3272
3273 /*
3274  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3275  * for the parent image.  If there is no parent, simply shows
3276  * "(no parent image)".
3277  */
3278 static ssize_t rbd_parent_show(struct device *dev,
3279                              struct device_attribute *attr,
3280                              char *buf)
3281 {
3282         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3283         struct rbd_spec *spec = rbd_dev->parent_spec;
3284         int count;
3285         char *bufp = buf;
3286
3287         if (!spec)
3288                 return sprintf(buf, "(no parent image)\n");
3289
3290         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3291                         (unsigned long long) spec->pool_id, spec->pool_name);
3292         if (count < 0)
3293                 return count;
3294         bufp += count;
3295
3296         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3297                         spec->image_name ? spec->image_name : "(unknown)");
3298         if (count < 0)
3299                 return count;
3300         bufp += count;
3301
3302         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3303                         (unsigned long long) spec->snap_id, spec->snap_name);
3304         if (count < 0)
3305                 return count;
3306         bufp += count;
3307
3308         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3309         if (count < 0)
3310                 return count;
3311         bufp += count;
3312
3313         return (ssize_t) (bufp - buf);
3314 }
3315
3316 static ssize_t rbd_image_refresh(struct device *dev,
3317                                  struct device_attribute *attr,
3318                                  const char *buf,
3319                                  size_t size)
3320 {
3321         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3322         int ret;
3323
3324         ret = rbd_dev_refresh(rbd_dev, NULL);
3325
3326         return ret < 0 ? ret : size;
3327 }
3328
3329 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3330 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3331 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3332 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3333 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3334 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3335 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3336 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3337 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3338 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3339 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3340
3341 static struct attribute *rbd_attrs[] = {
3342         &dev_attr_size.attr,
3343         &dev_attr_features.attr,
3344         &dev_attr_major.attr,
3345         &dev_attr_client_id.attr,
3346         &dev_attr_pool.attr,
3347         &dev_attr_pool_id.attr,
3348         &dev_attr_name.attr,
3349         &dev_attr_image_id.attr,
3350         &dev_attr_current_snap.attr,
3351         &dev_attr_parent.attr,
3352         &dev_attr_refresh.attr,
3353         NULL
3354 };
3355
3356 static struct attribute_group rbd_attr_group = {
3357         .attrs = rbd_attrs,
3358 };
3359
3360 static const struct attribute_group *rbd_attr_groups[] = {
3361         &rbd_attr_group,
3362         NULL
3363 };
3364
3365 static void rbd_sysfs_dev_release(struct device *dev)
3366 {
3367 }
3368
3369 static struct device_type rbd_device_type = {
3370         .name           = "rbd",
3371         .groups         = rbd_attr_groups,
3372         .release        = rbd_sysfs_dev_release,
3373 };
3374
3375 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3376 {
3377         kref_get(&spec->kref);
3378
3379         return spec;
3380 }
3381
3382 static void rbd_spec_free(struct kref *kref);
3383 static void rbd_spec_put(struct rbd_spec *spec)
3384 {
3385         if (spec)
3386                 kref_put(&spec->kref, rbd_spec_free);
3387 }
3388
3389 static struct rbd_spec *rbd_spec_alloc(void)
3390 {
3391         struct rbd_spec *spec;
3392
3393         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3394         if (!spec)
3395                 return NULL;
3396         kref_init(&spec->kref);
3397
3398         return spec;
3399 }
3400
3401 static void rbd_spec_free(struct kref *kref)
3402 {
3403         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3404
3405         kfree(spec->pool_name);
3406         kfree(spec->image_id);
3407         kfree(spec->image_name);
3408         kfree(spec->snap_name);
3409         kfree(spec);
3410 }
3411
3412 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3413                                 struct rbd_spec *spec)
3414 {
3415         struct rbd_device *rbd_dev;
3416
3417         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3418         if (!rbd_dev)
3419                 return NULL;
3420
3421         spin_lock_init(&rbd_dev->lock);
3422         rbd_dev->flags = 0;
3423         INIT_LIST_HEAD(&rbd_dev->node);
3424         INIT_LIST_HEAD(&rbd_dev->snaps);
3425         init_rwsem(&rbd_dev->header_rwsem);
3426
3427         rbd_dev->spec = spec;
3428         rbd_dev->rbd_client = rbdc;
3429
3430         /* Initialize the layout used for all rbd requests */
3431
3432         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3433         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3434         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3435         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3436
3437         return rbd_dev;
3438 }
3439
3440 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3441 {
3442         rbd_put_client(rbd_dev->rbd_client);
3443         rbd_spec_put(rbd_dev->spec);
3444         kfree(rbd_dev);
3445 }
3446
3447 static void rbd_snap_destroy(struct rbd_snap *snap)
3448 {
3449         kfree(snap->name);
3450         kfree(snap);
3451 }
3452
3453 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3454                                                 const char *snap_name,
3455                                                 u64 snap_id, u64 snap_size,
3456                                                 u64 snap_features)
3457 {
3458         struct rbd_snap *snap;
3459
3460         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3461         if (!snap)
3462                 return ERR_PTR(-ENOMEM);
3463
3464         snap->name = snap_name;
3465         snap->id = snap_id;
3466         snap->size = snap_size;
3467         snap->features = snap_features;
3468
3469         return snap;
3470 }
3471
3472 /*
3473  * Returns a dynamically-allocated snapshot name if successful, or a
3474  * pointer-coded error otherwise.
3475  */
3476 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3477                 u64 *snap_size, u64 *snap_features)
3478 {
3479         char *snap_name;
3480         int i;
3481
3482         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3483
3484         /* Skip over names until we find the one we are looking for */
3485
3486         snap_name = rbd_dev->header.snap_names;
3487         for (i = 0; i < which; i++)
3488                 snap_name += strlen(snap_name) + 1;
3489
3490         snap_name = kstrdup(snap_name, GFP_KERNEL);
3491         if (!snap_name)
3492                 return ERR_PTR(-ENOMEM);
3493
3494         *snap_size = rbd_dev->header.snap_sizes[which];
3495         *snap_features = 0;     /* No features for v1 */
3496
3497         return snap_name;
3498 }
3499
3500 /*
3501  * Get the size and object order for an image snapshot, or if
3502  * snap_id is CEPH_NOSNAP, gets this information for the base
3503  * image.
3504  */
3505 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3506                                 u8 *order, u64 *snap_size)
3507 {
3508         __le64 snapid = cpu_to_le64(snap_id);
3509         int ret;
3510         struct {
3511                 u8 order;
3512                 __le64 size;
3513         } __attribute__ ((packed)) size_buf = { 0 };
3514
3515         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3516                                 "rbd", "get_size",
3517                                 &snapid, sizeof (snapid),
3518                                 &size_buf, sizeof (size_buf), NULL);
3519         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3520         if (ret < 0)
3521                 return ret;
3522         if (ret < sizeof (size_buf))
3523                 return -ERANGE;
3524
3525         if (order)
3526                 *order = size_buf.order;
3527         *snap_size = le64_to_cpu(size_buf.size);
3528
3529         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3530                 (unsigned long long)snap_id, (unsigned int)*order,
3531                 (unsigned long long)*snap_size);
3532
3533         return 0;
3534 }
3535
3536 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3537 {
3538         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3539                                         &rbd_dev->header.obj_order,
3540                                         &rbd_dev->header.image_size);
3541 }
3542
3543 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3544 {
3545         void *reply_buf;
3546         int ret;
3547         void *p;
3548
3549         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3550         if (!reply_buf)
3551                 return -ENOMEM;
3552
3553         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3554                                 "rbd", "get_object_prefix", NULL, 0,
3555                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3556         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3557         if (ret < 0)
3558                 goto out;
3559
3560         p = reply_buf;
3561         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3562                                                 p + ret, NULL, GFP_NOIO);
3563         ret = 0;
3564
3565         if (IS_ERR(rbd_dev->header.object_prefix)) {
3566                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3567                 rbd_dev->header.object_prefix = NULL;
3568         } else {
3569                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3570         }
3571 out:
3572         kfree(reply_buf);
3573
3574         return ret;
3575 }
3576
3577 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3578                 u64 *snap_features)
3579 {
3580         __le64 snapid = cpu_to_le64(snap_id);
3581         struct {
3582                 __le64 features;
3583                 __le64 incompat;
3584         } __attribute__ ((packed)) features_buf = { 0 };
3585         u64 incompat;
3586         int ret;
3587
3588         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3589                                 "rbd", "get_features",
3590                                 &snapid, sizeof (snapid),
3591                                 &features_buf, sizeof (features_buf), NULL);
3592         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3593         if (ret < 0)
3594                 return ret;
3595         if (ret < sizeof (features_buf))
3596                 return -ERANGE;
3597
3598         incompat = le64_to_cpu(features_buf.incompat);
3599         if (incompat & ~RBD_FEATURES_SUPPORTED)
3600                 return -ENXIO;
3601
3602         *snap_features = le64_to_cpu(features_buf.features);
3603
3604         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3605                 (unsigned long long)snap_id,
3606                 (unsigned long long)*snap_features,
3607                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3608
3609         return 0;
3610 }
3611
3612 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3613 {
3614         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3615                                                 &rbd_dev->header.features);
3616 }
3617
3618 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3619 {
3620         struct rbd_spec *parent_spec;
3621         size_t size;
3622         void *reply_buf = NULL;
3623         __le64 snapid;
3624         void *p;
3625         void *end;
3626         char *image_id;
3627         u64 overlap;
3628         int ret;
3629
3630         parent_spec = rbd_spec_alloc();
3631         if (!parent_spec)
3632                 return -ENOMEM;
3633
3634         size = sizeof (__le64) +                                /* pool_id */
3635                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3636                 sizeof (__le64) +                               /* snap_id */
3637                 sizeof (__le64);                                /* overlap */
3638         reply_buf = kmalloc(size, GFP_KERNEL);
3639         if (!reply_buf) {
3640                 ret = -ENOMEM;
3641                 goto out_err;
3642         }
3643
3644         snapid = cpu_to_le64(CEPH_NOSNAP);
3645         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3646                                 "rbd", "get_parent",
3647                                 &snapid, sizeof (snapid),
3648                                 reply_buf, size, NULL);
3649         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3650         if (ret < 0)
3651                 goto out_err;
3652
3653         p = reply_buf;
3654         end = reply_buf + ret;
3655         ret = -ERANGE;
3656         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3657         if (parent_spec->pool_id == CEPH_NOPOOL)
3658                 goto out;       /* No parent?  No problem. */
3659
3660         /* The ceph file layout needs to fit pool id in 32 bits */
3661
3662         ret = -EIO;
3663         if (parent_spec->pool_id > (u64)U32_MAX) {
3664                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3665                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3666                 goto out_err;
3667         }
3668
3669         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3670         if (IS_ERR(image_id)) {
3671                 ret = PTR_ERR(image_id);
3672                 goto out_err;
3673         }
3674         parent_spec->image_id = image_id;
3675         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3676         ceph_decode_64_safe(&p, end, overlap, out_err);
3677
3678         rbd_dev->parent_overlap = overlap;
3679         rbd_dev->parent_spec = parent_spec;
3680         parent_spec = NULL;     /* rbd_dev now owns this */
3681 out:
3682         ret = 0;
3683 out_err:
3684         kfree(reply_buf);
3685         rbd_spec_put(parent_spec);
3686
3687         return ret;
3688 }
3689
3690 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3691 {
3692         struct {
3693                 __le64 stripe_unit;
3694                 __le64 stripe_count;
3695         } __attribute__ ((packed)) striping_info_buf = { 0 };
3696         size_t size = sizeof (striping_info_buf);
3697         void *p;
3698         u64 obj_size;
3699         u64 stripe_unit;
3700         u64 stripe_count;
3701         int ret;
3702
3703         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3704                                 "rbd", "get_stripe_unit_count", NULL, 0,
3705                                 (char *)&striping_info_buf, size, NULL);
3706         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3707         if (ret < 0)
3708                 return ret;
3709         if (ret < size)
3710                 return -ERANGE;
3711
3712         /*
3713          * We don't actually support the "fancy striping" feature
3714          * (STRIPINGV2) yet, but if the striping sizes are the
3715          * defaults the behavior is the same as before.  So find
3716          * out, and only fail if the image has non-default values.
3717          */
3718         ret = -EINVAL;
3719         obj_size = (u64)1 << rbd_dev->header.obj_order;
3720         p = &striping_info_buf;
3721         stripe_unit = ceph_decode_64(&p);
3722         if (stripe_unit != obj_size) {
3723                 rbd_warn(rbd_dev, "unsupported stripe unit "
3724                                 "(got %llu want %llu)",
3725                                 stripe_unit, obj_size);
3726                 return -EINVAL;
3727         }
3728         stripe_count = ceph_decode_64(&p);
3729         if (stripe_count != 1) {
3730                 rbd_warn(rbd_dev, "unsupported stripe count "
3731                                 "(got %llu want 1)", stripe_count);
3732                 return -EINVAL;
3733         }
3734         rbd_dev->header.stripe_unit = stripe_unit;
3735         rbd_dev->header.stripe_count = stripe_count;
3736
3737         return 0;
3738 }
3739
3740 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3741 {
3742         size_t image_id_size;
3743         char *image_id;
3744         void *p;
3745         void *end;
3746         size_t size;
3747         void *reply_buf = NULL;
3748         size_t len = 0;
3749         char *image_name = NULL;
3750         int ret;
3751
3752         rbd_assert(!rbd_dev->spec->image_name);
3753
3754         len = strlen(rbd_dev->spec->image_id);
3755         image_id_size = sizeof (__le32) + len;
3756         image_id = kmalloc(image_id_size, GFP_KERNEL);
3757         if (!image_id)
3758                 return NULL;
3759
3760         p = image_id;
3761         end = image_id + image_id_size;
3762         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3763
3764         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3765         reply_buf = kmalloc(size, GFP_KERNEL);
3766         if (!reply_buf)
3767                 goto out;
3768
3769         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3770                                 "rbd", "dir_get_name",
3771                                 image_id, image_id_size,
3772                                 reply_buf, size, NULL);
3773         if (ret < 0)
3774                 goto out;
3775         p = reply_buf;
3776         end = reply_buf + ret;
3777
3778         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3779         if (IS_ERR(image_name))
3780                 image_name = NULL;
3781         else
3782                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3783 out:
3784         kfree(reply_buf);
3785         kfree(image_id);
3786
3787         return image_name;
3788 }
3789
3790 /*
3791  * When an rbd image has a parent image, it is identified by the
3792  * pool, image, and snapshot ids (not names).  This function fills
3793  * in the names for those ids.  (It's OK if we can't figure out the
3794  * name for an image id, but the pool and snapshot ids should always
3795  * exist and have names.)  All names in an rbd spec are dynamically
3796  * allocated.
3797  *
3798  * When an image being mapped (not a parent) is probed, we have the
3799  * pool name and pool id, image name and image id, and the snapshot
3800  * name.  The only thing we're missing is the snapshot id.
3801  *
3802  * The set of snapshots for an image is not known until they have
3803  * been read by rbd_dev_snaps_update(), so we can't completely fill
3804  * in this information until after that has been called.
3805  */
3806 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3807 {
3808         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3809         struct rbd_spec *spec = rbd_dev->spec;
3810         const char *pool_name;
3811         const char *image_name;
3812         const char *snap_name;
3813         int ret;
3814
3815         /*
3816          * An image being mapped will have the pool name (etc.), but
3817          * we need to look up the snapshot id.
3818          */
3819         if (spec->pool_name) {
3820                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3821                         struct rbd_snap *snap;
3822
3823                         snap = snap_by_name(rbd_dev, spec->snap_name);
3824                         if (!snap)
3825                                 return -ENOENT;
3826                         spec->snap_id = snap->id;
3827                 } else {
3828                         spec->snap_id = CEPH_NOSNAP;
3829                 }
3830
3831                 return 0;
3832         }
3833
3834         /* Get the pool name; we have to make our own copy of this */
3835
3836         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3837         if (!pool_name) {
3838                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3839                 return -EIO;
3840         }
3841         pool_name = kstrdup(pool_name, GFP_KERNEL);
3842         if (!pool_name)
3843                 return -ENOMEM;
3844
3845         /* Fetch the image name; tolerate failure here */
3846
3847         image_name = rbd_dev_image_name(rbd_dev);
3848         if (!image_name)
3849                 rbd_warn(rbd_dev, "unable to get image name");
3850
3851         /* Look up the snapshot name, and make a copy */
3852
3853         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3854         if (!snap_name) {
3855                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3856                 ret = -EIO;
3857                 goto out_err;
3858         }
3859         snap_name = kstrdup(snap_name, GFP_KERNEL);
3860         if (!snap_name) {
3861                 ret = -ENOMEM;
3862                 goto out_err;
3863         }
3864
3865         spec->pool_name = pool_name;
3866         spec->image_name = image_name;
3867         spec->snap_name = snap_name;
3868
3869         return 0;
3870 out_err:
3871         kfree(image_name);
3872         kfree(pool_name);
3873
3874         return ret;
3875 }
3876
3877 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3878 {
3879         size_t size;
3880         int ret;
3881         void *reply_buf;
3882         void *p;
3883         void *end;
3884         u64 seq;
3885         u32 snap_count;
3886         struct ceph_snap_context *snapc;
3887         u32 i;
3888
3889         /*
3890          * We'll need room for the seq value (maximum snapshot id),
3891          * snapshot count, and array of that many snapshot ids.
3892          * For now we have a fixed upper limit on the number we're
3893          * prepared to receive.
3894          */
3895         size = sizeof (__le64) + sizeof (__le32) +
3896                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3897         reply_buf = kzalloc(size, GFP_KERNEL);
3898         if (!reply_buf)
3899                 return -ENOMEM;
3900
3901         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3902                                 "rbd", "get_snapcontext", NULL, 0,
3903                                 reply_buf, size, ver);
3904         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3905         if (ret < 0)
3906                 goto out;
3907
3908         p = reply_buf;
3909         end = reply_buf + ret;
3910         ret = -ERANGE;
3911         ceph_decode_64_safe(&p, end, seq, out);
3912         ceph_decode_32_safe(&p, end, snap_count, out);
3913
3914         /*
3915          * Make sure the reported number of snapshot ids wouldn't go
3916          * beyond the end of our buffer.  But before checking that,
3917          * make sure the computed size of the snapshot context we
3918          * allocate is representable in a size_t.
3919          */
3920         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3921                                  / sizeof (u64)) {
3922                 ret = -EINVAL;
3923                 goto out;
3924         }
3925         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3926                 goto out;
3927         ret = 0;
3928
3929         snapc = rbd_snap_context_create(snap_count);
3930         if (!snapc) {
3931                 ret = -ENOMEM;
3932                 goto out;
3933         }
3934         snapc->seq = seq;
3935         for (i = 0; i < snap_count; i++)
3936                 snapc->snaps[i] = ceph_decode_64(&p);
3937
3938         rbd_dev->header.snapc = snapc;
3939
3940         dout("  snap context seq = %llu, snap_count = %u\n",
3941                 (unsigned long long)seq, (unsigned int)snap_count);
3942 out:
3943         kfree(reply_buf);
3944
3945         return ret;
3946 }
3947
3948 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3949 {
3950         size_t size;
3951         void *reply_buf;
3952         __le64 snap_id;
3953         int ret;
3954         void *p;
3955         void *end;
3956         char *snap_name;
3957
3958         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3959         reply_buf = kmalloc(size, GFP_KERNEL);
3960         if (!reply_buf)
3961                 return ERR_PTR(-ENOMEM);
3962
3963         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3964         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3965         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3966                                 "rbd", "get_snapshot_name",
3967                                 &snap_id, sizeof (snap_id),
3968                                 reply_buf, size, NULL);
3969         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3970         if (ret < 0) {
3971                 snap_name = ERR_PTR(ret);
3972                 goto out;
3973         }
3974
3975         p = reply_buf;
3976         end = reply_buf + ret;
3977         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3978         if (IS_ERR(snap_name))
3979                 goto out;
3980
3981         dout("  snap_id 0x%016llx snap_name = %s\n",
3982                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3983 out:
3984         kfree(reply_buf);
3985
3986         return snap_name;
3987 }
3988
3989 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3990                 u64 *snap_size, u64 *snap_features)
3991 {
3992         u64 snap_id;
3993         u64 size;
3994         u64 features;
3995         char *snap_name;
3996         int ret;
3997
3998         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3999         snap_id = rbd_dev->header.snapc->snaps[which];
4000         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
4001         if (ret)
4002                 goto out_err;
4003
4004         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
4005         if (ret)
4006                 goto out_err;
4007
4008         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
4009         if (!IS_ERR(snap_name)) {
4010                 *snap_size = size;
4011                 *snap_features = features;
4012         }
4013
4014         return snap_name;
4015 out_err:
4016         return ERR_PTR(ret);
4017 }
4018
4019 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4020                 u64 *snap_size, u64 *snap_features)
4021 {
4022         if (rbd_dev->image_format == 1)
4023                 return rbd_dev_v1_snap_info(rbd_dev, which,
4024                                         snap_size, snap_features);
4025         if (rbd_dev->image_format == 2)
4026                 return rbd_dev_v2_snap_info(rbd_dev, which,
4027                                         snap_size, snap_features);
4028         return ERR_PTR(-EINVAL);
4029 }
4030
4031 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4032 {
4033         int ret;
4034
4035         down_write(&rbd_dev->header_rwsem);
4036
4037         ret = rbd_dev_v2_image_size(rbd_dev);
4038         if (ret)
4039                 goto out;
4040         rbd_update_mapping_size(rbd_dev);
4041
4042         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4043         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4044         if (ret)
4045                 goto out;
4046         ret = rbd_dev_snaps_update(rbd_dev);
4047         dout("rbd_dev_snaps_update returned %d\n", ret);
4048         if (ret)
4049                 goto out;
4050 out:
4051         up_write(&rbd_dev->header_rwsem);
4052
4053         return ret;
4054 }
4055
4056 /*
4057  * Scan the rbd device's current snapshot list and compare it to the
4058  * newly-received snapshot context.  Remove any existing snapshots
4059  * not present in the new snapshot context.  Add a new snapshot for
4060  * any snaphots in the snapshot context not in the current list.
4061  * And verify there are no changes to snapshots we already know
4062  * about.
4063  *
4064  * Assumes the snapshots in the snapshot context are sorted by
4065  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4066  * are also maintained in that order.)
4067  *
4068  * Note that any error occurs while updating the snapshot list
4069  * aborts the update, and the entire list is cleared.  The snapshot
4070  * list becomes inconsistent at that point anyway, so it might as
4071  * well be empty.
4072  */
4073 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4074 {
4075         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4076         const u32 snap_count = snapc->num_snaps;
4077         struct list_head *head = &rbd_dev->snaps;
4078         struct list_head *links = head->next;
4079         u32 index = 0;
4080         int ret = 0;
4081
4082         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4083         while (index < snap_count || links != head) {
4084                 u64 snap_id;
4085                 struct rbd_snap *snap;
4086                 char *snap_name;
4087                 u64 snap_size = 0;
4088                 u64 snap_features = 0;
4089
4090                 snap_id = index < snap_count ? snapc->snaps[index]
4091                                              : CEPH_NOSNAP;
4092                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4093                                      : NULL;
4094                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4095
4096                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4097                         struct list_head *next = links->next;
4098
4099                         /*
4100                          * A previously-existing snapshot is not in
4101                          * the new snap context.
4102                          *
4103                          * If the now-missing snapshot is the one
4104                          * the image represents, clear its existence
4105                          * flag so we can avoid sending any more
4106                          * requests to it.
4107                          */
4108                         if (rbd_dev->spec->snap_id == snap->id)
4109                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4110                         dout("removing %ssnap id %llu\n",
4111                                 rbd_dev->spec->snap_id == snap->id ?
4112                                                         "mapped " : "",
4113                                 (unsigned long long)snap->id);
4114
4115                         list_del(&snap->node);
4116                         rbd_snap_destroy(snap);
4117
4118                         /* Done with this list entry; advance */
4119
4120                         links = next;
4121                         continue;
4122                 }
4123
4124                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4125                                         &snap_size, &snap_features);
4126                 if (IS_ERR(snap_name)) {
4127                         ret = PTR_ERR(snap_name);
4128                         dout("failed to get snap info, error %d\n", ret);
4129                         goto out_err;
4130                 }
4131
4132                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4133                         (unsigned long long)snap_id);
4134                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4135                         struct rbd_snap *new_snap;
4136
4137                         /* We haven't seen this snapshot before */
4138
4139                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4140                                         snap_id, snap_size, snap_features);
4141                         if (IS_ERR(new_snap)) {
4142                                 ret = PTR_ERR(new_snap);
4143                                 dout("  failed to add dev, error %d\n", ret);
4144                                 goto out_err;
4145                         }
4146
4147                         /* New goes before existing, or at end of list */
4148
4149                         dout("  added dev%s\n", snap ? "" : " at end\n");
4150                         if (snap)
4151                                 list_add_tail(&new_snap->node, &snap->node);
4152                         else
4153                                 list_add_tail(&new_snap->node, head);
4154                 } else {
4155                         /* Already have this one */
4156
4157                         dout("  already present\n");
4158
4159                         rbd_assert(snap->size == snap_size);
4160                         rbd_assert(!strcmp(snap->name, snap_name));
4161                         rbd_assert(snap->features == snap_features);
4162
4163                         /* Done with this list entry; advance */
4164
4165                         links = links->next;
4166                 }
4167
4168                 /* Advance to the next entry in the snapshot context */
4169
4170                 index++;
4171         }
4172         dout("%s: done\n", __func__);
4173
4174         return 0;
4175 out_err:
4176         rbd_remove_all_snaps(rbd_dev);
4177
4178         return ret;
4179 }
4180
4181 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4182 {
4183         struct device *dev;
4184         int ret;
4185
4186         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4187
4188         dev = &rbd_dev->dev;
4189         dev->bus = &rbd_bus_type;
4190         dev->type = &rbd_device_type;
4191         dev->parent = &rbd_root_dev;
4192         dev->release = rbd_dev_device_release;
4193         dev_set_name(dev, "%d", rbd_dev->dev_id);
4194         ret = device_register(dev);
4195
4196         mutex_unlock(&ctl_mutex);
4197
4198         return ret;
4199 }
4200
4201 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4202 {
4203         device_unregister(&rbd_dev->dev);
4204 }
4205
4206 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4207
4208 /*
4209  * Get a unique rbd identifier for the given new rbd_dev, and add
4210  * the rbd_dev to the global list.  The minimum rbd id is 1.
4211  */
4212 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4213 {
4214         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4215
4216         spin_lock(&rbd_dev_list_lock);
4217         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4218         spin_unlock(&rbd_dev_list_lock);
4219         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4220                 (unsigned long long) rbd_dev->dev_id);
4221 }
4222
4223 /*
4224  * Remove an rbd_dev from the global list, and record that its
4225  * identifier is no longer in use.
4226  */
4227 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4228 {
4229         struct list_head *tmp;
4230         int rbd_id = rbd_dev->dev_id;
4231         int max_id;
4232
4233         rbd_assert(rbd_id > 0);
4234
4235         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4236                 (unsigned long long) rbd_dev->dev_id);
4237         spin_lock(&rbd_dev_list_lock);
4238         list_del_init(&rbd_dev->node);
4239
4240         /*
4241          * If the id being "put" is not the current maximum, there
4242          * is nothing special we need to do.
4243          */
4244         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4245                 spin_unlock(&rbd_dev_list_lock);
4246                 return;
4247         }
4248
4249         /*
4250          * We need to update the current maximum id.  Search the
4251          * list to find out what it is.  We're more likely to find
4252          * the maximum at the end, so search the list backward.
4253          */
4254         max_id = 0;
4255         list_for_each_prev(tmp, &rbd_dev_list) {
4256                 struct rbd_device *rbd_dev;
4257
4258                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4259                 if (rbd_dev->dev_id > max_id)
4260                         max_id = rbd_dev->dev_id;
4261         }
4262         spin_unlock(&rbd_dev_list_lock);
4263
4264         /*
4265          * The max id could have been updated by rbd_dev_id_get(), in
4266          * which case it now accurately reflects the new maximum.
4267          * Be careful not to overwrite the maximum value in that
4268          * case.
4269          */
4270         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4271         dout("  max dev id has been reset\n");
4272 }
4273
4274 /*
4275  * Skips over white space at *buf, and updates *buf to point to the
4276  * first found non-space character (if any). Returns the length of
4277  * the token (string of non-white space characters) found.  Note
4278  * that *buf must be terminated with '\0'.
4279  */
4280 static inline size_t next_token(const char **buf)
4281 {
4282         /*
4283         * These are the characters that produce nonzero for
4284         * isspace() in the "C" and "POSIX" locales.
4285         */
4286         const char *spaces = " \f\n\r\t\v";
4287
4288         *buf += strspn(*buf, spaces);   /* Find start of token */
4289
4290         return strcspn(*buf, spaces);   /* Return token length */
4291 }
4292
4293 /*
4294  * Finds the next token in *buf, and if the provided token buffer is
4295  * big enough, copies the found token into it.  The result, if
4296  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4297  * must be terminated with '\0' on entry.
4298  *
4299  * Returns the length of the token found (not including the '\0').
4300  * Return value will be 0 if no token is found, and it will be >=
4301  * token_size if the token would not fit.
4302  *
4303  * The *buf pointer will be updated to point beyond the end of the
4304  * found token.  Note that this occurs even if the token buffer is
4305  * too small to hold it.
4306  */
4307 static inline size_t copy_token(const char **buf,
4308                                 char *token,
4309                                 size_t token_size)
4310 {
4311         size_t len;
4312
4313         len = next_token(buf);
4314         if (len < token_size) {
4315                 memcpy(token, *buf, len);
4316                 *(token + len) = '\0';
4317         }
4318         *buf += len;
4319
4320         return len;
4321 }
4322
4323 /*
4324  * Finds the next token in *buf, dynamically allocates a buffer big
4325  * enough to hold a copy of it, and copies the token into the new
4326  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4327  * that a duplicate buffer is created even for a zero-length token.
4328  *
4329  * Returns a pointer to the newly-allocated duplicate, or a null
4330  * pointer if memory for the duplicate was not available.  If
4331  * the lenp argument is a non-null pointer, the length of the token
4332  * (not including the '\0') is returned in *lenp.
4333  *
4334  * If successful, the *buf pointer will be updated to point beyond
4335  * the end of the found token.
4336  *
4337  * Note: uses GFP_KERNEL for allocation.
4338  */
4339 static inline char *dup_token(const char **buf, size_t *lenp)
4340 {
4341         char *dup;
4342         size_t len;
4343
4344         len = next_token(buf);
4345         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4346         if (!dup)
4347                 return NULL;
4348         *(dup + len) = '\0';
4349         *buf += len;
4350
4351         if (lenp)
4352                 *lenp = len;
4353
4354         return dup;
4355 }
4356
4357 /*
4358  * Parse the options provided for an "rbd add" (i.e., rbd image
4359  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4360  * and the data written is passed here via a NUL-terminated buffer.
4361  * Returns 0 if successful or an error code otherwise.
4362  *
4363  * The information extracted from these options is recorded in
4364  * the other parameters which return dynamically-allocated
4365  * structures:
4366  *  ceph_opts
4367  *      The address of a pointer that will refer to a ceph options
4368  *      structure.  Caller must release the returned pointer using
4369  *      ceph_destroy_options() when it is no longer needed.
4370  *  rbd_opts
4371  *      Address of an rbd options pointer.  Fully initialized by
4372  *      this function; caller must release with kfree().
4373  *  spec
4374  *      Address of an rbd image specification pointer.  Fully
4375  *      initialized by this function based on parsed options.
4376  *      Caller must release with rbd_spec_put().
4377  *
4378  * The options passed take this form:
4379  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4380  * where:
4381  *  <mon_addrs>
4382  *      A comma-separated list of one or more monitor addresses.
4383  *      A monitor address is an ip address, optionally followed
4384  *      by a port number (separated by a colon).
4385  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4386  *  <options>
4387  *      A comma-separated list of ceph and/or rbd options.
4388  *  <pool_name>
4389  *      The name of the rados pool containing the rbd image.
4390  *  <image_name>
4391  *      The name of the image in that pool to map.
4392  *  <snap_id>
4393  *      An optional snapshot id.  If provided, the mapping will
4394  *      present data from the image at the time that snapshot was
4395  *      created.  The image head is used if no snapshot id is
4396  *      provided.  Snapshot mappings are always read-only.
4397  */
4398 static int rbd_add_parse_args(const char *buf,
4399                                 struct ceph_options **ceph_opts,
4400                                 struct rbd_options **opts,
4401                                 struct rbd_spec **rbd_spec)
4402 {
4403         size_t len;
4404         char *options;
4405         const char *mon_addrs;
4406         char *snap_name;
4407         size_t mon_addrs_size;
4408         struct rbd_spec *spec = NULL;
4409         struct rbd_options *rbd_opts = NULL;
4410         struct ceph_options *copts;
4411         int ret;
4412
4413         /* The first four tokens are required */
4414
4415         len = next_token(&buf);
4416         if (!len) {
4417                 rbd_warn(NULL, "no monitor address(es) provided");
4418                 return -EINVAL;
4419         }
4420         mon_addrs = buf;
4421         mon_addrs_size = len + 1;
4422         buf += len;
4423
4424         ret = -EINVAL;
4425         options = dup_token(&buf, NULL);
4426         if (!options)
4427                 return -ENOMEM;
4428         if (!*options) {
4429                 rbd_warn(NULL, "no options provided");
4430                 goto out_err;
4431         }
4432
4433         spec = rbd_spec_alloc();
4434         if (!spec)
4435                 goto out_mem;
4436
4437         spec->pool_name = dup_token(&buf, NULL);
4438         if (!spec->pool_name)
4439                 goto out_mem;
4440         if (!*spec->pool_name) {
4441                 rbd_warn(NULL, "no pool name provided");
4442                 goto out_err;
4443         }
4444
4445         spec->image_name = dup_token(&buf, NULL);
4446         if (!spec->image_name)
4447                 goto out_mem;
4448         if (!*spec->image_name) {
4449                 rbd_warn(NULL, "no image name provided");
4450                 goto out_err;
4451         }
4452
4453         /*
4454          * Snapshot name is optional; default is to use "-"
4455          * (indicating the head/no snapshot).
4456          */
4457         len = next_token(&buf);
4458         if (!len) {
4459                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4460                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4461         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4462                 ret = -ENAMETOOLONG;
4463                 goto out_err;
4464         }
4465         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4466         if (!snap_name)
4467                 goto out_mem;
4468         *(snap_name + len) = '\0';
4469         spec->snap_name = snap_name;
4470
4471         /* Initialize all rbd options to the defaults */
4472
4473         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4474         if (!rbd_opts)
4475                 goto out_mem;
4476
4477         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4478
4479         copts = ceph_parse_options(options, mon_addrs,
4480                                         mon_addrs + mon_addrs_size - 1,
4481                                         parse_rbd_opts_token, rbd_opts);
4482         if (IS_ERR(copts)) {
4483                 ret = PTR_ERR(copts);
4484                 goto out_err;
4485         }
4486         kfree(options);
4487
4488         *ceph_opts = copts;
4489         *opts = rbd_opts;
4490         *rbd_spec = spec;
4491
4492         return 0;
4493 out_mem:
4494         ret = -ENOMEM;
4495 out_err:
4496         kfree(rbd_opts);
4497         rbd_spec_put(spec);
4498         kfree(options);
4499
4500         return ret;
4501 }
4502
4503 /*
4504  * An rbd format 2 image has a unique identifier, distinct from the
4505  * name given to it by the user.  Internally, that identifier is
4506  * what's used to specify the names of objects related to the image.
4507  *
4508  * A special "rbd id" object is used to map an rbd image name to its
4509  * id.  If that object doesn't exist, then there is no v2 rbd image
4510  * with the supplied name.
4511  *
4512  * This function will record the given rbd_dev's image_id field if
4513  * it can be determined, and in that case will return 0.  If any
4514  * errors occur a negative errno will be returned and the rbd_dev's
4515  * image_id field will be unchanged (and should be NULL).
4516  */
4517 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4518 {
4519         int ret;
4520         size_t size;
4521         char *object_name;
4522         void *response;
4523         char *image_id;
4524
4525         /*
4526          * When probing a parent image, the image id is already
4527          * known (and the image name likely is not).  There's no
4528          * need to fetch the image id again in this case.  We
4529          * do still need to set the image format though.
4530          */
4531         if (rbd_dev->spec->image_id) {
4532                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4533
4534                 return 0;
4535         }
4536
4537         /*
4538          * First, see if the format 2 image id file exists, and if
4539          * so, get the image's persistent id from it.
4540          */
4541         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4542         object_name = kmalloc(size, GFP_NOIO);
4543         if (!object_name)
4544                 return -ENOMEM;
4545         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4546         dout("rbd id object name is %s\n", object_name);
4547
4548         /* Response will be an encoded string, which includes a length */
4549
4550         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4551         response = kzalloc(size, GFP_NOIO);
4552         if (!response) {
4553                 ret = -ENOMEM;
4554                 goto out;
4555         }
4556
4557         /* If it doesn't exist we'll assume it's a format 1 image */
4558
4559         ret = rbd_obj_method_sync(rbd_dev, object_name,
4560                                 "rbd", "get_id", NULL, 0,
4561                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4562         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4563         if (ret == -ENOENT) {
4564                 image_id = kstrdup("", GFP_KERNEL);
4565                 ret = image_id ? 0 : -ENOMEM;
4566                 if (!ret)
4567                         rbd_dev->image_format = 1;
4568         } else if (ret > sizeof (__le32)) {
4569                 void *p = response;
4570
4571                 image_id = ceph_extract_encoded_string(&p, p + ret,
4572                                                 NULL, GFP_NOIO);
4573                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4574                 if (!ret)
4575                         rbd_dev->image_format = 2;
4576         } else {
4577                 ret = -EINVAL;
4578         }
4579
4580         if (!ret) {
4581                 rbd_dev->spec->image_id = image_id;
4582                 dout("image_id is %s\n", image_id);
4583         }
4584 out:
4585         kfree(response);
4586         kfree(object_name);
4587
4588         return ret;
4589 }
4590
4591 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4592 {
4593         int ret;
4594
4595         /* Populate rbd image metadata */
4596
4597         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4598         if (ret < 0)
4599                 goto out_err;
4600
4601         /* Version 1 images have no parent (no layering) */
4602
4603         rbd_dev->parent_spec = NULL;
4604         rbd_dev->parent_overlap = 0;
4605
4606         dout("discovered version 1 image, header name is %s\n",
4607                 rbd_dev->header_name);
4608
4609         return 0;
4610
4611 out_err:
4612         kfree(rbd_dev->header_name);
4613         rbd_dev->header_name = NULL;
4614         kfree(rbd_dev->spec->image_id);
4615         rbd_dev->spec->image_id = NULL;
4616
4617         return ret;
4618 }
4619
4620 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4621 {
4622         int ret;
4623         u64 ver = 0;
4624
4625         ret = rbd_dev_v2_image_size(rbd_dev);
4626         if (ret)
4627                 goto out_err;
4628
4629         /* Get the object prefix (a.k.a. block_name) for the image */
4630
4631         ret = rbd_dev_v2_object_prefix(rbd_dev);
4632         if (ret)
4633                 goto out_err;
4634
4635         /* Get the and check features for the image */
4636
4637         ret = rbd_dev_v2_features(rbd_dev);
4638         if (ret)
4639                 goto out_err;
4640
4641         /* If the image supports layering, get the parent info */
4642
4643         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4644                 ret = rbd_dev_v2_parent_info(rbd_dev);
4645                 if (ret)
4646                         goto out_err;
4647                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4648                                         "layered rbd images is EXPERIMENTAL!");
4649         }
4650
4651         /* If the image supports fancy striping, get its parameters */
4652
4653         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4654                 ret = rbd_dev_v2_striping_info(rbd_dev);
4655                 if (ret < 0)
4656                         goto out_err;
4657         }
4658
4659         /* crypto and compression type aren't (yet) supported for v2 images */
4660
4661         rbd_dev->header.crypt_type = 0;
4662         rbd_dev->header.comp_type = 0;
4663
4664         /* Get the snapshot context, plus the header version */
4665
4666         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4667         if (ret)
4668                 goto out_err;
4669         rbd_dev->header.obj_version = ver;
4670
4671         dout("discovered version 2 image, header name is %s\n",
4672                 rbd_dev->header_name);
4673
4674         return 0;
4675 out_err:
4676         rbd_dev->parent_overlap = 0;
4677         rbd_spec_put(rbd_dev->parent_spec);
4678         rbd_dev->parent_spec = NULL;
4679         kfree(rbd_dev->header_name);
4680         rbd_dev->header_name = NULL;
4681         kfree(rbd_dev->header.object_prefix);
4682         rbd_dev->header.object_prefix = NULL;
4683
4684         return ret;
4685 }
4686
4687 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4688 {
4689         struct rbd_device *parent = NULL;
4690         struct rbd_spec *parent_spec;
4691         struct rbd_client *rbdc;
4692         int ret;
4693
4694         if (!rbd_dev->parent_spec)
4695                 return 0;
4696         /*
4697          * We need to pass a reference to the client and the parent
4698          * spec when creating the parent rbd_dev.  Images related by
4699          * parent/child relationships always share both.
4700          */
4701         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4702         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4703
4704         ret = -ENOMEM;
4705         parent = rbd_dev_create(rbdc, parent_spec);
4706         if (!parent)
4707                 goto out_err;
4708
4709         ret = rbd_dev_image_probe(parent);
4710         if (ret < 0)
4711                 goto out_err;
4712         rbd_dev->parent = parent;
4713
4714         return 0;
4715 out_err:
4716         if (parent) {
4717                 rbd_spec_put(rbd_dev->parent_spec);
4718                 kfree(rbd_dev->header_name);
4719                 rbd_dev_destroy(parent);
4720         } else {
4721                 rbd_put_client(rbdc);
4722                 rbd_spec_put(parent_spec);
4723         }
4724
4725         return ret;
4726 }
4727
4728 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4729 {
4730         int ret;
4731
4732         ret = rbd_dev_mapping_set(rbd_dev);
4733         if (ret)
4734                 return ret;
4735
4736         /* generate unique id: find highest unique id, add one */
4737         rbd_dev_id_get(rbd_dev);
4738
4739         /* Fill in the device name, now that we have its id. */
4740         BUILD_BUG_ON(DEV_NAME_LEN
4741                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4742         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4743
4744         /* Get our block major device number. */
4745
4746         ret = register_blkdev(0, rbd_dev->name);
4747         if (ret < 0)
4748                 goto err_out_id;
4749         rbd_dev->major = ret;
4750
4751         /* Set up the blkdev mapping. */
4752
4753         ret = rbd_init_disk(rbd_dev);
4754         if (ret)
4755                 goto err_out_blkdev;
4756
4757         ret = rbd_bus_add_dev(rbd_dev);
4758         if (ret)
4759                 goto err_out_disk;
4760
4761         /* Everything's ready.  Announce the disk to the world. */
4762
4763         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4764         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4765         add_disk(rbd_dev->disk);
4766
4767         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4768                 (unsigned long long) rbd_dev->mapping.size);
4769
4770         return ret;
4771
4772 err_out_disk:
4773         rbd_free_disk(rbd_dev);
4774 err_out_blkdev:
4775         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4776 err_out_id:
4777         rbd_dev_id_put(rbd_dev);
4778         rbd_dev_mapping_clear(rbd_dev);
4779
4780         return ret;
4781 }
4782
4783 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4784 {
4785         struct rbd_spec *spec = rbd_dev->spec;
4786         size_t size;
4787
4788         /* Record the header object name for this rbd image. */
4789
4790         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4791
4792         if (rbd_dev->image_format == 1)
4793                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4794         else
4795                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4796
4797         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4798         if (!rbd_dev->header_name)
4799                 return -ENOMEM;
4800
4801         if (rbd_dev->image_format == 1)
4802                 sprintf(rbd_dev->header_name, "%s%s",
4803                         spec->image_name, RBD_SUFFIX);
4804         else
4805                 sprintf(rbd_dev->header_name, "%s%s",
4806                         RBD_HEADER_PREFIX, spec->image_id);
4807         return 0;
4808 }
4809
4810 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4811 {
4812         rbd_header_free(&rbd_dev->header);
4813         rbd_assert(rbd_dev->rbd_client != NULL);
4814         rbd_spec_put(rbd_dev->parent_spec);
4815         kfree(rbd_dev->header_name);
4816         rbd_dev_destroy(rbd_dev);
4817 }
4818
4819 /*
4820  * Probe for the existence of the header object for the given rbd
4821  * device.  For format 2 images this includes determining the image
4822  * id.
4823  */
4824 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4825 {
4826         int ret;
4827         int tmp;
4828
4829         /*
4830          * Get the id from the image id object.  If it's not a
4831          * format 2 image, we'll get ENOENT back, and we'll assume
4832          * it's a format 1 image.
4833          */
4834         ret = rbd_dev_image_id(rbd_dev);
4835         if (ret)
4836                 return ret;
4837         rbd_assert(rbd_dev->spec->image_id);
4838         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4839
4840         ret = rbd_dev_header_name(rbd_dev);
4841         if (ret)
4842                 goto err_out_format;
4843
4844         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4845         if (ret)
4846                 goto out_header_name;
4847
4848         if (rbd_dev->image_format == 1)
4849                 ret = rbd_dev_v1_probe(rbd_dev);
4850         else
4851                 ret = rbd_dev_v2_probe(rbd_dev);
4852         if (ret)
4853                 goto err_out_watch;
4854
4855         ret = rbd_dev_snaps_update(rbd_dev);
4856         if (ret)
4857                 goto err_out_watch;
4858
4859         ret = rbd_dev_spec_update(rbd_dev);
4860         if (ret)
4861                 goto err_out_snaps;
4862
4863         ret = rbd_dev_probe_parent(rbd_dev);
4864         if (ret)
4865                 goto err_out_snaps;
4866
4867         ret = rbd_dev_device_setup(rbd_dev);
4868         if (ret)
4869                 goto err_out_parent;
4870
4871         return ret;
4872 err_out_parent:
4873         rbd_dev_remove_parent(rbd_dev);
4874         rbd_header_free(&rbd_dev->header);
4875 err_out_snaps:
4876         rbd_remove_all_snaps(rbd_dev);
4877 err_out_watch:
4878         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4879         if (tmp)
4880                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4881 out_header_name:
4882         kfree(rbd_dev->header_name);
4883         rbd_dev->header_name = NULL;
4884 err_out_format:
4885         rbd_dev->image_format = 0;
4886         kfree(rbd_dev->spec->image_id);
4887         rbd_dev->spec->image_id = NULL;
4888
4889         dout("probe failed, returning %d\n", ret);
4890
4891         return ret;
4892 }
4893
4894 static ssize_t rbd_add(struct bus_type *bus,
4895                        const char *buf,
4896                        size_t count)
4897 {
4898         struct rbd_device *rbd_dev = NULL;
4899         struct ceph_options *ceph_opts = NULL;
4900         struct rbd_options *rbd_opts = NULL;
4901         struct rbd_spec *spec = NULL;
4902         struct rbd_client *rbdc;
4903         struct ceph_osd_client *osdc;
4904         int rc = -ENOMEM;
4905
4906         if (!try_module_get(THIS_MODULE))
4907                 return -ENODEV;
4908
4909         /* parse add command */
4910         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4911         if (rc < 0)
4912                 goto err_out_module;
4913
4914         rbdc = rbd_get_client(ceph_opts);
4915         if (IS_ERR(rbdc)) {
4916                 rc = PTR_ERR(rbdc);
4917                 goto err_out_args;
4918         }
4919         ceph_opts = NULL;       /* rbd_dev client now owns this */
4920
4921         /* pick the pool */
4922         osdc = &rbdc->client->osdc;
4923         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4924         if (rc < 0)
4925                 goto err_out_client;
4926         spec->pool_id = (u64)rc;
4927
4928         /* The ceph file layout needs to fit pool id in 32 bits */
4929
4930         if (spec->pool_id > (u64)U32_MAX) {
4931                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4932                                 (unsigned long long)spec->pool_id, U32_MAX);
4933                 rc = -EIO;
4934                 goto err_out_client;
4935         }
4936
4937         rbd_dev = rbd_dev_create(rbdc, spec);
4938         if (!rbd_dev)
4939                 goto err_out_client;
4940         rbdc = NULL;            /* rbd_dev now owns this */
4941         spec = NULL;            /* rbd_dev now owns this */
4942
4943         rbd_dev->mapping.read_only = rbd_opts->read_only;
4944         kfree(rbd_opts);
4945         rbd_opts = NULL;        /* done with this */
4946
4947         rc = rbd_dev_image_probe(rbd_dev);
4948         if (rc < 0)
4949                 goto err_out_rbd_dev;
4950
4951         return count;
4952 err_out_rbd_dev:
4953         kfree(rbd_dev->header_name);
4954         rbd_dev_destroy(rbd_dev);
4955 err_out_client:
4956         rbd_put_client(rbdc);
4957 err_out_args:
4958         if (ceph_opts)
4959                 ceph_destroy_options(ceph_opts);
4960         kfree(rbd_opts);
4961         rbd_spec_put(spec);
4962 err_out_module:
4963         module_put(THIS_MODULE);
4964
4965         dout("Error adding device %s\n", buf);
4966
4967         return (ssize_t)rc;
4968 }
4969
4970 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4971 {
4972         struct list_head *tmp;
4973         struct rbd_device *rbd_dev;
4974
4975         spin_lock(&rbd_dev_list_lock);
4976         list_for_each(tmp, &rbd_dev_list) {
4977                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4978                 if (rbd_dev->dev_id == dev_id) {
4979                         spin_unlock(&rbd_dev_list_lock);
4980                         return rbd_dev;
4981                 }
4982         }
4983         spin_unlock(&rbd_dev_list_lock);
4984         return NULL;
4985 }
4986
4987 static void rbd_dev_device_release(struct device *dev)
4988 {
4989         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4990
4991         rbd_free_disk(rbd_dev);
4992         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4993         rbd_dev_clear_mapping(rbd_dev);
4994         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4995         rbd_dev->major = 0;
4996         rbd_dev_id_put(rbd_dev);
4997         rbd_dev_mapping_clear(rbd_dev);
4998
4999         rbd_dev_image_release(rbd_dev);
5000 }
5001
5002 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5003 {
5004         while (rbd_dev->parent) {
5005                 struct rbd_device *first = rbd_dev;
5006                 struct rbd_device *second = first->parent;
5007                 struct rbd_device *third;
5008                 int ret;
5009
5010                 /*
5011                  * Follow to the parent with no grandparent and
5012                  * remove it.
5013                  */
5014                 while (second && (third = second->parent)) {
5015                         first = second;
5016                         second = third;
5017                 }
5018                 rbd_assert(second);
5019                 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
5020                 if (ret)
5021                         rbd_warn(rbd_dev,
5022                                 "failed to cancel watch event (%d)\n", ret);
5023                 rbd_remove_all_snaps(second);
5024                 rbd_bus_del_dev(second);
5025                 first->parent = NULL;
5026                 first->parent_overlap = 0;
5027
5028                 rbd_assert(first->parent_spec);
5029                 rbd_spec_put(first->parent_spec);
5030                 first->parent_spec = NULL;
5031         }
5032 }
5033
5034 static ssize_t rbd_remove(struct bus_type *bus,
5035                           const char *buf,
5036                           size_t count)
5037 {
5038         struct rbd_device *rbd_dev = NULL;
5039         int target_id;
5040         unsigned long ul;
5041         int ret;
5042
5043         ret = strict_strtoul(buf, 10, &ul);
5044         if (ret)
5045                 return ret;
5046
5047         /* convert to int; abort if we lost anything in the conversion */
5048         target_id = (int) ul;
5049         if (target_id != ul)
5050                 return -EINVAL;
5051
5052         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5053
5054         rbd_dev = __rbd_get_dev(target_id);
5055         if (!rbd_dev) {
5056                 ret = -ENOENT;
5057                 goto done;
5058         }
5059
5060         spin_lock_irq(&rbd_dev->lock);
5061         if (rbd_dev->open_count)
5062                 ret = -EBUSY;
5063         else
5064                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5065         spin_unlock_irq(&rbd_dev->lock);
5066         if (ret < 0)
5067                 goto done;
5068
5069         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
5070         if (ret) {
5071                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5072                 clear_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5073                 smp_mb();
5074                 return ret;
5075         }
5076         ret = count;
5077
5078         rbd_dev_remove_parent(rbd_dev);
5079
5080         rbd_remove_all_snaps(rbd_dev);
5081         rbd_bus_del_dev(rbd_dev);
5082         module_put(THIS_MODULE);
5083 done:
5084         mutex_unlock(&ctl_mutex);
5085
5086         return ret;
5087 }
5088
5089 /*
5090  * create control files in sysfs
5091  * /sys/bus/rbd/...
5092  */
5093 static int rbd_sysfs_init(void)
5094 {
5095         int ret;
5096
5097         ret = device_register(&rbd_root_dev);
5098         if (ret < 0)
5099                 return ret;
5100
5101         ret = bus_register(&rbd_bus_type);
5102         if (ret < 0)
5103                 device_unregister(&rbd_root_dev);
5104
5105         return ret;
5106 }
5107
5108 static void rbd_sysfs_cleanup(void)
5109 {
5110         bus_unregister(&rbd_bus_type);
5111         device_unregister(&rbd_root_dev);
5112 }
5113
5114 static int __init rbd_init(void)
5115 {
5116         int rc;
5117
5118         if (!libceph_compatible(NULL)) {
5119                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5120
5121                 return -EINVAL;
5122         }
5123         rc = rbd_sysfs_init();
5124         if (rc)
5125                 return rc;
5126         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5127         return 0;
5128 }
5129
5130 static void __exit rbd_exit(void)
5131 {
5132         rbd_sysfs_cleanup();
5133 }
5134
5135 module_init(rbd_init);
5136 module_exit(rbd_exit);
5137
5138 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5139 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5140 MODULE_DESCRIPTION("rados block device");
5141
5142 /* following authorship retained from original osdblk.c */
5143 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5144
5145 MODULE_LICENSE("GPL");