]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
d74be04ceeffcf319119157995a3fdc0e7c9b71e
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 stripe_unit;
117         u64 stripe_count;
118 };
119
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146         u64             pool_id;
147         const char      *pool_name;
148
149         const char      *image_id;
150         const char      *image_name;
151
152         u64             snap_id;
153         const char      *snap_name;
154
155         struct kref     kref;
156 };
157
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162         struct ceph_client      *client;
163         struct kref             kref;
164         struct list_head        node;
165 };
166
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
171
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
175 enum obj_request_type {
176         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178
179 enum obj_req_flags {
180         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
181         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
182         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
183         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
184 };
185
186 struct rbd_obj_request {
187         const char              *object_name;
188         u64                     offset;         /* object start byte */
189         u64                     length;         /* bytes from offset */
190         unsigned long           flags;
191
192         /*
193          * An object request associated with an image will have its
194          * img_data flag set; a standalone object request will not.
195          *
196          * A standalone object request will have which == BAD_WHICH
197          * and a null obj_request pointer.
198          *
199          * An object request initiated in support of a layered image
200          * object (to check for its existence before a write) will
201          * have which == BAD_WHICH and a non-null obj_request pointer.
202          *
203          * Finally, an object request for rbd image data will have
204          * which != BAD_WHICH, and will have a non-null img_request
205          * pointer.  The value of which will be in the range
206          * 0..(img_request->obj_request_count-1).
207          */
208         union {
209                 struct rbd_obj_request  *obj_request;   /* STAT op */
210                 struct {
211                         struct rbd_img_request  *img_request;
212                         u64                     img_offset;
213                         /* links for img_request->obj_requests list */
214                         struct list_head        links;
215                 };
216         };
217         u32                     which;          /* posn image request list */
218
219         enum obj_request_type   type;
220         union {
221                 struct bio      *bio_list;
222                 struct {
223                         struct page     **pages;
224                         u32             page_count;
225                 };
226         };
227         struct page             **copyup_pages;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         spinlock_t              completion_lock;/* protects next_completion */
261         u32                     next_completion;
262         rbd_img_callback_t      callback;
263         u64                     xferred;/* aggregate bytes transferred */
264         int                     result; /* first nonzero obj_request result */
265
266         u32                     obj_request_count;
267         struct list_head        obj_requests;   /* rbd_obj_request structs */
268
269         struct kref             kref;
270 };
271
272 #define for_each_obj_request(ireq, oreq) \
273         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278
279 struct rbd_mapping {
280         u64                     size;
281         u64                     features;
282         bool                    read_only;
283 };
284
285 /*
286  * a single device
287  */
288 struct rbd_device {
289         int                     dev_id;         /* blkdev unique id */
290
291         int                     major;          /* blkdev assigned major */
292         struct gendisk          *disk;          /* blkdev's gendisk and rq */
293
294         u32                     image_format;   /* Either 1 or 2 */
295         struct rbd_client       *rbd_client;
296
297         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
299         spinlock_t              lock;           /* queue, flags, open_count */
300
301         struct rbd_image_header header;
302         unsigned long           flags;          /* possibly lock protected */
303         struct rbd_spec         *spec;
304
305         char                    *header_name;
306
307         struct ceph_file_layout layout;
308
309         struct ceph_osd_event   *watch_event;
310         struct rbd_obj_request  *watch_request;
311
312         struct rbd_spec         *parent_spec;
313         u64                     parent_overlap;
314         struct rbd_device       *parent;
315
316         /* protects updating the header */
317         struct rw_semaphore     header_rwsem;
318
319         struct rbd_mapping      mapping;
320
321         struct list_head        node;
322
323         /* sysfs related */
324         struct device           dev;
325         unsigned long           open_count;     /* protected by lock */
326 };
327
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
337         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
338 };
339
340 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
341
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
345 static LIST_HEAD(rbd_client_list);              /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347
348 static struct kmem_cache        *rbd_img_request_cache;
349
350 static int rbd_img_request_submit(struct rbd_img_request *img_request);
351
352 static void rbd_dev_device_release(struct device *dev);
353
354 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
355                        size_t count);
356 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
357                           size_t count);
358 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
359
360 static struct bus_attribute rbd_bus_attrs[] = {
361         __ATTR(add, S_IWUSR, NULL, rbd_add),
362         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
363         __ATTR_NULL
364 };
365
366 static struct bus_type rbd_bus_type = {
367         .name           = "rbd",
368         .bus_attrs      = rbd_bus_attrs,
369 };
370
371 static void rbd_root_dev_release(struct device *dev)
372 {
373 }
374
375 static struct device rbd_root_dev = {
376         .init_name =    "rbd",
377         .release =      rbd_root_dev_release,
378 };
379
380 static __printf(2, 3)
381 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
382 {
383         struct va_format vaf;
384         va_list args;
385
386         va_start(args, fmt);
387         vaf.fmt = fmt;
388         vaf.va = &args;
389
390         if (!rbd_dev)
391                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
392         else if (rbd_dev->disk)
393                 printk(KERN_WARNING "%s: %s: %pV\n",
394                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
395         else if (rbd_dev->spec && rbd_dev->spec->image_name)
396                 printk(KERN_WARNING "%s: image %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_id)
399                 printk(KERN_WARNING "%s: id %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
401         else    /* punt */
402                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
403                         RBD_DRV_NAME, rbd_dev, &vaf);
404         va_end(args);
405 }
406
407 #ifdef RBD_DEBUG
408 #define rbd_assert(expr)                                                \
409                 if (unlikely(!(expr))) {                                \
410                         printk(KERN_ERR "\nAssertion failure in %s() "  \
411                                                 "at line %d:\n\n"       \
412                                         "\trbd_assert(%s);\n\n",        \
413                                         __func__, __LINE__, #expr);     \
414                         BUG();                                          \
415                 }
416 #else /* !RBD_DEBUG */
417 #  define rbd_assert(expr)      ((void) 0)
418 #endif /* !RBD_DEBUG */
419
420 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
421 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
422 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
423
424 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
425 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
426 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
427                                         u64 snap_id);
428 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
429                                 u8 *order, u64 *snap_size);
430 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
431                 u64 *snap_features);
432 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
433
434 static int rbd_open(struct block_device *bdev, fmode_t mode)
435 {
436         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
437         bool removing = false;
438
439         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
440                 return -EROFS;
441
442         spin_lock_irq(&rbd_dev->lock);
443         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
444                 removing = true;
445         else
446                 rbd_dev->open_count++;
447         spin_unlock_irq(&rbd_dev->lock);
448         if (removing)
449                 return -ENOENT;
450
451         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
452         (void) get_device(&rbd_dev->dev);
453         set_device_ro(bdev, rbd_dev->mapping.read_only);
454         mutex_unlock(&ctl_mutex);
455
456         return 0;
457 }
458
459 static int rbd_release(struct gendisk *disk, fmode_t mode)
460 {
461         struct rbd_device *rbd_dev = disk->private_data;
462         unsigned long open_count_before;
463
464         spin_lock_irq(&rbd_dev->lock);
465         open_count_before = rbd_dev->open_count--;
466         spin_unlock_irq(&rbd_dev->lock);
467         rbd_assert(open_count_before > 0);
468
469         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
470         put_device(&rbd_dev->dev);
471         mutex_unlock(&ctl_mutex);
472
473         return 0;
474 }
475
476 static const struct block_device_operations rbd_bd_ops = {
477         .owner                  = THIS_MODULE,
478         .open                   = rbd_open,
479         .release                = rbd_release,
480 };
481
482 /*
483  * Initialize an rbd client instance.
484  * We own *ceph_opts.
485  */
486 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
487 {
488         struct rbd_client *rbdc;
489         int ret = -ENOMEM;
490
491         dout("%s:\n", __func__);
492         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
493         if (!rbdc)
494                 goto out_opt;
495
496         kref_init(&rbdc->kref);
497         INIT_LIST_HEAD(&rbdc->node);
498
499         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
500
501         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
502         if (IS_ERR(rbdc->client))
503                 goto out_mutex;
504         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
505
506         ret = ceph_open_session(rbdc->client);
507         if (ret < 0)
508                 goto out_err;
509
510         spin_lock(&rbd_client_list_lock);
511         list_add_tail(&rbdc->node, &rbd_client_list);
512         spin_unlock(&rbd_client_list_lock);
513
514         mutex_unlock(&ctl_mutex);
515         dout("%s: rbdc %p\n", __func__, rbdc);
516
517         return rbdc;
518
519 out_err:
520         ceph_destroy_client(rbdc->client);
521 out_mutex:
522         mutex_unlock(&ctl_mutex);
523         kfree(rbdc);
524 out_opt:
525         if (ceph_opts)
526                 ceph_destroy_options(ceph_opts);
527         dout("%s: error %d\n", __func__, ret);
528
529         return ERR_PTR(ret);
530 }
531
532 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
533 {
534         kref_get(&rbdc->kref);
535
536         return rbdc;
537 }
538
539 /*
540  * Find a ceph client with specific addr and configuration.  If
541  * found, bump its reference count.
542  */
543 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
544 {
545         struct rbd_client *client_node;
546         bool found = false;
547
548         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
549                 return NULL;
550
551         spin_lock(&rbd_client_list_lock);
552         list_for_each_entry(client_node, &rbd_client_list, node) {
553                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
554                         __rbd_get_client(client_node);
555
556                         found = true;
557                         break;
558                 }
559         }
560         spin_unlock(&rbd_client_list_lock);
561
562         return found ? client_node : NULL;
563 }
564
565 /*
566  * mount options
567  */
568 enum {
569         Opt_last_int,
570         /* int args above */
571         Opt_last_string,
572         /* string args above */
573         Opt_read_only,
574         Opt_read_write,
575         /* Boolean args above */
576         Opt_last_bool,
577 };
578
579 static match_table_t rbd_opts_tokens = {
580         /* int args above */
581         /* string args above */
582         {Opt_read_only, "read_only"},
583         {Opt_read_only, "ro"},          /* Alternate spelling */
584         {Opt_read_write, "read_write"},
585         {Opt_read_write, "rw"},         /* Alternate spelling */
586         /* Boolean args above */
587         {-1, NULL}
588 };
589
590 struct rbd_options {
591         bool    read_only;
592 };
593
594 #define RBD_READ_ONLY_DEFAULT   false
595
596 static int parse_rbd_opts_token(char *c, void *private)
597 {
598         struct rbd_options *rbd_opts = private;
599         substring_t argstr[MAX_OPT_ARGS];
600         int token, intval, ret;
601
602         token = match_token(c, rbd_opts_tokens, argstr);
603         if (token < 0)
604                 return -EINVAL;
605
606         if (token < Opt_last_int) {
607                 ret = match_int(&argstr[0], &intval);
608                 if (ret < 0) {
609                         pr_err("bad mount option arg (not int) "
610                                "at '%s'\n", c);
611                         return ret;
612                 }
613                 dout("got int token %d val %d\n", token, intval);
614         } else if (token > Opt_last_int && token < Opt_last_string) {
615                 dout("got string token %d val %s\n", token,
616                      argstr[0].from);
617         } else if (token > Opt_last_string && token < Opt_last_bool) {
618                 dout("got Boolean token %d\n", token);
619         } else {
620                 dout("got token %d\n", token);
621         }
622
623         switch (token) {
624         case Opt_read_only:
625                 rbd_opts->read_only = true;
626                 break;
627         case Opt_read_write:
628                 rbd_opts->read_only = false;
629                 break;
630         default:
631                 rbd_assert(false);
632                 break;
633         }
634         return 0;
635 }
636
637 /*
638  * Get a ceph client with specific addr and configuration, if one does
639  * not exist create it.
640  */
641 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
642 {
643         struct rbd_client *rbdc;
644
645         rbdc = rbd_client_find(ceph_opts);
646         if (rbdc)       /* using an existing client */
647                 ceph_destroy_options(ceph_opts);
648         else
649                 rbdc = rbd_client_create(ceph_opts);
650
651         return rbdc;
652 }
653
654 /*
655  * Destroy ceph client
656  *
657  * Caller must hold rbd_client_list_lock.
658  */
659 static void rbd_client_release(struct kref *kref)
660 {
661         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
662
663         dout("%s: rbdc %p\n", __func__, rbdc);
664         spin_lock(&rbd_client_list_lock);
665         list_del(&rbdc->node);
666         spin_unlock(&rbd_client_list_lock);
667
668         ceph_destroy_client(rbdc->client);
669         kfree(rbdc);
670 }
671
672 /*
673  * Drop reference to ceph client node. If it's not referenced anymore, release
674  * it.
675  */
676 static void rbd_put_client(struct rbd_client *rbdc)
677 {
678         if (rbdc)
679                 kref_put(&rbdc->kref, rbd_client_release);
680 }
681
682 static bool rbd_image_format_valid(u32 image_format)
683 {
684         return image_format == 1 || image_format == 2;
685 }
686
687 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
688 {
689         size_t size;
690         u32 snap_count;
691
692         /* The header has to start with the magic rbd header text */
693         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
694                 return false;
695
696         /* The bio layer requires at least sector-sized I/O */
697
698         if (ondisk->options.order < SECTOR_SHIFT)
699                 return false;
700
701         /* If we use u64 in a few spots we may be able to loosen this */
702
703         if (ondisk->options.order > 8 * sizeof (int) - 1)
704                 return false;
705
706         /*
707          * The size of a snapshot header has to fit in a size_t, and
708          * that limits the number of snapshots.
709          */
710         snap_count = le32_to_cpu(ondisk->snap_count);
711         size = SIZE_MAX - sizeof (struct ceph_snap_context);
712         if (snap_count > size / sizeof (__le64))
713                 return false;
714
715         /*
716          * Not only that, but the size of the entire the snapshot
717          * header must also be representable in a size_t.
718          */
719         size -= snap_count * sizeof (__le64);
720         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
721                 return false;
722
723         return true;
724 }
725
726 /*
727  * Create a new header structure, translate header format from the on-disk
728  * header.
729  */
730 static int rbd_header_from_disk(struct rbd_image_header *header,
731                                  struct rbd_image_header_ondisk *ondisk)
732 {
733         u32 snap_count;
734         size_t len;
735         size_t size;
736         u32 i;
737
738         memset(header, 0, sizeof (*header));
739
740         snap_count = le32_to_cpu(ondisk->snap_count);
741
742         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
743         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
744         if (!header->object_prefix)
745                 return -ENOMEM;
746         memcpy(header->object_prefix, ondisk->object_prefix, len);
747         header->object_prefix[len] = '\0';
748
749         if (snap_count) {
750                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
751
752                 /* Save a copy of the snapshot names */
753
754                 if (snap_names_len > (u64) SIZE_MAX)
755                         return -EIO;
756                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
757                 if (!header->snap_names)
758                         goto out_err;
759                 /*
760                  * Note that rbd_dev_v1_header_read() guarantees
761                  * the ondisk buffer we're working with has
762                  * snap_names_len bytes beyond the end of the
763                  * snapshot id array, this memcpy() is safe.
764                  */
765                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
766                         snap_names_len);
767
768                 /* Record each snapshot's size */
769
770                 size = snap_count * sizeof (*header->snap_sizes);
771                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
772                 if (!header->snap_sizes)
773                         goto out_err;
774                 for (i = 0; i < snap_count; i++)
775                         header->snap_sizes[i] =
776                                 le64_to_cpu(ondisk->snaps[i].image_size);
777         } else {
778                 header->snap_names = NULL;
779                 header->snap_sizes = NULL;
780         }
781
782         header->features = 0;   /* No features support in v1 images */
783         header->obj_order = ondisk->options.order;
784         header->crypt_type = ondisk->options.crypt_type;
785         header->comp_type = ondisk->options.comp_type;
786
787         /* Allocate and fill in the snapshot context */
788
789         header->image_size = le64_to_cpu(ondisk->image_size);
790
791         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
792         if (!header->snapc)
793                 goto out_err;
794         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
795         for (i = 0; i < snap_count; i++)
796                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
797
798         return 0;
799
800 out_err:
801         kfree(header->snap_sizes);
802         header->snap_sizes = NULL;
803         kfree(header->snap_names);
804         header->snap_names = NULL;
805         kfree(header->object_prefix);
806         header->object_prefix = NULL;
807
808         return -ENOMEM;
809 }
810
811 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
812 {
813         const char *snap_name;
814
815         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
816
817         /* Skip over names until we find the one we are looking for */
818
819         snap_name = rbd_dev->header.snap_names;
820         while (which--)
821                 snap_name += strlen(snap_name) + 1;
822
823         return kstrdup(snap_name, GFP_KERNEL);
824 }
825
826 /*
827  * Snapshot id comparison function for use with qsort()/bsearch().
828  * Note that result is for snapshots in *descending* order.
829  */
830 static int snapid_compare_reverse(const void *s1, const void *s2)
831 {
832         u64 snap_id1 = *(u64 *)s1;
833         u64 snap_id2 = *(u64 *)s2;
834
835         if (snap_id1 < snap_id2)
836                 return 1;
837         return snap_id1 == snap_id2 ? 0 : -1;
838 }
839
840 /*
841  * Search a snapshot context to see if the given snapshot id is
842  * present.
843  *
844  * Returns the position of the snapshot id in the array if it's found,
845  * or BAD_SNAP_INDEX otherwise.
846  *
847  * Note: The snapshot array is in kept sorted (by the osd) in
848  * reverse order, highest snapshot id first.
849  */
850 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
851 {
852         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
853         u64 *found;
854
855         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
856                                 sizeof (snap_id), snapid_compare_reverse);
857
858         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
859 }
860
861 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
862                                         u64 snap_id)
863 {
864         u32 which;
865
866         which = rbd_dev_snap_index(rbd_dev, snap_id);
867         if (which == BAD_SNAP_INDEX)
868                 return NULL;
869
870         return _rbd_dev_v1_snap_name(rbd_dev, which);
871 }
872
873 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
874 {
875         if (snap_id == CEPH_NOSNAP)
876                 return RBD_SNAP_HEAD_NAME;
877
878         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
879         if (rbd_dev->image_format == 1)
880                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
881
882         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
883 }
884
885 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
886                                 u64 *snap_size)
887 {
888         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
889         if (snap_id == CEPH_NOSNAP) {
890                 *snap_size = rbd_dev->header.image_size;
891         } else if (rbd_dev->image_format == 1) {
892                 u32 which;
893
894                 which = rbd_dev_snap_index(rbd_dev, snap_id);
895                 if (which == BAD_SNAP_INDEX)
896                         return -ENOENT;
897
898                 *snap_size = rbd_dev->header.snap_sizes[which];
899         } else {
900                 u64 size = 0;
901                 int ret;
902
903                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
904                 if (ret)
905                         return ret;
906
907                 *snap_size = size;
908         }
909         return 0;
910 }
911
912 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
913                         u64 *snap_features)
914 {
915         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
916         if (snap_id == CEPH_NOSNAP) {
917                 *snap_features = rbd_dev->header.features;
918         } else if (rbd_dev->image_format == 1) {
919                 *snap_features = 0;     /* No features for format 1 */
920         } else {
921                 u64 features = 0;
922                 int ret;
923
924                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
925                 if (ret)
926                         return ret;
927
928                 *snap_features = features;
929         }
930         return 0;
931 }
932
933 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
934 {
935         const char *snap_name = rbd_dev->spec->snap_name;
936         u64 snap_id;
937         u64 size = 0;
938         u64 features = 0;
939         int ret;
940
941         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
942                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
943                 if (snap_id == CEPH_NOSNAP)
944                         return -ENOENT;
945         } else {
946                 snap_id = CEPH_NOSNAP;
947         }
948
949         ret = rbd_snap_size(rbd_dev, snap_id, &size);
950         if (ret)
951                 return ret;
952         ret = rbd_snap_features(rbd_dev, snap_id, &features);
953         if (ret)
954                 return ret;
955
956         rbd_dev->mapping.size = size;
957         rbd_dev->mapping.features = features;
958
959         /* If we are mapping a snapshot it must be marked read-only */
960
961         if (snap_id != CEPH_NOSNAP)
962                 rbd_dev->mapping.read_only = true;
963
964         return 0;
965 }
966
967 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
968 {
969         rbd_dev->mapping.size = 0;
970         rbd_dev->mapping.features = 0;
971         rbd_dev->mapping.read_only = true;
972 }
973
974 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
975 {
976         rbd_dev->mapping.size = 0;
977         rbd_dev->mapping.features = 0;
978         rbd_dev->mapping.read_only = true;
979 }
980
981 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
982 {
983         char *name;
984         u64 segment;
985         int ret;
986
987         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
988         if (!name)
989                 return NULL;
990         segment = offset >> rbd_dev->header.obj_order;
991         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
992                         rbd_dev->header.object_prefix, segment);
993         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
994                 pr_err("error formatting segment name for #%llu (%d)\n",
995                         segment, ret);
996                 kfree(name);
997                 name = NULL;
998         }
999
1000         return name;
1001 }
1002
1003 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1004 {
1005         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1006
1007         return offset & (segment_size - 1);
1008 }
1009
1010 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1011                                 u64 offset, u64 length)
1012 {
1013         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1014
1015         offset &= segment_size - 1;
1016
1017         rbd_assert(length <= U64_MAX - offset);
1018         if (offset + length > segment_size)
1019                 length = segment_size - offset;
1020
1021         return length;
1022 }
1023
1024 /*
1025  * returns the size of an object in the image
1026  */
1027 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1028 {
1029         return 1 << header->obj_order;
1030 }
1031
1032 /*
1033  * bio helpers
1034  */
1035
1036 static void bio_chain_put(struct bio *chain)
1037 {
1038         struct bio *tmp;
1039
1040         while (chain) {
1041                 tmp = chain;
1042                 chain = chain->bi_next;
1043                 bio_put(tmp);
1044         }
1045 }
1046
1047 /*
1048  * zeros a bio chain, starting at specific offset
1049  */
1050 static void zero_bio_chain(struct bio *chain, int start_ofs)
1051 {
1052         struct bio_vec *bv;
1053         unsigned long flags;
1054         void *buf;
1055         int i;
1056         int pos = 0;
1057
1058         while (chain) {
1059                 bio_for_each_segment(bv, chain, i) {
1060                         if (pos + bv->bv_len > start_ofs) {
1061                                 int remainder = max(start_ofs - pos, 0);
1062                                 buf = bvec_kmap_irq(bv, &flags);
1063                                 memset(buf + remainder, 0,
1064                                        bv->bv_len - remainder);
1065                                 bvec_kunmap_irq(buf, &flags);
1066                         }
1067                         pos += bv->bv_len;
1068                 }
1069
1070                 chain = chain->bi_next;
1071         }
1072 }
1073
1074 /*
1075  * similar to zero_bio_chain(), zeros data defined by a page array,
1076  * starting at the given byte offset from the start of the array and
1077  * continuing up to the given end offset.  The pages array is
1078  * assumed to be big enough to hold all bytes up to the end.
1079  */
1080 static void zero_pages(struct page **pages, u64 offset, u64 end)
1081 {
1082         struct page **page = &pages[offset >> PAGE_SHIFT];
1083
1084         rbd_assert(end > offset);
1085         rbd_assert(end - offset <= (u64)SIZE_MAX);
1086         while (offset < end) {
1087                 size_t page_offset;
1088                 size_t length;
1089                 unsigned long flags;
1090                 void *kaddr;
1091
1092                 page_offset = (size_t)(offset & ~PAGE_MASK);
1093                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1094                 local_irq_save(flags);
1095                 kaddr = kmap_atomic(*page);
1096                 memset(kaddr + page_offset, 0, length);
1097                 kunmap_atomic(kaddr);
1098                 local_irq_restore(flags);
1099
1100                 offset += length;
1101                 page++;
1102         }
1103 }
1104
1105 /*
1106  * Clone a portion of a bio, starting at the given byte offset
1107  * and continuing for the number of bytes indicated.
1108  */
1109 static struct bio *bio_clone_range(struct bio *bio_src,
1110                                         unsigned int offset,
1111                                         unsigned int len,
1112                                         gfp_t gfpmask)
1113 {
1114         struct bio_vec *bv;
1115         unsigned int resid;
1116         unsigned short idx;
1117         unsigned int voff;
1118         unsigned short end_idx;
1119         unsigned short vcnt;
1120         struct bio *bio;
1121
1122         /* Handle the easy case for the caller */
1123
1124         if (!offset && len == bio_src->bi_size)
1125                 return bio_clone(bio_src, gfpmask);
1126
1127         if (WARN_ON_ONCE(!len))
1128                 return NULL;
1129         if (WARN_ON_ONCE(len > bio_src->bi_size))
1130                 return NULL;
1131         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1132                 return NULL;
1133
1134         /* Find first affected segment... */
1135
1136         resid = offset;
1137         __bio_for_each_segment(bv, bio_src, idx, 0) {
1138                 if (resid < bv->bv_len)
1139                         break;
1140                 resid -= bv->bv_len;
1141         }
1142         voff = resid;
1143
1144         /* ...and the last affected segment */
1145
1146         resid += len;
1147         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1148                 if (resid <= bv->bv_len)
1149                         break;
1150                 resid -= bv->bv_len;
1151         }
1152         vcnt = end_idx - idx + 1;
1153
1154         /* Build the clone */
1155
1156         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1157         if (!bio)
1158                 return NULL;    /* ENOMEM */
1159
1160         bio->bi_bdev = bio_src->bi_bdev;
1161         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1162         bio->bi_rw = bio_src->bi_rw;
1163         bio->bi_flags |= 1 << BIO_CLONED;
1164
1165         /*
1166          * Copy over our part of the bio_vec, then update the first
1167          * and last (or only) entries.
1168          */
1169         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1170                         vcnt * sizeof (struct bio_vec));
1171         bio->bi_io_vec[0].bv_offset += voff;
1172         if (vcnt > 1) {
1173                 bio->bi_io_vec[0].bv_len -= voff;
1174                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1175         } else {
1176                 bio->bi_io_vec[0].bv_len = len;
1177         }
1178
1179         bio->bi_vcnt = vcnt;
1180         bio->bi_size = len;
1181         bio->bi_idx = 0;
1182
1183         return bio;
1184 }
1185
1186 /*
1187  * Clone a portion of a bio chain, starting at the given byte offset
1188  * into the first bio in the source chain and continuing for the
1189  * number of bytes indicated.  The result is another bio chain of
1190  * exactly the given length, or a null pointer on error.
1191  *
1192  * The bio_src and offset parameters are both in-out.  On entry they
1193  * refer to the first source bio and the offset into that bio where
1194  * the start of data to be cloned is located.
1195  *
1196  * On return, bio_src is updated to refer to the bio in the source
1197  * chain that contains first un-cloned byte, and *offset will
1198  * contain the offset of that byte within that bio.
1199  */
1200 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1201                                         unsigned int *offset,
1202                                         unsigned int len,
1203                                         gfp_t gfpmask)
1204 {
1205         struct bio *bi = *bio_src;
1206         unsigned int off = *offset;
1207         struct bio *chain = NULL;
1208         struct bio **end;
1209
1210         /* Build up a chain of clone bios up to the limit */
1211
1212         if (!bi || off >= bi->bi_size || !len)
1213                 return NULL;            /* Nothing to clone */
1214
1215         end = &chain;
1216         while (len) {
1217                 unsigned int bi_size;
1218                 struct bio *bio;
1219
1220                 if (!bi) {
1221                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1222                         goto out_err;   /* EINVAL; ran out of bio's */
1223                 }
1224                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1225                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1226                 if (!bio)
1227                         goto out_err;   /* ENOMEM */
1228
1229                 *end = bio;
1230                 end = &bio->bi_next;
1231
1232                 off += bi_size;
1233                 if (off == bi->bi_size) {
1234                         bi = bi->bi_next;
1235                         off = 0;
1236                 }
1237                 len -= bi_size;
1238         }
1239         *bio_src = bi;
1240         *offset = off;
1241
1242         return chain;
1243 out_err:
1244         bio_chain_put(chain);
1245
1246         return NULL;
1247 }
1248
1249 /*
1250  * The default/initial value for all object request flags is 0.  For
1251  * each flag, once its value is set to 1 it is never reset to 0
1252  * again.
1253  */
1254 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1255 {
1256         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1257                 struct rbd_device *rbd_dev;
1258
1259                 rbd_dev = obj_request->img_request->rbd_dev;
1260                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1261                         obj_request);
1262         }
1263 }
1264
1265 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1266 {
1267         smp_mb();
1268         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1269 }
1270
1271 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1272 {
1273         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1274                 struct rbd_device *rbd_dev = NULL;
1275
1276                 if (obj_request_img_data_test(obj_request))
1277                         rbd_dev = obj_request->img_request->rbd_dev;
1278                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1279                         obj_request);
1280         }
1281 }
1282
1283 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1284 {
1285         smp_mb();
1286         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1287 }
1288
1289 /*
1290  * This sets the KNOWN flag after (possibly) setting the EXISTS
1291  * flag.  The latter is set based on the "exists" value provided.
1292  *
1293  * Note that for our purposes once an object exists it never goes
1294  * away again.  It's possible that the response from two existence
1295  * checks are separated by the creation of the target object, and
1296  * the first ("doesn't exist") response arrives *after* the second
1297  * ("does exist").  In that case we ignore the second one.
1298  */
1299 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1300                                 bool exists)
1301 {
1302         if (exists)
1303                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1304         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1305         smp_mb();
1306 }
1307
1308 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1309 {
1310         smp_mb();
1311         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1312 }
1313
1314 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1315 {
1316         smp_mb();
1317         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1318 }
1319
1320 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1321 {
1322         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1323                 atomic_read(&obj_request->kref.refcount));
1324         kref_get(&obj_request->kref);
1325 }
1326
1327 static void rbd_obj_request_destroy(struct kref *kref);
1328 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1329 {
1330         rbd_assert(obj_request != NULL);
1331         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332                 atomic_read(&obj_request->kref.refcount));
1333         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1334 }
1335
1336 static void rbd_img_request_get(struct rbd_img_request *img_request)
1337 {
1338         dout("%s: img %p (was %d)\n", __func__, img_request,
1339                 atomic_read(&img_request->kref.refcount));
1340         kref_get(&img_request->kref);
1341 }
1342
1343 static void rbd_img_request_destroy(struct kref *kref);
1344 static void rbd_img_request_put(struct rbd_img_request *img_request)
1345 {
1346         rbd_assert(img_request != NULL);
1347         dout("%s: img %p (was %d)\n", __func__, img_request,
1348                 atomic_read(&img_request->kref.refcount));
1349         kref_put(&img_request->kref, rbd_img_request_destroy);
1350 }
1351
1352 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1353                                         struct rbd_obj_request *obj_request)
1354 {
1355         rbd_assert(obj_request->img_request == NULL);
1356
1357         /* Image request now owns object's original reference */
1358         obj_request->img_request = img_request;
1359         obj_request->which = img_request->obj_request_count;
1360         rbd_assert(!obj_request_img_data_test(obj_request));
1361         obj_request_img_data_set(obj_request);
1362         rbd_assert(obj_request->which != BAD_WHICH);
1363         img_request->obj_request_count++;
1364         list_add_tail(&obj_request->links, &img_request->obj_requests);
1365         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1366                 obj_request->which);
1367 }
1368
1369 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1370                                         struct rbd_obj_request *obj_request)
1371 {
1372         rbd_assert(obj_request->which != BAD_WHICH);
1373
1374         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375                 obj_request->which);
1376         list_del(&obj_request->links);
1377         rbd_assert(img_request->obj_request_count > 0);
1378         img_request->obj_request_count--;
1379         rbd_assert(obj_request->which == img_request->obj_request_count);
1380         obj_request->which = BAD_WHICH;
1381         rbd_assert(obj_request_img_data_test(obj_request));
1382         rbd_assert(obj_request->img_request == img_request);
1383         obj_request->img_request = NULL;
1384         obj_request->callback = NULL;
1385         rbd_obj_request_put(obj_request);
1386 }
1387
1388 static bool obj_request_type_valid(enum obj_request_type type)
1389 {
1390         switch (type) {
1391         case OBJ_REQUEST_NODATA:
1392         case OBJ_REQUEST_BIO:
1393         case OBJ_REQUEST_PAGES:
1394                 return true;
1395         default:
1396                 return false;
1397         }
1398 }
1399
1400 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1401                                 struct rbd_obj_request *obj_request)
1402 {
1403         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1404
1405         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1406 }
1407
1408 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1409 {
1410
1411         dout("%s: img %p\n", __func__, img_request);
1412
1413         /*
1414          * If no error occurred, compute the aggregate transfer
1415          * count for the image request.  We could instead use
1416          * atomic64_cmpxchg() to update it as each object request
1417          * completes; not clear which way is better off hand.
1418          */
1419         if (!img_request->result) {
1420                 struct rbd_obj_request *obj_request;
1421                 u64 xferred = 0;
1422
1423                 for_each_obj_request(img_request, obj_request)
1424                         xferred += obj_request->xferred;
1425                 img_request->xferred = xferred;
1426         }
1427
1428         if (img_request->callback)
1429                 img_request->callback(img_request);
1430         else
1431                 rbd_img_request_put(img_request);
1432 }
1433
1434 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1435
1436 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1437 {
1438         dout("%s: obj %p\n", __func__, obj_request);
1439
1440         return wait_for_completion_interruptible(&obj_request->completion);
1441 }
1442
1443 /*
1444  * The default/initial value for all image request flags is 0.  Each
1445  * is conditionally set to 1 at image request initialization time
1446  * and currently never change thereafter.
1447  */
1448 static void img_request_write_set(struct rbd_img_request *img_request)
1449 {
1450         set_bit(IMG_REQ_WRITE, &img_request->flags);
1451         smp_mb();
1452 }
1453
1454 static bool img_request_write_test(struct rbd_img_request *img_request)
1455 {
1456         smp_mb();
1457         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1458 }
1459
1460 static void img_request_child_set(struct rbd_img_request *img_request)
1461 {
1462         set_bit(IMG_REQ_CHILD, &img_request->flags);
1463         smp_mb();
1464 }
1465
1466 static bool img_request_child_test(struct rbd_img_request *img_request)
1467 {
1468         smp_mb();
1469         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1470 }
1471
1472 static void img_request_layered_set(struct rbd_img_request *img_request)
1473 {
1474         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1475         smp_mb();
1476 }
1477
1478 static bool img_request_layered_test(struct rbd_img_request *img_request)
1479 {
1480         smp_mb();
1481         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1482 }
1483
1484 static void
1485 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1486 {
1487         u64 xferred = obj_request->xferred;
1488         u64 length = obj_request->length;
1489
1490         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1491                 obj_request, obj_request->img_request, obj_request->result,
1492                 xferred, length);
1493         /*
1494          * ENOENT means a hole in the image.  We zero-fill the
1495          * entire length of the request.  A short read also implies
1496          * zero-fill to the end of the request.  Either way we
1497          * update the xferred count to indicate the whole request
1498          * was satisfied.
1499          */
1500         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1501         if (obj_request->result == -ENOENT) {
1502                 if (obj_request->type == OBJ_REQUEST_BIO)
1503                         zero_bio_chain(obj_request->bio_list, 0);
1504                 else
1505                         zero_pages(obj_request->pages, 0, length);
1506                 obj_request->result = 0;
1507                 obj_request->xferred = length;
1508         } else if (xferred < length && !obj_request->result) {
1509                 if (obj_request->type == OBJ_REQUEST_BIO)
1510                         zero_bio_chain(obj_request->bio_list, xferred);
1511                 else
1512                         zero_pages(obj_request->pages, xferred, length);
1513                 obj_request->xferred = length;
1514         }
1515         obj_request_done_set(obj_request);
1516 }
1517
1518 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1519 {
1520         dout("%s: obj %p cb %p\n", __func__, obj_request,
1521                 obj_request->callback);
1522         if (obj_request->callback)
1523                 obj_request->callback(obj_request);
1524         else
1525                 complete_all(&obj_request->completion);
1526 }
1527
1528 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1529 {
1530         dout("%s: obj %p\n", __func__, obj_request);
1531         obj_request_done_set(obj_request);
1532 }
1533
1534 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1535 {
1536         struct rbd_img_request *img_request = NULL;
1537         struct rbd_device *rbd_dev = NULL;
1538         bool layered = false;
1539
1540         if (obj_request_img_data_test(obj_request)) {
1541                 img_request = obj_request->img_request;
1542                 layered = img_request && img_request_layered_test(img_request);
1543                 rbd_dev = img_request->rbd_dev;
1544         }
1545
1546         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1547                 obj_request, img_request, obj_request->result,
1548                 obj_request->xferred, obj_request->length);
1549         if (layered && obj_request->result == -ENOENT &&
1550                         obj_request->img_offset < rbd_dev->parent_overlap)
1551                 rbd_img_parent_read(obj_request);
1552         else if (img_request)
1553                 rbd_img_obj_request_read_callback(obj_request);
1554         else
1555                 obj_request_done_set(obj_request);
1556 }
1557
1558 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1559 {
1560         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1561                 obj_request->result, obj_request->length);
1562         /*
1563          * There is no such thing as a successful short write.  Set
1564          * it to our originally-requested length.
1565          */
1566         obj_request->xferred = obj_request->length;
1567         obj_request_done_set(obj_request);
1568 }
1569
1570 /*
1571  * For a simple stat call there's nothing to do.  We'll do more if
1572  * this is part of a write sequence for a layered image.
1573  */
1574 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1575 {
1576         dout("%s: obj %p\n", __func__, obj_request);
1577         obj_request_done_set(obj_request);
1578 }
1579
1580 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1581                                 struct ceph_msg *msg)
1582 {
1583         struct rbd_obj_request *obj_request = osd_req->r_priv;
1584         u16 opcode;
1585
1586         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1587         rbd_assert(osd_req == obj_request->osd_req);
1588         if (obj_request_img_data_test(obj_request)) {
1589                 rbd_assert(obj_request->img_request);
1590                 rbd_assert(obj_request->which != BAD_WHICH);
1591         } else {
1592                 rbd_assert(obj_request->which == BAD_WHICH);
1593         }
1594
1595         if (osd_req->r_result < 0)
1596                 obj_request->result = osd_req->r_result;
1597
1598         BUG_ON(osd_req->r_num_ops > 2);
1599
1600         /*
1601          * We support a 64-bit length, but ultimately it has to be
1602          * passed to blk_end_request(), which takes an unsigned int.
1603          */
1604         obj_request->xferred = osd_req->r_reply_op_len[0];
1605         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1606         opcode = osd_req->r_ops[0].op;
1607         switch (opcode) {
1608         case CEPH_OSD_OP_READ:
1609                 rbd_osd_read_callback(obj_request);
1610                 break;
1611         case CEPH_OSD_OP_WRITE:
1612                 rbd_osd_write_callback(obj_request);
1613                 break;
1614         case CEPH_OSD_OP_STAT:
1615                 rbd_osd_stat_callback(obj_request);
1616                 break;
1617         case CEPH_OSD_OP_CALL:
1618         case CEPH_OSD_OP_NOTIFY_ACK:
1619         case CEPH_OSD_OP_WATCH:
1620                 rbd_osd_trivial_callback(obj_request);
1621                 break;
1622         default:
1623                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1624                         obj_request->object_name, (unsigned short) opcode);
1625                 break;
1626         }
1627
1628         if (obj_request_done_test(obj_request))
1629                 rbd_obj_request_complete(obj_request);
1630 }
1631
1632 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1633 {
1634         struct rbd_img_request *img_request = obj_request->img_request;
1635         struct ceph_osd_request *osd_req = obj_request->osd_req;
1636         u64 snap_id;
1637
1638         rbd_assert(osd_req != NULL);
1639
1640         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1641         ceph_osdc_build_request(osd_req, obj_request->offset,
1642                         NULL, snap_id, NULL);
1643 }
1644
1645 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1646 {
1647         struct rbd_img_request *img_request = obj_request->img_request;
1648         struct ceph_osd_request *osd_req = obj_request->osd_req;
1649         struct ceph_snap_context *snapc;
1650         struct timespec mtime = CURRENT_TIME;
1651
1652         rbd_assert(osd_req != NULL);
1653
1654         snapc = img_request ? img_request->snapc : NULL;
1655         ceph_osdc_build_request(osd_req, obj_request->offset,
1656                         snapc, CEPH_NOSNAP, &mtime);
1657 }
1658
1659 static struct ceph_osd_request *rbd_osd_req_create(
1660                                         struct rbd_device *rbd_dev,
1661                                         bool write_request,
1662                                         struct rbd_obj_request *obj_request)
1663 {
1664         struct ceph_snap_context *snapc = NULL;
1665         struct ceph_osd_client *osdc;
1666         struct ceph_osd_request *osd_req;
1667
1668         if (obj_request_img_data_test(obj_request)) {
1669                 struct rbd_img_request *img_request = obj_request->img_request;
1670
1671                 rbd_assert(write_request ==
1672                                 img_request_write_test(img_request));
1673                 if (write_request)
1674                         snapc = img_request->snapc;
1675         }
1676
1677         /* Allocate and initialize the request, for the single op */
1678
1679         osdc = &rbd_dev->rbd_client->client->osdc;
1680         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1681         if (!osd_req)
1682                 return NULL;    /* ENOMEM */
1683
1684         if (write_request)
1685                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1686         else
1687                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1688
1689         osd_req->r_callback = rbd_osd_req_callback;
1690         osd_req->r_priv = obj_request;
1691
1692         osd_req->r_oid_len = strlen(obj_request->object_name);
1693         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1694         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1695
1696         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1697
1698         return osd_req;
1699 }
1700
1701 /*
1702  * Create a copyup osd request based on the information in the
1703  * object request supplied.  A copyup request has two osd ops,
1704  * a copyup method call, and a "normal" write request.
1705  */
1706 static struct ceph_osd_request *
1707 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1708 {
1709         struct rbd_img_request *img_request;
1710         struct ceph_snap_context *snapc;
1711         struct rbd_device *rbd_dev;
1712         struct ceph_osd_client *osdc;
1713         struct ceph_osd_request *osd_req;
1714
1715         rbd_assert(obj_request_img_data_test(obj_request));
1716         img_request = obj_request->img_request;
1717         rbd_assert(img_request);
1718         rbd_assert(img_request_write_test(img_request));
1719
1720         /* Allocate and initialize the request, for the two ops */
1721
1722         snapc = img_request->snapc;
1723         rbd_dev = img_request->rbd_dev;
1724         osdc = &rbd_dev->rbd_client->client->osdc;
1725         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1726         if (!osd_req)
1727                 return NULL;    /* ENOMEM */
1728
1729         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1730         osd_req->r_callback = rbd_osd_req_callback;
1731         osd_req->r_priv = obj_request;
1732
1733         osd_req->r_oid_len = strlen(obj_request->object_name);
1734         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1735         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1736
1737         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1738
1739         return osd_req;
1740 }
1741
1742
1743 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1744 {
1745         ceph_osdc_put_request(osd_req);
1746 }
1747
1748 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1749
1750 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1751                                                 u64 offset, u64 length,
1752                                                 enum obj_request_type type)
1753 {
1754         struct rbd_obj_request *obj_request;
1755         size_t size;
1756         char *name;
1757
1758         rbd_assert(obj_request_type_valid(type));
1759
1760         size = strlen(object_name) + 1;
1761         name = kmalloc(size, GFP_KERNEL);
1762         if (!name)
1763                 return NULL;
1764
1765         obj_request = kzalloc(sizeof (*obj_request), GFP_KERNEL);
1766         if (!obj_request) {
1767                 kfree(name);
1768                 return NULL;
1769         }
1770
1771         obj_request->object_name = memcpy(name, object_name, size);
1772         obj_request->offset = offset;
1773         obj_request->length = length;
1774         obj_request->flags = 0;
1775         obj_request->which = BAD_WHICH;
1776         obj_request->type = type;
1777         INIT_LIST_HEAD(&obj_request->links);
1778         init_completion(&obj_request->completion);
1779         kref_init(&obj_request->kref);
1780
1781         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1782                 offset, length, (int)type, obj_request);
1783
1784         return obj_request;
1785 }
1786
1787 static void rbd_obj_request_destroy(struct kref *kref)
1788 {
1789         struct rbd_obj_request *obj_request;
1790
1791         obj_request = container_of(kref, struct rbd_obj_request, kref);
1792
1793         dout("%s: obj %p\n", __func__, obj_request);
1794
1795         rbd_assert(obj_request->img_request == NULL);
1796         rbd_assert(obj_request->which == BAD_WHICH);
1797
1798         if (obj_request->osd_req)
1799                 rbd_osd_req_destroy(obj_request->osd_req);
1800
1801         rbd_assert(obj_request_type_valid(obj_request->type));
1802         switch (obj_request->type) {
1803         case OBJ_REQUEST_NODATA:
1804                 break;          /* Nothing to do */
1805         case OBJ_REQUEST_BIO:
1806                 if (obj_request->bio_list)
1807                         bio_chain_put(obj_request->bio_list);
1808                 break;
1809         case OBJ_REQUEST_PAGES:
1810                 if (obj_request->pages)
1811                         ceph_release_page_vector(obj_request->pages,
1812                                                 obj_request->page_count);
1813                 break;
1814         }
1815
1816         kfree(obj_request->object_name);
1817         kfree(obj_request);
1818 }
1819
1820 /*
1821  * Caller is responsible for filling in the list of object requests
1822  * that comprises the image request, and the Linux request pointer
1823  * (if there is one).
1824  */
1825 static struct rbd_img_request *rbd_img_request_create(
1826                                         struct rbd_device *rbd_dev,
1827                                         u64 offset, u64 length,
1828                                         bool write_request,
1829                                         bool child_request)
1830 {
1831         struct rbd_img_request *img_request;
1832
1833         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1834         if (!img_request)
1835                 return NULL;
1836
1837         if (write_request) {
1838                 down_read(&rbd_dev->header_rwsem);
1839                 ceph_get_snap_context(rbd_dev->header.snapc);
1840                 up_read(&rbd_dev->header_rwsem);
1841         }
1842
1843         img_request->rq = NULL;
1844         img_request->rbd_dev = rbd_dev;
1845         img_request->offset = offset;
1846         img_request->length = length;
1847         img_request->flags = 0;
1848         if (write_request) {
1849                 img_request_write_set(img_request);
1850                 img_request->snapc = rbd_dev->header.snapc;
1851         } else {
1852                 img_request->snap_id = rbd_dev->spec->snap_id;
1853         }
1854         if (child_request)
1855                 img_request_child_set(img_request);
1856         if (rbd_dev->parent_spec)
1857                 img_request_layered_set(img_request);
1858         spin_lock_init(&img_request->completion_lock);
1859         img_request->next_completion = 0;
1860         img_request->callback = NULL;
1861         img_request->result = 0;
1862         img_request->obj_request_count = 0;
1863         INIT_LIST_HEAD(&img_request->obj_requests);
1864         kref_init(&img_request->kref);
1865
1866         rbd_img_request_get(img_request);       /* Avoid a warning */
1867         rbd_img_request_put(img_request);       /* TEMPORARY */
1868
1869         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1870                 write_request ? "write" : "read", offset, length,
1871                 img_request);
1872
1873         return img_request;
1874 }
1875
1876 static void rbd_img_request_destroy(struct kref *kref)
1877 {
1878         struct rbd_img_request *img_request;
1879         struct rbd_obj_request *obj_request;
1880         struct rbd_obj_request *next_obj_request;
1881
1882         img_request = container_of(kref, struct rbd_img_request, kref);
1883
1884         dout("%s: img %p\n", __func__, img_request);
1885
1886         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1887                 rbd_img_obj_request_del(img_request, obj_request);
1888         rbd_assert(img_request->obj_request_count == 0);
1889
1890         if (img_request_write_test(img_request))
1891                 ceph_put_snap_context(img_request->snapc);
1892
1893         if (img_request_child_test(img_request))
1894                 rbd_obj_request_put(img_request->obj_request);
1895
1896         kmem_cache_free(rbd_img_request_cache, img_request);
1897 }
1898
1899 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1900 {
1901         struct rbd_img_request *img_request;
1902         unsigned int xferred;
1903         int result;
1904         bool more;
1905
1906         rbd_assert(obj_request_img_data_test(obj_request));
1907         img_request = obj_request->img_request;
1908
1909         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1910         xferred = (unsigned int)obj_request->xferred;
1911         result = obj_request->result;
1912         if (result) {
1913                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1914
1915                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1916                         img_request_write_test(img_request) ? "write" : "read",
1917                         obj_request->length, obj_request->img_offset,
1918                         obj_request->offset);
1919                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1920                         result, xferred);
1921                 if (!img_request->result)
1922                         img_request->result = result;
1923         }
1924
1925         /* Image object requests don't own their page array */
1926
1927         if (obj_request->type == OBJ_REQUEST_PAGES) {
1928                 obj_request->pages = NULL;
1929                 obj_request->page_count = 0;
1930         }
1931
1932         if (img_request_child_test(img_request)) {
1933                 rbd_assert(img_request->obj_request != NULL);
1934                 more = obj_request->which < img_request->obj_request_count - 1;
1935         } else {
1936                 rbd_assert(img_request->rq != NULL);
1937                 more = blk_end_request(img_request->rq, result, xferred);
1938         }
1939
1940         return more;
1941 }
1942
1943 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1944 {
1945         struct rbd_img_request *img_request;
1946         u32 which = obj_request->which;
1947         bool more = true;
1948
1949         rbd_assert(obj_request_img_data_test(obj_request));
1950         img_request = obj_request->img_request;
1951
1952         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1953         rbd_assert(img_request != NULL);
1954         rbd_assert(img_request->obj_request_count > 0);
1955         rbd_assert(which != BAD_WHICH);
1956         rbd_assert(which < img_request->obj_request_count);
1957         rbd_assert(which >= img_request->next_completion);
1958
1959         spin_lock_irq(&img_request->completion_lock);
1960         if (which != img_request->next_completion)
1961                 goto out;
1962
1963         for_each_obj_request_from(img_request, obj_request) {
1964                 rbd_assert(more);
1965                 rbd_assert(which < img_request->obj_request_count);
1966
1967                 if (!obj_request_done_test(obj_request))
1968                         break;
1969                 more = rbd_img_obj_end_request(obj_request);
1970                 which++;
1971         }
1972
1973         rbd_assert(more ^ (which == img_request->obj_request_count));
1974         img_request->next_completion = which;
1975 out:
1976         spin_unlock_irq(&img_request->completion_lock);
1977
1978         if (!more)
1979                 rbd_img_request_complete(img_request);
1980 }
1981
1982 /*
1983  * Split up an image request into one or more object requests, each
1984  * to a different object.  The "type" parameter indicates whether
1985  * "data_desc" is the pointer to the head of a list of bio
1986  * structures, or the base of a page array.  In either case this
1987  * function assumes data_desc describes memory sufficient to hold
1988  * all data described by the image request.
1989  */
1990 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1991                                         enum obj_request_type type,
1992                                         void *data_desc)
1993 {
1994         struct rbd_device *rbd_dev = img_request->rbd_dev;
1995         struct rbd_obj_request *obj_request = NULL;
1996         struct rbd_obj_request *next_obj_request;
1997         bool write_request = img_request_write_test(img_request);
1998         struct bio *bio_list;
1999         unsigned int bio_offset = 0;
2000         struct page **pages;
2001         u64 img_offset;
2002         u64 resid;
2003         u16 opcode;
2004
2005         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2006                 (int)type, data_desc);
2007
2008         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2009         img_offset = img_request->offset;
2010         resid = img_request->length;
2011         rbd_assert(resid > 0);
2012
2013         if (type == OBJ_REQUEST_BIO) {
2014                 bio_list = data_desc;
2015                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2016         } else {
2017                 rbd_assert(type == OBJ_REQUEST_PAGES);
2018                 pages = data_desc;
2019         }
2020
2021         while (resid) {
2022                 struct ceph_osd_request *osd_req;
2023                 const char *object_name;
2024                 u64 offset;
2025                 u64 length;
2026
2027                 object_name = rbd_segment_name(rbd_dev, img_offset);
2028                 if (!object_name)
2029                         goto out_unwind;
2030                 offset = rbd_segment_offset(rbd_dev, img_offset);
2031                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2032                 obj_request = rbd_obj_request_create(object_name,
2033                                                 offset, length, type);
2034                 kfree(object_name);     /* object request has its own copy */
2035                 if (!obj_request)
2036                         goto out_unwind;
2037
2038                 if (type == OBJ_REQUEST_BIO) {
2039                         unsigned int clone_size;
2040
2041                         rbd_assert(length <= (u64)UINT_MAX);
2042                         clone_size = (unsigned int)length;
2043                         obj_request->bio_list =
2044                                         bio_chain_clone_range(&bio_list,
2045                                                                 &bio_offset,
2046                                                                 clone_size,
2047                                                                 GFP_ATOMIC);
2048                         if (!obj_request->bio_list)
2049                                 goto out_partial;
2050                 } else {
2051                         unsigned int page_count;
2052
2053                         obj_request->pages = pages;
2054                         page_count = (u32)calc_pages_for(offset, length);
2055                         obj_request->page_count = page_count;
2056                         if ((offset + length) & ~PAGE_MASK)
2057                                 page_count--;   /* more on last page */
2058                         pages += page_count;
2059                 }
2060
2061                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2062                                                 obj_request);
2063                 if (!osd_req)
2064                         goto out_partial;
2065                 obj_request->osd_req = osd_req;
2066                 obj_request->callback = rbd_img_obj_callback;
2067
2068                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2069                                                 0, 0);
2070                 if (type == OBJ_REQUEST_BIO)
2071                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2072                                         obj_request->bio_list, length);
2073                 else
2074                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2075                                         obj_request->pages, length,
2076                                         offset & ~PAGE_MASK, false, false);
2077
2078                 if (write_request)
2079                         rbd_osd_req_format_write(obj_request);
2080                 else
2081                         rbd_osd_req_format_read(obj_request);
2082
2083                 obj_request->img_offset = img_offset;
2084                 rbd_img_obj_request_add(img_request, obj_request);
2085
2086                 img_offset += length;
2087                 resid -= length;
2088         }
2089
2090         return 0;
2091
2092 out_partial:
2093         rbd_obj_request_put(obj_request);
2094 out_unwind:
2095         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2096                 rbd_obj_request_put(obj_request);
2097
2098         return -ENOMEM;
2099 }
2100
2101 static void
2102 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2103 {
2104         struct rbd_img_request *img_request;
2105         struct rbd_device *rbd_dev;
2106         u64 length;
2107         u32 page_count;
2108
2109         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2110         rbd_assert(obj_request_img_data_test(obj_request));
2111         img_request = obj_request->img_request;
2112         rbd_assert(img_request);
2113
2114         rbd_dev = img_request->rbd_dev;
2115         rbd_assert(rbd_dev);
2116         length = (u64)1 << rbd_dev->header.obj_order;
2117         page_count = (u32)calc_pages_for(0, length);
2118
2119         rbd_assert(obj_request->copyup_pages);
2120         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2121         obj_request->copyup_pages = NULL;
2122
2123         /*
2124          * We want the transfer count to reflect the size of the
2125          * original write request.  There is no such thing as a
2126          * successful short write, so if the request was successful
2127          * we can just set it to the originally-requested length.
2128          */
2129         if (!obj_request->result)
2130                 obj_request->xferred = obj_request->length;
2131
2132         /* Finish up with the normal image object callback */
2133
2134         rbd_img_obj_callback(obj_request);
2135 }
2136
2137 static void
2138 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2139 {
2140         struct rbd_obj_request *orig_request;
2141         struct ceph_osd_request *osd_req;
2142         struct ceph_osd_client *osdc;
2143         struct rbd_device *rbd_dev;
2144         struct page **pages;
2145         int result;
2146         u64 obj_size;
2147         u64 xferred;
2148
2149         rbd_assert(img_request_child_test(img_request));
2150
2151         /* First get what we need from the image request */
2152
2153         pages = img_request->copyup_pages;
2154         rbd_assert(pages != NULL);
2155         img_request->copyup_pages = NULL;
2156
2157         orig_request = img_request->obj_request;
2158         rbd_assert(orig_request != NULL);
2159         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2160         result = img_request->result;
2161         obj_size = img_request->length;
2162         xferred = img_request->xferred;
2163
2164         rbd_dev = img_request->rbd_dev;
2165         rbd_assert(rbd_dev);
2166         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2167
2168         rbd_img_request_put(img_request);
2169
2170         if (result)
2171                 goto out_err;
2172
2173         /* Allocate the new copyup osd request for the original request */
2174
2175         result = -ENOMEM;
2176         rbd_assert(!orig_request->osd_req);
2177         osd_req = rbd_osd_req_create_copyup(orig_request);
2178         if (!osd_req)
2179                 goto out_err;
2180         orig_request->osd_req = osd_req;
2181         orig_request->copyup_pages = pages;
2182
2183         /* Initialize the copyup op */
2184
2185         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2186         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2187                                                 false, false);
2188
2189         /* Then the original write request op */
2190
2191         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2192                                         orig_request->offset,
2193                                         orig_request->length, 0, 0);
2194         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2195                                         orig_request->length);
2196
2197         rbd_osd_req_format_write(orig_request);
2198
2199         /* All set, send it off. */
2200
2201         orig_request->callback = rbd_img_obj_copyup_callback;
2202         osdc = &rbd_dev->rbd_client->client->osdc;
2203         result = rbd_obj_request_submit(osdc, orig_request);
2204         if (!result)
2205                 return;
2206 out_err:
2207         /* Record the error code and complete the request */
2208
2209         orig_request->result = result;
2210         orig_request->xferred = 0;
2211         obj_request_done_set(orig_request);
2212         rbd_obj_request_complete(orig_request);
2213 }
2214
2215 /*
2216  * Read from the parent image the range of data that covers the
2217  * entire target of the given object request.  This is used for
2218  * satisfying a layered image write request when the target of an
2219  * object request from the image request does not exist.
2220  *
2221  * A page array big enough to hold the returned data is allocated
2222  * and supplied to rbd_img_request_fill() as the "data descriptor."
2223  * When the read completes, this page array will be transferred to
2224  * the original object request for the copyup operation.
2225  *
2226  * If an error occurs, record it as the result of the original
2227  * object request and mark it done so it gets completed.
2228  */
2229 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2230 {
2231         struct rbd_img_request *img_request = NULL;
2232         struct rbd_img_request *parent_request = NULL;
2233         struct rbd_device *rbd_dev;
2234         u64 img_offset;
2235         u64 length;
2236         struct page **pages = NULL;
2237         u32 page_count;
2238         int result;
2239
2240         rbd_assert(obj_request_img_data_test(obj_request));
2241         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2242
2243         img_request = obj_request->img_request;
2244         rbd_assert(img_request != NULL);
2245         rbd_dev = img_request->rbd_dev;
2246         rbd_assert(rbd_dev->parent != NULL);
2247
2248         /*
2249          * First things first.  The original osd request is of no
2250          * use to use any more, we'll need a new one that can hold
2251          * the two ops in a copyup request.  We'll get that later,
2252          * but for now we can release the old one.
2253          */
2254         rbd_osd_req_destroy(obj_request->osd_req);
2255         obj_request->osd_req = NULL;
2256
2257         /*
2258          * Determine the byte range covered by the object in the
2259          * child image to which the original request was to be sent.
2260          */
2261         img_offset = obj_request->img_offset - obj_request->offset;
2262         length = (u64)1 << rbd_dev->header.obj_order;
2263
2264         /*
2265          * There is no defined parent data beyond the parent
2266          * overlap, so limit what we read at that boundary if
2267          * necessary.
2268          */
2269         if (img_offset + length > rbd_dev->parent_overlap) {
2270                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2271                 length = rbd_dev->parent_overlap - img_offset;
2272         }
2273
2274         /*
2275          * Allocate a page array big enough to receive the data read
2276          * from the parent.
2277          */
2278         page_count = (u32)calc_pages_for(0, length);
2279         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2280         if (IS_ERR(pages)) {
2281                 result = PTR_ERR(pages);
2282                 pages = NULL;
2283                 goto out_err;
2284         }
2285
2286         result = -ENOMEM;
2287         parent_request = rbd_img_request_create(rbd_dev->parent,
2288                                                 img_offset, length,
2289                                                 false, true);
2290         if (!parent_request)
2291                 goto out_err;
2292         rbd_obj_request_get(obj_request);
2293         parent_request->obj_request = obj_request;
2294
2295         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2296         if (result)
2297                 goto out_err;
2298         parent_request->copyup_pages = pages;
2299
2300         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2301         result = rbd_img_request_submit(parent_request);
2302         if (!result)
2303                 return 0;
2304
2305         parent_request->copyup_pages = NULL;
2306         parent_request->obj_request = NULL;
2307         rbd_obj_request_put(obj_request);
2308 out_err:
2309         if (pages)
2310                 ceph_release_page_vector(pages, page_count);
2311         if (parent_request)
2312                 rbd_img_request_put(parent_request);
2313         obj_request->result = result;
2314         obj_request->xferred = 0;
2315         obj_request_done_set(obj_request);
2316
2317         return result;
2318 }
2319
2320 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2321 {
2322         struct rbd_obj_request *orig_request;
2323         int result;
2324
2325         rbd_assert(!obj_request_img_data_test(obj_request));
2326
2327         /*
2328          * All we need from the object request is the original
2329          * request and the result of the STAT op.  Grab those, then
2330          * we're done with the request.
2331          */
2332         orig_request = obj_request->obj_request;
2333         obj_request->obj_request = NULL;
2334         rbd_assert(orig_request);
2335         rbd_assert(orig_request->img_request);
2336
2337         result = obj_request->result;
2338         obj_request->result = 0;
2339
2340         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2341                 obj_request, orig_request, result,
2342                 obj_request->xferred, obj_request->length);
2343         rbd_obj_request_put(obj_request);
2344
2345         rbd_assert(orig_request);
2346         rbd_assert(orig_request->img_request);
2347
2348         /*
2349          * Our only purpose here is to determine whether the object
2350          * exists, and we don't want to treat the non-existence as
2351          * an error.  If something else comes back, transfer the
2352          * error to the original request and complete it now.
2353          */
2354         if (!result) {
2355                 obj_request_existence_set(orig_request, true);
2356         } else if (result == -ENOENT) {
2357                 obj_request_existence_set(orig_request, false);
2358         } else if (result) {
2359                 orig_request->result = result;
2360                 goto out;
2361         }
2362
2363         /*
2364          * Resubmit the original request now that we have recorded
2365          * whether the target object exists.
2366          */
2367         orig_request->result = rbd_img_obj_request_submit(orig_request);
2368 out:
2369         if (orig_request->result)
2370                 rbd_obj_request_complete(orig_request);
2371         rbd_obj_request_put(orig_request);
2372 }
2373
2374 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2375 {
2376         struct rbd_obj_request *stat_request;
2377         struct rbd_device *rbd_dev;
2378         struct ceph_osd_client *osdc;
2379         struct page **pages = NULL;
2380         u32 page_count;
2381         size_t size;
2382         int ret;
2383
2384         /*
2385          * The response data for a STAT call consists of:
2386          *     le64 length;
2387          *     struct {
2388          *         le32 tv_sec;
2389          *         le32 tv_nsec;
2390          *     } mtime;
2391          */
2392         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2393         page_count = (u32)calc_pages_for(0, size);
2394         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2395         if (IS_ERR(pages))
2396                 return PTR_ERR(pages);
2397
2398         ret = -ENOMEM;
2399         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2400                                                         OBJ_REQUEST_PAGES);
2401         if (!stat_request)
2402                 goto out;
2403
2404         rbd_obj_request_get(obj_request);
2405         stat_request->obj_request = obj_request;
2406         stat_request->pages = pages;
2407         stat_request->page_count = page_count;
2408
2409         rbd_assert(obj_request->img_request);
2410         rbd_dev = obj_request->img_request->rbd_dev;
2411         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2412                                                 stat_request);
2413         if (!stat_request->osd_req)
2414                 goto out;
2415         stat_request->callback = rbd_img_obj_exists_callback;
2416
2417         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2418         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2419                                         false, false);
2420         rbd_osd_req_format_read(stat_request);
2421
2422         osdc = &rbd_dev->rbd_client->client->osdc;
2423         ret = rbd_obj_request_submit(osdc, stat_request);
2424 out:
2425         if (ret)
2426                 rbd_obj_request_put(obj_request);
2427
2428         return ret;
2429 }
2430
2431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2432 {
2433         struct rbd_img_request *img_request;
2434         struct rbd_device *rbd_dev;
2435         bool known;
2436
2437         rbd_assert(obj_request_img_data_test(obj_request));
2438
2439         img_request = obj_request->img_request;
2440         rbd_assert(img_request);
2441         rbd_dev = img_request->rbd_dev;
2442
2443         /*
2444          * Only writes to layered images need special handling.
2445          * Reads and non-layered writes are simple object requests.
2446          * Layered writes that start beyond the end of the overlap
2447          * with the parent have no parent data, so they too are
2448          * simple object requests.  Finally, if the target object is
2449          * known to already exist, its parent data has already been
2450          * copied, so a write to the object can also be handled as a
2451          * simple object request.
2452          */
2453         if (!img_request_write_test(img_request) ||
2454                 !img_request_layered_test(img_request) ||
2455                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2456                 ((known = obj_request_known_test(obj_request)) &&
2457                         obj_request_exists_test(obj_request))) {
2458
2459                 struct rbd_device *rbd_dev;
2460                 struct ceph_osd_client *osdc;
2461
2462                 rbd_dev = obj_request->img_request->rbd_dev;
2463                 osdc = &rbd_dev->rbd_client->client->osdc;
2464
2465                 return rbd_obj_request_submit(osdc, obj_request);
2466         }
2467
2468         /*
2469          * It's a layered write.  The target object might exist but
2470          * we may not know that yet.  If we know it doesn't exist,
2471          * start by reading the data for the full target object from
2472          * the parent so we can use it for a copyup to the target.
2473          */
2474         if (known)
2475                 return rbd_img_obj_parent_read_full(obj_request);
2476
2477         /* We don't know whether the target exists.  Go find out. */
2478
2479         return rbd_img_obj_exists_submit(obj_request);
2480 }
2481
2482 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2483 {
2484         struct rbd_obj_request *obj_request;
2485         struct rbd_obj_request *next_obj_request;
2486
2487         dout("%s: img %p\n", __func__, img_request);
2488         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2489                 int ret;
2490
2491                 ret = rbd_img_obj_request_submit(obj_request);
2492                 if (ret)
2493                         return ret;
2494         }
2495
2496         return 0;
2497 }
2498
2499 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2500 {
2501         struct rbd_obj_request *obj_request;
2502         struct rbd_device *rbd_dev;
2503         u64 obj_end;
2504
2505         rbd_assert(img_request_child_test(img_request));
2506
2507         obj_request = img_request->obj_request;
2508         rbd_assert(obj_request);
2509         rbd_assert(obj_request->img_request);
2510
2511         obj_request->result = img_request->result;
2512         if (obj_request->result)
2513                 goto out;
2514
2515         /*
2516          * We need to zero anything beyond the parent overlap
2517          * boundary.  Since rbd_img_obj_request_read_callback()
2518          * will zero anything beyond the end of a short read, an
2519          * easy way to do this is to pretend the data from the
2520          * parent came up short--ending at the overlap boundary.
2521          */
2522         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2523         obj_end = obj_request->img_offset + obj_request->length;
2524         rbd_dev = obj_request->img_request->rbd_dev;
2525         if (obj_end > rbd_dev->parent_overlap) {
2526                 u64 xferred = 0;
2527
2528                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2529                         xferred = rbd_dev->parent_overlap -
2530                                         obj_request->img_offset;
2531
2532                 obj_request->xferred = min(img_request->xferred, xferred);
2533         } else {
2534                 obj_request->xferred = img_request->xferred;
2535         }
2536 out:
2537         rbd_img_obj_request_read_callback(obj_request);
2538         rbd_obj_request_complete(obj_request);
2539 }
2540
2541 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2542 {
2543         struct rbd_device *rbd_dev;
2544         struct rbd_img_request *img_request;
2545         int result;
2546
2547         rbd_assert(obj_request_img_data_test(obj_request));
2548         rbd_assert(obj_request->img_request != NULL);
2549         rbd_assert(obj_request->result == (s32) -ENOENT);
2550         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2551
2552         rbd_dev = obj_request->img_request->rbd_dev;
2553         rbd_assert(rbd_dev->parent != NULL);
2554         /* rbd_read_finish(obj_request, obj_request->length); */
2555         img_request = rbd_img_request_create(rbd_dev->parent,
2556                                                 obj_request->img_offset,
2557                                                 obj_request->length,
2558                                                 false, true);
2559         result = -ENOMEM;
2560         if (!img_request)
2561                 goto out_err;
2562
2563         rbd_obj_request_get(obj_request);
2564         img_request->obj_request = obj_request;
2565
2566         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2567                                         obj_request->bio_list);
2568         if (result)
2569                 goto out_err;
2570
2571         img_request->callback = rbd_img_parent_read_callback;
2572         result = rbd_img_request_submit(img_request);
2573         if (result)
2574                 goto out_err;
2575
2576         return;
2577 out_err:
2578         if (img_request)
2579                 rbd_img_request_put(img_request);
2580         obj_request->result = result;
2581         obj_request->xferred = 0;
2582         obj_request_done_set(obj_request);
2583 }
2584
2585 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2586 {
2587         struct rbd_obj_request *obj_request;
2588         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2589         int ret;
2590
2591         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2592                                                         OBJ_REQUEST_NODATA);
2593         if (!obj_request)
2594                 return -ENOMEM;
2595
2596         ret = -ENOMEM;
2597         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2598         if (!obj_request->osd_req)
2599                 goto out;
2600         obj_request->callback = rbd_obj_request_put;
2601
2602         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2603                                         notify_id, 0, 0);
2604         rbd_osd_req_format_read(obj_request);
2605
2606         ret = rbd_obj_request_submit(osdc, obj_request);
2607 out:
2608         if (ret)
2609                 rbd_obj_request_put(obj_request);
2610
2611         return ret;
2612 }
2613
2614 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2615 {
2616         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2617
2618         if (!rbd_dev)
2619                 return;
2620
2621         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2622                 rbd_dev->header_name, (unsigned long long)notify_id,
2623                 (unsigned int)opcode);
2624         (void)rbd_dev_refresh(rbd_dev);
2625
2626         rbd_obj_notify_ack(rbd_dev, notify_id);
2627 }
2628
2629 /*
2630  * Request sync osd watch/unwatch.  The value of "start" determines
2631  * whether a watch request is being initiated or torn down.
2632  */
2633 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2634 {
2635         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2636         struct rbd_obj_request *obj_request;
2637         int ret;
2638
2639         rbd_assert(start ^ !!rbd_dev->watch_event);
2640         rbd_assert(start ^ !!rbd_dev->watch_request);
2641
2642         if (start) {
2643                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2644                                                 &rbd_dev->watch_event);
2645                 if (ret < 0)
2646                         return ret;
2647                 rbd_assert(rbd_dev->watch_event != NULL);
2648         }
2649
2650         ret = -ENOMEM;
2651         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2652                                                         OBJ_REQUEST_NODATA);
2653         if (!obj_request)
2654                 goto out_cancel;
2655
2656         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2657         if (!obj_request->osd_req)
2658                 goto out_cancel;
2659
2660         if (start)
2661                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2662         else
2663                 ceph_osdc_unregister_linger_request(osdc,
2664                                         rbd_dev->watch_request->osd_req);
2665
2666         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2667                                 rbd_dev->watch_event->cookie, 0, start);
2668         rbd_osd_req_format_write(obj_request);
2669
2670         ret = rbd_obj_request_submit(osdc, obj_request);
2671         if (ret)
2672                 goto out_cancel;
2673         ret = rbd_obj_request_wait(obj_request);
2674         if (ret)
2675                 goto out_cancel;
2676         ret = obj_request->result;
2677         if (ret)
2678                 goto out_cancel;
2679
2680         /*
2681          * A watch request is set to linger, so the underlying osd
2682          * request won't go away until we unregister it.  We retain
2683          * a pointer to the object request during that time (in
2684          * rbd_dev->watch_request), so we'll keep a reference to
2685          * it.  We'll drop that reference (below) after we've
2686          * unregistered it.
2687          */
2688         if (start) {
2689                 rbd_dev->watch_request = obj_request;
2690
2691                 return 0;
2692         }
2693
2694         /* We have successfully torn down the watch request */
2695
2696         rbd_obj_request_put(rbd_dev->watch_request);
2697         rbd_dev->watch_request = NULL;
2698 out_cancel:
2699         /* Cancel the event if we're tearing down, or on error */
2700         ceph_osdc_cancel_event(rbd_dev->watch_event);
2701         rbd_dev->watch_event = NULL;
2702         if (obj_request)
2703                 rbd_obj_request_put(obj_request);
2704
2705         return ret;
2706 }
2707
2708 /*
2709  * Synchronous osd object method call.  Returns the number of bytes
2710  * returned in the outbound buffer, or a negative error code.
2711  */
2712 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2713                              const char *object_name,
2714                              const char *class_name,
2715                              const char *method_name,
2716                              const void *outbound,
2717                              size_t outbound_size,
2718                              void *inbound,
2719                              size_t inbound_size)
2720 {
2721         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2722         struct rbd_obj_request *obj_request;
2723         struct page **pages;
2724         u32 page_count;
2725         int ret;
2726
2727         /*
2728          * Method calls are ultimately read operations.  The result
2729          * should placed into the inbound buffer provided.  They
2730          * also supply outbound data--parameters for the object
2731          * method.  Currently if this is present it will be a
2732          * snapshot id.
2733          */
2734         page_count = (u32)calc_pages_for(0, inbound_size);
2735         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2736         if (IS_ERR(pages))
2737                 return PTR_ERR(pages);
2738
2739         ret = -ENOMEM;
2740         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2741                                                         OBJ_REQUEST_PAGES);
2742         if (!obj_request)
2743                 goto out;
2744
2745         obj_request->pages = pages;
2746         obj_request->page_count = page_count;
2747
2748         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2749         if (!obj_request->osd_req)
2750                 goto out;
2751
2752         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2753                                         class_name, method_name);
2754         if (outbound_size) {
2755                 struct ceph_pagelist *pagelist;
2756
2757                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2758                 if (!pagelist)
2759                         goto out;
2760
2761                 ceph_pagelist_init(pagelist);
2762                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2763                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2764                                                 pagelist);
2765         }
2766         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2767                                         obj_request->pages, inbound_size,
2768                                         0, false, false);
2769         rbd_osd_req_format_read(obj_request);
2770
2771         ret = rbd_obj_request_submit(osdc, obj_request);
2772         if (ret)
2773                 goto out;
2774         ret = rbd_obj_request_wait(obj_request);
2775         if (ret)
2776                 goto out;
2777
2778         ret = obj_request->result;
2779         if (ret < 0)
2780                 goto out;
2781
2782         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2783         ret = (int)obj_request->xferred;
2784         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2785 out:
2786         if (obj_request)
2787                 rbd_obj_request_put(obj_request);
2788         else
2789                 ceph_release_page_vector(pages, page_count);
2790
2791         return ret;
2792 }
2793
2794 static void rbd_request_fn(struct request_queue *q)
2795                 __releases(q->queue_lock) __acquires(q->queue_lock)
2796 {
2797         struct rbd_device *rbd_dev = q->queuedata;
2798         bool read_only = rbd_dev->mapping.read_only;
2799         struct request *rq;
2800         int result;
2801
2802         while ((rq = blk_fetch_request(q))) {
2803                 bool write_request = rq_data_dir(rq) == WRITE;
2804                 struct rbd_img_request *img_request;
2805                 u64 offset;
2806                 u64 length;
2807
2808                 /* Ignore any non-FS requests that filter through. */
2809
2810                 if (rq->cmd_type != REQ_TYPE_FS) {
2811                         dout("%s: non-fs request type %d\n", __func__,
2812                                 (int) rq->cmd_type);
2813                         __blk_end_request_all(rq, 0);
2814                         continue;
2815                 }
2816
2817                 /* Ignore/skip any zero-length requests */
2818
2819                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2820                 length = (u64) blk_rq_bytes(rq);
2821
2822                 if (!length) {
2823                         dout("%s: zero-length request\n", __func__);
2824                         __blk_end_request_all(rq, 0);
2825                         continue;
2826                 }
2827
2828                 spin_unlock_irq(q->queue_lock);
2829
2830                 /* Disallow writes to a read-only device */
2831
2832                 if (write_request) {
2833                         result = -EROFS;
2834                         if (read_only)
2835                                 goto end_request;
2836                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2837                 }
2838
2839                 /*
2840                  * Quit early if the mapped snapshot no longer
2841                  * exists.  It's still possible the snapshot will
2842                  * have disappeared by the time our request arrives
2843                  * at the osd, but there's no sense in sending it if
2844                  * we already know.
2845                  */
2846                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2847                         dout("request for non-existent snapshot");
2848                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2849                         result = -ENXIO;
2850                         goto end_request;
2851                 }
2852
2853                 result = -EINVAL;
2854                 if (offset && length > U64_MAX - offset + 1) {
2855                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2856                                 offset, length);
2857                         goto end_request;       /* Shouldn't happen */
2858                 }
2859
2860                 result = -ENOMEM;
2861                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2862                                                         write_request, false);
2863                 if (!img_request)
2864                         goto end_request;
2865
2866                 img_request->rq = rq;
2867
2868                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2869                                                 rq->bio);
2870                 if (!result)
2871                         result = rbd_img_request_submit(img_request);
2872                 if (result)
2873                         rbd_img_request_put(img_request);
2874 end_request:
2875                 spin_lock_irq(q->queue_lock);
2876                 if (result < 0) {
2877                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2878                                 write_request ? "write" : "read",
2879                                 length, offset, result);
2880
2881                         __blk_end_request_all(rq, result);
2882                 }
2883         }
2884 }
2885
2886 /*
2887  * a queue callback. Makes sure that we don't create a bio that spans across
2888  * multiple osd objects. One exception would be with a single page bios,
2889  * which we handle later at bio_chain_clone_range()
2890  */
2891 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2892                           struct bio_vec *bvec)
2893 {
2894         struct rbd_device *rbd_dev = q->queuedata;
2895         sector_t sector_offset;
2896         sector_t sectors_per_obj;
2897         sector_t obj_sector_offset;
2898         int ret;
2899
2900         /*
2901          * Find how far into its rbd object the partition-relative
2902          * bio start sector is to offset relative to the enclosing
2903          * device.
2904          */
2905         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2906         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2907         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2908
2909         /*
2910          * Compute the number of bytes from that offset to the end
2911          * of the object.  Account for what's already used by the bio.
2912          */
2913         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2914         if (ret > bmd->bi_size)
2915                 ret -= bmd->bi_size;
2916         else
2917                 ret = 0;
2918
2919         /*
2920          * Don't send back more than was asked for.  And if the bio
2921          * was empty, let the whole thing through because:  "Note
2922          * that a block device *must* allow a single page to be
2923          * added to an empty bio."
2924          */
2925         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2926         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2927                 ret = (int) bvec->bv_len;
2928
2929         return ret;
2930 }
2931
2932 static void rbd_free_disk(struct rbd_device *rbd_dev)
2933 {
2934         struct gendisk *disk = rbd_dev->disk;
2935
2936         if (!disk)
2937                 return;
2938
2939         rbd_dev->disk = NULL;
2940         if (disk->flags & GENHD_FL_UP) {
2941                 del_gendisk(disk);
2942                 if (disk->queue)
2943                         blk_cleanup_queue(disk->queue);
2944         }
2945         put_disk(disk);
2946 }
2947
2948 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2949                                 const char *object_name,
2950                                 u64 offset, u64 length, void *buf)
2951
2952 {
2953         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2954         struct rbd_obj_request *obj_request;
2955         struct page **pages = NULL;
2956         u32 page_count;
2957         size_t size;
2958         int ret;
2959
2960         page_count = (u32) calc_pages_for(offset, length);
2961         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2962         if (IS_ERR(pages))
2963                 ret = PTR_ERR(pages);
2964
2965         ret = -ENOMEM;
2966         obj_request = rbd_obj_request_create(object_name, offset, length,
2967                                                         OBJ_REQUEST_PAGES);
2968         if (!obj_request)
2969                 goto out;
2970
2971         obj_request->pages = pages;
2972         obj_request->page_count = page_count;
2973
2974         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2975         if (!obj_request->osd_req)
2976                 goto out;
2977
2978         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2979                                         offset, length, 0, 0);
2980         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2981                                         obj_request->pages,
2982                                         obj_request->length,
2983                                         obj_request->offset & ~PAGE_MASK,
2984                                         false, false);
2985         rbd_osd_req_format_read(obj_request);
2986
2987         ret = rbd_obj_request_submit(osdc, obj_request);
2988         if (ret)
2989                 goto out;
2990         ret = rbd_obj_request_wait(obj_request);
2991         if (ret)
2992                 goto out;
2993
2994         ret = obj_request->result;
2995         if (ret < 0)
2996                 goto out;
2997
2998         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2999         size = (size_t) obj_request->xferred;
3000         ceph_copy_from_page_vector(pages, buf, 0, size);
3001         rbd_assert(size <= (size_t)INT_MAX);
3002         ret = (int)size;
3003 out:
3004         if (obj_request)
3005                 rbd_obj_request_put(obj_request);
3006         else
3007                 ceph_release_page_vector(pages, page_count);
3008
3009         return ret;
3010 }
3011
3012 /*
3013  * Read the complete header for the given rbd device.
3014  *
3015  * Returns a pointer to a dynamically-allocated buffer containing
3016  * the complete and validated header.  Caller can pass the address
3017  * of a variable that will be filled in with the version of the
3018  * header object at the time it was read.
3019  *
3020  * Returns a pointer-coded errno if a failure occurs.
3021  */
3022 static struct rbd_image_header_ondisk *
3023 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3024 {
3025         struct rbd_image_header_ondisk *ondisk = NULL;
3026         u32 snap_count = 0;
3027         u64 names_size = 0;
3028         u32 want_count;
3029         int ret;
3030
3031         /*
3032          * The complete header will include an array of its 64-bit
3033          * snapshot ids, followed by the names of those snapshots as
3034          * a contiguous block of NUL-terminated strings.  Note that
3035          * the number of snapshots could change by the time we read
3036          * it in, in which case we re-read it.
3037          */
3038         do {
3039                 size_t size;
3040
3041                 kfree(ondisk);
3042
3043                 size = sizeof (*ondisk);
3044                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3045                 size += names_size;
3046                 ondisk = kmalloc(size, GFP_KERNEL);
3047                 if (!ondisk)
3048                         return ERR_PTR(-ENOMEM);
3049
3050                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3051                                        0, size, ondisk);
3052                 if (ret < 0)
3053                         goto out_err;
3054                 if ((size_t)ret < size) {
3055                         ret = -ENXIO;
3056                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3057                                 size, ret);
3058                         goto out_err;
3059                 }
3060                 if (!rbd_dev_ondisk_valid(ondisk)) {
3061                         ret = -ENXIO;
3062                         rbd_warn(rbd_dev, "invalid header");
3063                         goto out_err;
3064                 }
3065
3066                 names_size = le64_to_cpu(ondisk->snap_names_len);
3067                 want_count = snap_count;
3068                 snap_count = le32_to_cpu(ondisk->snap_count);
3069         } while (snap_count != want_count);
3070
3071         return ondisk;
3072
3073 out_err:
3074         kfree(ondisk);
3075
3076         return ERR_PTR(ret);
3077 }
3078
3079 /*
3080  * reload the ondisk the header
3081  */
3082 static int rbd_read_header(struct rbd_device *rbd_dev,
3083                            struct rbd_image_header *header)
3084 {
3085         struct rbd_image_header_ondisk *ondisk;
3086         int ret;
3087
3088         ondisk = rbd_dev_v1_header_read(rbd_dev);
3089         if (IS_ERR(ondisk))
3090                 return PTR_ERR(ondisk);
3091         ret = rbd_header_from_disk(header, ondisk);
3092         kfree(ondisk);
3093
3094         return ret;
3095 }
3096
3097 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3098 {
3099         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3100                 return;
3101
3102         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3103                 sector_t size;
3104
3105                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3106                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3107                 dout("setting size to %llu sectors", (unsigned long long)size);
3108                 set_capacity(rbd_dev->disk, size);
3109         }
3110 }
3111
3112 /*
3113  * only read the first part of the ondisk header, without the snaps info
3114  */
3115 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3116 {
3117         int ret;
3118         struct rbd_image_header h;
3119
3120         ret = rbd_read_header(rbd_dev, &h);
3121         if (ret < 0)
3122                 return ret;
3123
3124         down_write(&rbd_dev->header_rwsem);
3125
3126         /* Update image size, and check for resize of mapped image */
3127         rbd_dev->header.image_size = h.image_size;
3128         rbd_update_mapping_size(rbd_dev);
3129
3130         /* rbd_dev->header.object_prefix shouldn't change */
3131         kfree(rbd_dev->header.snap_sizes);
3132         kfree(rbd_dev->header.snap_names);
3133         /* osd requests may still refer to snapc */
3134         ceph_put_snap_context(rbd_dev->header.snapc);
3135
3136         rbd_dev->header.image_size = h.image_size;
3137         rbd_dev->header.snapc = h.snapc;
3138         rbd_dev->header.snap_names = h.snap_names;
3139         rbd_dev->header.snap_sizes = h.snap_sizes;
3140         /* Free the extra copy of the object prefix */
3141         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3142                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3143         kfree(h.object_prefix);
3144
3145         up_write(&rbd_dev->header_rwsem);
3146
3147         return ret;
3148 }
3149
3150 /*
3151  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3152  * has disappeared from the (just updated) snapshot context.
3153  */
3154 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3155 {
3156         u64 snap_id;
3157
3158         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3159                 return;
3160
3161         snap_id = rbd_dev->spec->snap_id;
3162         if (snap_id == CEPH_NOSNAP)
3163                 return;
3164
3165         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3166                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3167 }
3168
3169 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3170 {
3171         u64 image_size;
3172         int ret;
3173
3174         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3175         image_size = rbd_dev->header.image_size;
3176         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3177         if (rbd_dev->image_format == 1)
3178                 ret = rbd_dev_v1_refresh(rbd_dev);
3179         else
3180                 ret = rbd_dev_v2_refresh(rbd_dev);
3181
3182         /* If it's a mapped snapshot, validate its EXISTS flag */
3183
3184         rbd_exists_validate(rbd_dev);
3185         mutex_unlock(&ctl_mutex);
3186         if (ret)
3187                 rbd_warn(rbd_dev, "got notification but failed to "
3188                            " update snaps: %d\n", ret);
3189         if (image_size != rbd_dev->header.image_size)
3190                 revalidate_disk(rbd_dev->disk);
3191
3192         return ret;
3193 }
3194
3195 static int rbd_init_disk(struct rbd_device *rbd_dev)
3196 {
3197         struct gendisk *disk;
3198         struct request_queue *q;
3199         u64 segment_size;
3200
3201         /* create gendisk info */
3202         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3203         if (!disk)
3204                 return -ENOMEM;
3205
3206         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3207                  rbd_dev->dev_id);
3208         disk->major = rbd_dev->major;
3209         disk->first_minor = 0;
3210         disk->fops = &rbd_bd_ops;
3211         disk->private_data = rbd_dev;
3212
3213         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3214         if (!q)
3215                 goto out_disk;
3216
3217         /* We use the default size, but let's be explicit about it. */
3218         blk_queue_physical_block_size(q, SECTOR_SIZE);
3219
3220         /* set io sizes to object size */
3221         segment_size = rbd_obj_bytes(&rbd_dev->header);
3222         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3223         blk_queue_max_segment_size(q, segment_size);
3224         blk_queue_io_min(q, segment_size);
3225         blk_queue_io_opt(q, segment_size);
3226
3227         blk_queue_merge_bvec(q, rbd_merge_bvec);
3228         disk->queue = q;
3229
3230         q->queuedata = rbd_dev;
3231
3232         rbd_dev->disk = disk;
3233
3234         return 0;
3235 out_disk:
3236         put_disk(disk);
3237
3238         return -ENOMEM;
3239 }
3240
3241 /*
3242   sysfs
3243 */
3244
3245 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3246 {
3247         return container_of(dev, struct rbd_device, dev);
3248 }
3249
3250 static ssize_t rbd_size_show(struct device *dev,
3251                              struct device_attribute *attr, char *buf)
3252 {
3253         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3254
3255         return sprintf(buf, "%llu\n",
3256                 (unsigned long long)rbd_dev->mapping.size);
3257 }
3258
3259 /*
3260  * Note this shows the features for whatever's mapped, which is not
3261  * necessarily the base image.
3262  */
3263 static ssize_t rbd_features_show(struct device *dev,
3264                              struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         return sprintf(buf, "0x%016llx\n",
3269                         (unsigned long long)rbd_dev->mapping.features);
3270 }
3271
3272 static ssize_t rbd_major_show(struct device *dev,
3273                               struct device_attribute *attr, char *buf)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276
3277         if (rbd_dev->major)
3278                 return sprintf(buf, "%d\n", rbd_dev->major);
3279
3280         return sprintf(buf, "(none)\n");
3281
3282 }
3283
3284 static ssize_t rbd_client_id_show(struct device *dev,
3285                                   struct device_attribute *attr, char *buf)
3286 {
3287         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288
3289         return sprintf(buf, "client%lld\n",
3290                         ceph_client_id(rbd_dev->rbd_client->client));
3291 }
3292
3293 static ssize_t rbd_pool_show(struct device *dev,
3294                              struct device_attribute *attr, char *buf)
3295 {
3296         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3297
3298         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3299 }
3300
3301 static ssize_t rbd_pool_id_show(struct device *dev,
3302                              struct device_attribute *attr, char *buf)
3303 {
3304         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3305
3306         return sprintf(buf, "%llu\n",
3307                         (unsigned long long) rbd_dev->spec->pool_id);
3308 }
3309
3310 static ssize_t rbd_name_show(struct device *dev,
3311                              struct device_attribute *attr, char *buf)
3312 {
3313         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3314
3315         if (rbd_dev->spec->image_name)
3316                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3317
3318         return sprintf(buf, "(unknown)\n");
3319 }
3320
3321 static ssize_t rbd_image_id_show(struct device *dev,
3322                              struct device_attribute *attr, char *buf)
3323 {
3324         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3325
3326         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3327 }
3328
3329 /*
3330  * Shows the name of the currently-mapped snapshot (or
3331  * RBD_SNAP_HEAD_NAME for the base image).
3332  */
3333 static ssize_t rbd_snap_show(struct device *dev,
3334                              struct device_attribute *attr,
3335                              char *buf)
3336 {
3337         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3338
3339         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3340 }
3341
3342 /*
3343  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3344  * for the parent image.  If there is no parent, simply shows
3345  * "(no parent image)".
3346  */
3347 static ssize_t rbd_parent_show(struct device *dev,
3348                              struct device_attribute *attr,
3349                              char *buf)
3350 {
3351         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3352         struct rbd_spec *spec = rbd_dev->parent_spec;
3353         int count;
3354         char *bufp = buf;
3355
3356         if (!spec)
3357                 return sprintf(buf, "(no parent image)\n");
3358
3359         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3360                         (unsigned long long) spec->pool_id, spec->pool_name);
3361         if (count < 0)
3362                 return count;
3363         bufp += count;
3364
3365         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3366                         spec->image_name ? spec->image_name : "(unknown)");
3367         if (count < 0)
3368                 return count;
3369         bufp += count;
3370
3371         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3372                         (unsigned long long) spec->snap_id, spec->snap_name);
3373         if (count < 0)
3374                 return count;
3375         bufp += count;
3376
3377         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3378         if (count < 0)
3379                 return count;
3380         bufp += count;
3381
3382         return (ssize_t) (bufp - buf);
3383 }
3384
3385 static ssize_t rbd_image_refresh(struct device *dev,
3386                                  struct device_attribute *attr,
3387                                  const char *buf,
3388                                  size_t size)
3389 {
3390         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3391         int ret;
3392
3393         ret = rbd_dev_refresh(rbd_dev);
3394
3395         return ret < 0 ? ret : size;
3396 }
3397
3398 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3399 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3400 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3401 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3402 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3403 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3404 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3405 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3406 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3407 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3408 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3409
3410 static struct attribute *rbd_attrs[] = {
3411         &dev_attr_size.attr,
3412         &dev_attr_features.attr,
3413         &dev_attr_major.attr,
3414         &dev_attr_client_id.attr,
3415         &dev_attr_pool.attr,
3416         &dev_attr_pool_id.attr,
3417         &dev_attr_name.attr,
3418         &dev_attr_image_id.attr,
3419         &dev_attr_current_snap.attr,
3420         &dev_attr_parent.attr,
3421         &dev_attr_refresh.attr,
3422         NULL
3423 };
3424
3425 static struct attribute_group rbd_attr_group = {
3426         .attrs = rbd_attrs,
3427 };
3428
3429 static const struct attribute_group *rbd_attr_groups[] = {
3430         &rbd_attr_group,
3431         NULL
3432 };
3433
3434 static void rbd_sysfs_dev_release(struct device *dev)
3435 {
3436 }
3437
3438 static struct device_type rbd_device_type = {
3439         .name           = "rbd",
3440         .groups         = rbd_attr_groups,
3441         .release        = rbd_sysfs_dev_release,
3442 };
3443
3444 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3445 {
3446         kref_get(&spec->kref);
3447
3448         return spec;
3449 }
3450
3451 static void rbd_spec_free(struct kref *kref);
3452 static void rbd_spec_put(struct rbd_spec *spec)
3453 {
3454         if (spec)
3455                 kref_put(&spec->kref, rbd_spec_free);
3456 }
3457
3458 static struct rbd_spec *rbd_spec_alloc(void)
3459 {
3460         struct rbd_spec *spec;
3461
3462         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3463         if (!spec)
3464                 return NULL;
3465         kref_init(&spec->kref);
3466
3467         return spec;
3468 }
3469
3470 static void rbd_spec_free(struct kref *kref)
3471 {
3472         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3473
3474         kfree(spec->pool_name);
3475         kfree(spec->image_id);
3476         kfree(spec->image_name);
3477         kfree(spec->snap_name);
3478         kfree(spec);
3479 }
3480
3481 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3482                                 struct rbd_spec *spec)
3483 {
3484         struct rbd_device *rbd_dev;
3485
3486         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3487         if (!rbd_dev)
3488                 return NULL;
3489
3490         spin_lock_init(&rbd_dev->lock);
3491         rbd_dev->flags = 0;
3492         INIT_LIST_HEAD(&rbd_dev->node);
3493         init_rwsem(&rbd_dev->header_rwsem);
3494
3495         rbd_dev->spec = spec;
3496         rbd_dev->rbd_client = rbdc;
3497
3498         /* Initialize the layout used for all rbd requests */
3499
3500         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3501         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3502         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3503         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3504
3505         return rbd_dev;
3506 }
3507
3508 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3509 {
3510         rbd_put_client(rbd_dev->rbd_client);
3511         rbd_spec_put(rbd_dev->spec);
3512         kfree(rbd_dev);
3513 }
3514
3515 /*
3516  * Get the size and object order for an image snapshot, or if
3517  * snap_id is CEPH_NOSNAP, gets this information for the base
3518  * image.
3519  */
3520 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3521                                 u8 *order, u64 *snap_size)
3522 {
3523         __le64 snapid = cpu_to_le64(snap_id);
3524         int ret;
3525         struct {
3526                 u8 order;
3527                 __le64 size;
3528         } __attribute__ ((packed)) size_buf = { 0 };
3529
3530         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3531                                 "rbd", "get_size",
3532                                 &snapid, sizeof (snapid),
3533                                 &size_buf, sizeof (size_buf));
3534         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3535         if (ret < 0)
3536                 return ret;
3537         if (ret < sizeof (size_buf))
3538                 return -ERANGE;
3539
3540         if (order)
3541                 *order = size_buf.order;
3542         *snap_size = le64_to_cpu(size_buf.size);
3543
3544         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3545                 (unsigned long long)snap_id, (unsigned int)*order,
3546                 (unsigned long long)*snap_size);
3547
3548         return 0;
3549 }
3550
3551 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3552 {
3553         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3554                                         &rbd_dev->header.obj_order,
3555                                         &rbd_dev->header.image_size);
3556 }
3557
3558 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3559 {
3560         void *reply_buf;
3561         int ret;
3562         void *p;
3563
3564         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3565         if (!reply_buf)
3566                 return -ENOMEM;
3567
3568         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3569                                 "rbd", "get_object_prefix", NULL, 0,
3570                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3571         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3572         if (ret < 0)
3573                 goto out;
3574
3575         p = reply_buf;
3576         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3577                                                 p + ret, NULL, GFP_NOIO);
3578         ret = 0;
3579
3580         if (IS_ERR(rbd_dev->header.object_prefix)) {
3581                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3582                 rbd_dev->header.object_prefix = NULL;
3583         } else {
3584                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3585         }
3586 out:
3587         kfree(reply_buf);
3588
3589         return ret;
3590 }
3591
3592 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3593                 u64 *snap_features)
3594 {
3595         __le64 snapid = cpu_to_le64(snap_id);
3596         struct {
3597                 __le64 features;
3598                 __le64 incompat;
3599         } __attribute__ ((packed)) features_buf = { 0 };
3600         u64 incompat;
3601         int ret;
3602
3603         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3604                                 "rbd", "get_features",
3605                                 &snapid, sizeof (snapid),
3606                                 &features_buf, sizeof (features_buf));
3607         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3608         if (ret < 0)
3609                 return ret;
3610         if (ret < sizeof (features_buf))
3611                 return -ERANGE;
3612
3613         incompat = le64_to_cpu(features_buf.incompat);
3614         if (incompat & ~RBD_FEATURES_SUPPORTED)
3615                 return -ENXIO;
3616
3617         *snap_features = le64_to_cpu(features_buf.features);
3618
3619         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3620                 (unsigned long long)snap_id,
3621                 (unsigned long long)*snap_features,
3622                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3623
3624         return 0;
3625 }
3626
3627 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3628 {
3629         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3630                                                 &rbd_dev->header.features);
3631 }
3632
3633 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3634 {
3635         struct rbd_spec *parent_spec;
3636         size_t size;
3637         void *reply_buf = NULL;
3638         __le64 snapid;
3639         void *p;
3640         void *end;
3641         char *image_id;
3642         u64 overlap;
3643         int ret;
3644
3645         parent_spec = rbd_spec_alloc();
3646         if (!parent_spec)
3647                 return -ENOMEM;
3648
3649         size = sizeof (__le64) +                                /* pool_id */
3650                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3651                 sizeof (__le64) +                               /* snap_id */
3652                 sizeof (__le64);                                /* overlap */
3653         reply_buf = kmalloc(size, GFP_KERNEL);
3654         if (!reply_buf) {
3655                 ret = -ENOMEM;
3656                 goto out_err;
3657         }
3658
3659         snapid = cpu_to_le64(CEPH_NOSNAP);
3660         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3661                                 "rbd", "get_parent",
3662                                 &snapid, sizeof (snapid),
3663                                 reply_buf, size);
3664         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3665         if (ret < 0)
3666                 goto out_err;
3667
3668         p = reply_buf;
3669         end = reply_buf + ret;
3670         ret = -ERANGE;
3671         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3672         if (parent_spec->pool_id == CEPH_NOPOOL)
3673                 goto out;       /* No parent?  No problem. */
3674
3675         /* The ceph file layout needs to fit pool id in 32 bits */
3676
3677         ret = -EIO;
3678         if (parent_spec->pool_id > (u64)U32_MAX) {
3679                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3680                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3681                 goto out_err;
3682         }
3683
3684         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3685         if (IS_ERR(image_id)) {
3686                 ret = PTR_ERR(image_id);
3687                 goto out_err;
3688         }
3689         parent_spec->image_id = image_id;
3690         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3691         ceph_decode_64_safe(&p, end, overlap, out_err);
3692
3693         rbd_dev->parent_overlap = overlap;
3694         rbd_dev->parent_spec = parent_spec;
3695         parent_spec = NULL;     /* rbd_dev now owns this */
3696 out:
3697         ret = 0;
3698 out_err:
3699         kfree(reply_buf);
3700         rbd_spec_put(parent_spec);
3701
3702         return ret;
3703 }
3704
3705 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3706 {
3707         struct {
3708                 __le64 stripe_unit;
3709                 __le64 stripe_count;
3710         } __attribute__ ((packed)) striping_info_buf = { 0 };
3711         size_t size = sizeof (striping_info_buf);
3712         void *p;
3713         u64 obj_size;
3714         u64 stripe_unit;
3715         u64 stripe_count;
3716         int ret;
3717
3718         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3719                                 "rbd", "get_stripe_unit_count", NULL, 0,
3720                                 (char *)&striping_info_buf, size);
3721         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3722         if (ret < 0)
3723                 return ret;
3724         if (ret < size)
3725                 return -ERANGE;
3726
3727         /*
3728          * We don't actually support the "fancy striping" feature
3729          * (STRIPINGV2) yet, but if the striping sizes are the
3730          * defaults the behavior is the same as before.  So find
3731          * out, and only fail if the image has non-default values.
3732          */
3733         ret = -EINVAL;
3734         obj_size = (u64)1 << rbd_dev->header.obj_order;
3735         p = &striping_info_buf;
3736         stripe_unit = ceph_decode_64(&p);
3737         if (stripe_unit != obj_size) {
3738                 rbd_warn(rbd_dev, "unsupported stripe unit "
3739                                 "(got %llu want %llu)",
3740                                 stripe_unit, obj_size);
3741                 return -EINVAL;
3742         }
3743         stripe_count = ceph_decode_64(&p);
3744         if (stripe_count != 1) {
3745                 rbd_warn(rbd_dev, "unsupported stripe count "
3746                                 "(got %llu want 1)", stripe_count);
3747                 return -EINVAL;
3748         }
3749         rbd_dev->header.stripe_unit = stripe_unit;
3750         rbd_dev->header.stripe_count = stripe_count;
3751
3752         return 0;
3753 }
3754
3755 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3756 {
3757         size_t image_id_size;
3758         char *image_id;
3759         void *p;
3760         void *end;
3761         size_t size;
3762         void *reply_buf = NULL;
3763         size_t len = 0;
3764         char *image_name = NULL;
3765         int ret;
3766
3767         rbd_assert(!rbd_dev->spec->image_name);
3768
3769         len = strlen(rbd_dev->spec->image_id);
3770         image_id_size = sizeof (__le32) + len;
3771         image_id = kmalloc(image_id_size, GFP_KERNEL);
3772         if (!image_id)
3773                 return NULL;
3774
3775         p = image_id;
3776         end = image_id + image_id_size;
3777         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3778
3779         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3780         reply_buf = kmalloc(size, GFP_KERNEL);
3781         if (!reply_buf)
3782                 goto out;
3783
3784         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3785                                 "rbd", "dir_get_name",
3786                                 image_id, image_id_size,
3787                                 reply_buf, size);
3788         if (ret < 0)
3789                 goto out;
3790         p = reply_buf;
3791         end = reply_buf + ret;
3792
3793         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3794         if (IS_ERR(image_name))
3795                 image_name = NULL;
3796         else
3797                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3798 out:
3799         kfree(reply_buf);
3800         kfree(image_id);
3801
3802         return image_name;
3803 }
3804
3805 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3806 {
3807         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3808         const char *snap_name;
3809         u32 which = 0;
3810
3811         /* Skip over names until we find the one we are looking for */
3812
3813         snap_name = rbd_dev->header.snap_names;
3814         while (which < snapc->num_snaps) {
3815                 if (!strcmp(name, snap_name))
3816                         return snapc->snaps[which];
3817                 snap_name += strlen(snap_name) + 1;
3818                 which++;
3819         }
3820         return CEPH_NOSNAP;
3821 }
3822
3823 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3824 {
3825         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3826         u32 which;
3827         bool found = false;
3828         u64 snap_id;
3829
3830         for (which = 0; !found && which < snapc->num_snaps; which++) {
3831                 const char *snap_name;
3832
3833                 snap_id = snapc->snaps[which];
3834                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3835                 if (IS_ERR(snap_name))
3836                         break;
3837                 found = !strcmp(name, snap_name);
3838                 kfree(snap_name);
3839         }
3840         return found ? snap_id : CEPH_NOSNAP;
3841 }
3842
3843 /*
3844  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3845  * no snapshot by that name is found, or if an error occurs.
3846  */
3847 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3848 {
3849         if (rbd_dev->image_format == 1)
3850                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3851
3852         return rbd_v2_snap_id_by_name(rbd_dev, name);
3853 }
3854
3855 /*
3856  * When an rbd image has a parent image, it is identified by the
3857  * pool, image, and snapshot ids (not names).  This function fills
3858  * in the names for those ids.  (It's OK if we can't figure out the
3859  * name for an image id, but the pool and snapshot ids should always
3860  * exist and have names.)  All names in an rbd spec are dynamically
3861  * allocated.
3862  *
3863  * When an image being mapped (not a parent) is probed, we have the
3864  * pool name and pool id, image name and image id, and the snapshot
3865  * name.  The only thing we're missing is the snapshot id.
3866  */
3867 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3868 {
3869         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3870         struct rbd_spec *spec = rbd_dev->spec;
3871         const char *pool_name;
3872         const char *image_name;
3873         const char *snap_name;
3874         int ret;
3875
3876         /*
3877          * An image being mapped will have the pool name (etc.), but
3878          * we need to look up the snapshot id.
3879          */
3880         if (spec->pool_name) {
3881                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3882                         u64 snap_id;
3883
3884                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3885                         if (snap_id == CEPH_NOSNAP)
3886                                 return -ENOENT;
3887                         spec->snap_id = snap_id;
3888                 } else {
3889                         spec->snap_id = CEPH_NOSNAP;
3890                 }
3891
3892                 return 0;
3893         }
3894
3895         /* Get the pool name; we have to make our own copy of this */
3896
3897         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3898         if (!pool_name) {
3899                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3900                 return -EIO;
3901         }
3902         pool_name = kstrdup(pool_name, GFP_KERNEL);
3903         if (!pool_name)
3904                 return -ENOMEM;
3905
3906         /* Fetch the image name; tolerate failure here */
3907
3908         image_name = rbd_dev_image_name(rbd_dev);
3909         if (!image_name)
3910                 rbd_warn(rbd_dev, "unable to get image name");
3911
3912         /* Look up the snapshot name, and make a copy */
3913
3914         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3915         if (!snap_name) {
3916                 ret = -ENOMEM;
3917                 goto out_err;
3918         }
3919
3920         spec->pool_name = pool_name;
3921         spec->image_name = image_name;
3922         spec->snap_name = snap_name;
3923
3924         return 0;
3925 out_err:
3926         kfree(image_name);
3927         kfree(pool_name);
3928
3929         return ret;
3930 }
3931
3932 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3933 {
3934         size_t size;
3935         int ret;
3936         void *reply_buf;
3937         void *p;
3938         void *end;
3939         u64 seq;
3940         u32 snap_count;
3941         struct ceph_snap_context *snapc;
3942         u32 i;
3943
3944         /*
3945          * We'll need room for the seq value (maximum snapshot id),
3946          * snapshot count, and array of that many snapshot ids.
3947          * For now we have a fixed upper limit on the number we're
3948          * prepared to receive.
3949          */
3950         size = sizeof (__le64) + sizeof (__le32) +
3951                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3952         reply_buf = kzalloc(size, GFP_KERNEL);
3953         if (!reply_buf)
3954                 return -ENOMEM;
3955
3956         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3957                                 "rbd", "get_snapcontext", NULL, 0,
3958                                 reply_buf, size);
3959         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3960         if (ret < 0)
3961                 goto out;
3962
3963         p = reply_buf;
3964         end = reply_buf + ret;
3965         ret = -ERANGE;
3966         ceph_decode_64_safe(&p, end, seq, out);
3967         ceph_decode_32_safe(&p, end, snap_count, out);
3968
3969         /*
3970          * Make sure the reported number of snapshot ids wouldn't go
3971          * beyond the end of our buffer.  But before checking that,
3972          * make sure the computed size of the snapshot context we
3973          * allocate is representable in a size_t.
3974          */
3975         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3976                                  / sizeof (u64)) {
3977                 ret = -EINVAL;
3978                 goto out;
3979         }
3980         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3981                 goto out;
3982         ret = 0;
3983
3984         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3985         if (!snapc) {
3986                 ret = -ENOMEM;
3987                 goto out;
3988         }
3989         snapc->seq = seq;
3990         for (i = 0; i < snap_count; i++)
3991                 snapc->snaps[i] = ceph_decode_64(&p);
3992
3993         rbd_dev->header.snapc = snapc;
3994
3995         dout("  snap context seq = %llu, snap_count = %u\n",
3996                 (unsigned long long)seq, (unsigned int)snap_count);
3997 out:
3998         kfree(reply_buf);
3999
4000         return ret;
4001 }
4002
4003 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4004                                         u64 snap_id)
4005 {
4006         size_t size;
4007         void *reply_buf;
4008         __le64 snapid;
4009         int ret;
4010         void *p;
4011         void *end;
4012         char *snap_name;
4013
4014         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4015         reply_buf = kmalloc(size, GFP_KERNEL);
4016         if (!reply_buf)
4017                 return ERR_PTR(-ENOMEM);
4018
4019         snapid = cpu_to_le64(snap_id);
4020         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4021                                 "rbd", "get_snapshot_name",
4022                                 &snapid, sizeof (snapid),
4023                                 reply_buf, size);
4024         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4025         if (ret < 0) {
4026                 snap_name = ERR_PTR(ret);
4027                 goto out;
4028         }
4029
4030         p = reply_buf;
4031         end = reply_buf + ret;
4032         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4033         if (IS_ERR(snap_name))
4034                 goto out;
4035
4036         dout("  snap_id 0x%016llx snap_name = %s\n",
4037                 (unsigned long long)snap_id, snap_name);
4038 out:
4039         kfree(reply_buf);
4040
4041         return snap_name;
4042 }
4043
4044 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4045 {
4046         int ret;
4047
4048         down_write(&rbd_dev->header_rwsem);
4049
4050         ret = rbd_dev_v2_image_size(rbd_dev);
4051         if (ret)
4052                 goto out;
4053         rbd_update_mapping_size(rbd_dev);
4054
4055         ret = rbd_dev_v2_snap_context(rbd_dev);
4056         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4057         if (ret)
4058                 goto out;
4059 out:
4060         up_write(&rbd_dev->header_rwsem);
4061
4062         return ret;
4063 }
4064
4065 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4066 {
4067         struct device *dev;
4068         int ret;
4069
4070         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4071
4072         dev = &rbd_dev->dev;
4073         dev->bus = &rbd_bus_type;
4074         dev->type = &rbd_device_type;
4075         dev->parent = &rbd_root_dev;
4076         dev->release = rbd_dev_device_release;
4077         dev_set_name(dev, "%d", rbd_dev->dev_id);
4078         ret = device_register(dev);
4079
4080         mutex_unlock(&ctl_mutex);
4081
4082         return ret;
4083 }
4084
4085 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4086 {
4087         device_unregister(&rbd_dev->dev);
4088 }
4089
4090 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4091
4092 /*
4093  * Get a unique rbd identifier for the given new rbd_dev, and add
4094  * the rbd_dev to the global list.  The minimum rbd id is 1.
4095  */
4096 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4097 {
4098         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4099
4100         spin_lock(&rbd_dev_list_lock);
4101         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4102         spin_unlock(&rbd_dev_list_lock);
4103         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4104                 (unsigned long long) rbd_dev->dev_id);
4105 }
4106
4107 /*
4108  * Remove an rbd_dev from the global list, and record that its
4109  * identifier is no longer in use.
4110  */
4111 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4112 {
4113         struct list_head *tmp;
4114         int rbd_id = rbd_dev->dev_id;
4115         int max_id;
4116
4117         rbd_assert(rbd_id > 0);
4118
4119         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4120                 (unsigned long long) rbd_dev->dev_id);
4121         spin_lock(&rbd_dev_list_lock);
4122         list_del_init(&rbd_dev->node);
4123
4124         /*
4125          * If the id being "put" is not the current maximum, there
4126          * is nothing special we need to do.
4127          */
4128         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4129                 spin_unlock(&rbd_dev_list_lock);
4130                 return;
4131         }
4132
4133         /*
4134          * We need to update the current maximum id.  Search the
4135          * list to find out what it is.  We're more likely to find
4136          * the maximum at the end, so search the list backward.
4137          */
4138         max_id = 0;
4139         list_for_each_prev(tmp, &rbd_dev_list) {
4140                 struct rbd_device *rbd_dev;
4141
4142                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4143                 if (rbd_dev->dev_id > max_id)
4144                         max_id = rbd_dev->dev_id;
4145         }
4146         spin_unlock(&rbd_dev_list_lock);
4147
4148         /*
4149          * The max id could have been updated by rbd_dev_id_get(), in
4150          * which case it now accurately reflects the new maximum.
4151          * Be careful not to overwrite the maximum value in that
4152          * case.
4153          */
4154         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4155         dout("  max dev id has been reset\n");
4156 }
4157
4158 /*
4159  * Skips over white space at *buf, and updates *buf to point to the
4160  * first found non-space character (if any). Returns the length of
4161  * the token (string of non-white space characters) found.  Note
4162  * that *buf must be terminated with '\0'.
4163  */
4164 static inline size_t next_token(const char **buf)
4165 {
4166         /*
4167         * These are the characters that produce nonzero for
4168         * isspace() in the "C" and "POSIX" locales.
4169         */
4170         const char *spaces = " \f\n\r\t\v";
4171
4172         *buf += strspn(*buf, spaces);   /* Find start of token */
4173
4174         return strcspn(*buf, spaces);   /* Return token length */
4175 }
4176
4177 /*
4178  * Finds the next token in *buf, and if the provided token buffer is
4179  * big enough, copies the found token into it.  The result, if
4180  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4181  * must be terminated with '\0' on entry.
4182  *
4183  * Returns the length of the token found (not including the '\0').
4184  * Return value will be 0 if no token is found, and it will be >=
4185  * token_size if the token would not fit.
4186  *
4187  * The *buf pointer will be updated to point beyond the end of the
4188  * found token.  Note that this occurs even if the token buffer is
4189  * too small to hold it.
4190  */
4191 static inline size_t copy_token(const char **buf,
4192                                 char *token,
4193                                 size_t token_size)
4194 {
4195         size_t len;
4196
4197         len = next_token(buf);
4198         if (len < token_size) {
4199                 memcpy(token, *buf, len);
4200                 *(token + len) = '\0';
4201         }
4202         *buf += len;
4203
4204         return len;
4205 }
4206
4207 /*
4208  * Finds the next token in *buf, dynamically allocates a buffer big
4209  * enough to hold a copy of it, and copies the token into the new
4210  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4211  * that a duplicate buffer is created even for a zero-length token.
4212  *
4213  * Returns a pointer to the newly-allocated duplicate, or a null
4214  * pointer if memory for the duplicate was not available.  If
4215  * the lenp argument is a non-null pointer, the length of the token
4216  * (not including the '\0') is returned in *lenp.
4217  *
4218  * If successful, the *buf pointer will be updated to point beyond
4219  * the end of the found token.
4220  *
4221  * Note: uses GFP_KERNEL for allocation.
4222  */
4223 static inline char *dup_token(const char **buf, size_t *lenp)
4224 {
4225         char *dup;
4226         size_t len;
4227
4228         len = next_token(buf);
4229         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4230         if (!dup)
4231                 return NULL;
4232         *(dup + len) = '\0';
4233         *buf += len;
4234
4235         if (lenp)
4236                 *lenp = len;
4237
4238         return dup;
4239 }
4240
4241 /*
4242  * Parse the options provided for an "rbd add" (i.e., rbd image
4243  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4244  * and the data written is passed here via a NUL-terminated buffer.
4245  * Returns 0 if successful or an error code otherwise.
4246  *
4247  * The information extracted from these options is recorded in
4248  * the other parameters which return dynamically-allocated
4249  * structures:
4250  *  ceph_opts
4251  *      The address of a pointer that will refer to a ceph options
4252  *      structure.  Caller must release the returned pointer using
4253  *      ceph_destroy_options() when it is no longer needed.
4254  *  rbd_opts
4255  *      Address of an rbd options pointer.  Fully initialized by
4256  *      this function; caller must release with kfree().
4257  *  spec
4258  *      Address of an rbd image specification pointer.  Fully
4259  *      initialized by this function based on parsed options.
4260  *      Caller must release with rbd_spec_put().
4261  *
4262  * The options passed take this form:
4263  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4264  * where:
4265  *  <mon_addrs>
4266  *      A comma-separated list of one or more monitor addresses.
4267  *      A monitor address is an ip address, optionally followed
4268  *      by a port number (separated by a colon).
4269  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4270  *  <options>
4271  *      A comma-separated list of ceph and/or rbd options.
4272  *  <pool_name>
4273  *      The name of the rados pool containing the rbd image.
4274  *  <image_name>
4275  *      The name of the image in that pool to map.
4276  *  <snap_id>
4277  *      An optional snapshot id.  If provided, the mapping will
4278  *      present data from the image at the time that snapshot was
4279  *      created.  The image head is used if no snapshot id is
4280  *      provided.  Snapshot mappings are always read-only.
4281  */
4282 static int rbd_add_parse_args(const char *buf,
4283                                 struct ceph_options **ceph_opts,
4284                                 struct rbd_options **opts,
4285                                 struct rbd_spec **rbd_spec)
4286 {
4287         size_t len;
4288         char *options;
4289         const char *mon_addrs;
4290         char *snap_name;
4291         size_t mon_addrs_size;
4292         struct rbd_spec *spec = NULL;
4293         struct rbd_options *rbd_opts = NULL;
4294         struct ceph_options *copts;
4295         int ret;
4296
4297         /* The first four tokens are required */
4298
4299         len = next_token(&buf);
4300         if (!len) {
4301                 rbd_warn(NULL, "no monitor address(es) provided");
4302                 return -EINVAL;
4303         }
4304         mon_addrs = buf;
4305         mon_addrs_size = len + 1;
4306         buf += len;
4307
4308         ret = -EINVAL;
4309         options = dup_token(&buf, NULL);
4310         if (!options)
4311                 return -ENOMEM;
4312         if (!*options) {
4313                 rbd_warn(NULL, "no options provided");
4314                 goto out_err;
4315         }
4316
4317         spec = rbd_spec_alloc();
4318         if (!spec)
4319                 goto out_mem;
4320
4321         spec->pool_name = dup_token(&buf, NULL);
4322         if (!spec->pool_name)
4323                 goto out_mem;
4324         if (!*spec->pool_name) {
4325                 rbd_warn(NULL, "no pool name provided");
4326                 goto out_err;
4327         }
4328
4329         spec->image_name = dup_token(&buf, NULL);
4330         if (!spec->image_name)
4331                 goto out_mem;
4332         if (!*spec->image_name) {
4333                 rbd_warn(NULL, "no image name provided");
4334                 goto out_err;
4335         }
4336
4337         /*
4338          * Snapshot name is optional; default is to use "-"
4339          * (indicating the head/no snapshot).
4340          */
4341         len = next_token(&buf);
4342         if (!len) {
4343                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4344                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4345         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4346                 ret = -ENAMETOOLONG;
4347                 goto out_err;
4348         }
4349         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4350         if (!snap_name)
4351                 goto out_mem;
4352         *(snap_name + len) = '\0';
4353         spec->snap_name = snap_name;
4354
4355         /* Initialize all rbd options to the defaults */
4356
4357         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4358         if (!rbd_opts)
4359                 goto out_mem;
4360
4361         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4362
4363         copts = ceph_parse_options(options, mon_addrs,
4364                                         mon_addrs + mon_addrs_size - 1,
4365                                         parse_rbd_opts_token, rbd_opts);
4366         if (IS_ERR(copts)) {
4367                 ret = PTR_ERR(copts);
4368                 goto out_err;
4369         }
4370         kfree(options);
4371
4372         *ceph_opts = copts;
4373         *opts = rbd_opts;
4374         *rbd_spec = spec;
4375
4376         return 0;
4377 out_mem:
4378         ret = -ENOMEM;
4379 out_err:
4380         kfree(rbd_opts);
4381         rbd_spec_put(spec);
4382         kfree(options);
4383
4384         return ret;
4385 }
4386
4387 /*
4388  * An rbd format 2 image has a unique identifier, distinct from the
4389  * name given to it by the user.  Internally, that identifier is
4390  * what's used to specify the names of objects related to the image.
4391  *
4392  * A special "rbd id" object is used to map an rbd image name to its
4393  * id.  If that object doesn't exist, then there is no v2 rbd image
4394  * with the supplied name.
4395  *
4396  * This function will record the given rbd_dev's image_id field if
4397  * it can be determined, and in that case will return 0.  If any
4398  * errors occur a negative errno will be returned and the rbd_dev's
4399  * image_id field will be unchanged (and should be NULL).
4400  */
4401 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4402 {
4403         int ret;
4404         size_t size;
4405         char *object_name;
4406         void *response;
4407         char *image_id;
4408
4409         /*
4410          * When probing a parent image, the image id is already
4411          * known (and the image name likely is not).  There's no
4412          * need to fetch the image id again in this case.  We
4413          * do still need to set the image format though.
4414          */
4415         if (rbd_dev->spec->image_id) {
4416                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4417
4418                 return 0;
4419         }
4420
4421         /*
4422          * First, see if the format 2 image id file exists, and if
4423          * so, get the image's persistent id from it.
4424          */
4425         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4426         object_name = kmalloc(size, GFP_NOIO);
4427         if (!object_name)
4428                 return -ENOMEM;
4429         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4430         dout("rbd id object name is %s\n", object_name);
4431
4432         /* Response will be an encoded string, which includes a length */
4433
4434         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4435         response = kzalloc(size, GFP_NOIO);
4436         if (!response) {
4437                 ret = -ENOMEM;
4438                 goto out;
4439         }
4440
4441         /* If it doesn't exist we'll assume it's a format 1 image */
4442
4443         ret = rbd_obj_method_sync(rbd_dev, object_name,
4444                                 "rbd", "get_id", NULL, 0,
4445                                 response, RBD_IMAGE_ID_LEN_MAX);
4446         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4447         if (ret == -ENOENT) {
4448                 image_id = kstrdup("", GFP_KERNEL);
4449                 ret = image_id ? 0 : -ENOMEM;
4450                 if (!ret)
4451                         rbd_dev->image_format = 1;
4452         } else if (ret > sizeof (__le32)) {
4453                 void *p = response;
4454
4455                 image_id = ceph_extract_encoded_string(&p, p + ret,
4456                                                 NULL, GFP_NOIO);
4457                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4458                 if (!ret)
4459                         rbd_dev->image_format = 2;
4460         } else {
4461                 ret = -EINVAL;
4462         }
4463
4464         if (!ret) {
4465                 rbd_dev->spec->image_id = image_id;
4466                 dout("image_id is %s\n", image_id);
4467         }
4468 out:
4469         kfree(response);
4470         kfree(object_name);
4471
4472         return ret;
4473 }
4474
4475 /* Undo whatever state changes are made by v1 or v2 image probe */
4476
4477 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4478 {
4479         struct rbd_image_header *header;
4480
4481         rbd_dev_remove_parent(rbd_dev);
4482         rbd_spec_put(rbd_dev->parent_spec);
4483         rbd_dev->parent_spec = NULL;
4484         rbd_dev->parent_overlap = 0;
4485
4486         /* Free dynamic fields from the header, then zero it out */
4487
4488         header = &rbd_dev->header;
4489         ceph_put_snap_context(header->snapc);
4490         kfree(header->snap_sizes);
4491         kfree(header->snap_names);
4492         kfree(header->object_prefix);
4493         memset(header, 0, sizeof (*header));
4494 }
4495
4496 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4497 {
4498         int ret;
4499
4500         /* Populate rbd image metadata */
4501
4502         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4503         if (ret < 0)
4504                 goto out_err;
4505
4506         /* Version 1 images have no parent (no layering) */
4507
4508         rbd_dev->parent_spec = NULL;
4509         rbd_dev->parent_overlap = 0;
4510
4511         dout("discovered version 1 image, header name is %s\n",
4512                 rbd_dev->header_name);
4513
4514         return 0;
4515
4516 out_err:
4517         kfree(rbd_dev->header_name);
4518         rbd_dev->header_name = NULL;
4519         kfree(rbd_dev->spec->image_id);
4520         rbd_dev->spec->image_id = NULL;
4521
4522         return ret;
4523 }
4524
4525 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4526 {
4527         int ret;
4528
4529         ret = rbd_dev_v2_image_size(rbd_dev);
4530         if (ret)
4531                 goto out_err;
4532
4533         /* Get the object prefix (a.k.a. block_name) for the image */
4534
4535         ret = rbd_dev_v2_object_prefix(rbd_dev);
4536         if (ret)
4537                 goto out_err;
4538
4539         /* Get the and check features for the image */
4540
4541         ret = rbd_dev_v2_features(rbd_dev);
4542         if (ret)
4543                 goto out_err;
4544
4545         /* If the image supports layering, get the parent info */
4546
4547         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4548                 ret = rbd_dev_v2_parent_info(rbd_dev);
4549                 if (ret)
4550                         goto out_err;
4551
4552                 /*
4553                  * Don't print a warning for parent images.  We can
4554                  * tell this point because we won't know its pool
4555                  * name yet (just its pool id).
4556                  */
4557                 if (rbd_dev->spec->pool_name)
4558                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4559                                         "is EXPERIMENTAL!");
4560         }
4561
4562         /* If the image supports fancy striping, get its parameters */
4563
4564         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4565                 ret = rbd_dev_v2_striping_info(rbd_dev);
4566                 if (ret < 0)
4567                         goto out_err;
4568         }
4569
4570         /* crypto and compression type aren't (yet) supported for v2 images */
4571
4572         rbd_dev->header.crypt_type = 0;
4573         rbd_dev->header.comp_type = 0;
4574
4575         /* Get the snapshot context, plus the header version */
4576
4577         ret = rbd_dev_v2_snap_context(rbd_dev);
4578         if (ret)
4579                 goto out_err;
4580
4581         dout("discovered version 2 image, header name is %s\n",
4582                 rbd_dev->header_name);
4583
4584         return 0;
4585 out_err:
4586         rbd_dev->parent_overlap = 0;
4587         rbd_spec_put(rbd_dev->parent_spec);
4588         rbd_dev->parent_spec = NULL;
4589         kfree(rbd_dev->header_name);
4590         rbd_dev->header_name = NULL;
4591         kfree(rbd_dev->header.object_prefix);
4592         rbd_dev->header.object_prefix = NULL;
4593
4594         return ret;
4595 }
4596
4597 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4598 {
4599         struct rbd_device *parent = NULL;
4600         struct rbd_spec *parent_spec;
4601         struct rbd_client *rbdc;
4602         int ret;
4603
4604         if (!rbd_dev->parent_spec)
4605                 return 0;
4606         /*
4607          * We need to pass a reference to the client and the parent
4608          * spec when creating the parent rbd_dev.  Images related by
4609          * parent/child relationships always share both.
4610          */
4611         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4612         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4613
4614         ret = -ENOMEM;
4615         parent = rbd_dev_create(rbdc, parent_spec);
4616         if (!parent)
4617                 goto out_err;
4618
4619         ret = rbd_dev_image_probe(parent);
4620         if (ret < 0)
4621                 goto out_err;
4622         rbd_dev->parent = parent;
4623
4624         return 0;
4625 out_err:
4626         if (parent) {
4627                 rbd_spec_put(rbd_dev->parent_spec);
4628                 kfree(rbd_dev->header_name);
4629                 rbd_dev_destroy(parent);
4630         } else {
4631                 rbd_put_client(rbdc);
4632                 rbd_spec_put(parent_spec);
4633         }
4634
4635         return ret;
4636 }
4637
4638 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4639 {
4640         int ret;
4641
4642         ret = rbd_dev_mapping_set(rbd_dev);
4643         if (ret)
4644                 return ret;
4645
4646         /* generate unique id: find highest unique id, add one */
4647         rbd_dev_id_get(rbd_dev);
4648
4649         /* Fill in the device name, now that we have its id. */
4650         BUILD_BUG_ON(DEV_NAME_LEN
4651                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4652         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4653
4654         /* Get our block major device number. */
4655
4656         ret = register_blkdev(0, rbd_dev->name);
4657         if (ret < 0)
4658                 goto err_out_id;
4659         rbd_dev->major = ret;
4660
4661         /* Set up the blkdev mapping. */
4662
4663         ret = rbd_init_disk(rbd_dev);
4664         if (ret)
4665                 goto err_out_blkdev;
4666
4667         ret = rbd_bus_add_dev(rbd_dev);
4668         if (ret)
4669                 goto err_out_disk;
4670
4671         /* Everything's ready.  Announce the disk to the world. */
4672
4673         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4674         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4675         add_disk(rbd_dev->disk);
4676
4677         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4678                 (unsigned long long) rbd_dev->mapping.size);
4679
4680         return ret;
4681
4682 err_out_disk:
4683         rbd_free_disk(rbd_dev);
4684 err_out_blkdev:
4685         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4686 err_out_id:
4687         rbd_dev_id_put(rbd_dev);
4688         rbd_dev_mapping_clear(rbd_dev);
4689
4690         return ret;
4691 }
4692
4693 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4694 {
4695         struct rbd_spec *spec = rbd_dev->spec;
4696         size_t size;
4697
4698         /* Record the header object name for this rbd image. */
4699
4700         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4701
4702         if (rbd_dev->image_format == 1)
4703                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4704         else
4705                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4706
4707         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4708         if (!rbd_dev->header_name)
4709                 return -ENOMEM;
4710
4711         if (rbd_dev->image_format == 1)
4712                 sprintf(rbd_dev->header_name, "%s%s",
4713                         spec->image_name, RBD_SUFFIX);
4714         else
4715                 sprintf(rbd_dev->header_name, "%s%s",
4716                         RBD_HEADER_PREFIX, spec->image_id);
4717         return 0;
4718 }
4719
4720 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4721 {
4722         int ret;
4723
4724         rbd_dev_unprobe(rbd_dev);
4725         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4726         if (ret)
4727                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4728         kfree(rbd_dev->header_name);
4729         rbd_dev->header_name = NULL;
4730         rbd_dev->image_format = 0;
4731         kfree(rbd_dev->spec->image_id);
4732         rbd_dev->spec->image_id = NULL;
4733
4734         rbd_dev_destroy(rbd_dev);
4735 }
4736
4737 /*
4738  * Probe for the existence of the header object for the given rbd
4739  * device.  For format 2 images this includes determining the image
4740  * id.
4741  */
4742 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4743 {
4744         int ret;
4745         int tmp;
4746
4747         /*
4748          * Get the id from the image id object.  If it's not a
4749          * format 2 image, we'll get ENOENT back, and we'll assume
4750          * it's a format 1 image.
4751          */
4752         ret = rbd_dev_image_id(rbd_dev);
4753         if (ret)
4754                 return ret;
4755         rbd_assert(rbd_dev->spec->image_id);
4756         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4757
4758         ret = rbd_dev_header_name(rbd_dev);
4759         if (ret)
4760                 goto err_out_format;
4761
4762         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4763         if (ret)
4764                 goto out_header_name;
4765
4766         if (rbd_dev->image_format == 1)
4767                 ret = rbd_dev_v1_probe(rbd_dev);
4768         else
4769                 ret = rbd_dev_v2_probe(rbd_dev);
4770         if (ret)
4771                 goto err_out_watch;
4772
4773         ret = rbd_dev_spec_update(rbd_dev);
4774         if (ret)
4775                 goto err_out_probe;
4776
4777         ret = rbd_dev_probe_parent(rbd_dev);
4778         if (!ret)
4779                 return 0;
4780
4781 err_out_probe:
4782         rbd_dev_unprobe(rbd_dev);
4783 err_out_watch:
4784         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4785         if (tmp)
4786                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4787 out_header_name:
4788         kfree(rbd_dev->header_name);
4789         rbd_dev->header_name = NULL;
4790 err_out_format:
4791         rbd_dev->image_format = 0;
4792         kfree(rbd_dev->spec->image_id);
4793         rbd_dev->spec->image_id = NULL;
4794
4795         dout("probe failed, returning %d\n", ret);
4796
4797         return ret;
4798 }
4799
4800 static ssize_t rbd_add(struct bus_type *bus,
4801                        const char *buf,
4802                        size_t count)
4803 {
4804         struct rbd_device *rbd_dev = NULL;
4805         struct ceph_options *ceph_opts = NULL;
4806         struct rbd_options *rbd_opts = NULL;
4807         struct rbd_spec *spec = NULL;
4808         struct rbd_client *rbdc;
4809         struct ceph_osd_client *osdc;
4810         int rc = -ENOMEM;
4811
4812         if (!try_module_get(THIS_MODULE))
4813                 return -ENODEV;
4814
4815         /* parse add command */
4816         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4817         if (rc < 0)
4818                 goto err_out_module;
4819
4820         rbdc = rbd_get_client(ceph_opts);
4821         if (IS_ERR(rbdc)) {
4822                 rc = PTR_ERR(rbdc);
4823                 goto err_out_args;
4824         }
4825         ceph_opts = NULL;       /* rbd_dev client now owns this */
4826
4827         /* pick the pool */
4828         osdc = &rbdc->client->osdc;
4829         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4830         if (rc < 0)
4831                 goto err_out_client;
4832         spec->pool_id = (u64)rc;
4833
4834         /* The ceph file layout needs to fit pool id in 32 bits */
4835
4836         if (spec->pool_id > (u64)U32_MAX) {
4837                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4838                                 (unsigned long long)spec->pool_id, U32_MAX);
4839                 rc = -EIO;
4840                 goto err_out_client;
4841         }
4842
4843         rbd_dev = rbd_dev_create(rbdc, spec);
4844         if (!rbd_dev)
4845                 goto err_out_client;
4846         rbdc = NULL;            /* rbd_dev now owns this */
4847         spec = NULL;            /* rbd_dev now owns this */
4848
4849         rbd_dev->mapping.read_only = rbd_opts->read_only;
4850         kfree(rbd_opts);
4851         rbd_opts = NULL;        /* done with this */
4852
4853         rc = rbd_dev_image_probe(rbd_dev);
4854         if (rc < 0)
4855                 goto err_out_rbd_dev;
4856
4857         rc = rbd_dev_device_setup(rbd_dev);
4858         if (!rc)
4859                 return count;
4860
4861         rbd_dev_image_release(rbd_dev);
4862 err_out_rbd_dev:
4863         rbd_dev_destroy(rbd_dev);
4864 err_out_client:
4865         rbd_put_client(rbdc);
4866 err_out_args:
4867         if (ceph_opts)
4868                 ceph_destroy_options(ceph_opts);
4869         kfree(rbd_opts);
4870         rbd_spec_put(spec);
4871 err_out_module:
4872         module_put(THIS_MODULE);
4873
4874         dout("Error adding device %s\n", buf);
4875
4876         return (ssize_t)rc;
4877 }
4878
4879 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4880 {
4881         struct list_head *tmp;
4882         struct rbd_device *rbd_dev;
4883
4884         spin_lock(&rbd_dev_list_lock);
4885         list_for_each(tmp, &rbd_dev_list) {
4886                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4887                 if (rbd_dev->dev_id == dev_id) {
4888                         spin_unlock(&rbd_dev_list_lock);
4889                         return rbd_dev;
4890                 }
4891         }
4892         spin_unlock(&rbd_dev_list_lock);
4893         return NULL;
4894 }
4895
4896 static void rbd_dev_device_release(struct device *dev)
4897 {
4898         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4899
4900         rbd_free_disk(rbd_dev);
4901         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4902         rbd_dev_clear_mapping(rbd_dev);
4903         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4904         rbd_dev->major = 0;
4905         rbd_dev_id_put(rbd_dev);
4906         rbd_dev_mapping_clear(rbd_dev);
4907 }
4908
4909 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4910 {
4911         while (rbd_dev->parent) {
4912                 struct rbd_device *first = rbd_dev;
4913                 struct rbd_device *second = first->parent;
4914                 struct rbd_device *third;
4915
4916                 /*
4917                  * Follow to the parent with no grandparent and
4918                  * remove it.
4919                  */
4920                 while (second && (third = second->parent)) {
4921                         first = second;
4922                         second = third;
4923                 }
4924                 rbd_assert(second);
4925                 rbd_dev_image_release(second);
4926                 first->parent = NULL;
4927                 first->parent_overlap = 0;
4928
4929                 rbd_assert(first->parent_spec);
4930                 rbd_spec_put(first->parent_spec);
4931                 first->parent_spec = NULL;
4932         }
4933 }
4934
4935 static ssize_t rbd_remove(struct bus_type *bus,
4936                           const char *buf,
4937                           size_t count)
4938 {
4939         struct rbd_device *rbd_dev = NULL;
4940         int target_id;
4941         unsigned long ul;
4942         int ret;
4943
4944         ret = strict_strtoul(buf, 10, &ul);
4945         if (ret)
4946                 return ret;
4947
4948         /* convert to int; abort if we lost anything in the conversion */
4949         target_id = (int) ul;
4950         if (target_id != ul)
4951                 return -EINVAL;
4952
4953         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4954
4955         rbd_dev = __rbd_get_dev(target_id);
4956         if (!rbd_dev) {
4957                 ret = -ENOENT;
4958                 goto done;
4959         }
4960
4961         spin_lock_irq(&rbd_dev->lock);
4962         if (rbd_dev->open_count)
4963                 ret = -EBUSY;
4964         else
4965                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4966         spin_unlock_irq(&rbd_dev->lock);
4967         if (ret < 0)
4968                 goto done;
4969         ret = count;
4970         rbd_bus_del_dev(rbd_dev);
4971         rbd_dev_image_release(rbd_dev);
4972         module_put(THIS_MODULE);
4973 done:
4974         mutex_unlock(&ctl_mutex);
4975
4976         return ret;
4977 }
4978
4979 /*
4980  * create control files in sysfs
4981  * /sys/bus/rbd/...
4982  */
4983 static int rbd_sysfs_init(void)
4984 {
4985         int ret;
4986
4987         ret = device_register(&rbd_root_dev);
4988         if (ret < 0)
4989                 return ret;
4990
4991         ret = bus_register(&rbd_bus_type);
4992         if (ret < 0)
4993                 device_unregister(&rbd_root_dev);
4994
4995         return ret;
4996 }
4997
4998 static void rbd_sysfs_cleanup(void)
4999 {
5000         bus_unregister(&rbd_bus_type);
5001         device_unregister(&rbd_root_dev);
5002 }
5003
5004 static int rbd_slab_init(void)
5005 {
5006         rbd_assert(!rbd_img_request_cache);
5007         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5008                                         sizeof (struct rbd_img_request),
5009                                         __alignof__(struct rbd_img_request),
5010                                         0, NULL);
5011         if (rbd_img_request_cache)
5012                 return 0;
5013
5014         return -ENOMEM;
5015 }
5016
5017 static void rbd_slab_exit(void)
5018 {
5019         rbd_assert(rbd_img_request_cache);
5020         kmem_cache_destroy(rbd_img_request_cache);
5021         rbd_img_request_cache = NULL;
5022 }
5023
5024 static int __init rbd_init(void)
5025 {
5026         int rc;
5027
5028         if (!libceph_compatible(NULL)) {
5029                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5030
5031                 return -EINVAL;
5032         }
5033         rc = rbd_slab_init();
5034         if (rc)
5035                 return rc;
5036         rc = rbd_sysfs_init();
5037         if (rc)
5038                 rbd_slab_exit();
5039         else
5040                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5041
5042         return rc;
5043 }
5044
5045 static void __exit rbd_exit(void)
5046 {
5047         rbd_sysfs_cleanup();
5048         rbd_slab_exit();
5049 }
5050
5051 module_init(rbd_init);
5052 module_exit(rbd_exit);
5053
5054 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5055 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5056 MODULE_DESCRIPTION("rados block device");
5057
5058 /* following authorship retained from original osdblk.c */
5059 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5060
5061 MODULE_LICENSE("GPL");