]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: allocate image requests with a slab allocator
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 stripe_unit;
117         u64 stripe_count;
118 };
119
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146         u64             pool_id;
147         const char      *pool_name;
148
149         const char      *image_id;
150         const char      *image_name;
151
152         u64             snap_id;
153         const char      *snap_name;
154
155         struct kref     kref;
156 };
157
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162         struct ceph_client      *client;
163         struct kref             kref;
164         struct list_head        node;
165 };
166
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
171
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
175 enum obj_request_type {
176         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178
179 enum obj_req_flags {
180         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
181         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
182         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
183         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
184 };
185
186 struct rbd_obj_request {
187         const char              *object_name;
188         u64                     offset;         /* object start byte */
189         u64                     length;         /* bytes from offset */
190         unsigned long           flags;
191
192         /*
193          * An object request associated with an image will have its
194          * img_data flag set; a standalone object request will not.
195          *
196          * A standalone object request will have which == BAD_WHICH
197          * and a null obj_request pointer.
198          *
199          * An object request initiated in support of a layered image
200          * object (to check for its existence before a write) will
201          * have which == BAD_WHICH and a non-null obj_request pointer.
202          *
203          * Finally, an object request for rbd image data will have
204          * which != BAD_WHICH, and will have a non-null img_request
205          * pointer.  The value of which will be in the range
206          * 0..(img_request->obj_request_count-1).
207          */
208         union {
209                 struct rbd_obj_request  *obj_request;   /* STAT op */
210                 struct {
211                         struct rbd_img_request  *img_request;
212                         u64                     img_offset;
213                         /* links for img_request->obj_requests list */
214                         struct list_head        links;
215                 };
216         };
217         u32                     which;          /* posn image request list */
218
219         enum obj_request_type   type;
220         union {
221                 struct bio      *bio_list;
222                 struct {
223                         struct page     **pages;
224                         u32             page_count;
225                 };
226         };
227         struct page             **copyup_pages;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         spinlock_t              completion_lock;/* protects next_completion */
261         u32                     next_completion;
262         rbd_img_callback_t      callback;
263         u64                     xferred;/* aggregate bytes transferred */
264         int                     result; /* first nonzero obj_request result */
265
266         u32                     obj_request_count;
267         struct list_head        obj_requests;   /* rbd_obj_request structs */
268
269         struct kref             kref;
270 };
271
272 #define for_each_obj_request(ireq, oreq) \
273         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278
279 struct rbd_mapping {
280         u64                     size;
281         u64                     features;
282         bool                    read_only;
283 };
284
285 /*
286  * a single device
287  */
288 struct rbd_device {
289         int                     dev_id;         /* blkdev unique id */
290
291         int                     major;          /* blkdev assigned major */
292         struct gendisk          *disk;          /* blkdev's gendisk and rq */
293
294         u32                     image_format;   /* Either 1 or 2 */
295         struct rbd_client       *rbd_client;
296
297         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
299         spinlock_t              lock;           /* queue, flags, open_count */
300
301         struct rbd_image_header header;
302         unsigned long           flags;          /* possibly lock protected */
303         struct rbd_spec         *spec;
304
305         char                    *header_name;
306
307         struct ceph_file_layout layout;
308
309         struct ceph_osd_event   *watch_event;
310         struct rbd_obj_request  *watch_request;
311
312         struct rbd_spec         *parent_spec;
313         u64                     parent_overlap;
314         struct rbd_device       *parent;
315
316         /* protects updating the header */
317         struct rw_semaphore     header_rwsem;
318
319         struct rbd_mapping      mapping;
320
321         struct list_head        node;
322
323         /* sysfs related */
324         struct device           dev;
325         unsigned long           open_count;     /* protected by lock */
326 };
327
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
337         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
338 };
339
340 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
341
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
345 static LIST_HEAD(rbd_client_list);              /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347
348 static struct kmem_cache        *rbd_img_request_cache;
349
350 static int rbd_img_request_submit(struct rbd_img_request *img_request);
351
352 static void rbd_dev_device_release(struct device *dev);
353
354 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
355                        size_t count);
356 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
357                           size_t count);
358 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
359
360 static struct bus_attribute rbd_bus_attrs[] = {
361         __ATTR(add, S_IWUSR, NULL, rbd_add),
362         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
363         __ATTR_NULL
364 };
365
366 static struct bus_type rbd_bus_type = {
367         .name           = "rbd",
368         .bus_attrs      = rbd_bus_attrs,
369 };
370
371 static void rbd_root_dev_release(struct device *dev)
372 {
373 }
374
375 static struct device rbd_root_dev = {
376         .init_name =    "rbd",
377         .release =      rbd_root_dev_release,
378 };
379
380 static __printf(2, 3)
381 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
382 {
383         struct va_format vaf;
384         va_list args;
385
386         va_start(args, fmt);
387         vaf.fmt = fmt;
388         vaf.va = &args;
389
390         if (!rbd_dev)
391                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
392         else if (rbd_dev->disk)
393                 printk(KERN_WARNING "%s: %s: %pV\n",
394                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
395         else if (rbd_dev->spec && rbd_dev->spec->image_name)
396                 printk(KERN_WARNING "%s: image %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_id)
399                 printk(KERN_WARNING "%s: id %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
401         else    /* punt */
402                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
403                         RBD_DRV_NAME, rbd_dev, &vaf);
404         va_end(args);
405 }
406
407 #ifdef RBD_DEBUG
408 #define rbd_assert(expr)                                                \
409                 if (unlikely(!(expr))) {                                \
410                         printk(KERN_ERR "\nAssertion failure in %s() "  \
411                                                 "at line %d:\n\n"       \
412                                         "\trbd_assert(%s);\n\n",        \
413                                         __func__, __LINE__, #expr);     \
414                         BUG();                                          \
415                 }
416 #else /* !RBD_DEBUG */
417 #  define rbd_assert(expr)      ((void) 0)
418 #endif /* !RBD_DEBUG */
419
420 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
421 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
422 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
423
424 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
425 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
426 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
427                                         u64 snap_id);
428 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
429                                 u8 *order, u64 *snap_size);
430 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
431                 u64 *snap_features);
432 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
433
434 static int rbd_open(struct block_device *bdev, fmode_t mode)
435 {
436         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
437         bool removing = false;
438
439         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
440                 return -EROFS;
441
442         spin_lock_irq(&rbd_dev->lock);
443         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
444                 removing = true;
445         else
446                 rbd_dev->open_count++;
447         spin_unlock_irq(&rbd_dev->lock);
448         if (removing)
449                 return -ENOENT;
450
451         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
452         (void) get_device(&rbd_dev->dev);
453         set_device_ro(bdev, rbd_dev->mapping.read_only);
454         mutex_unlock(&ctl_mutex);
455
456         return 0;
457 }
458
459 static int rbd_release(struct gendisk *disk, fmode_t mode)
460 {
461         struct rbd_device *rbd_dev = disk->private_data;
462         unsigned long open_count_before;
463
464         spin_lock_irq(&rbd_dev->lock);
465         open_count_before = rbd_dev->open_count--;
466         spin_unlock_irq(&rbd_dev->lock);
467         rbd_assert(open_count_before > 0);
468
469         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
470         put_device(&rbd_dev->dev);
471         mutex_unlock(&ctl_mutex);
472
473         return 0;
474 }
475
476 static const struct block_device_operations rbd_bd_ops = {
477         .owner                  = THIS_MODULE,
478         .open                   = rbd_open,
479         .release                = rbd_release,
480 };
481
482 /*
483  * Initialize an rbd client instance.
484  * We own *ceph_opts.
485  */
486 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
487 {
488         struct rbd_client *rbdc;
489         int ret = -ENOMEM;
490
491         dout("%s:\n", __func__);
492         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
493         if (!rbdc)
494                 goto out_opt;
495
496         kref_init(&rbdc->kref);
497         INIT_LIST_HEAD(&rbdc->node);
498
499         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
500
501         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
502         if (IS_ERR(rbdc->client))
503                 goto out_mutex;
504         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
505
506         ret = ceph_open_session(rbdc->client);
507         if (ret < 0)
508                 goto out_err;
509
510         spin_lock(&rbd_client_list_lock);
511         list_add_tail(&rbdc->node, &rbd_client_list);
512         spin_unlock(&rbd_client_list_lock);
513
514         mutex_unlock(&ctl_mutex);
515         dout("%s: rbdc %p\n", __func__, rbdc);
516
517         return rbdc;
518
519 out_err:
520         ceph_destroy_client(rbdc->client);
521 out_mutex:
522         mutex_unlock(&ctl_mutex);
523         kfree(rbdc);
524 out_opt:
525         if (ceph_opts)
526                 ceph_destroy_options(ceph_opts);
527         dout("%s: error %d\n", __func__, ret);
528
529         return ERR_PTR(ret);
530 }
531
532 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
533 {
534         kref_get(&rbdc->kref);
535
536         return rbdc;
537 }
538
539 /*
540  * Find a ceph client with specific addr and configuration.  If
541  * found, bump its reference count.
542  */
543 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
544 {
545         struct rbd_client *client_node;
546         bool found = false;
547
548         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
549                 return NULL;
550
551         spin_lock(&rbd_client_list_lock);
552         list_for_each_entry(client_node, &rbd_client_list, node) {
553                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
554                         __rbd_get_client(client_node);
555
556                         found = true;
557                         break;
558                 }
559         }
560         spin_unlock(&rbd_client_list_lock);
561
562         return found ? client_node : NULL;
563 }
564
565 /*
566  * mount options
567  */
568 enum {
569         Opt_last_int,
570         /* int args above */
571         Opt_last_string,
572         /* string args above */
573         Opt_read_only,
574         Opt_read_write,
575         /* Boolean args above */
576         Opt_last_bool,
577 };
578
579 static match_table_t rbd_opts_tokens = {
580         /* int args above */
581         /* string args above */
582         {Opt_read_only, "read_only"},
583         {Opt_read_only, "ro"},          /* Alternate spelling */
584         {Opt_read_write, "read_write"},
585         {Opt_read_write, "rw"},         /* Alternate spelling */
586         /* Boolean args above */
587         {-1, NULL}
588 };
589
590 struct rbd_options {
591         bool    read_only;
592 };
593
594 #define RBD_READ_ONLY_DEFAULT   false
595
596 static int parse_rbd_opts_token(char *c, void *private)
597 {
598         struct rbd_options *rbd_opts = private;
599         substring_t argstr[MAX_OPT_ARGS];
600         int token, intval, ret;
601
602         token = match_token(c, rbd_opts_tokens, argstr);
603         if (token < 0)
604                 return -EINVAL;
605
606         if (token < Opt_last_int) {
607                 ret = match_int(&argstr[0], &intval);
608                 if (ret < 0) {
609                         pr_err("bad mount option arg (not int) "
610                                "at '%s'\n", c);
611                         return ret;
612                 }
613                 dout("got int token %d val %d\n", token, intval);
614         } else if (token > Opt_last_int && token < Opt_last_string) {
615                 dout("got string token %d val %s\n", token,
616                      argstr[0].from);
617         } else if (token > Opt_last_string && token < Opt_last_bool) {
618                 dout("got Boolean token %d\n", token);
619         } else {
620                 dout("got token %d\n", token);
621         }
622
623         switch (token) {
624         case Opt_read_only:
625                 rbd_opts->read_only = true;
626                 break;
627         case Opt_read_write:
628                 rbd_opts->read_only = false;
629                 break;
630         default:
631                 rbd_assert(false);
632                 break;
633         }
634         return 0;
635 }
636
637 /*
638  * Get a ceph client with specific addr and configuration, if one does
639  * not exist create it.
640  */
641 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
642 {
643         struct rbd_client *rbdc;
644
645         rbdc = rbd_client_find(ceph_opts);
646         if (rbdc)       /* using an existing client */
647                 ceph_destroy_options(ceph_opts);
648         else
649                 rbdc = rbd_client_create(ceph_opts);
650
651         return rbdc;
652 }
653
654 /*
655  * Destroy ceph client
656  *
657  * Caller must hold rbd_client_list_lock.
658  */
659 static void rbd_client_release(struct kref *kref)
660 {
661         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
662
663         dout("%s: rbdc %p\n", __func__, rbdc);
664         spin_lock(&rbd_client_list_lock);
665         list_del(&rbdc->node);
666         spin_unlock(&rbd_client_list_lock);
667
668         ceph_destroy_client(rbdc->client);
669         kfree(rbdc);
670 }
671
672 /*
673  * Drop reference to ceph client node. If it's not referenced anymore, release
674  * it.
675  */
676 static void rbd_put_client(struct rbd_client *rbdc)
677 {
678         if (rbdc)
679                 kref_put(&rbdc->kref, rbd_client_release);
680 }
681
682 static bool rbd_image_format_valid(u32 image_format)
683 {
684         return image_format == 1 || image_format == 2;
685 }
686
687 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
688 {
689         size_t size;
690         u32 snap_count;
691
692         /* The header has to start with the magic rbd header text */
693         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
694                 return false;
695
696         /* The bio layer requires at least sector-sized I/O */
697
698         if (ondisk->options.order < SECTOR_SHIFT)
699                 return false;
700
701         /* If we use u64 in a few spots we may be able to loosen this */
702
703         if (ondisk->options.order > 8 * sizeof (int) - 1)
704                 return false;
705
706         /*
707          * The size of a snapshot header has to fit in a size_t, and
708          * that limits the number of snapshots.
709          */
710         snap_count = le32_to_cpu(ondisk->snap_count);
711         size = SIZE_MAX - sizeof (struct ceph_snap_context);
712         if (snap_count > size / sizeof (__le64))
713                 return false;
714
715         /*
716          * Not only that, but the size of the entire the snapshot
717          * header must also be representable in a size_t.
718          */
719         size -= snap_count * sizeof (__le64);
720         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
721                 return false;
722
723         return true;
724 }
725
726 /*
727  * Create a new header structure, translate header format from the on-disk
728  * header.
729  */
730 static int rbd_header_from_disk(struct rbd_image_header *header,
731                                  struct rbd_image_header_ondisk *ondisk)
732 {
733         u32 snap_count;
734         size_t len;
735         size_t size;
736         u32 i;
737
738         memset(header, 0, sizeof (*header));
739
740         snap_count = le32_to_cpu(ondisk->snap_count);
741
742         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
743         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
744         if (!header->object_prefix)
745                 return -ENOMEM;
746         memcpy(header->object_prefix, ondisk->object_prefix, len);
747         header->object_prefix[len] = '\0';
748
749         if (snap_count) {
750                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
751
752                 /* Save a copy of the snapshot names */
753
754                 if (snap_names_len > (u64) SIZE_MAX)
755                         return -EIO;
756                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
757                 if (!header->snap_names)
758                         goto out_err;
759                 /*
760                  * Note that rbd_dev_v1_header_read() guarantees
761                  * the ondisk buffer we're working with has
762                  * snap_names_len bytes beyond the end of the
763                  * snapshot id array, this memcpy() is safe.
764                  */
765                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
766                         snap_names_len);
767
768                 /* Record each snapshot's size */
769
770                 size = snap_count * sizeof (*header->snap_sizes);
771                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
772                 if (!header->snap_sizes)
773                         goto out_err;
774                 for (i = 0; i < snap_count; i++)
775                         header->snap_sizes[i] =
776                                 le64_to_cpu(ondisk->snaps[i].image_size);
777         } else {
778                 header->snap_names = NULL;
779                 header->snap_sizes = NULL;
780         }
781
782         header->features = 0;   /* No features support in v1 images */
783         header->obj_order = ondisk->options.order;
784         header->crypt_type = ondisk->options.crypt_type;
785         header->comp_type = ondisk->options.comp_type;
786
787         /* Allocate and fill in the snapshot context */
788
789         header->image_size = le64_to_cpu(ondisk->image_size);
790
791         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
792         if (!header->snapc)
793                 goto out_err;
794         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
795         for (i = 0; i < snap_count; i++)
796                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
797
798         return 0;
799
800 out_err:
801         kfree(header->snap_sizes);
802         header->snap_sizes = NULL;
803         kfree(header->snap_names);
804         header->snap_names = NULL;
805         kfree(header->object_prefix);
806         header->object_prefix = NULL;
807
808         return -ENOMEM;
809 }
810
811 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
812 {
813         const char *snap_name;
814
815         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
816
817         /* Skip over names until we find the one we are looking for */
818
819         snap_name = rbd_dev->header.snap_names;
820         while (which--)
821                 snap_name += strlen(snap_name) + 1;
822
823         return kstrdup(snap_name, GFP_KERNEL);
824 }
825
826 /*
827  * Snapshot id comparison function for use with qsort()/bsearch().
828  * Note that result is for snapshots in *descending* order.
829  */
830 static int snapid_compare_reverse(const void *s1, const void *s2)
831 {
832         u64 snap_id1 = *(u64 *)s1;
833         u64 snap_id2 = *(u64 *)s2;
834
835         if (snap_id1 < snap_id2)
836                 return 1;
837         return snap_id1 == snap_id2 ? 0 : -1;
838 }
839
840 /*
841  * Search a snapshot context to see if the given snapshot id is
842  * present.
843  *
844  * Returns the position of the snapshot id in the array if it's found,
845  * or BAD_SNAP_INDEX otherwise.
846  *
847  * Note: The snapshot array is in kept sorted (by the osd) in
848  * reverse order, highest snapshot id first.
849  */
850 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
851 {
852         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
853         u64 *found;
854
855         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
856                                 sizeof (snap_id), snapid_compare_reverse);
857
858         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
859 }
860
861 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
862                                         u64 snap_id)
863 {
864         u32 which;
865
866         which = rbd_dev_snap_index(rbd_dev, snap_id);
867         if (which == BAD_SNAP_INDEX)
868                 return NULL;
869
870         return _rbd_dev_v1_snap_name(rbd_dev, which);
871 }
872
873 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
874 {
875         if (snap_id == CEPH_NOSNAP)
876                 return RBD_SNAP_HEAD_NAME;
877
878         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
879         if (rbd_dev->image_format == 1)
880                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
881
882         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
883 }
884
885 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
886                                 u64 *snap_size)
887 {
888         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
889         if (snap_id == CEPH_NOSNAP) {
890                 *snap_size = rbd_dev->header.image_size;
891         } else if (rbd_dev->image_format == 1) {
892                 u32 which;
893
894                 which = rbd_dev_snap_index(rbd_dev, snap_id);
895                 if (which == BAD_SNAP_INDEX)
896                         return -ENOENT;
897
898                 *snap_size = rbd_dev->header.snap_sizes[which];
899         } else {
900                 u64 size = 0;
901                 int ret;
902
903                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
904                 if (ret)
905                         return ret;
906
907                 *snap_size = size;
908         }
909         return 0;
910 }
911
912 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
913                         u64 *snap_features)
914 {
915         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
916         if (snap_id == CEPH_NOSNAP) {
917                 *snap_features = rbd_dev->header.features;
918         } else if (rbd_dev->image_format == 1) {
919                 *snap_features = 0;     /* No features for format 1 */
920         } else {
921                 u64 features = 0;
922                 int ret;
923
924                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
925                 if (ret)
926                         return ret;
927
928                 *snap_features = features;
929         }
930         return 0;
931 }
932
933 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
934 {
935         const char *snap_name = rbd_dev->spec->snap_name;
936         u64 snap_id;
937         u64 size = 0;
938         u64 features = 0;
939         int ret;
940
941         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
942                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
943                 if (snap_id == CEPH_NOSNAP)
944                         return -ENOENT;
945         } else {
946                 snap_id = CEPH_NOSNAP;
947         }
948
949         ret = rbd_snap_size(rbd_dev, snap_id, &size);
950         if (ret)
951                 return ret;
952         ret = rbd_snap_features(rbd_dev, snap_id, &features);
953         if (ret)
954                 return ret;
955
956         rbd_dev->mapping.size = size;
957         rbd_dev->mapping.features = features;
958
959         /* If we are mapping a snapshot it must be marked read-only */
960
961         if (snap_id != CEPH_NOSNAP)
962                 rbd_dev->mapping.read_only = true;
963
964         return 0;
965 }
966
967 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
968 {
969         rbd_dev->mapping.size = 0;
970         rbd_dev->mapping.features = 0;
971         rbd_dev->mapping.read_only = true;
972 }
973
974 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
975 {
976         rbd_dev->mapping.size = 0;
977         rbd_dev->mapping.features = 0;
978         rbd_dev->mapping.read_only = true;
979 }
980
981 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
982 {
983         char *name;
984         u64 segment;
985         int ret;
986
987         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
988         if (!name)
989                 return NULL;
990         segment = offset >> rbd_dev->header.obj_order;
991         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
992                         rbd_dev->header.object_prefix, segment);
993         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
994                 pr_err("error formatting segment name for #%llu (%d)\n",
995                         segment, ret);
996                 kfree(name);
997                 name = NULL;
998         }
999
1000         return name;
1001 }
1002
1003 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1004 {
1005         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1006
1007         return offset & (segment_size - 1);
1008 }
1009
1010 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1011                                 u64 offset, u64 length)
1012 {
1013         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1014
1015         offset &= segment_size - 1;
1016
1017         rbd_assert(length <= U64_MAX - offset);
1018         if (offset + length > segment_size)
1019                 length = segment_size - offset;
1020
1021         return length;
1022 }
1023
1024 /*
1025  * returns the size of an object in the image
1026  */
1027 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1028 {
1029         return 1 << header->obj_order;
1030 }
1031
1032 /*
1033  * bio helpers
1034  */
1035
1036 static void bio_chain_put(struct bio *chain)
1037 {
1038         struct bio *tmp;
1039
1040         while (chain) {
1041                 tmp = chain;
1042                 chain = chain->bi_next;
1043                 bio_put(tmp);
1044         }
1045 }
1046
1047 /*
1048  * zeros a bio chain, starting at specific offset
1049  */
1050 static void zero_bio_chain(struct bio *chain, int start_ofs)
1051 {
1052         struct bio_vec *bv;
1053         unsigned long flags;
1054         void *buf;
1055         int i;
1056         int pos = 0;
1057
1058         while (chain) {
1059                 bio_for_each_segment(bv, chain, i) {
1060                         if (pos + bv->bv_len > start_ofs) {
1061                                 int remainder = max(start_ofs - pos, 0);
1062                                 buf = bvec_kmap_irq(bv, &flags);
1063                                 memset(buf + remainder, 0,
1064                                        bv->bv_len - remainder);
1065                                 bvec_kunmap_irq(buf, &flags);
1066                         }
1067                         pos += bv->bv_len;
1068                 }
1069
1070                 chain = chain->bi_next;
1071         }
1072 }
1073
1074 /*
1075  * similar to zero_bio_chain(), zeros data defined by a page array,
1076  * starting at the given byte offset from the start of the array and
1077  * continuing up to the given end offset.  The pages array is
1078  * assumed to be big enough to hold all bytes up to the end.
1079  */
1080 static void zero_pages(struct page **pages, u64 offset, u64 end)
1081 {
1082         struct page **page = &pages[offset >> PAGE_SHIFT];
1083
1084         rbd_assert(end > offset);
1085         rbd_assert(end - offset <= (u64)SIZE_MAX);
1086         while (offset < end) {
1087                 size_t page_offset;
1088                 size_t length;
1089                 unsigned long flags;
1090                 void *kaddr;
1091
1092                 page_offset = (size_t)(offset & ~PAGE_MASK);
1093                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1094                 local_irq_save(flags);
1095                 kaddr = kmap_atomic(*page);
1096                 memset(kaddr + page_offset, 0, length);
1097                 kunmap_atomic(kaddr);
1098                 local_irq_restore(flags);
1099
1100                 offset += length;
1101                 page++;
1102         }
1103 }
1104
1105 /*
1106  * Clone a portion of a bio, starting at the given byte offset
1107  * and continuing for the number of bytes indicated.
1108  */
1109 static struct bio *bio_clone_range(struct bio *bio_src,
1110                                         unsigned int offset,
1111                                         unsigned int len,
1112                                         gfp_t gfpmask)
1113 {
1114         struct bio_vec *bv;
1115         unsigned int resid;
1116         unsigned short idx;
1117         unsigned int voff;
1118         unsigned short end_idx;
1119         unsigned short vcnt;
1120         struct bio *bio;
1121
1122         /* Handle the easy case for the caller */
1123
1124         if (!offset && len == bio_src->bi_size)
1125                 return bio_clone(bio_src, gfpmask);
1126
1127         if (WARN_ON_ONCE(!len))
1128                 return NULL;
1129         if (WARN_ON_ONCE(len > bio_src->bi_size))
1130                 return NULL;
1131         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1132                 return NULL;
1133
1134         /* Find first affected segment... */
1135
1136         resid = offset;
1137         __bio_for_each_segment(bv, bio_src, idx, 0) {
1138                 if (resid < bv->bv_len)
1139                         break;
1140                 resid -= bv->bv_len;
1141         }
1142         voff = resid;
1143
1144         /* ...and the last affected segment */
1145
1146         resid += len;
1147         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1148                 if (resid <= bv->bv_len)
1149                         break;
1150                 resid -= bv->bv_len;
1151         }
1152         vcnt = end_idx - idx + 1;
1153
1154         /* Build the clone */
1155
1156         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1157         if (!bio)
1158                 return NULL;    /* ENOMEM */
1159
1160         bio->bi_bdev = bio_src->bi_bdev;
1161         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1162         bio->bi_rw = bio_src->bi_rw;
1163         bio->bi_flags |= 1 << BIO_CLONED;
1164
1165         /*
1166          * Copy over our part of the bio_vec, then update the first
1167          * and last (or only) entries.
1168          */
1169         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1170                         vcnt * sizeof (struct bio_vec));
1171         bio->bi_io_vec[0].bv_offset += voff;
1172         if (vcnt > 1) {
1173                 bio->bi_io_vec[0].bv_len -= voff;
1174                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1175         } else {
1176                 bio->bi_io_vec[0].bv_len = len;
1177         }
1178
1179         bio->bi_vcnt = vcnt;
1180         bio->bi_size = len;
1181         bio->bi_idx = 0;
1182
1183         return bio;
1184 }
1185
1186 /*
1187  * Clone a portion of a bio chain, starting at the given byte offset
1188  * into the first bio in the source chain and continuing for the
1189  * number of bytes indicated.  The result is another bio chain of
1190  * exactly the given length, or a null pointer on error.
1191  *
1192  * The bio_src and offset parameters are both in-out.  On entry they
1193  * refer to the first source bio and the offset into that bio where
1194  * the start of data to be cloned is located.
1195  *
1196  * On return, bio_src is updated to refer to the bio in the source
1197  * chain that contains first un-cloned byte, and *offset will
1198  * contain the offset of that byte within that bio.
1199  */
1200 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1201                                         unsigned int *offset,
1202                                         unsigned int len,
1203                                         gfp_t gfpmask)
1204 {
1205         struct bio *bi = *bio_src;
1206         unsigned int off = *offset;
1207         struct bio *chain = NULL;
1208         struct bio **end;
1209
1210         /* Build up a chain of clone bios up to the limit */
1211
1212         if (!bi || off >= bi->bi_size || !len)
1213                 return NULL;            /* Nothing to clone */
1214
1215         end = &chain;
1216         while (len) {
1217                 unsigned int bi_size;
1218                 struct bio *bio;
1219
1220                 if (!bi) {
1221                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1222                         goto out_err;   /* EINVAL; ran out of bio's */
1223                 }
1224                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1225                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1226                 if (!bio)
1227                         goto out_err;   /* ENOMEM */
1228
1229                 *end = bio;
1230                 end = &bio->bi_next;
1231
1232                 off += bi_size;
1233                 if (off == bi->bi_size) {
1234                         bi = bi->bi_next;
1235                         off = 0;
1236                 }
1237                 len -= bi_size;
1238         }
1239         *bio_src = bi;
1240         *offset = off;
1241
1242         return chain;
1243 out_err:
1244         bio_chain_put(chain);
1245
1246         return NULL;
1247 }
1248
1249 /*
1250  * The default/initial value for all object request flags is 0.  For
1251  * each flag, once its value is set to 1 it is never reset to 0
1252  * again.
1253  */
1254 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1255 {
1256         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1257                 struct rbd_device *rbd_dev;
1258
1259                 rbd_dev = obj_request->img_request->rbd_dev;
1260                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1261                         obj_request);
1262         }
1263 }
1264
1265 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1266 {
1267         smp_mb();
1268         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1269 }
1270
1271 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1272 {
1273         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1274                 struct rbd_device *rbd_dev = NULL;
1275
1276                 if (obj_request_img_data_test(obj_request))
1277                         rbd_dev = obj_request->img_request->rbd_dev;
1278                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1279                         obj_request);
1280         }
1281 }
1282
1283 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1284 {
1285         smp_mb();
1286         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1287 }
1288
1289 /*
1290  * This sets the KNOWN flag after (possibly) setting the EXISTS
1291  * flag.  The latter is set based on the "exists" value provided.
1292  *
1293  * Note that for our purposes once an object exists it never goes
1294  * away again.  It's possible that the response from two existence
1295  * checks are separated by the creation of the target object, and
1296  * the first ("doesn't exist") response arrives *after* the second
1297  * ("does exist").  In that case we ignore the second one.
1298  */
1299 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1300                                 bool exists)
1301 {
1302         if (exists)
1303                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1304         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1305         smp_mb();
1306 }
1307
1308 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1309 {
1310         smp_mb();
1311         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1312 }
1313
1314 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1315 {
1316         smp_mb();
1317         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1318 }
1319
1320 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1321 {
1322         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1323                 atomic_read(&obj_request->kref.refcount));
1324         kref_get(&obj_request->kref);
1325 }
1326
1327 static void rbd_obj_request_destroy(struct kref *kref);
1328 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1329 {
1330         rbd_assert(obj_request != NULL);
1331         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332                 atomic_read(&obj_request->kref.refcount));
1333         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1334 }
1335
1336 static void rbd_img_request_get(struct rbd_img_request *img_request)
1337 {
1338         dout("%s: img %p (was %d)\n", __func__, img_request,
1339                 atomic_read(&img_request->kref.refcount));
1340         kref_get(&img_request->kref);
1341 }
1342
1343 static void rbd_img_request_destroy(struct kref *kref);
1344 static void rbd_img_request_put(struct rbd_img_request *img_request)
1345 {
1346         rbd_assert(img_request != NULL);
1347         dout("%s: img %p (was %d)\n", __func__, img_request,
1348                 atomic_read(&img_request->kref.refcount));
1349         kref_put(&img_request->kref, rbd_img_request_destroy);
1350 }
1351
1352 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1353                                         struct rbd_obj_request *obj_request)
1354 {
1355         rbd_assert(obj_request->img_request == NULL);
1356
1357         /* Image request now owns object's original reference */
1358         obj_request->img_request = img_request;
1359         obj_request->which = img_request->obj_request_count;
1360         rbd_assert(!obj_request_img_data_test(obj_request));
1361         obj_request_img_data_set(obj_request);
1362         rbd_assert(obj_request->which != BAD_WHICH);
1363         img_request->obj_request_count++;
1364         list_add_tail(&obj_request->links, &img_request->obj_requests);
1365         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1366                 obj_request->which);
1367 }
1368
1369 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1370                                         struct rbd_obj_request *obj_request)
1371 {
1372         rbd_assert(obj_request->which != BAD_WHICH);
1373
1374         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375                 obj_request->which);
1376         list_del(&obj_request->links);
1377         rbd_assert(img_request->obj_request_count > 0);
1378         img_request->obj_request_count--;
1379         rbd_assert(obj_request->which == img_request->obj_request_count);
1380         obj_request->which = BAD_WHICH;
1381         rbd_assert(obj_request_img_data_test(obj_request));
1382         rbd_assert(obj_request->img_request == img_request);
1383         obj_request->img_request = NULL;
1384         obj_request->callback = NULL;
1385         rbd_obj_request_put(obj_request);
1386 }
1387
1388 static bool obj_request_type_valid(enum obj_request_type type)
1389 {
1390         switch (type) {
1391         case OBJ_REQUEST_NODATA:
1392         case OBJ_REQUEST_BIO:
1393         case OBJ_REQUEST_PAGES:
1394                 return true;
1395         default:
1396                 return false;
1397         }
1398 }
1399
1400 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1401                                 struct rbd_obj_request *obj_request)
1402 {
1403         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1404
1405         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1406 }
1407
1408 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1409 {
1410
1411         dout("%s: img %p\n", __func__, img_request);
1412
1413         /*
1414          * If no error occurred, compute the aggregate transfer
1415          * count for the image request.  We could instead use
1416          * atomic64_cmpxchg() to update it as each object request
1417          * completes; not clear which way is better off hand.
1418          */
1419         if (!img_request->result) {
1420                 struct rbd_obj_request *obj_request;
1421                 u64 xferred = 0;
1422
1423                 for_each_obj_request(img_request, obj_request)
1424                         xferred += obj_request->xferred;
1425                 img_request->xferred = xferred;
1426         }
1427
1428         if (img_request->callback)
1429                 img_request->callback(img_request);
1430         else
1431                 rbd_img_request_put(img_request);
1432 }
1433
1434 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1435
1436 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1437 {
1438         dout("%s: obj %p\n", __func__, obj_request);
1439
1440         return wait_for_completion_interruptible(&obj_request->completion);
1441 }
1442
1443 /*
1444  * The default/initial value for all image request flags is 0.  Each
1445  * is conditionally set to 1 at image request initialization time
1446  * and currently never change thereafter.
1447  */
1448 static void img_request_write_set(struct rbd_img_request *img_request)
1449 {
1450         set_bit(IMG_REQ_WRITE, &img_request->flags);
1451         smp_mb();
1452 }
1453
1454 static bool img_request_write_test(struct rbd_img_request *img_request)
1455 {
1456         smp_mb();
1457         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1458 }
1459
1460 static void img_request_child_set(struct rbd_img_request *img_request)
1461 {
1462         set_bit(IMG_REQ_CHILD, &img_request->flags);
1463         smp_mb();
1464 }
1465
1466 static bool img_request_child_test(struct rbd_img_request *img_request)
1467 {
1468         smp_mb();
1469         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1470 }
1471
1472 static void img_request_layered_set(struct rbd_img_request *img_request)
1473 {
1474         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1475         smp_mb();
1476 }
1477
1478 static bool img_request_layered_test(struct rbd_img_request *img_request)
1479 {
1480         smp_mb();
1481         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1482 }
1483
1484 static void
1485 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1486 {
1487         u64 xferred = obj_request->xferred;
1488         u64 length = obj_request->length;
1489
1490         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1491                 obj_request, obj_request->img_request, obj_request->result,
1492                 xferred, length);
1493         /*
1494          * ENOENT means a hole in the image.  We zero-fill the
1495          * entire length of the request.  A short read also implies
1496          * zero-fill to the end of the request.  Either way we
1497          * update the xferred count to indicate the whole request
1498          * was satisfied.
1499          */
1500         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1501         if (obj_request->result == -ENOENT) {
1502                 if (obj_request->type == OBJ_REQUEST_BIO)
1503                         zero_bio_chain(obj_request->bio_list, 0);
1504                 else
1505                         zero_pages(obj_request->pages, 0, length);
1506                 obj_request->result = 0;
1507                 obj_request->xferred = length;
1508         } else if (xferred < length && !obj_request->result) {
1509                 if (obj_request->type == OBJ_REQUEST_BIO)
1510                         zero_bio_chain(obj_request->bio_list, xferred);
1511                 else
1512                         zero_pages(obj_request->pages, xferred, length);
1513                 obj_request->xferred = length;
1514         }
1515         obj_request_done_set(obj_request);
1516 }
1517
1518 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1519 {
1520         dout("%s: obj %p cb %p\n", __func__, obj_request,
1521                 obj_request->callback);
1522         if (obj_request->callback)
1523                 obj_request->callback(obj_request);
1524         else
1525                 complete_all(&obj_request->completion);
1526 }
1527
1528 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1529 {
1530         dout("%s: obj %p\n", __func__, obj_request);
1531         obj_request_done_set(obj_request);
1532 }
1533
1534 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1535 {
1536         struct rbd_img_request *img_request = NULL;
1537         struct rbd_device *rbd_dev = NULL;
1538         bool layered = false;
1539
1540         if (obj_request_img_data_test(obj_request)) {
1541                 img_request = obj_request->img_request;
1542                 layered = img_request && img_request_layered_test(img_request);
1543                 rbd_dev = img_request->rbd_dev;
1544         }
1545
1546         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1547                 obj_request, img_request, obj_request->result,
1548                 obj_request->xferred, obj_request->length);
1549         if (layered && obj_request->result == -ENOENT &&
1550                         obj_request->img_offset < rbd_dev->parent_overlap)
1551                 rbd_img_parent_read(obj_request);
1552         else if (img_request)
1553                 rbd_img_obj_request_read_callback(obj_request);
1554         else
1555                 obj_request_done_set(obj_request);
1556 }
1557
1558 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1559 {
1560         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1561                 obj_request->result, obj_request->length);
1562         /*
1563          * There is no such thing as a successful short write.  Set
1564          * it to our originally-requested length.
1565          */
1566         obj_request->xferred = obj_request->length;
1567         obj_request_done_set(obj_request);
1568 }
1569
1570 /*
1571  * For a simple stat call there's nothing to do.  We'll do more if
1572  * this is part of a write sequence for a layered image.
1573  */
1574 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1575 {
1576         dout("%s: obj %p\n", __func__, obj_request);
1577         obj_request_done_set(obj_request);
1578 }
1579
1580 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1581                                 struct ceph_msg *msg)
1582 {
1583         struct rbd_obj_request *obj_request = osd_req->r_priv;
1584         u16 opcode;
1585
1586         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1587         rbd_assert(osd_req == obj_request->osd_req);
1588         if (obj_request_img_data_test(obj_request)) {
1589                 rbd_assert(obj_request->img_request);
1590                 rbd_assert(obj_request->which != BAD_WHICH);
1591         } else {
1592                 rbd_assert(obj_request->which == BAD_WHICH);
1593         }
1594
1595         if (osd_req->r_result < 0)
1596                 obj_request->result = osd_req->r_result;
1597
1598         BUG_ON(osd_req->r_num_ops > 2);
1599
1600         /*
1601          * We support a 64-bit length, but ultimately it has to be
1602          * passed to blk_end_request(), which takes an unsigned int.
1603          */
1604         obj_request->xferred = osd_req->r_reply_op_len[0];
1605         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1606         opcode = osd_req->r_ops[0].op;
1607         switch (opcode) {
1608         case CEPH_OSD_OP_READ:
1609                 rbd_osd_read_callback(obj_request);
1610                 break;
1611         case CEPH_OSD_OP_WRITE:
1612                 rbd_osd_write_callback(obj_request);
1613                 break;
1614         case CEPH_OSD_OP_STAT:
1615                 rbd_osd_stat_callback(obj_request);
1616                 break;
1617         case CEPH_OSD_OP_CALL:
1618         case CEPH_OSD_OP_NOTIFY_ACK:
1619         case CEPH_OSD_OP_WATCH:
1620                 rbd_osd_trivial_callback(obj_request);
1621                 break;
1622         default:
1623                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1624                         obj_request->object_name, (unsigned short) opcode);
1625                 break;
1626         }
1627
1628         if (obj_request_done_test(obj_request))
1629                 rbd_obj_request_complete(obj_request);
1630 }
1631
1632 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1633 {
1634         struct rbd_img_request *img_request = obj_request->img_request;
1635         struct ceph_osd_request *osd_req = obj_request->osd_req;
1636         u64 snap_id;
1637
1638         rbd_assert(osd_req != NULL);
1639
1640         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1641         ceph_osdc_build_request(osd_req, obj_request->offset,
1642                         NULL, snap_id, NULL);
1643 }
1644
1645 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1646 {
1647         struct rbd_img_request *img_request = obj_request->img_request;
1648         struct ceph_osd_request *osd_req = obj_request->osd_req;
1649         struct ceph_snap_context *snapc;
1650         struct timespec mtime = CURRENT_TIME;
1651
1652         rbd_assert(osd_req != NULL);
1653
1654         snapc = img_request ? img_request->snapc : NULL;
1655         ceph_osdc_build_request(osd_req, obj_request->offset,
1656                         snapc, CEPH_NOSNAP, &mtime);
1657 }
1658
1659 static struct ceph_osd_request *rbd_osd_req_create(
1660                                         struct rbd_device *rbd_dev,
1661                                         bool write_request,
1662                                         struct rbd_obj_request *obj_request)
1663 {
1664         struct ceph_snap_context *snapc = NULL;
1665         struct ceph_osd_client *osdc;
1666         struct ceph_osd_request *osd_req;
1667
1668         if (obj_request_img_data_test(obj_request)) {
1669                 struct rbd_img_request *img_request = obj_request->img_request;
1670
1671                 rbd_assert(write_request ==
1672                                 img_request_write_test(img_request));
1673                 if (write_request)
1674                         snapc = img_request->snapc;
1675         }
1676
1677         /* Allocate and initialize the request, for the single op */
1678
1679         osdc = &rbd_dev->rbd_client->client->osdc;
1680         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1681         if (!osd_req)
1682                 return NULL;    /* ENOMEM */
1683
1684         if (write_request)
1685                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1686         else
1687                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1688
1689         osd_req->r_callback = rbd_osd_req_callback;
1690         osd_req->r_priv = obj_request;
1691
1692         osd_req->r_oid_len = strlen(obj_request->object_name);
1693         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1694         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1695
1696         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1697
1698         return osd_req;
1699 }
1700
1701 /*
1702  * Create a copyup osd request based on the information in the
1703  * object request supplied.  A copyup request has two osd ops,
1704  * a copyup method call, and a "normal" write request.
1705  */
1706 static struct ceph_osd_request *
1707 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1708 {
1709         struct rbd_img_request *img_request;
1710         struct ceph_snap_context *snapc;
1711         struct rbd_device *rbd_dev;
1712         struct ceph_osd_client *osdc;
1713         struct ceph_osd_request *osd_req;
1714
1715         rbd_assert(obj_request_img_data_test(obj_request));
1716         img_request = obj_request->img_request;
1717         rbd_assert(img_request);
1718         rbd_assert(img_request_write_test(img_request));
1719
1720         /* Allocate and initialize the request, for the two ops */
1721
1722         snapc = img_request->snapc;
1723         rbd_dev = img_request->rbd_dev;
1724         osdc = &rbd_dev->rbd_client->client->osdc;
1725         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1726         if (!osd_req)
1727                 return NULL;    /* ENOMEM */
1728
1729         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1730         osd_req->r_callback = rbd_osd_req_callback;
1731         osd_req->r_priv = obj_request;
1732
1733         osd_req->r_oid_len = strlen(obj_request->object_name);
1734         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1735         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1736
1737         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1738
1739         return osd_req;
1740 }
1741
1742
1743 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1744 {
1745         ceph_osdc_put_request(osd_req);
1746 }
1747
1748 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1749
1750 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1751                                                 u64 offset, u64 length,
1752                                                 enum obj_request_type type)
1753 {
1754         struct rbd_obj_request *obj_request;
1755         size_t size;
1756         char *name;
1757
1758         rbd_assert(obj_request_type_valid(type));
1759
1760         size = strlen(object_name) + 1;
1761         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1762         if (!obj_request)
1763                 return NULL;
1764
1765         name = (char *)(obj_request + 1);
1766         obj_request->object_name = memcpy(name, object_name, size);
1767         obj_request->offset = offset;
1768         obj_request->length = length;
1769         obj_request->flags = 0;
1770         obj_request->which = BAD_WHICH;
1771         obj_request->type = type;
1772         INIT_LIST_HEAD(&obj_request->links);
1773         init_completion(&obj_request->completion);
1774         kref_init(&obj_request->kref);
1775
1776         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1777                 offset, length, (int)type, obj_request);
1778
1779         return obj_request;
1780 }
1781
1782 static void rbd_obj_request_destroy(struct kref *kref)
1783 {
1784         struct rbd_obj_request *obj_request;
1785
1786         obj_request = container_of(kref, struct rbd_obj_request, kref);
1787
1788         dout("%s: obj %p\n", __func__, obj_request);
1789
1790         rbd_assert(obj_request->img_request == NULL);
1791         rbd_assert(obj_request->which == BAD_WHICH);
1792
1793         if (obj_request->osd_req)
1794                 rbd_osd_req_destroy(obj_request->osd_req);
1795
1796         rbd_assert(obj_request_type_valid(obj_request->type));
1797         switch (obj_request->type) {
1798         case OBJ_REQUEST_NODATA:
1799                 break;          /* Nothing to do */
1800         case OBJ_REQUEST_BIO:
1801                 if (obj_request->bio_list)
1802                         bio_chain_put(obj_request->bio_list);
1803                 break;
1804         case OBJ_REQUEST_PAGES:
1805                 if (obj_request->pages)
1806                         ceph_release_page_vector(obj_request->pages,
1807                                                 obj_request->page_count);
1808                 break;
1809         }
1810
1811         kfree(obj_request);
1812 }
1813
1814 /*
1815  * Caller is responsible for filling in the list of object requests
1816  * that comprises the image request, and the Linux request pointer
1817  * (if there is one).
1818  */
1819 static struct rbd_img_request *rbd_img_request_create(
1820                                         struct rbd_device *rbd_dev,
1821                                         u64 offset, u64 length,
1822                                         bool write_request,
1823                                         bool child_request)
1824 {
1825         struct rbd_img_request *img_request;
1826
1827         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1828         if (!img_request)
1829                 return NULL;
1830
1831         if (write_request) {
1832                 down_read(&rbd_dev->header_rwsem);
1833                 ceph_get_snap_context(rbd_dev->header.snapc);
1834                 up_read(&rbd_dev->header_rwsem);
1835         }
1836
1837         img_request->rq = NULL;
1838         img_request->rbd_dev = rbd_dev;
1839         img_request->offset = offset;
1840         img_request->length = length;
1841         img_request->flags = 0;
1842         if (write_request) {
1843                 img_request_write_set(img_request);
1844                 img_request->snapc = rbd_dev->header.snapc;
1845         } else {
1846                 img_request->snap_id = rbd_dev->spec->snap_id;
1847         }
1848         if (child_request)
1849                 img_request_child_set(img_request);
1850         if (rbd_dev->parent_spec)
1851                 img_request_layered_set(img_request);
1852         spin_lock_init(&img_request->completion_lock);
1853         img_request->next_completion = 0;
1854         img_request->callback = NULL;
1855         img_request->result = 0;
1856         img_request->obj_request_count = 0;
1857         INIT_LIST_HEAD(&img_request->obj_requests);
1858         kref_init(&img_request->kref);
1859
1860         rbd_img_request_get(img_request);       /* Avoid a warning */
1861         rbd_img_request_put(img_request);       /* TEMPORARY */
1862
1863         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1864                 write_request ? "write" : "read", offset, length,
1865                 img_request);
1866
1867         return img_request;
1868 }
1869
1870 static void rbd_img_request_destroy(struct kref *kref)
1871 {
1872         struct rbd_img_request *img_request;
1873         struct rbd_obj_request *obj_request;
1874         struct rbd_obj_request *next_obj_request;
1875
1876         img_request = container_of(kref, struct rbd_img_request, kref);
1877
1878         dout("%s: img %p\n", __func__, img_request);
1879
1880         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1881                 rbd_img_obj_request_del(img_request, obj_request);
1882         rbd_assert(img_request->obj_request_count == 0);
1883
1884         if (img_request_write_test(img_request))
1885                 ceph_put_snap_context(img_request->snapc);
1886
1887         if (img_request_child_test(img_request))
1888                 rbd_obj_request_put(img_request->obj_request);
1889
1890         kmem_cache_free(rbd_img_request_cache, img_request);
1891 }
1892
1893 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1894 {
1895         struct rbd_img_request *img_request;
1896         unsigned int xferred;
1897         int result;
1898         bool more;
1899
1900         rbd_assert(obj_request_img_data_test(obj_request));
1901         img_request = obj_request->img_request;
1902
1903         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1904         xferred = (unsigned int)obj_request->xferred;
1905         result = obj_request->result;
1906         if (result) {
1907                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1908
1909                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1910                         img_request_write_test(img_request) ? "write" : "read",
1911                         obj_request->length, obj_request->img_offset,
1912                         obj_request->offset);
1913                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1914                         result, xferred);
1915                 if (!img_request->result)
1916                         img_request->result = result;
1917         }
1918
1919         /* Image object requests don't own their page array */
1920
1921         if (obj_request->type == OBJ_REQUEST_PAGES) {
1922                 obj_request->pages = NULL;
1923                 obj_request->page_count = 0;
1924         }
1925
1926         if (img_request_child_test(img_request)) {
1927                 rbd_assert(img_request->obj_request != NULL);
1928                 more = obj_request->which < img_request->obj_request_count - 1;
1929         } else {
1930                 rbd_assert(img_request->rq != NULL);
1931                 more = blk_end_request(img_request->rq, result, xferred);
1932         }
1933
1934         return more;
1935 }
1936
1937 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1938 {
1939         struct rbd_img_request *img_request;
1940         u32 which = obj_request->which;
1941         bool more = true;
1942
1943         rbd_assert(obj_request_img_data_test(obj_request));
1944         img_request = obj_request->img_request;
1945
1946         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1947         rbd_assert(img_request != NULL);
1948         rbd_assert(img_request->obj_request_count > 0);
1949         rbd_assert(which != BAD_WHICH);
1950         rbd_assert(which < img_request->obj_request_count);
1951         rbd_assert(which >= img_request->next_completion);
1952
1953         spin_lock_irq(&img_request->completion_lock);
1954         if (which != img_request->next_completion)
1955                 goto out;
1956
1957         for_each_obj_request_from(img_request, obj_request) {
1958                 rbd_assert(more);
1959                 rbd_assert(which < img_request->obj_request_count);
1960
1961                 if (!obj_request_done_test(obj_request))
1962                         break;
1963                 more = rbd_img_obj_end_request(obj_request);
1964                 which++;
1965         }
1966
1967         rbd_assert(more ^ (which == img_request->obj_request_count));
1968         img_request->next_completion = which;
1969 out:
1970         spin_unlock_irq(&img_request->completion_lock);
1971
1972         if (!more)
1973                 rbd_img_request_complete(img_request);
1974 }
1975
1976 /*
1977  * Split up an image request into one or more object requests, each
1978  * to a different object.  The "type" parameter indicates whether
1979  * "data_desc" is the pointer to the head of a list of bio
1980  * structures, or the base of a page array.  In either case this
1981  * function assumes data_desc describes memory sufficient to hold
1982  * all data described by the image request.
1983  */
1984 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1985                                         enum obj_request_type type,
1986                                         void *data_desc)
1987 {
1988         struct rbd_device *rbd_dev = img_request->rbd_dev;
1989         struct rbd_obj_request *obj_request = NULL;
1990         struct rbd_obj_request *next_obj_request;
1991         bool write_request = img_request_write_test(img_request);
1992         struct bio *bio_list;
1993         unsigned int bio_offset = 0;
1994         struct page **pages;
1995         u64 img_offset;
1996         u64 resid;
1997         u16 opcode;
1998
1999         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2000                 (int)type, data_desc);
2001
2002         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2003         img_offset = img_request->offset;
2004         resid = img_request->length;
2005         rbd_assert(resid > 0);
2006
2007         if (type == OBJ_REQUEST_BIO) {
2008                 bio_list = data_desc;
2009                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2010         } else {
2011                 rbd_assert(type == OBJ_REQUEST_PAGES);
2012                 pages = data_desc;
2013         }
2014
2015         while (resid) {
2016                 struct ceph_osd_request *osd_req;
2017                 const char *object_name;
2018                 u64 offset;
2019                 u64 length;
2020
2021                 object_name = rbd_segment_name(rbd_dev, img_offset);
2022                 if (!object_name)
2023                         goto out_unwind;
2024                 offset = rbd_segment_offset(rbd_dev, img_offset);
2025                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2026                 obj_request = rbd_obj_request_create(object_name,
2027                                                 offset, length, type);
2028                 kfree(object_name);     /* object request has its own copy */
2029                 if (!obj_request)
2030                         goto out_unwind;
2031
2032                 if (type == OBJ_REQUEST_BIO) {
2033                         unsigned int clone_size;
2034
2035                         rbd_assert(length <= (u64)UINT_MAX);
2036                         clone_size = (unsigned int)length;
2037                         obj_request->bio_list =
2038                                         bio_chain_clone_range(&bio_list,
2039                                                                 &bio_offset,
2040                                                                 clone_size,
2041                                                                 GFP_ATOMIC);
2042                         if (!obj_request->bio_list)
2043                                 goto out_partial;
2044                 } else {
2045                         unsigned int page_count;
2046
2047                         obj_request->pages = pages;
2048                         page_count = (u32)calc_pages_for(offset, length);
2049                         obj_request->page_count = page_count;
2050                         if ((offset + length) & ~PAGE_MASK)
2051                                 page_count--;   /* more on last page */
2052                         pages += page_count;
2053                 }
2054
2055                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2056                                                 obj_request);
2057                 if (!osd_req)
2058                         goto out_partial;
2059                 obj_request->osd_req = osd_req;
2060                 obj_request->callback = rbd_img_obj_callback;
2061
2062                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2063                                                 0, 0);
2064                 if (type == OBJ_REQUEST_BIO)
2065                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2066                                         obj_request->bio_list, length);
2067                 else
2068                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2069                                         obj_request->pages, length,
2070                                         offset & ~PAGE_MASK, false, false);
2071
2072                 if (write_request)
2073                         rbd_osd_req_format_write(obj_request);
2074                 else
2075                         rbd_osd_req_format_read(obj_request);
2076
2077                 obj_request->img_offset = img_offset;
2078                 rbd_img_obj_request_add(img_request, obj_request);
2079
2080                 img_offset += length;
2081                 resid -= length;
2082         }
2083
2084         return 0;
2085
2086 out_partial:
2087         rbd_obj_request_put(obj_request);
2088 out_unwind:
2089         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2090                 rbd_obj_request_put(obj_request);
2091
2092         return -ENOMEM;
2093 }
2094
2095 static void
2096 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2097 {
2098         struct rbd_img_request *img_request;
2099         struct rbd_device *rbd_dev;
2100         u64 length;
2101         u32 page_count;
2102
2103         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2104         rbd_assert(obj_request_img_data_test(obj_request));
2105         img_request = obj_request->img_request;
2106         rbd_assert(img_request);
2107
2108         rbd_dev = img_request->rbd_dev;
2109         rbd_assert(rbd_dev);
2110         length = (u64)1 << rbd_dev->header.obj_order;
2111         page_count = (u32)calc_pages_for(0, length);
2112
2113         rbd_assert(obj_request->copyup_pages);
2114         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2115         obj_request->copyup_pages = NULL;
2116
2117         /*
2118          * We want the transfer count to reflect the size of the
2119          * original write request.  There is no such thing as a
2120          * successful short write, so if the request was successful
2121          * we can just set it to the originally-requested length.
2122          */
2123         if (!obj_request->result)
2124                 obj_request->xferred = obj_request->length;
2125
2126         /* Finish up with the normal image object callback */
2127
2128         rbd_img_obj_callback(obj_request);
2129 }
2130
2131 static void
2132 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2133 {
2134         struct rbd_obj_request *orig_request;
2135         struct ceph_osd_request *osd_req;
2136         struct ceph_osd_client *osdc;
2137         struct rbd_device *rbd_dev;
2138         struct page **pages;
2139         int result;
2140         u64 obj_size;
2141         u64 xferred;
2142
2143         rbd_assert(img_request_child_test(img_request));
2144
2145         /* First get what we need from the image request */
2146
2147         pages = img_request->copyup_pages;
2148         rbd_assert(pages != NULL);
2149         img_request->copyup_pages = NULL;
2150
2151         orig_request = img_request->obj_request;
2152         rbd_assert(orig_request != NULL);
2153         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2154         result = img_request->result;
2155         obj_size = img_request->length;
2156         xferred = img_request->xferred;
2157
2158         rbd_dev = img_request->rbd_dev;
2159         rbd_assert(rbd_dev);
2160         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2161
2162         rbd_img_request_put(img_request);
2163
2164         if (result)
2165                 goto out_err;
2166
2167         /* Allocate the new copyup osd request for the original request */
2168
2169         result = -ENOMEM;
2170         rbd_assert(!orig_request->osd_req);
2171         osd_req = rbd_osd_req_create_copyup(orig_request);
2172         if (!osd_req)
2173                 goto out_err;
2174         orig_request->osd_req = osd_req;
2175         orig_request->copyup_pages = pages;
2176
2177         /* Initialize the copyup op */
2178
2179         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2180         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2181                                                 false, false);
2182
2183         /* Then the original write request op */
2184
2185         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2186                                         orig_request->offset,
2187                                         orig_request->length, 0, 0);
2188         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2189                                         orig_request->length);
2190
2191         rbd_osd_req_format_write(orig_request);
2192
2193         /* All set, send it off. */
2194
2195         orig_request->callback = rbd_img_obj_copyup_callback;
2196         osdc = &rbd_dev->rbd_client->client->osdc;
2197         result = rbd_obj_request_submit(osdc, orig_request);
2198         if (!result)
2199                 return;
2200 out_err:
2201         /* Record the error code and complete the request */
2202
2203         orig_request->result = result;
2204         orig_request->xferred = 0;
2205         obj_request_done_set(orig_request);
2206         rbd_obj_request_complete(orig_request);
2207 }
2208
2209 /*
2210  * Read from the parent image the range of data that covers the
2211  * entire target of the given object request.  This is used for
2212  * satisfying a layered image write request when the target of an
2213  * object request from the image request does not exist.
2214  *
2215  * A page array big enough to hold the returned data is allocated
2216  * and supplied to rbd_img_request_fill() as the "data descriptor."
2217  * When the read completes, this page array will be transferred to
2218  * the original object request for the copyup operation.
2219  *
2220  * If an error occurs, record it as the result of the original
2221  * object request and mark it done so it gets completed.
2222  */
2223 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2224 {
2225         struct rbd_img_request *img_request = NULL;
2226         struct rbd_img_request *parent_request = NULL;
2227         struct rbd_device *rbd_dev;
2228         u64 img_offset;
2229         u64 length;
2230         struct page **pages = NULL;
2231         u32 page_count;
2232         int result;
2233
2234         rbd_assert(obj_request_img_data_test(obj_request));
2235         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2236
2237         img_request = obj_request->img_request;
2238         rbd_assert(img_request != NULL);
2239         rbd_dev = img_request->rbd_dev;
2240         rbd_assert(rbd_dev->parent != NULL);
2241
2242         /*
2243          * First things first.  The original osd request is of no
2244          * use to use any more, we'll need a new one that can hold
2245          * the two ops in a copyup request.  We'll get that later,
2246          * but for now we can release the old one.
2247          */
2248         rbd_osd_req_destroy(obj_request->osd_req);
2249         obj_request->osd_req = NULL;
2250
2251         /*
2252          * Determine the byte range covered by the object in the
2253          * child image to which the original request was to be sent.
2254          */
2255         img_offset = obj_request->img_offset - obj_request->offset;
2256         length = (u64)1 << rbd_dev->header.obj_order;
2257
2258         /*
2259          * There is no defined parent data beyond the parent
2260          * overlap, so limit what we read at that boundary if
2261          * necessary.
2262          */
2263         if (img_offset + length > rbd_dev->parent_overlap) {
2264                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2265                 length = rbd_dev->parent_overlap - img_offset;
2266         }
2267
2268         /*
2269          * Allocate a page array big enough to receive the data read
2270          * from the parent.
2271          */
2272         page_count = (u32)calc_pages_for(0, length);
2273         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2274         if (IS_ERR(pages)) {
2275                 result = PTR_ERR(pages);
2276                 pages = NULL;
2277                 goto out_err;
2278         }
2279
2280         result = -ENOMEM;
2281         parent_request = rbd_img_request_create(rbd_dev->parent,
2282                                                 img_offset, length,
2283                                                 false, true);
2284         if (!parent_request)
2285                 goto out_err;
2286         rbd_obj_request_get(obj_request);
2287         parent_request->obj_request = obj_request;
2288
2289         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2290         if (result)
2291                 goto out_err;
2292         parent_request->copyup_pages = pages;
2293
2294         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2295         result = rbd_img_request_submit(parent_request);
2296         if (!result)
2297                 return 0;
2298
2299         parent_request->copyup_pages = NULL;
2300         parent_request->obj_request = NULL;
2301         rbd_obj_request_put(obj_request);
2302 out_err:
2303         if (pages)
2304                 ceph_release_page_vector(pages, page_count);
2305         if (parent_request)
2306                 rbd_img_request_put(parent_request);
2307         obj_request->result = result;
2308         obj_request->xferred = 0;
2309         obj_request_done_set(obj_request);
2310
2311         return result;
2312 }
2313
2314 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2315 {
2316         struct rbd_obj_request *orig_request;
2317         int result;
2318
2319         rbd_assert(!obj_request_img_data_test(obj_request));
2320
2321         /*
2322          * All we need from the object request is the original
2323          * request and the result of the STAT op.  Grab those, then
2324          * we're done with the request.
2325          */
2326         orig_request = obj_request->obj_request;
2327         obj_request->obj_request = NULL;
2328         rbd_assert(orig_request);
2329         rbd_assert(orig_request->img_request);
2330
2331         result = obj_request->result;
2332         obj_request->result = 0;
2333
2334         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2335                 obj_request, orig_request, result,
2336                 obj_request->xferred, obj_request->length);
2337         rbd_obj_request_put(obj_request);
2338
2339         rbd_assert(orig_request);
2340         rbd_assert(orig_request->img_request);
2341
2342         /*
2343          * Our only purpose here is to determine whether the object
2344          * exists, and we don't want to treat the non-existence as
2345          * an error.  If something else comes back, transfer the
2346          * error to the original request and complete it now.
2347          */
2348         if (!result) {
2349                 obj_request_existence_set(orig_request, true);
2350         } else if (result == -ENOENT) {
2351                 obj_request_existence_set(orig_request, false);
2352         } else if (result) {
2353                 orig_request->result = result;
2354                 goto out;
2355         }
2356
2357         /*
2358          * Resubmit the original request now that we have recorded
2359          * whether the target object exists.
2360          */
2361         orig_request->result = rbd_img_obj_request_submit(orig_request);
2362 out:
2363         if (orig_request->result)
2364                 rbd_obj_request_complete(orig_request);
2365         rbd_obj_request_put(orig_request);
2366 }
2367
2368 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2369 {
2370         struct rbd_obj_request *stat_request;
2371         struct rbd_device *rbd_dev;
2372         struct ceph_osd_client *osdc;
2373         struct page **pages = NULL;
2374         u32 page_count;
2375         size_t size;
2376         int ret;
2377
2378         /*
2379          * The response data for a STAT call consists of:
2380          *     le64 length;
2381          *     struct {
2382          *         le32 tv_sec;
2383          *         le32 tv_nsec;
2384          *     } mtime;
2385          */
2386         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2387         page_count = (u32)calc_pages_for(0, size);
2388         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2389         if (IS_ERR(pages))
2390                 return PTR_ERR(pages);
2391
2392         ret = -ENOMEM;
2393         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2394                                                         OBJ_REQUEST_PAGES);
2395         if (!stat_request)
2396                 goto out;
2397
2398         rbd_obj_request_get(obj_request);
2399         stat_request->obj_request = obj_request;
2400         stat_request->pages = pages;
2401         stat_request->page_count = page_count;
2402
2403         rbd_assert(obj_request->img_request);
2404         rbd_dev = obj_request->img_request->rbd_dev;
2405         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2406                                                 stat_request);
2407         if (!stat_request->osd_req)
2408                 goto out;
2409         stat_request->callback = rbd_img_obj_exists_callback;
2410
2411         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2412         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2413                                         false, false);
2414         rbd_osd_req_format_read(stat_request);
2415
2416         osdc = &rbd_dev->rbd_client->client->osdc;
2417         ret = rbd_obj_request_submit(osdc, stat_request);
2418 out:
2419         if (ret)
2420                 rbd_obj_request_put(obj_request);
2421
2422         return ret;
2423 }
2424
2425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2426 {
2427         struct rbd_img_request *img_request;
2428         struct rbd_device *rbd_dev;
2429         bool known;
2430
2431         rbd_assert(obj_request_img_data_test(obj_request));
2432
2433         img_request = obj_request->img_request;
2434         rbd_assert(img_request);
2435         rbd_dev = img_request->rbd_dev;
2436
2437         /*
2438          * Only writes to layered images need special handling.
2439          * Reads and non-layered writes are simple object requests.
2440          * Layered writes that start beyond the end of the overlap
2441          * with the parent have no parent data, so they too are
2442          * simple object requests.  Finally, if the target object is
2443          * known to already exist, its parent data has already been
2444          * copied, so a write to the object can also be handled as a
2445          * simple object request.
2446          */
2447         if (!img_request_write_test(img_request) ||
2448                 !img_request_layered_test(img_request) ||
2449                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2450                 ((known = obj_request_known_test(obj_request)) &&
2451                         obj_request_exists_test(obj_request))) {
2452
2453                 struct rbd_device *rbd_dev;
2454                 struct ceph_osd_client *osdc;
2455
2456                 rbd_dev = obj_request->img_request->rbd_dev;
2457                 osdc = &rbd_dev->rbd_client->client->osdc;
2458
2459                 return rbd_obj_request_submit(osdc, obj_request);
2460         }
2461
2462         /*
2463          * It's a layered write.  The target object might exist but
2464          * we may not know that yet.  If we know it doesn't exist,
2465          * start by reading the data for the full target object from
2466          * the parent so we can use it for a copyup to the target.
2467          */
2468         if (known)
2469                 return rbd_img_obj_parent_read_full(obj_request);
2470
2471         /* We don't know whether the target exists.  Go find out. */
2472
2473         return rbd_img_obj_exists_submit(obj_request);
2474 }
2475
2476 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2477 {
2478         struct rbd_obj_request *obj_request;
2479         struct rbd_obj_request *next_obj_request;
2480
2481         dout("%s: img %p\n", __func__, img_request);
2482         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2483                 int ret;
2484
2485                 ret = rbd_img_obj_request_submit(obj_request);
2486                 if (ret)
2487                         return ret;
2488         }
2489
2490         return 0;
2491 }
2492
2493 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2494 {
2495         struct rbd_obj_request *obj_request;
2496         struct rbd_device *rbd_dev;
2497         u64 obj_end;
2498
2499         rbd_assert(img_request_child_test(img_request));
2500
2501         obj_request = img_request->obj_request;
2502         rbd_assert(obj_request);
2503         rbd_assert(obj_request->img_request);
2504
2505         obj_request->result = img_request->result;
2506         if (obj_request->result)
2507                 goto out;
2508
2509         /*
2510          * We need to zero anything beyond the parent overlap
2511          * boundary.  Since rbd_img_obj_request_read_callback()
2512          * will zero anything beyond the end of a short read, an
2513          * easy way to do this is to pretend the data from the
2514          * parent came up short--ending at the overlap boundary.
2515          */
2516         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2517         obj_end = obj_request->img_offset + obj_request->length;
2518         rbd_dev = obj_request->img_request->rbd_dev;
2519         if (obj_end > rbd_dev->parent_overlap) {
2520                 u64 xferred = 0;
2521
2522                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2523                         xferred = rbd_dev->parent_overlap -
2524                                         obj_request->img_offset;
2525
2526                 obj_request->xferred = min(img_request->xferred, xferred);
2527         } else {
2528                 obj_request->xferred = img_request->xferred;
2529         }
2530 out:
2531         rbd_img_obj_request_read_callback(obj_request);
2532         rbd_obj_request_complete(obj_request);
2533 }
2534
2535 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2536 {
2537         struct rbd_device *rbd_dev;
2538         struct rbd_img_request *img_request;
2539         int result;
2540
2541         rbd_assert(obj_request_img_data_test(obj_request));
2542         rbd_assert(obj_request->img_request != NULL);
2543         rbd_assert(obj_request->result == (s32) -ENOENT);
2544         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2545
2546         rbd_dev = obj_request->img_request->rbd_dev;
2547         rbd_assert(rbd_dev->parent != NULL);
2548         /* rbd_read_finish(obj_request, obj_request->length); */
2549         img_request = rbd_img_request_create(rbd_dev->parent,
2550                                                 obj_request->img_offset,
2551                                                 obj_request->length,
2552                                                 false, true);
2553         result = -ENOMEM;
2554         if (!img_request)
2555                 goto out_err;
2556
2557         rbd_obj_request_get(obj_request);
2558         img_request->obj_request = obj_request;
2559
2560         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2561                                         obj_request->bio_list);
2562         if (result)
2563                 goto out_err;
2564
2565         img_request->callback = rbd_img_parent_read_callback;
2566         result = rbd_img_request_submit(img_request);
2567         if (result)
2568                 goto out_err;
2569
2570         return;
2571 out_err:
2572         if (img_request)
2573                 rbd_img_request_put(img_request);
2574         obj_request->result = result;
2575         obj_request->xferred = 0;
2576         obj_request_done_set(obj_request);
2577 }
2578
2579 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2580 {
2581         struct rbd_obj_request *obj_request;
2582         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2583         int ret;
2584
2585         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2586                                                         OBJ_REQUEST_NODATA);
2587         if (!obj_request)
2588                 return -ENOMEM;
2589
2590         ret = -ENOMEM;
2591         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2592         if (!obj_request->osd_req)
2593                 goto out;
2594         obj_request->callback = rbd_obj_request_put;
2595
2596         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2597                                         notify_id, 0, 0);
2598         rbd_osd_req_format_read(obj_request);
2599
2600         ret = rbd_obj_request_submit(osdc, obj_request);
2601 out:
2602         if (ret)
2603                 rbd_obj_request_put(obj_request);
2604
2605         return ret;
2606 }
2607
2608 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2609 {
2610         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2611
2612         if (!rbd_dev)
2613                 return;
2614
2615         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2616                 rbd_dev->header_name, (unsigned long long)notify_id,
2617                 (unsigned int)opcode);
2618         (void)rbd_dev_refresh(rbd_dev);
2619
2620         rbd_obj_notify_ack(rbd_dev, notify_id);
2621 }
2622
2623 /*
2624  * Request sync osd watch/unwatch.  The value of "start" determines
2625  * whether a watch request is being initiated or torn down.
2626  */
2627 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2628 {
2629         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2630         struct rbd_obj_request *obj_request;
2631         int ret;
2632
2633         rbd_assert(start ^ !!rbd_dev->watch_event);
2634         rbd_assert(start ^ !!rbd_dev->watch_request);
2635
2636         if (start) {
2637                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2638                                                 &rbd_dev->watch_event);
2639                 if (ret < 0)
2640                         return ret;
2641                 rbd_assert(rbd_dev->watch_event != NULL);
2642         }
2643
2644         ret = -ENOMEM;
2645         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2646                                                         OBJ_REQUEST_NODATA);
2647         if (!obj_request)
2648                 goto out_cancel;
2649
2650         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2651         if (!obj_request->osd_req)
2652                 goto out_cancel;
2653
2654         if (start)
2655                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2656         else
2657                 ceph_osdc_unregister_linger_request(osdc,
2658                                         rbd_dev->watch_request->osd_req);
2659
2660         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2661                                 rbd_dev->watch_event->cookie, 0, start);
2662         rbd_osd_req_format_write(obj_request);
2663
2664         ret = rbd_obj_request_submit(osdc, obj_request);
2665         if (ret)
2666                 goto out_cancel;
2667         ret = rbd_obj_request_wait(obj_request);
2668         if (ret)
2669                 goto out_cancel;
2670         ret = obj_request->result;
2671         if (ret)
2672                 goto out_cancel;
2673
2674         /*
2675          * A watch request is set to linger, so the underlying osd
2676          * request won't go away until we unregister it.  We retain
2677          * a pointer to the object request during that time (in
2678          * rbd_dev->watch_request), so we'll keep a reference to
2679          * it.  We'll drop that reference (below) after we've
2680          * unregistered it.
2681          */
2682         if (start) {
2683                 rbd_dev->watch_request = obj_request;
2684
2685                 return 0;
2686         }
2687
2688         /* We have successfully torn down the watch request */
2689
2690         rbd_obj_request_put(rbd_dev->watch_request);
2691         rbd_dev->watch_request = NULL;
2692 out_cancel:
2693         /* Cancel the event if we're tearing down, or on error */
2694         ceph_osdc_cancel_event(rbd_dev->watch_event);
2695         rbd_dev->watch_event = NULL;
2696         if (obj_request)
2697                 rbd_obj_request_put(obj_request);
2698
2699         return ret;
2700 }
2701
2702 /*
2703  * Synchronous osd object method call.  Returns the number of bytes
2704  * returned in the outbound buffer, or a negative error code.
2705  */
2706 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2707                              const char *object_name,
2708                              const char *class_name,
2709                              const char *method_name,
2710                              const void *outbound,
2711                              size_t outbound_size,
2712                              void *inbound,
2713                              size_t inbound_size)
2714 {
2715         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2716         struct rbd_obj_request *obj_request;
2717         struct page **pages;
2718         u32 page_count;
2719         int ret;
2720
2721         /*
2722          * Method calls are ultimately read operations.  The result
2723          * should placed into the inbound buffer provided.  They
2724          * also supply outbound data--parameters for the object
2725          * method.  Currently if this is present it will be a
2726          * snapshot id.
2727          */
2728         page_count = (u32)calc_pages_for(0, inbound_size);
2729         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2730         if (IS_ERR(pages))
2731                 return PTR_ERR(pages);
2732
2733         ret = -ENOMEM;
2734         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2735                                                         OBJ_REQUEST_PAGES);
2736         if (!obj_request)
2737                 goto out;
2738
2739         obj_request->pages = pages;
2740         obj_request->page_count = page_count;
2741
2742         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2743         if (!obj_request->osd_req)
2744                 goto out;
2745
2746         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2747                                         class_name, method_name);
2748         if (outbound_size) {
2749                 struct ceph_pagelist *pagelist;
2750
2751                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2752                 if (!pagelist)
2753                         goto out;
2754
2755                 ceph_pagelist_init(pagelist);
2756                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2757                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2758                                                 pagelist);
2759         }
2760         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2761                                         obj_request->pages, inbound_size,
2762                                         0, false, false);
2763         rbd_osd_req_format_read(obj_request);
2764
2765         ret = rbd_obj_request_submit(osdc, obj_request);
2766         if (ret)
2767                 goto out;
2768         ret = rbd_obj_request_wait(obj_request);
2769         if (ret)
2770                 goto out;
2771
2772         ret = obj_request->result;
2773         if (ret < 0)
2774                 goto out;
2775
2776         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2777         ret = (int)obj_request->xferred;
2778         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2779 out:
2780         if (obj_request)
2781                 rbd_obj_request_put(obj_request);
2782         else
2783                 ceph_release_page_vector(pages, page_count);
2784
2785         return ret;
2786 }
2787
2788 static void rbd_request_fn(struct request_queue *q)
2789                 __releases(q->queue_lock) __acquires(q->queue_lock)
2790 {
2791         struct rbd_device *rbd_dev = q->queuedata;
2792         bool read_only = rbd_dev->mapping.read_only;
2793         struct request *rq;
2794         int result;
2795
2796         while ((rq = blk_fetch_request(q))) {
2797                 bool write_request = rq_data_dir(rq) == WRITE;
2798                 struct rbd_img_request *img_request;
2799                 u64 offset;
2800                 u64 length;
2801
2802                 /* Ignore any non-FS requests that filter through. */
2803
2804                 if (rq->cmd_type != REQ_TYPE_FS) {
2805                         dout("%s: non-fs request type %d\n", __func__,
2806                                 (int) rq->cmd_type);
2807                         __blk_end_request_all(rq, 0);
2808                         continue;
2809                 }
2810
2811                 /* Ignore/skip any zero-length requests */
2812
2813                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2814                 length = (u64) blk_rq_bytes(rq);
2815
2816                 if (!length) {
2817                         dout("%s: zero-length request\n", __func__);
2818                         __blk_end_request_all(rq, 0);
2819                         continue;
2820                 }
2821
2822                 spin_unlock_irq(q->queue_lock);
2823
2824                 /* Disallow writes to a read-only device */
2825
2826                 if (write_request) {
2827                         result = -EROFS;
2828                         if (read_only)
2829                                 goto end_request;
2830                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2831                 }
2832
2833                 /*
2834                  * Quit early if the mapped snapshot no longer
2835                  * exists.  It's still possible the snapshot will
2836                  * have disappeared by the time our request arrives
2837                  * at the osd, but there's no sense in sending it if
2838                  * we already know.
2839                  */
2840                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2841                         dout("request for non-existent snapshot");
2842                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2843                         result = -ENXIO;
2844                         goto end_request;
2845                 }
2846
2847                 result = -EINVAL;
2848                 if (offset && length > U64_MAX - offset + 1) {
2849                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2850                                 offset, length);
2851                         goto end_request;       /* Shouldn't happen */
2852                 }
2853
2854                 result = -ENOMEM;
2855                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2856                                                         write_request, false);
2857                 if (!img_request)
2858                         goto end_request;
2859
2860                 img_request->rq = rq;
2861
2862                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2863                                                 rq->bio);
2864                 if (!result)
2865                         result = rbd_img_request_submit(img_request);
2866                 if (result)
2867                         rbd_img_request_put(img_request);
2868 end_request:
2869                 spin_lock_irq(q->queue_lock);
2870                 if (result < 0) {
2871                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2872                                 write_request ? "write" : "read",
2873                                 length, offset, result);
2874
2875                         __blk_end_request_all(rq, result);
2876                 }
2877         }
2878 }
2879
2880 /*
2881  * a queue callback. Makes sure that we don't create a bio that spans across
2882  * multiple osd objects. One exception would be with a single page bios,
2883  * which we handle later at bio_chain_clone_range()
2884  */
2885 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2886                           struct bio_vec *bvec)
2887 {
2888         struct rbd_device *rbd_dev = q->queuedata;
2889         sector_t sector_offset;
2890         sector_t sectors_per_obj;
2891         sector_t obj_sector_offset;
2892         int ret;
2893
2894         /*
2895          * Find how far into its rbd object the partition-relative
2896          * bio start sector is to offset relative to the enclosing
2897          * device.
2898          */
2899         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2900         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2901         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2902
2903         /*
2904          * Compute the number of bytes from that offset to the end
2905          * of the object.  Account for what's already used by the bio.
2906          */
2907         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2908         if (ret > bmd->bi_size)
2909                 ret -= bmd->bi_size;
2910         else
2911                 ret = 0;
2912
2913         /*
2914          * Don't send back more than was asked for.  And if the bio
2915          * was empty, let the whole thing through because:  "Note
2916          * that a block device *must* allow a single page to be
2917          * added to an empty bio."
2918          */
2919         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2920         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2921                 ret = (int) bvec->bv_len;
2922
2923         return ret;
2924 }
2925
2926 static void rbd_free_disk(struct rbd_device *rbd_dev)
2927 {
2928         struct gendisk *disk = rbd_dev->disk;
2929
2930         if (!disk)
2931                 return;
2932
2933         rbd_dev->disk = NULL;
2934         if (disk->flags & GENHD_FL_UP) {
2935                 del_gendisk(disk);
2936                 if (disk->queue)
2937                         blk_cleanup_queue(disk->queue);
2938         }
2939         put_disk(disk);
2940 }
2941
2942 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2943                                 const char *object_name,
2944                                 u64 offset, u64 length, void *buf)
2945
2946 {
2947         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2948         struct rbd_obj_request *obj_request;
2949         struct page **pages = NULL;
2950         u32 page_count;
2951         size_t size;
2952         int ret;
2953
2954         page_count = (u32) calc_pages_for(offset, length);
2955         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2956         if (IS_ERR(pages))
2957                 ret = PTR_ERR(pages);
2958
2959         ret = -ENOMEM;
2960         obj_request = rbd_obj_request_create(object_name, offset, length,
2961                                                         OBJ_REQUEST_PAGES);
2962         if (!obj_request)
2963                 goto out;
2964
2965         obj_request->pages = pages;
2966         obj_request->page_count = page_count;
2967
2968         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2969         if (!obj_request->osd_req)
2970                 goto out;
2971
2972         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2973                                         offset, length, 0, 0);
2974         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2975                                         obj_request->pages,
2976                                         obj_request->length,
2977                                         obj_request->offset & ~PAGE_MASK,
2978                                         false, false);
2979         rbd_osd_req_format_read(obj_request);
2980
2981         ret = rbd_obj_request_submit(osdc, obj_request);
2982         if (ret)
2983                 goto out;
2984         ret = rbd_obj_request_wait(obj_request);
2985         if (ret)
2986                 goto out;
2987
2988         ret = obj_request->result;
2989         if (ret < 0)
2990                 goto out;
2991
2992         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2993         size = (size_t) obj_request->xferred;
2994         ceph_copy_from_page_vector(pages, buf, 0, size);
2995         rbd_assert(size <= (size_t)INT_MAX);
2996         ret = (int)size;
2997 out:
2998         if (obj_request)
2999                 rbd_obj_request_put(obj_request);
3000         else
3001                 ceph_release_page_vector(pages, page_count);
3002
3003         return ret;
3004 }
3005
3006 /*
3007  * Read the complete header for the given rbd device.
3008  *
3009  * Returns a pointer to a dynamically-allocated buffer containing
3010  * the complete and validated header.  Caller can pass the address
3011  * of a variable that will be filled in with the version of the
3012  * header object at the time it was read.
3013  *
3014  * Returns a pointer-coded errno if a failure occurs.
3015  */
3016 static struct rbd_image_header_ondisk *
3017 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3018 {
3019         struct rbd_image_header_ondisk *ondisk = NULL;
3020         u32 snap_count = 0;
3021         u64 names_size = 0;
3022         u32 want_count;
3023         int ret;
3024
3025         /*
3026          * The complete header will include an array of its 64-bit
3027          * snapshot ids, followed by the names of those snapshots as
3028          * a contiguous block of NUL-terminated strings.  Note that
3029          * the number of snapshots could change by the time we read
3030          * it in, in which case we re-read it.
3031          */
3032         do {
3033                 size_t size;
3034
3035                 kfree(ondisk);
3036
3037                 size = sizeof (*ondisk);
3038                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3039                 size += names_size;
3040                 ondisk = kmalloc(size, GFP_KERNEL);
3041                 if (!ondisk)
3042                         return ERR_PTR(-ENOMEM);
3043
3044                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3045                                        0, size, ondisk);
3046                 if (ret < 0)
3047                         goto out_err;
3048                 if ((size_t)ret < size) {
3049                         ret = -ENXIO;
3050                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3051                                 size, ret);
3052                         goto out_err;
3053                 }
3054                 if (!rbd_dev_ondisk_valid(ondisk)) {
3055                         ret = -ENXIO;
3056                         rbd_warn(rbd_dev, "invalid header");
3057                         goto out_err;
3058                 }
3059
3060                 names_size = le64_to_cpu(ondisk->snap_names_len);
3061                 want_count = snap_count;
3062                 snap_count = le32_to_cpu(ondisk->snap_count);
3063         } while (snap_count != want_count);
3064
3065         return ondisk;
3066
3067 out_err:
3068         kfree(ondisk);
3069
3070         return ERR_PTR(ret);
3071 }
3072
3073 /*
3074  * reload the ondisk the header
3075  */
3076 static int rbd_read_header(struct rbd_device *rbd_dev,
3077                            struct rbd_image_header *header)
3078 {
3079         struct rbd_image_header_ondisk *ondisk;
3080         int ret;
3081
3082         ondisk = rbd_dev_v1_header_read(rbd_dev);
3083         if (IS_ERR(ondisk))
3084                 return PTR_ERR(ondisk);
3085         ret = rbd_header_from_disk(header, ondisk);
3086         kfree(ondisk);
3087
3088         return ret;
3089 }
3090
3091 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3092 {
3093         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3094                 return;
3095
3096         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3097                 sector_t size;
3098
3099                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3100                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3101                 dout("setting size to %llu sectors", (unsigned long long)size);
3102                 set_capacity(rbd_dev->disk, size);
3103         }
3104 }
3105
3106 /*
3107  * only read the first part of the ondisk header, without the snaps info
3108  */
3109 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3110 {
3111         int ret;
3112         struct rbd_image_header h;
3113
3114         ret = rbd_read_header(rbd_dev, &h);
3115         if (ret < 0)
3116                 return ret;
3117
3118         down_write(&rbd_dev->header_rwsem);
3119
3120         /* Update image size, and check for resize of mapped image */
3121         rbd_dev->header.image_size = h.image_size;
3122         rbd_update_mapping_size(rbd_dev);
3123
3124         /* rbd_dev->header.object_prefix shouldn't change */
3125         kfree(rbd_dev->header.snap_sizes);
3126         kfree(rbd_dev->header.snap_names);
3127         /* osd requests may still refer to snapc */
3128         ceph_put_snap_context(rbd_dev->header.snapc);
3129
3130         rbd_dev->header.image_size = h.image_size;
3131         rbd_dev->header.snapc = h.snapc;
3132         rbd_dev->header.snap_names = h.snap_names;
3133         rbd_dev->header.snap_sizes = h.snap_sizes;
3134         /* Free the extra copy of the object prefix */
3135         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3136                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3137         kfree(h.object_prefix);
3138
3139         up_write(&rbd_dev->header_rwsem);
3140
3141         return ret;
3142 }
3143
3144 /*
3145  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3146  * has disappeared from the (just updated) snapshot context.
3147  */
3148 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3149 {
3150         u64 snap_id;
3151
3152         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3153                 return;
3154
3155         snap_id = rbd_dev->spec->snap_id;
3156         if (snap_id == CEPH_NOSNAP)
3157                 return;
3158
3159         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3160                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3161 }
3162
3163 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3164 {
3165         u64 image_size;
3166         int ret;
3167
3168         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3169         image_size = rbd_dev->header.image_size;
3170         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3171         if (rbd_dev->image_format == 1)
3172                 ret = rbd_dev_v1_refresh(rbd_dev);
3173         else
3174                 ret = rbd_dev_v2_refresh(rbd_dev);
3175
3176         /* If it's a mapped snapshot, validate its EXISTS flag */
3177
3178         rbd_exists_validate(rbd_dev);
3179         mutex_unlock(&ctl_mutex);
3180         if (ret)
3181                 rbd_warn(rbd_dev, "got notification but failed to "
3182                            " update snaps: %d\n", ret);
3183         if (image_size != rbd_dev->header.image_size)
3184                 revalidate_disk(rbd_dev->disk);
3185
3186         return ret;
3187 }
3188
3189 static int rbd_init_disk(struct rbd_device *rbd_dev)
3190 {
3191         struct gendisk *disk;
3192         struct request_queue *q;
3193         u64 segment_size;
3194
3195         /* create gendisk info */
3196         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3197         if (!disk)
3198                 return -ENOMEM;
3199
3200         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3201                  rbd_dev->dev_id);
3202         disk->major = rbd_dev->major;
3203         disk->first_minor = 0;
3204         disk->fops = &rbd_bd_ops;
3205         disk->private_data = rbd_dev;
3206
3207         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3208         if (!q)
3209                 goto out_disk;
3210
3211         /* We use the default size, but let's be explicit about it. */
3212         blk_queue_physical_block_size(q, SECTOR_SIZE);
3213
3214         /* set io sizes to object size */
3215         segment_size = rbd_obj_bytes(&rbd_dev->header);
3216         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3217         blk_queue_max_segment_size(q, segment_size);
3218         blk_queue_io_min(q, segment_size);
3219         blk_queue_io_opt(q, segment_size);
3220
3221         blk_queue_merge_bvec(q, rbd_merge_bvec);
3222         disk->queue = q;
3223
3224         q->queuedata = rbd_dev;
3225
3226         rbd_dev->disk = disk;
3227
3228         return 0;
3229 out_disk:
3230         put_disk(disk);
3231
3232         return -ENOMEM;
3233 }
3234
3235 /*
3236   sysfs
3237 */
3238
3239 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3240 {
3241         return container_of(dev, struct rbd_device, dev);
3242 }
3243
3244 static ssize_t rbd_size_show(struct device *dev,
3245                              struct device_attribute *attr, char *buf)
3246 {
3247         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3248
3249         return sprintf(buf, "%llu\n",
3250                 (unsigned long long)rbd_dev->mapping.size);
3251 }
3252
3253 /*
3254  * Note this shows the features for whatever's mapped, which is not
3255  * necessarily the base image.
3256  */
3257 static ssize_t rbd_features_show(struct device *dev,
3258                              struct device_attribute *attr, char *buf)
3259 {
3260         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3261
3262         return sprintf(buf, "0x%016llx\n",
3263                         (unsigned long long)rbd_dev->mapping.features);
3264 }
3265
3266 static ssize_t rbd_major_show(struct device *dev,
3267                               struct device_attribute *attr, char *buf)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270
3271         if (rbd_dev->major)
3272                 return sprintf(buf, "%d\n", rbd_dev->major);
3273
3274         return sprintf(buf, "(none)\n");
3275
3276 }
3277
3278 static ssize_t rbd_client_id_show(struct device *dev,
3279                                   struct device_attribute *attr, char *buf)
3280 {
3281         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3282
3283         return sprintf(buf, "client%lld\n",
3284                         ceph_client_id(rbd_dev->rbd_client->client));
3285 }
3286
3287 static ssize_t rbd_pool_show(struct device *dev,
3288                              struct device_attribute *attr, char *buf)
3289 {
3290         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291
3292         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3293 }
3294
3295 static ssize_t rbd_pool_id_show(struct device *dev,
3296                              struct device_attribute *attr, char *buf)
3297 {
3298         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3299
3300         return sprintf(buf, "%llu\n",
3301                         (unsigned long long) rbd_dev->spec->pool_id);
3302 }
3303
3304 static ssize_t rbd_name_show(struct device *dev,
3305                              struct device_attribute *attr, char *buf)
3306 {
3307         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3308
3309         if (rbd_dev->spec->image_name)
3310                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3311
3312         return sprintf(buf, "(unknown)\n");
3313 }
3314
3315 static ssize_t rbd_image_id_show(struct device *dev,
3316                              struct device_attribute *attr, char *buf)
3317 {
3318         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3319
3320         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3321 }
3322
3323 /*
3324  * Shows the name of the currently-mapped snapshot (or
3325  * RBD_SNAP_HEAD_NAME for the base image).
3326  */
3327 static ssize_t rbd_snap_show(struct device *dev,
3328                              struct device_attribute *attr,
3329                              char *buf)
3330 {
3331         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3332
3333         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3334 }
3335
3336 /*
3337  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3338  * for the parent image.  If there is no parent, simply shows
3339  * "(no parent image)".
3340  */
3341 static ssize_t rbd_parent_show(struct device *dev,
3342                              struct device_attribute *attr,
3343                              char *buf)
3344 {
3345         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3346         struct rbd_spec *spec = rbd_dev->parent_spec;
3347         int count;
3348         char *bufp = buf;
3349
3350         if (!spec)
3351                 return sprintf(buf, "(no parent image)\n");
3352
3353         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3354                         (unsigned long long) spec->pool_id, spec->pool_name);
3355         if (count < 0)
3356                 return count;
3357         bufp += count;
3358
3359         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3360                         spec->image_name ? spec->image_name : "(unknown)");
3361         if (count < 0)
3362                 return count;
3363         bufp += count;
3364
3365         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3366                         (unsigned long long) spec->snap_id, spec->snap_name);
3367         if (count < 0)
3368                 return count;
3369         bufp += count;
3370
3371         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3372         if (count < 0)
3373                 return count;
3374         bufp += count;
3375
3376         return (ssize_t) (bufp - buf);
3377 }
3378
3379 static ssize_t rbd_image_refresh(struct device *dev,
3380                                  struct device_attribute *attr,
3381                                  const char *buf,
3382                                  size_t size)
3383 {
3384         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3385         int ret;
3386
3387         ret = rbd_dev_refresh(rbd_dev);
3388
3389         return ret < 0 ? ret : size;
3390 }
3391
3392 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3393 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3394 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3395 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3396 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3397 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3398 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3399 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3400 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3401 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3402 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3403
3404 static struct attribute *rbd_attrs[] = {
3405         &dev_attr_size.attr,
3406         &dev_attr_features.attr,
3407         &dev_attr_major.attr,
3408         &dev_attr_client_id.attr,
3409         &dev_attr_pool.attr,
3410         &dev_attr_pool_id.attr,
3411         &dev_attr_name.attr,
3412         &dev_attr_image_id.attr,
3413         &dev_attr_current_snap.attr,
3414         &dev_attr_parent.attr,
3415         &dev_attr_refresh.attr,
3416         NULL
3417 };
3418
3419 static struct attribute_group rbd_attr_group = {
3420         .attrs = rbd_attrs,
3421 };
3422
3423 static const struct attribute_group *rbd_attr_groups[] = {
3424         &rbd_attr_group,
3425         NULL
3426 };
3427
3428 static void rbd_sysfs_dev_release(struct device *dev)
3429 {
3430 }
3431
3432 static struct device_type rbd_device_type = {
3433         .name           = "rbd",
3434         .groups         = rbd_attr_groups,
3435         .release        = rbd_sysfs_dev_release,
3436 };
3437
3438 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3439 {
3440         kref_get(&spec->kref);
3441
3442         return spec;
3443 }
3444
3445 static void rbd_spec_free(struct kref *kref);
3446 static void rbd_spec_put(struct rbd_spec *spec)
3447 {
3448         if (spec)
3449                 kref_put(&spec->kref, rbd_spec_free);
3450 }
3451
3452 static struct rbd_spec *rbd_spec_alloc(void)
3453 {
3454         struct rbd_spec *spec;
3455
3456         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3457         if (!spec)
3458                 return NULL;
3459         kref_init(&spec->kref);
3460
3461         return spec;
3462 }
3463
3464 static void rbd_spec_free(struct kref *kref)
3465 {
3466         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3467
3468         kfree(spec->pool_name);
3469         kfree(spec->image_id);
3470         kfree(spec->image_name);
3471         kfree(spec->snap_name);
3472         kfree(spec);
3473 }
3474
3475 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3476                                 struct rbd_spec *spec)
3477 {
3478         struct rbd_device *rbd_dev;
3479
3480         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3481         if (!rbd_dev)
3482                 return NULL;
3483
3484         spin_lock_init(&rbd_dev->lock);
3485         rbd_dev->flags = 0;
3486         INIT_LIST_HEAD(&rbd_dev->node);
3487         init_rwsem(&rbd_dev->header_rwsem);
3488
3489         rbd_dev->spec = spec;
3490         rbd_dev->rbd_client = rbdc;
3491
3492         /* Initialize the layout used for all rbd requests */
3493
3494         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3495         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3496         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3497         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3498
3499         return rbd_dev;
3500 }
3501
3502 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3503 {
3504         rbd_put_client(rbd_dev->rbd_client);
3505         rbd_spec_put(rbd_dev->spec);
3506         kfree(rbd_dev);
3507 }
3508
3509 /*
3510  * Get the size and object order for an image snapshot, or if
3511  * snap_id is CEPH_NOSNAP, gets this information for the base
3512  * image.
3513  */
3514 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3515                                 u8 *order, u64 *snap_size)
3516 {
3517         __le64 snapid = cpu_to_le64(snap_id);
3518         int ret;
3519         struct {
3520                 u8 order;
3521                 __le64 size;
3522         } __attribute__ ((packed)) size_buf = { 0 };
3523
3524         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3525                                 "rbd", "get_size",
3526                                 &snapid, sizeof (snapid),
3527                                 &size_buf, sizeof (size_buf));
3528         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3529         if (ret < 0)
3530                 return ret;
3531         if (ret < sizeof (size_buf))
3532                 return -ERANGE;
3533
3534         if (order)
3535                 *order = size_buf.order;
3536         *snap_size = le64_to_cpu(size_buf.size);
3537
3538         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3539                 (unsigned long long)snap_id, (unsigned int)*order,
3540                 (unsigned long long)*snap_size);
3541
3542         return 0;
3543 }
3544
3545 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3546 {
3547         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3548                                         &rbd_dev->header.obj_order,
3549                                         &rbd_dev->header.image_size);
3550 }
3551
3552 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3553 {
3554         void *reply_buf;
3555         int ret;
3556         void *p;
3557
3558         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3559         if (!reply_buf)
3560                 return -ENOMEM;
3561
3562         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3563                                 "rbd", "get_object_prefix", NULL, 0,
3564                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3565         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3566         if (ret < 0)
3567                 goto out;
3568
3569         p = reply_buf;
3570         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3571                                                 p + ret, NULL, GFP_NOIO);
3572         ret = 0;
3573
3574         if (IS_ERR(rbd_dev->header.object_prefix)) {
3575                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3576                 rbd_dev->header.object_prefix = NULL;
3577         } else {
3578                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3579         }
3580 out:
3581         kfree(reply_buf);
3582
3583         return ret;
3584 }
3585
3586 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3587                 u64 *snap_features)
3588 {
3589         __le64 snapid = cpu_to_le64(snap_id);
3590         struct {
3591                 __le64 features;
3592                 __le64 incompat;
3593         } __attribute__ ((packed)) features_buf = { 0 };
3594         u64 incompat;
3595         int ret;
3596
3597         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3598                                 "rbd", "get_features",
3599                                 &snapid, sizeof (snapid),
3600                                 &features_buf, sizeof (features_buf));
3601         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3602         if (ret < 0)
3603                 return ret;
3604         if (ret < sizeof (features_buf))
3605                 return -ERANGE;
3606
3607         incompat = le64_to_cpu(features_buf.incompat);
3608         if (incompat & ~RBD_FEATURES_SUPPORTED)
3609                 return -ENXIO;
3610
3611         *snap_features = le64_to_cpu(features_buf.features);
3612
3613         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3614                 (unsigned long long)snap_id,
3615                 (unsigned long long)*snap_features,
3616                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3617
3618         return 0;
3619 }
3620
3621 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3622 {
3623         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3624                                                 &rbd_dev->header.features);
3625 }
3626
3627 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3628 {
3629         struct rbd_spec *parent_spec;
3630         size_t size;
3631         void *reply_buf = NULL;
3632         __le64 snapid;
3633         void *p;
3634         void *end;
3635         char *image_id;
3636         u64 overlap;
3637         int ret;
3638
3639         parent_spec = rbd_spec_alloc();
3640         if (!parent_spec)
3641                 return -ENOMEM;
3642
3643         size = sizeof (__le64) +                                /* pool_id */
3644                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3645                 sizeof (__le64) +                               /* snap_id */
3646                 sizeof (__le64);                                /* overlap */
3647         reply_buf = kmalloc(size, GFP_KERNEL);
3648         if (!reply_buf) {
3649                 ret = -ENOMEM;
3650                 goto out_err;
3651         }
3652
3653         snapid = cpu_to_le64(CEPH_NOSNAP);
3654         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3655                                 "rbd", "get_parent",
3656                                 &snapid, sizeof (snapid),
3657                                 reply_buf, size);
3658         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3659         if (ret < 0)
3660                 goto out_err;
3661
3662         p = reply_buf;
3663         end = reply_buf + ret;
3664         ret = -ERANGE;
3665         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3666         if (parent_spec->pool_id == CEPH_NOPOOL)
3667                 goto out;       /* No parent?  No problem. */
3668
3669         /* The ceph file layout needs to fit pool id in 32 bits */
3670
3671         ret = -EIO;
3672         if (parent_spec->pool_id > (u64)U32_MAX) {
3673                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3674                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3675                 goto out_err;
3676         }
3677
3678         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3679         if (IS_ERR(image_id)) {
3680                 ret = PTR_ERR(image_id);
3681                 goto out_err;
3682         }
3683         parent_spec->image_id = image_id;
3684         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3685         ceph_decode_64_safe(&p, end, overlap, out_err);
3686
3687         rbd_dev->parent_overlap = overlap;
3688         rbd_dev->parent_spec = parent_spec;
3689         parent_spec = NULL;     /* rbd_dev now owns this */
3690 out:
3691         ret = 0;
3692 out_err:
3693         kfree(reply_buf);
3694         rbd_spec_put(parent_spec);
3695
3696         return ret;
3697 }
3698
3699 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3700 {
3701         struct {
3702                 __le64 stripe_unit;
3703                 __le64 stripe_count;
3704         } __attribute__ ((packed)) striping_info_buf = { 0 };
3705         size_t size = sizeof (striping_info_buf);
3706         void *p;
3707         u64 obj_size;
3708         u64 stripe_unit;
3709         u64 stripe_count;
3710         int ret;
3711
3712         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3713                                 "rbd", "get_stripe_unit_count", NULL, 0,
3714                                 (char *)&striping_info_buf, size);
3715         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3716         if (ret < 0)
3717                 return ret;
3718         if (ret < size)
3719                 return -ERANGE;
3720
3721         /*
3722          * We don't actually support the "fancy striping" feature
3723          * (STRIPINGV2) yet, but if the striping sizes are the
3724          * defaults the behavior is the same as before.  So find
3725          * out, and only fail if the image has non-default values.
3726          */
3727         ret = -EINVAL;
3728         obj_size = (u64)1 << rbd_dev->header.obj_order;
3729         p = &striping_info_buf;
3730         stripe_unit = ceph_decode_64(&p);
3731         if (stripe_unit != obj_size) {
3732                 rbd_warn(rbd_dev, "unsupported stripe unit "
3733                                 "(got %llu want %llu)",
3734                                 stripe_unit, obj_size);
3735                 return -EINVAL;
3736         }
3737         stripe_count = ceph_decode_64(&p);
3738         if (stripe_count != 1) {
3739                 rbd_warn(rbd_dev, "unsupported stripe count "
3740                                 "(got %llu want 1)", stripe_count);
3741                 return -EINVAL;
3742         }
3743         rbd_dev->header.stripe_unit = stripe_unit;
3744         rbd_dev->header.stripe_count = stripe_count;
3745
3746         return 0;
3747 }
3748
3749 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3750 {
3751         size_t image_id_size;
3752         char *image_id;
3753         void *p;
3754         void *end;
3755         size_t size;
3756         void *reply_buf = NULL;
3757         size_t len = 0;
3758         char *image_name = NULL;
3759         int ret;
3760
3761         rbd_assert(!rbd_dev->spec->image_name);
3762
3763         len = strlen(rbd_dev->spec->image_id);
3764         image_id_size = sizeof (__le32) + len;
3765         image_id = kmalloc(image_id_size, GFP_KERNEL);
3766         if (!image_id)
3767                 return NULL;
3768
3769         p = image_id;
3770         end = image_id + image_id_size;
3771         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3772
3773         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3774         reply_buf = kmalloc(size, GFP_KERNEL);
3775         if (!reply_buf)
3776                 goto out;
3777
3778         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3779                                 "rbd", "dir_get_name",
3780                                 image_id, image_id_size,
3781                                 reply_buf, size);
3782         if (ret < 0)
3783                 goto out;
3784         p = reply_buf;
3785         end = reply_buf + ret;
3786
3787         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3788         if (IS_ERR(image_name))
3789                 image_name = NULL;
3790         else
3791                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3792 out:
3793         kfree(reply_buf);
3794         kfree(image_id);
3795
3796         return image_name;
3797 }
3798
3799 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3800 {
3801         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3802         const char *snap_name;
3803         u32 which = 0;
3804
3805         /* Skip over names until we find the one we are looking for */
3806
3807         snap_name = rbd_dev->header.snap_names;
3808         while (which < snapc->num_snaps) {
3809                 if (!strcmp(name, snap_name))
3810                         return snapc->snaps[which];
3811                 snap_name += strlen(snap_name) + 1;
3812                 which++;
3813         }
3814         return CEPH_NOSNAP;
3815 }
3816
3817 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3818 {
3819         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3820         u32 which;
3821         bool found = false;
3822         u64 snap_id;
3823
3824         for (which = 0; !found && which < snapc->num_snaps; which++) {
3825                 const char *snap_name;
3826
3827                 snap_id = snapc->snaps[which];
3828                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3829                 if (IS_ERR(snap_name))
3830                         break;
3831                 found = !strcmp(name, snap_name);
3832                 kfree(snap_name);
3833         }
3834         return found ? snap_id : CEPH_NOSNAP;
3835 }
3836
3837 /*
3838  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3839  * no snapshot by that name is found, or if an error occurs.
3840  */
3841 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3842 {
3843         if (rbd_dev->image_format == 1)
3844                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3845
3846         return rbd_v2_snap_id_by_name(rbd_dev, name);
3847 }
3848
3849 /*
3850  * When an rbd image has a parent image, it is identified by the
3851  * pool, image, and snapshot ids (not names).  This function fills
3852  * in the names for those ids.  (It's OK if we can't figure out the
3853  * name for an image id, but the pool and snapshot ids should always
3854  * exist and have names.)  All names in an rbd spec are dynamically
3855  * allocated.
3856  *
3857  * When an image being mapped (not a parent) is probed, we have the
3858  * pool name and pool id, image name and image id, and the snapshot
3859  * name.  The only thing we're missing is the snapshot id.
3860  */
3861 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3862 {
3863         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3864         struct rbd_spec *spec = rbd_dev->spec;
3865         const char *pool_name;
3866         const char *image_name;
3867         const char *snap_name;
3868         int ret;
3869
3870         /*
3871          * An image being mapped will have the pool name (etc.), but
3872          * we need to look up the snapshot id.
3873          */
3874         if (spec->pool_name) {
3875                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3876                         u64 snap_id;
3877
3878                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3879                         if (snap_id == CEPH_NOSNAP)
3880                                 return -ENOENT;
3881                         spec->snap_id = snap_id;
3882                 } else {
3883                         spec->snap_id = CEPH_NOSNAP;
3884                 }
3885
3886                 return 0;
3887         }
3888
3889         /* Get the pool name; we have to make our own copy of this */
3890
3891         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3892         if (!pool_name) {
3893                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3894                 return -EIO;
3895         }
3896         pool_name = kstrdup(pool_name, GFP_KERNEL);
3897         if (!pool_name)
3898                 return -ENOMEM;
3899
3900         /* Fetch the image name; tolerate failure here */
3901
3902         image_name = rbd_dev_image_name(rbd_dev);
3903         if (!image_name)
3904                 rbd_warn(rbd_dev, "unable to get image name");
3905
3906         /* Look up the snapshot name, and make a copy */
3907
3908         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3909         if (!snap_name) {
3910                 ret = -ENOMEM;
3911                 goto out_err;
3912         }
3913
3914         spec->pool_name = pool_name;
3915         spec->image_name = image_name;
3916         spec->snap_name = snap_name;
3917
3918         return 0;
3919 out_err:
3920         kfree(image_name);
3921         kfree(pool_name);
3922
3923         return ret;
3924 }
3925
3926 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3927 {
3928         size_t size;
3929         int ret;
3930         void *reply_buf;
3931         void *p;
3932         void *end;
3933         u64 seq;
3934         u32 snap_count;
3935         struct ceph_snap_context *snapc;
3936         u32 i;
3937
3938         /*
3939          * We'll need room for the seq value (maximum snapshot id),
3940          * snapshot count, and array of that many snapshot ids.
3941          * For now we have a fixed upper limit on the number we're
3942          * prepared to receive.
3943          */
3944         size = sizeof (__le64) + sizeof (__le32) +
3945                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3946         reply_buf = kzalloc(size, GFP_KERNEL);
3947         if (!reply_buf)
3948                 return -ENOMEM;
3949
3950         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3951                                 "rbd", "get_snapcontext", NULL, 0,
3952                                 reply_buf, size);
3953         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3954         if (ret < 0)
3955                 goto out;
3956
3957         p = reply_buf;
3958         end = reply_buf + ret;
3959         ret = -ERANGE;
3960         ceph_decode_64_safe(&p, end, seq, out);
3961         ceph_decode_32_safe(&p, end, snap_count, out);
3962
3963         /*
3964          * Make sure the reported number of snapshot ids wouldn't go
3965          * beyond the end of our buffer.  But before checking that,
3966          * make sure the computed size of the snapshot context we
3967          * allocate is representable in a size_t.
3968          */
3969         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3970                                  / sizeof (u64)) {
3971                 ret = -EINVAL;
3972                 goto out;
3973         }
3974         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3975                 goto out;
3976         ret = 0;
3977
3978         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3979         if (!snapc) {
3980                 ret = -ENOMEM;
3981                 goto out;
3982         }
3983         snapc->seq = seq;
3984         for (i = 0; i < snap_count; i++)
3985                 snapc->snaps[i] = ceph_decode_64(&p);
3986
3987         rbd_dev->header.snapc = snapc;
3988
3989         dout("  snap context seq = %llu, snap_count = %u\n",
3990                 (unsigned long long)seq, (unsigned int)snap_count);
3991 out:
3992         kfree(reply_buf);
3993
3994         return ret;
3995 }
3996
3997 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3998                                         u64 snap_id)
3999 {
4000         size_t size;
4001         void *reply_buf;
4002         __le64 snapid;
4003         int ret;
4004         void *p;
4005         void *end;
4006         char *snap_name;
4007
4008         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4009         reply_buf = kmalloc(size, GFP_KERNEL);
4010         if (!reply_buf)
4011                 return ERR_PTR(-ENOMEM);
4012
4013         snapid = cpu_to_le64(snap_id);
4014         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4015                                 "rbd", "get_snapshot_name",
4016                                 &snapid, sizeof (snapid),
4017                                 reply_buf, size);
4018         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4019         if (ret < 0) {
4020                 snap_name = ERR_PTR(ret);
4021                 goto out;
4022         }
4023
4024         p = reply_buf;
4025         end = reply_buf + ret;
4026         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4027         if (IS_ERR(snap_name))
4028                 goto out;
4029
4030         dout("  snap_id 0x%016llx snap_name = %s\n",
4031                 (unsigned long long)snap_id, snap_name);
4032 out:
4033         kfree(reply_buf);
4034
4035         return snap_name;
4036 }
4037
4038 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4039 {
4040         int ret;
4041
4042         down_write(&rbd_dev->header_rwsem);
4043
4044         ret = rbd_dev_v2_image_size(rbd_dev);
4045         if (ret)
4046                 goto out;
4047         rbd_update_mapping_size(rbd_dev);
4048
4049         ret = rbd_dev_v2_snap_context(rbd_dev);
4050         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4051         if (ret)
4052                 goto out;
4053 out:
4054         up_write(&rbd_dev->header_rwsem);
4055
4056         return ret;
4057 }
4058
4059 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4060 {
4061         struct device *dev;
4062         int ret;
4063
4064         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4065
4066         dev = &rbd_dev->dev;
4067         dev->bus = &rbd_bus_type;
4068         dev->type = &rbd_device_type;
4069         dev->parent = &rbd_root_dev;
4070         dev->release = rbd_dev_device_release;
4071         dev_set_name(dev, "%d", rbd_dev->dev_id);
4072         ret = device_register(dev);
4073
4074         mutex_unlock(&ctl_mutex);
4075
4076         return ret;
4077 }
4078
4079 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4080 {
4081         device_unregister(&rbd_dev->dev);
4082 }
4083
4084 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4085
4086 /*
4087  * Get a unique rbd identifier for the given new rbd_dev, and add
4088  * the rbd_dev to the global list.  The minimum rbd id is 1.
4089  */
4090 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4091 {
4092         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4093
4094         spin_lock(&rbd_dev_list_lock);
4095         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4096         spin_unlock(&rbd_dev_list_lock);
4097         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4098                 (unsigned long long) rbd_dev->dev_id);
4099 }
4100
4101 /*
4102  * Remove an rbd_dev from the global list, and record that its
4103  * identifier is no longer in use.
4104  */
4105 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4106 {
4107         struct list_head *tmp;
4108         int rbd_id = rbd_dev->dev_id;
4109         int max_id;
4110
4111         rbd_assert(rbd_id > 0);
4112
4113         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4114                 (unsigned long long) rbd_dev->dev_id);
4115         spin_lock(&rbd_dev_list_lock);
4116         list_del_init(&rbd_dev->node);
4117
4118         /*
4119          * If the id being "put" is not the current maximum, there
4120          * is nothing special we need to do.
4121          */
4122         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4123                 spin_unlock(&rbd_dev_list_lock);
4124                 return;
4125         }
4126
4127         /*
4128          * We need to update the current maximum id.  Search the
4129          * list to find out what it is.  We're more likely to find
4130          * the maximum at the end, so search the list backward.
4131          */
4132         max_id = 0;
4133         list_for_each_prev(tmp, &rbd_dev_list) {
4134                 struct rbd_device *rbd_dev;
4135
4136                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4137                 if (rbd_dev->dev_id > max_id)
4138                         max_id = rbd_dev->dev_id;
4139         }
4140         spin_unlock(&rbd_dev_list_lock);
4141
4142         /*
4143          * The max id could have been updated by rbd_dev_id_get(), in
4144          * which case it now accurately reflects the new maximum.
4145          * Be careful not to overwrite the maximum value in that
4146          * case.
4147          */
4148         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4149         dout("  max dev id has been reset\n");
4150 }
4151
4152 /*
4153  * Skips over white space at *buf, and updates *buf to point to the
4154  * first found non-space character (if any). Returns the length of
4155  * the token (string of non-white space characters) found.  Note
4156  * that *buf must be terminated with '\0'.
4157  */
4158 static inline size_t next_token(const char **buf)
4159 {
4160         /*
4161         * These are the characters that produce nonzero for
4162         * isspace() in the "C" and "POSIX" locales.
4163         */
4164         const char *spaces = " \f\n\r\t\v";
4165
4166         *buf += strspn(*buf, spaces);   /* Find start of token */
4167
4168         return strcspn(*buf, spaces);   /* Return token length */
4169 }
4170
4171 /*
4172  * Finds the next token in *buf, and if the provided token buffer is
4173  * big enough, copies the found token into it.  The result, if
4174  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4175  * must be terminated with '\0' on entry.
4176  *
4177  * Returns the length of the token found (not including the '\0').
4178  * Return value will be 0 if no token is found, and it will be >=
4179  * token_size if the token would not fit.
4180  *
4181  * The *buf pointer will be updated to point beyond the end of the
4182  * found token.  Note that this occurs even if the token buffer is
4183  * too small to hold it.
4184  */
4185 static inline size_t copy_token(const char **buf,
4186                                 char *token,
4187                                 size_t token_size)
4188 {
4189         size_t len;
4190
4191         len = next_token(buf);
4192         if (len < token_size) {
4193                 memcpy(token, *buf, len);
4194                 *(token + len) = '\0';
4195         }
4196         *buf += len;
4197
4198         return len;
4199 }
4200
4201 /*
4202  * Finds the next token in *buf, dynamically allocates a buffer big
4203  * enough to hold a copy of it, and copies the token into the new
4204  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4205  * that a duplicate buffer is created even for a zero-length token.
4206  *
4207  * Returns a pointer to the newly-allocated duplicate, or a null
4208  * pointer if memory for the duplicate was not available.  If
4209  * the lenp argument is a non-null pointer, the length of the token
4210  * (not including the '\0') is returned in *lenp.
4211  *
4212  * If successful, the *buf pointer will be updated to point beyond
4213  * the end of the found token.
4214  *
4215  * Note: uses GFP_KERNEL for allocation.
4216  */
4217 static inline char *dup_token(const char **buf, size_t *lenp)
4218 {
4219         char *dup;
4220         size_t len;
4221
4222         len = next_token(buf);
4223         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4224         if (!dup)
4225                 return NULL;
4226         *(dup + len) = '\0';
4227         *buf += len;
4228
4229         if (lenp)
4230                 *lenp = len;
4231
4232         return dup;
4233 }
4234
4235 /*
4236  * Parse the options provided for an "rbd add" (i.e., rbd image
4237  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4238  * and the data written is passed here via a NUL-terminated buffer.
4239  * Returns 0 if successful or an error code otherwise.
4240  *
4241  * The information extracted from these options is recorded in
4242  * the other parameters which return dynamically-allocated
4243  * structures:
4244  *  ceph_opts
4245  *      The address of a pointer that will refer to a ceph options
4246  *      structure.  Caller must release the returned pointer using
4247  *      ceph_destroy_options() when it is no longer needed.
4248  *  rbd_opts
4249  *      Address of an rbd options pointer.  Fully initialized by
4250  *      this function; caller must release with kfree().
4251  *  spec
4252  *      Address of an rbd image specification pointer.  Fully
4253  *      initialized by this function based on parsed options.
4254  *      Caller must release with rbd_spec_put().
4255  *
4256  * The options passed take this form:
4257  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4258  * where:
4259  *  <mon_addrs>
4260  *      A comma-separated list of one or more monitor addresses.
4261  *      A monitor address is an ip address, optionally followed
4262  *      by a port number (separated by a colon).
4263  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4264  *  <options>
4265  *      A comma-separated list of ceph and/or rbd options.
4266  *  <pool_name>
4267  *      The name of the rados pool containing the rbd image.
4268  *  <image_name>
4269  *      The name of the image in that pool to map.
4270  *  <snap_id>
4271  *      An optional snapshot id.  If provided, the mapping will
4272  *      present data from the image at the time that snapshot was
4273  *      created.  The image head is used if no snapshot id is
4274  *      provided.  Snapshot mappings are always read-only.
4275  */
4276 static int rbd_add_parse_args(const char *buf,
4277                                 struct ceph_options **ceph_opts,
4278                                 struct rbd_options **opts,
4279                                 struct rbd_spec **rbd_spec)
4280 {
4281         size_t len;
4282         char *options;
4283         const char *mon_addrs;
4284         char *snap_name;
4285         size_t mon_addrs_size;
4286         struct rbd_spec *spec = NULL;
4287         struct rbd_options *rbd_opts = NULL;
4288         struct ceph_options *copts;
4289         int ret;
4290
4291         /* The first four tokens are required */
4292
4293         len = next_token(&buf);
4294         if (!len) {
4295                 rbd_warn(NULL, "no monitor address(es) provided");
4296                 return -EINVAL;
4297         }
4298         mon_addrs = buf;
4299         mon_addrs_size = len + 1;
4300         buf += len;
4301
4302         ret = -EINVAL;
4303         options = dup_token(&buf, NULL);
4304         if (!options)
4305                 return -ENOMEM;
4306         if (!*options) {
4307                 rbd_warn(NULL, "no options provided");
4308                 goto out_err;
4309         }
4310
4311         spec = rbd_spec_alloc();
4312         if (!spec)
4313                 goto out_mem;
4314
4315         spec->pool_name = dup_token(&buf, NULL);
4316         if (!spec->pool_name)
4317                 goto out_mem;
4318         if (!*spec->pool_name) {
4319                 rbd_warn(NULL, "no pool name provided");
4320                 goto out_err;
4321         }
4322
4323         spec->image_name = dup_token(&buf, NULL);
4324         if (!spec->image_name)
4325                 goto out_mem;
4326         if (!*spec->image_name) {
4327                 rbd_warn(NULL, "no image name provided");
4328                 goto out_err;
4329         }
4330
4331         /*
4332          * Snapshot name is optional; default is to use "-"
4333          * (indicating the head/no snapshot).
4334          */
4335         len = next_token(&buf);
4336         if (!len) {
4337                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4338                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4339         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4340                 ret = -ENAMETOOLONG;
4341                 goto out_err;
4342         }
4343         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4344         if (!snap_name)
4345                 goto out_mem;
4346         *(snap_name + len) = '\0';
4347         spec->snap_name = snap_name;
4348
4349         /* Initialize all rbd options to the defaults */
4350
4351         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4352         if (!rbd_opts)
4353                 goto out_mem;
4354
4355         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4356
4357         copts = ceph_parse_options(options, mon_addrs,
4358                                         mon_addrs + mon_addrs_size - 1,
4359                                         parse_rbd_opts_token, rbd_opts);
4360         if (IS_ERR(copts)) {
4361                 ret = PTR_ERR(copts);
4362                 goto out_err;
4363         }
4364         kfree(options);
4365
4366         *ceph_opts = copts;
4367         *opts = rbd_opts;
4368         *rbd_spec = spec;
4369
4370         return 0;
4371 out_mem:
4372         ret = -ENOMEM;
4373 out_err:
4374         kfree(rbd_opts);
4375         rbd_spec_put(spec);
4376         kfree(options);
4377
4378         return ret;
4379 }
4380
4381 /*
4382  * An rbd format 2 image has a unique identifier, distinct from the
4383  * name given to it by the user.  Internally, that identifier is
4384  * what's used to specify the names of objects related to the image.
4385  *
4386  * A special "rbd id" object is used to map an rbd image name to its
4387  * id.  If that object doesn't exist, then there is no v2 rbd image
4388  * with the supplied name.
4389  *
4390  * This function will record the given rbd_dev's image_id field if
4391  * it can be determined, and in that case will return 0.  If any
4392  * errors occur a negative errno will be returned and the rbd_dev's
4393  * image_id field will be unchanged (and should be NULL).
4394  */
4395 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4396 {
4397         int ret;
4398         size_t size;
4399         char *object_name;
4400         void *response;
4401         char *image_id;
4402
4403         /*
4404          * When probing a parent image, the image id is already
4405          * known (and the image name likely is not).  There's no
4406          * need to fetch the image id again in this case.  We
4407          * do still need to set the image format though.
4408          */
4409         if (rbd_dev->spec->image_id) {
4410                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4411
4412                 return 0;
4413         }
4414
4415         /*
4416          * First, see if the format 2 image id file exists, and if
4417          * so, get the image's persistent id from it.
4418          */
4419         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4420         object_name = kmalloc(size, GFP_NOIO);
4421         if (!object_name)
4422                 return -ENOMEM;
4423         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4424         dout("rbd id object name is %s\n", object_name);
4425
4426         /* Response will be an encoded string, which includes a length */
4427
4428         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4429         response = kzalloc(size, GFP_NOIO);
4430         if (!response) {
4431                 ret = -ENOMEM;
4432                 goto out;
4433         }
4434
4435         /* If it doesn't exist we'll assume it's a format 1 image */
4436
4437         ret = rbd_obj_method_sync(rbd_dev, object_name,
4438                                 "rbd", "get_id", NULL, 0,
4439                                 response, RBD_IMAGE_ID_LEN_MAX);
4440         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4441         if (ret == -ENOENT) {
4442                 image_id = kstrdup("", GFP_KERNEL);
4443                 ret = image_id ? 0 : -ENOMEM;
4444                 if (!ret)
4445                         rbd_dev->image_format = 1;
4446         } else if (ret > sizeof (__le32)) {
4447                 void *p = response;
4448
4449                 image_id = ceph_extract_encoded_string(&p, p + ret,
4450                                                 NULL, GFP_NOIO);
4451                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4452                 if (!ret)
4453                         rbd_dev->image_format = 2;
4454         } else {
4455                 ret = -EINVAL;
4456         }
4457
4458         if (!ret) {
4459                 rbd_dev->spec->image_id = image_id;
4460                 dout("image_id is %s\n", image_id);
4461         }
4462 out:
4463         kfree(response);
4464         kfree(object_name);
4465
4466         return ret;
4467 }
4468
4469 /* Undo whatever state changes are made by v1 or v2 image probe */
4470
4471 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4472 {
4473         struct rbd_image_header *header;
4474
4475         rbd_dev_remove_parent(rbd_dev);
4476         rbd_spec_put(rbd_dev->parent_spec);
4477         rbd_dev->parent_spec = NULL;
4478         rbd_dev->parent_overlap = 0;
4479
4480         /* Free dynamic fields from the header, then zero it out */
4481
4482         header = &rbd_dev->header;
4483         ceph_put_snap_context(header->snapc);
4484         kfree(header->snap_sizes);
4485         kfree(header->snap_names);
4486         kfree(header->object_prefix);
4487         memset(header, 0, sizeof (*header));
4488 }
4489
4490 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4491 {
4492         int ret;
4493
4494         /* Populate rbd image metadata */
4495
4496         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4497         if (ret < 0)
4498                 goto out_err;
4499
4500         /* Version 1 images have no parent (no layering) */
4501
4502         rbd_dev->parent_spec = NULL;
4503         rbd_dev->parent_overlap = 0;
4504
4505         dout("discovered version 1 image, header name is %s\n",
4506                 rbd_dev->header_name);
4507
4508         return 0;
4509
4510 out_err:
4511         kfree(rbd_dev->header_name);
4512         rbd_dev->header_name = NULL;
4513         kfree(rbd_dev->spec->image_id);
4514         rbd_dev->spec->image_id = NULL;
4515
4516         return ret;
4517 }
4518
4519 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4520 {
4521         int ret;
4522
4523         ret = rbd_dev_v2_image_size(rbd_dev);
4524         if (ret)
4525                 goto out_err;
4526
4527         /* Get the object prefix (a.k.a. block_name) for the image */
4528
4529         ret = rbd_dev_v2_object_prefix(rbd_dev);
4530         if (ret)
4531                 goto out_err;
4532
4533         /* Get the and check features for the image */
4534
4535         ret = rbd_dev_v2_features(rbd_dev);
4536         if (ret)
4537                 goto out_err;
4538
4539         /* If the image supports layering, get the parent info */
4540
4541         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4542                 ret = rbd_dev_v2_parent_info(rbd_dev);
4543                 if (ret)
4544                         goto out_err;
4545
4546                 /*
4547                  * Don't print a warning for parent images.  We can
4548                  * tell this point because we won't know its pool
4549                  * name yet (just its pool id).
4550                  */
4551                 if (rbd_dev->spec->pool_name)
4552                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4553                                         "is EXPERIMENTAL!");
4554         }
4555
4556         /* If the image supports fancy striping, get its parameters */
4557
4558         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4559                 ret = rbd_dev_v2_striping_info(rbd_dev);
4560                 if (ret < 0)
4561                         goto out_err;
4562         }
4563
4564         /* crypto and compression type aren't (yet) supported for v2 images */
4565
4566         rbd_dev->header.crypt_type = 0;
4567         rbd_dev->header.comp_type = 0;
4568
4569         /* Get the snapshot context, plus the header version */
4570
4571         ret = rbd_dev_v2_snap_context(rbd_dev);
4572         if (ret)
4573                 goto out_err;
4574
4575         dout("discovered version 2 image, header name is %s\n",
4576                 rbd_dev->header_name);
4577
4578         return 0;
4579 out_err:
4580         rbd_dev->parent_overlap = 0;
4581         rbd_spec_put(rbd_dev->parent_spec);
4582         rbd_dev->parent_spec = NULL;
4583         kfree(rbd_dev->header_name);
4584         rbd_dev->header_name = NULL;
4585         kfree(rbd_dev->header.object_prefix);
4586         rbd_dev->header.object_prefix = NULL;
4587
4588         return ret;
4589 }
4590
4591 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4592 {
4593         struct rbd_device *parent = NULL;
4594         struct rbd_spec *parent_spec;
4595         struct rbd_client *rbdc;
4596         int ret;
4597
4598         if (!rbd_dev->parent_spec)
4599                 return 0;
4600         /*
4601          * We need to pass a reference to the client and the parent
4602          * spec when creating the parent rbd_dev.  Images related by
4603          * parent/child relationships always share both.
4604          */
4605         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4606         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4607
4608         ret = -ENOMEM;
4609         parent = rbd_dev_create(rbdc, parent_spec);
4610         if (!parent)
4611                 goto out_err;
4612
4613         ret = rbd_dev_image_probe(parent);
4614         if (ret < 0)
4615                 goto out_err;
4616         rbd_dev->parent = parent;
4617
4618         return 0;
4619 out_err:
4620         if (parent) {
4621                 rbd_spec_put(rbd_dev->parent_spec);
4622                 kfree(rbd_dev->header_name);
4623                 rbd_dev_destroy(parent);
4624         } else {
4625                 rbd_put_client(rbdc);
4626                 rbd_spec_put(parent_spec);
4627         }
4628
4629         return ret;
4630 }
4631
4632 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4633 {
4634         int ret;
4635
4636         ret = rbd_dev_mapping_set(rbd_dev);
4637         if (ret)
4638                 return ret;
4639
4640         /* generate unique id: find highest unique id, add one */
4641         rbd_dev_id_get(rbd_dev);
4642
4643         /* Fill in the device name, now that we have its id. */
4644         BUILD_BUG_ON(DEV_NAME_LEN
4645                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4646         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4647
4648         /* Get our block major device number. */
4649
4650         ret = register_blkdev(0, rbd_dev->name);
4651         if (ret < 0)
4652                 goto err_out_id;
4653         rbd_dev->major = ret;
4654
4655         /* Set up the blkdev mapping. */
4656
4657         ret = rbd_init_disk(rbd_dev);
4658         if (ret)
4659                 goto err_out_blkdev;
4660
4661         ret = rbd_bus_add_dev(rbd_dev);
4662         if (ret)
4663                 goto err_out_disk;
4664
4665         /* Everything's ready.  Announce the disk to the world. */
4666
4667         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4668         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4669         add_disk(rbd_dev->disk);
4670
4671         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4672                 (unsigned long long) rbd_dev->mapping.size);
4673
4674         return ret;
4675
4676 err_out_disk:
4677         rbd_free_disk(rbd_dev);
4678 err_out_blkdev:
4679         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4680 err_out_id:
4681         rbd_dev_id_put(rbd_dev);
4682         rbd_dev_mapping_clear(rbd_dev);
4683
4684         return ret;
4685 }
4686
4687 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4688 {
4689         struct rbd_spec *spec = rbd_dev->spec;
4690         size_t size;
4691
4692         /* Record the header object name for this rbd image. */
4693
4694         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4695
4696         if (rbd_dev->image_format == 1)
4697                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4698         else
4699                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4700
4701         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4702         if (!rbd_dev->header_name)
4703                 return -ENOMEM;
4704
4705         if (rbd_dev->image_format == 1)
4706                 sprintf(rbd_dev->header_name, "%s%s",
4707                         spec->image_name, RBD_SUFFIX);
4708         else
4709                 sprintf(rbd_dev->header_name, "%s%s",
4710                         RBD_HEADER_PREFIX, spec->image_id);
4711         return 0;
4712 }
4713
4714 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4715 {
4716         int ret;
4717
4718         rbd_dev_unprobe(rbd_dev);
4719         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4720         if (ret)
4721                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4722         kfree(rbd_dev->header_name);
4723         rbd_dev->header_name = NULL;
4724         rbd_dev->image_format = 0;
4725         kfree(rbd_dev->spec->image_id);
4726         rbd_dev->spec->image_id = NULL;
4727
4728         rbd_dev_destroy(rbd_dev);
4729 }
4730
4731 /*
4732  * Probe for the existence of the header object for the given rbd
4733  * device.  For format 2 images this includes determining the image
4734  * id.
4735  */
4736 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4737 {
4738         int ret;
4739         int tmp;
4740
4741         /*
4742          * Get the id from the image id object.  If it's not a
4743          * format 2 image, we'll get ENOENT back, and we'll assume
4744          * it's a format 1 image.
4745          */
4746         ret = rbd_dev_image_id(rbd_dev);
4747         if (ret)
4748                 return ret;
4749         rbd_assert(rbd_dev->spec->image_id);
4750         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4751
4752         ret = rbd_dev_header_name(rbd_dev);
4753         if (ret)
4754                 goto err_out_format;
4755
4756         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4757         if (ret)
4758                 goto out_header_name;
4759
4760         if (rbd_dev->image_format == 1)
4761                 ret = rbd_dev_v1_probe(rbd_dev);
4762         else
4763                 ret = rbd_dev_v2_probe(rbd_dev);
4764         if (ret)
4765                 goto err_out_watch;
4766
4767         ret = rbd_dev_spec_update(rbd_dev);
4768         if (ret)
4769                 goto err_out_probe;
4770
4771         ret = rbd_dev_probe_parent(rbd_dev);
4772         if (!ret)
4773                 return 0;
4774
4775 err_out_probe:
4776         rbd_dev_unprobe(rbd_dev);
4777 err_out_watch:
4778         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4779         if (tmp)
4780                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4781 out_header_name:
4782         kfree(rbd_dev->header_name);
4783         rbd_dev->header_name = NULL;
4784 err_out_format:
4785         rbd_dev->image_format = 0;
4786         kfree(rbd_dev->spec->image_id);
4787         rbd_dev->spec->image_id = NULL;
4788
4789         dout("probe failed, returning %d\n", ret);
4790
4791         return ret;
4792 }
4793
4794 static ssize_t rbd_add(struct bus_type *bus,
4795                        const char *buf,
4796                        size_t count)
4797 {
4798         struct rbd_device *rbd_dev = NULL;
4799         struct ceph_options *ceph_opts = NULL;
4800         struct rbd_options *rbd_opts = NULL;
4801         struct rbd_spec *spec = NULL;
4802         struct rbd_client *rbdc;
4803         struct ceph_osd_client *osdc;
4804         int rc = -ENOMEM;
4805
4806         if (!try_module_get(THIS_MODULE))
4807                 return -ENODEV;
4808
4809         /* parse add command */
4810         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4811         if (rc < 0)
4812                 goto err_out_module;
4813
4814         rbdc = rbd_get_client(ceph_opts);
4815         if (IS_ERR(rbdc)) {
4816                 rc = PTR_ERR(rbdc);
4817                 goto err_out_args;
4818         }
4819         ceph_opts = NULL;       /* rbd_dev client now owns this */
4820
4821         /* pick the pool */
4822         osdc = &rbdc->client->osdc;
4823         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4824         if (rc < 0)
4825                 goto err_out_client;
4826         spec->pool_id = (u64)rc;
4827
4828         /* The ceph file layout needs to fit pool id in 32 bits */
4829
4830         if (spec->pool_id > (u64)U32_MAX) {
4831                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4832                                 (unsigned long long)spec->pool_id, U32_MAX);
4833                 rc = -EIO;
4834                 goto err_out_client;
4835         }
4836
4837         rbd_dev = rbd_dev_create(rbdc, spec);
4838         if (!rbd_dev)
4839                 goto err_out_client;
4840         rbdc = NULL;            /* rbd_dev now owns this */
4841         spec = NULL;            /* rbd_dev now owns this */
4842
4843         rbd_dev->mapping.read_only = rbd_opts->read_only;
4844         kfree(rbd_opts);
4845         rbd_opts = NULL;        /* done with this */
4846
4847         rc = rbd_dev_image_probe(rbd_dev);
4848         if (rc < 0)
4849                 goto err_out_rbd_dev;
4850
4851         rc = rbd_dev_device_setup(rbd_dev);
4852         if (!rc)
4853                 return count;
4854
4855         rbd_dev_image_release(rbd_dev);
4856 err_out_rbd_dev:
4857         rbd_dev_destroy(rbd_dev);
4858 err_out_client:
4859         rbd_put_client(rbdc);
4860 err_out_args:
4861         if (ceph_opts)
4862                 ceph_destroy_options(ceph_opts);
4863         kfree(rbd_opts);
4864         rbd_spec_put(spec);
4865 err_out_module:
4866         module_put(THIS_MODULE);
4867
4868         dout("Error adding device %s\n", buf);
4869
4870         return (ssize_t)rc;
4871 }
4872
4873 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4874 {
4875         struct list_head *tmp;
4876         struct rbd_device *rbd_dev;
4877
4878         spin_lock(&rbd_dev_list_lock);
4879         list_for_each(tmp, &rbd_dev_list) {
4880                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4881                 if (rbd_dev->dev_id == dev_id) {
4882                         spin_unlock(&rbd_dev_list_lock);
4883                         return rbd_dev;
4884                 }
4885         }
4886         spin_unlock(&rbd_dev_list_lock);
4887         return NULL;
4888 }
4889
4890 static void rbd_dev_device_release(struct device *dev)
4891 {
4892         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4893
4894         rbd_free_disk(rbd_dev);
4895         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4896         rbd_dev_clear_mapping(rbd_dev);
4897         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4898         rbd_dev->major = 0;
4899         rbd_dev_id_put(rbd_dev);
4900         rbd_dev_mapping_clear(rbd_dev);
4901 }
4902
4903 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4904 {
4905         while (rbd_dev->parent) {
4906                 struct rbd_device *first = rbd_dev;
4907                 struct rbd_device *second = first->parent;
4908                 struct rbd_device *third;
4909
4910                 /*
4911                  * Follow to the parent with no grandparent and
4912                  * remove it.
4913                  */
4914                 while (second && (third = second->parent)) {
4915                         first = second;
4916                         second = third;
4917                 }
4918                 rbd_assert(second);
4919                 rbd_dev_image_release(second);
4920                 first->parent = NULL;
4921                 first->parent_overlap = 0;
4922
4923                 rbd_assert(first->parent_spec);
4924                 rbd_spec_put(first->parent_spec);
4925                 first->parent_spec = NULL;
4926         }
4927 }
4928
4929 static ssize_t rbd_remove(struct bus_type *bus,
4930                           const char *buf,
4931                           size_t count)
4932 {
4933         struct rbd_device *rbd_dev = NULL;
4934         int target_id;
4935         unsigned long ul;
4936         int ret;
4937
4938         ret = strict_strtoul(buf, 10, &ul);
4939         if (ret)
4940                 return ret;
4941
4942         /* convert to int; abort if we lost anything in the conversion */
4943         target_id = (int) ul;
4944         if (target_id != ul)
4945                 return -EINVAL;
4946
4947         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4948
4949         rbd_dev = __rbd_get_dev(target_id);
4950         if (!rbd_dev) {
4951                 ret = -ENOENT;
4952                 goto done;
4953         }
4954
4955         spin_lock_irq(&rbd_dev->lock);
4956         if (rbd_dev->open_count)
4957                 ret = -EBUSY;
4958         else
4959                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4960         spin_unlock_irq(&rbd_dev->lock);
4961         if (ret < 0)
4962                 goto done;
4963         ret = count;
4964         rbd_bus_del_dev(rbd_dev);
4965         rbd_dev_image_release(rbd_dev);
4966         module_put(THIS_MODULE);
4967 done:
4968         mutex_unlock(&ctl_mutex);
4969
4970         return ret;
4971 }
4972
4973 /*
4974  * create control files in sysfs
4975  * /sys/bus/rbd/...
4976  */
4977 static int rbd_sysfs_init(void)
4978 {
4979         int ret;
4980
4981         ret = device_register(&rbd_root_dev);
4982         if (ret < 0)
4983                 return ret;
4984
4985         ret = bus_register(&rbd_bus_type);
4986         if (ret < 0)
4987                 device_unregister(&rbd_root_dev);
4988
4989         return ret;
4990 }
4991
4992 static void rbd_sysfs_cleanup(void)
4993 {
4994         bus_unregister(&rbd_bus_type);
4995         device_unregister(&rbd_root_dev);
4996 }
4997
4998 static int rbd_slab_init(void)
4999 {
5000         rbd_assert(!rbd_img_request_cache);
5001         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5002                                         sizeof (struct rbd_img_request),
5003                                         __alignof__(struct rbd_img_request),
5004                                         0, NULL);
5005         if (rbd_img_request_cache)
5006                 return 0;
5007
5008         return -ENOMEM;
5009 }
5010
5011 static void rbd_slab_exit(void)
5012 {
5013         rbd_assert(rbd_img_request_cache);
5014         kmem_cache_destroy(rbd_img_request_cache);
5015         rbd_img_request_cache = NULL;
5016 }
5017
5018 static int __init rbd_init(void)
5019 {
5020         int rc;
5021
5022         if (!libceph_compatible(NULL)) {
5023                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5024
5025                 return -EINVAL;
5026         }
5027         rc = rbd_slab_init();
5028         if (rc)
5029                 return rc;
5030         rc = rbd_sysfs_init();
5031         if (rc)
5032                 rbd_slab_exit();
5033         else
5034                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5035
5036         return rc;
5037 }
5038
5039 static void __exit rbd_exit(void)
5040 {
5041         rbd_sysfs_cleanup();
5042         rbd_slab_exit();
5043 }
5044
5045 module_init(rbd_init);
5046 module_exit(rbd_exit);
5047
5048 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5049 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5050 MODULE_DESCRIPTION("rados block device");
5051
5052 /* following authorship retained from original osdblk.c */
5053 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5054
5055 MODULE_LICENSE("GPL");