]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: allocate object requests with a slab allocator
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 stripe_unit;
117         u64 stripe_count;
118 };
119
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146         u64             pool_id;
147         const char      *pool_name;
148
149         const char      *image_id;
150         const char      *image_name;
151
152         u64             snap_id;
153         const char      *snap_name;
154
155         struct kref     kref;
156 };
157
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162         struct ceph_client      *client;
163         struct kref             kref;
164         struct list_head        node;
165 };
166
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
171
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
175 enum obj_request_type {
176         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178
179 enum obj_req_flags {
180         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
181         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
182         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
183         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
184 };
185
186 struct rbd_obj_request {
187         const char              *object_name;
188         u64                     offset;         /* object start byte */
189         u64                     length;         /* bytes from offset */
190         unsigned long           flags;
191
192         /*
193          * An object request associated with an image will have its
194          * img_data flag set; a standalone object request will not.
195          *
196          * A standalone object request will have which == BAD_WHICH
197          * and a null obj_request pointer.
198          *
199          * An object request initiated in support of a layered image
200          * object (to check for its existence before a write) will
201          * have which == BAD_WHICH and a non-null obj_request pointer.
202          *
203          * Finally, an object request for rbd image data will have
204          * which != BAD_WHICH, and will have a non-null img_request
205          * pointer.  The value of which will be in the range
206          * 0..(img_request->obj_request_count-1).
207          */
208         union {
209                 struct rbd_obj_request  *obj_request;   /* STAT op */
210                 struct {
211                         struct rbd_img_request  *img_request;
212                         u64                     img_offset;
213                         /* links for img_request->obj_requests list */
214                         struct list_head        links;
215                 };
216         };
217         u32                     which;          /* posn image request list */
218
219         enum obj_request_type   type;
220         union {
221                 struct bio      *bio_list;
222                 struct {
223                         struct page     **pages;
224                         u32             page_count;
225                 };
226         };
227         struct page             **copyup_pages;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         spinlock_t              completion_lock;/* protects next_completion */
261         u32                     next_completion;
262         rbd_img_callback_t      callback;
263         u64                     xferred;/* aggregate bytes transferred */
264         int                     result; /* first nonzero obj_request result */
265
266         u32                     obj_request_count;
267         struct list_head        obj_requests;   /* rbd_obj_request structs */
268
269         struct kref             kref;
270 };
271
272 #define for_each_obj_request(ireq, oreq) \
273         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278
279 struct rbd_mapping {
280         u64                     size;
281         u64                     features;
282         bool                    read_only;
283 };
284
285 /*
286  * a single device
287  */
288 struct rbd_device {
289         int                     dev_id;         /* blkdev unique id */
290
291         int                     major;          /* blkdev assigned major */
292         struct gendisk          *disk;          /* blkdev's gendisk and rq */
293
294         u32                     image_format;   /* Either 1 or 2 */
295         struct rbd_client       *rbd_client;
296
297         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
299         spinlock_t              lock;           /* queue, flags, open_count */
300
301         struct rbd_image_header header;
302         unsigned long           flags;          /* possibly lock protected */
303         struct rbd_spec         *spec;
304
305         char                    *header_name;
306
307         struct ceph_file_layout layout;
308
309         struct ceph_osd_event   *watch_event;
310         struct rbd_obj_request  *watch_request;
311
312         struct rbd_spec         *parent_spec;
313         u64                     parent_overlap;
314         struct rbd_device       *parent;
315
316         /* protects updating the header */
317         struct rw_semaphore     header_rwsem;
318
319         struct rbd_mapping      mapping;
320
321         struct list_head        node;
322
323         /* sysfs related */
324         struct device           dev;
325         unsigned long           open_count;     /* protected by lock */
326 };
327
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
337         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
338 };
339
340 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
341
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
345 static LIST_HEAD(rbd_client_list);              /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347
348 static struct kmem_cache        *rbd_img_request_cache;
349 static struct kmem_cache        *rbd_obj_request_cache;
350
351 static int rbd_img_request_submit(struct rbd_img_request *img_request);
352
353 static void rbd_dev_device_release(struct device *dev);
354
355 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
356                        size_t count);
357 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
358                           size_t count);
359 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
360
361 static struct bus_attribute rbd_bus_attrs[] = {
362         __ATTR(add, S_IWUSR, NULL, rbd_add),
363         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
364         __ATTR_NULL
365 };
366
367 static struct bus_type rbd_bus_type = {
368         .name           = "rbd",
369         .bus_attrs      = rbd_bus_attrs,
370 };
371
372 static void rbd_root_dev_release(struct device *dev)
373 {
374 }
375
376 static struct device rbd_root_dev = {
377         .init_name =    "rbd",
378         .release =      rbd_root_dev_release,
379 };
380
381 static __printf(2, 3)
382 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
383 {
384         struct va_format vaf;
385         va_list args;
386
387         va_start(args, fmt);
388         vaf.fmt = fmt;
389         vaf.va = &args;
390
391         if (!rbd_dev)
392                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
393         else if (rbd_dev->disk)
394                 printk(KERN_WARNING "%s: %s: %pV\n",
395                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
396         else if (rbd_dev->spec && rbd_dev->spec->image_name)
397                 printk(KERN_WARNING "%s: image %s: %pV\n",
398                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
399         else if (rbd_dev->spec && rbd_dev->spec->image_id)
400                 printk(KERN_WARNING "%s: id %s: %pV\n",
401                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
402         else    /* punt */
403                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
404                         RBD_DRV_NAME, rbd_dev, &vaf);
405         va_end(args);
406 }
407
408 #ifdef RBD_DEBUG
409 #define rbd_assert(expr)                                                \
410                 if (unlikely(!(expr))) {                                \
411                         printk(KERN_ERR "\nAssertion failure in %s() "  \
412                                                 "at line %d:\n\n"       \
413                                         "\trbd_assert(%s);\n\n",        \
414                                         __func__, __LINE__, #expr);     \
415                         BUG();                                          \
416                 }
417 #else /* !RBD_DEBUG */
418 #  define rbd_assert(expr)      ((void) 0)
419 #endif /* !RBD_DEBUG */
420
421 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
422 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
423 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
424
425 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
426 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
427 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
428                                         u64 snap_id);
429 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
430                                 u8 *order, u64 *snap_size);
431 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
432                 u64 *snap_features);
433 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791
792         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
793         if (!header->snapc)
794                 goto out_err;
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         for (i = 0; i < snap_count; i++)
797                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
798
799         return 0;
800
801 out_err:
802         kfree(header->snap_sizes);
803         header->snap_sizes = NULL;
804         kfree(header->snap_names);
805         header->snap_names = NULL;
806         kfree(header->object_prefix);
807         header->object_prefix = NULL;
808
809         return -ENOMEM;
810 }
811
812 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
813 {
814         const char *snap_name;
815
816         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
817
818         /* Skip over names until we find the one we are looking for */
819
820         snap_name = rbd_dev->header.snap_names;
821         while (which--)
822                 snap_name += strlen(snap_name) + 1;
823
824         return kstrdup(snap_name, GFP_KERNEL);
825 }
826
827 /*
828  * Snapshot id comparison function for use with qsort()/bsearch().
829  * Note that result is for snapshots in *descending* order.
830  */
831 static int snapid_compare_reverse(const void *s1, const void *s2)
832 {
833         u64 snap_id1 = *(u64 *)s1;
834         u64 snap_id2 = *(u64 *)s2;
835
836         if (snap_id1 < snap_id2)
837                 return 1;
838         return snap_id1 == snap_id2 ? 0 : -1;
839 }
840
841 /*
842  * Search a snapshot context to see if the given snapshot id is
843  * present.
844  *
845  * Returns the position of the snapshot id in the array if it's found,
846  * or BAD_SNAP_INDEX otherwise.
847  *
848  * Note: The snapshot array is in kept sorted (by the osd) in
849  * reverse order, highest snapshot id first.
850  */
851 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
852 {
853         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
854         u64 *found;
855
856         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
857                                 sizeof (snap_id), snapid_compare_reverse);
858
859         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
860 }
861
862 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
863                                         u64 snap_id)
864 {
865         u32 which;
866
867         which = rbd_dev_snap_index(rbd_dev, snap_id);
868         if (which == BAD_SNAP_INDEX)
869                 return NULL;
870
871         return _rbd_dev_v1_snap_name(rbd_dev, which);
872 }
873
874 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
875 {
876         if (snap_id == CEPH_NOSNAP)
877                 return RBD_SNAP_HEAD_NAME;
878
879         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
880         if (rbd_dev->image_format == 1)
881                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
882
883         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
884 }
885
886 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
887                                 u64 *snap_size)
888 {
889         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
890         if (snap_id == CEPH_NOSNAP) {
891                 *snap_size = rbd_dev->header.image_size;
892         } else if (rbd_dev->image_format == 1) {
893                 u32 which;
894
895                 which = rbd_dev_snap_index(rbd_dev, snap_id);
896                 if (which == BAD_SNAP_INDEX)
897                         return -ENOENT;
898
899                 *snap_size = rbd_dev->header.snap_sizes[which];
900         } else {
901                 u64 size = 0;
902                 int ret;
903
904                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
905                 if (ret)
906                         return ret;
907
908                 *snap_size = size;
909         }
910         return 0;
911 }
912
913 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
914                         u64 *snap_features)
915 {
916         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
917         if (snap_id == CEPH_NOSNAP) {
918                 *snap_features = rbd_dev->header.features;
919         } else if (rbd_dev->image_format == 1) {
920                 *snap_features = 0;     /* No features for format 1 */
921         } else {
922                 u64 features = 0;
923                 int ret;
924
925                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
926                 if (ret)
927                         return ret;
928
929                 *snap_features = features;
930         }
931         return 0;
932 }
933
934 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
935 {
936         const char *snap_name = rbd_dev->spec->snap_name;
937         u64 snap_id;
938         u64 size = 0;
939         u64 features = 0;
940         int ret;
941
942         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
943                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
944                 if (snap_id == CEPH_NOSNAP)
945                         return -ENOENT;
946         } else {
947                 snap_id = CEPH_NOSNAP;
948         }
949
950         ret = rbd_snap_size(rbd_dev, snap_id, &size);
951         if (ret)
952                 return ret;
953         ret = rbd_snap_features(rbd_dev, snap_id, &features);
954         if (ret)
955                 return ret;
956
957         rbd_dev->mapping.size = size;
958         rbd_dev->mapping.features = features;
959
960         /* If we are mapping a snapshot it must be marked read-only */
961
962         if (snap_id != CEPH_NOSNAP)
963                 rbd_dev->mapping.read_only = true;
964
965         return 0;
966 }
967
968 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
969 {
970         rbd_dev->mapping.size = 0;
971         rbd_dev->mapping.features = 0;
972         rbd_dev->mapping.read_only = true;
973 }
974
975 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
976 {
977         rbd_dev->mapping.size = 0;
978         rbd_dev->mapping.features = 0;
979         rbd_dev->mapping.read_only = true;
980 }
981
982 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
983 {
984         char *name;
985         u64 segment;
986         int ret;
987
988         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
989         if (!name)
990                 return NULL;
991         segment = offset >> rbd_dev->header.obj_order;
992         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
993                         rbd_dev->header.object_prefix, segment);
994         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
995                 pr_err("error formatting segment name for #%llu (%d)\n",
996                         segment, ret);
997                 kfree(name);
998                 name = NULL;
999         }
1000
1001         return name;
1002 }
1003
1004 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1005 {
1006         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1007
1008         return offset & (segment_size - 1);
1009 }
1010
1011 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1012                                 u64 offset, u64 length)
1013 {
1014         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1015
1016         offset &= segment_size - 1;
1017
1018         rbd_assert(length <= U64_MAX - offset);
1019         if (offset + length > segment_size)
1020                 length = segment_size - offset;
1021
1022         return length;
1023 }
1024
1025 /*
1026  * returns the size of an object in the image
1027  */
1028 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1029 {
1030         return 1 << header->obj_order;
1031 }
1032
1033 /*
1034  * bio helpers
1035  */
1036
1037 static void bio_chain_put(struct bio *chain)
1038 {
1039         struct bio *tmp;
1040
1041         while (chain) {
1042                 tmp = chain;
1043                 chain = chain->bi_next;
1044                 bio_put(tmp);
1045         }
1046 }
1047
1048 /*
1049  * zeros a bio chain, starting at specific offset
1050  */
1051 static void zero_bio_chain(struct bio *chain, int start_ofs)
1052 {
1053         struct bio_vec *bv;
1054         unsigned long flags;
1055         void *buf;
1056         int i;
1057         int pos = 0;
1058
1059         while (chain) {
1060                 bio_for_each_segment(bv, chain, i) {
1061                         if (pos + bv->bv_len > start_ofs) {
1062                                 int remainder = max(start_ofs - pos, 0);
1063                                 buf = bvec_kmap_irq(bv, &flags);
1064                                 memset(buf + remainder, 0,
1065                                        bv->bv_len - remainder);
1066                                 bvec_kunmap_irq(buf, &flags);
1067                         }
1068                         pos += bv->bv_len;
1069                 }
1070
1071                 chain = chain->bi_next;
1072         }
1073 }
1074
1075 /*
1076  * similar to zero_bio_chain(), zeros data defined by a page array,
1077  * starting at the given byte offset from the start of the array and
1078  * continuing up to the given end offset.  The pages array is
1079  * assumed to be big enough to hold all bytes up to the end.
1080  */
1081 static void zero_pages(struct page **pages, u64 offset, u64 end)
1082 {
1083         struct page **page = &pages[offset >> PAGE_SHIFT];
1084
1085         rbd_assert(end > offset);
1086         rbd_assert(end - offset <= (u64)SIZE_MAX);
1087         while (offset < end) {
1088                 size_t page_offset;
1089                 size_t length;
1090                 unsigned long flags;
1091                 void *kaddr;
1092
1093                 page_offset = (size_t)(offset & ~PAGE_MASK);
1094                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1095                 local_irq_save(flags);
1096                 kaddr = kmap_atomic(*page);
1097                 memset(kaddr + page_offset, 0, length);
1098                 kunmap_atomic(kaddr);
1099                 local_irq_restore(flags);
1100
1101                 offset += length;
1102                 page++;
1103         }
1104 }
1105
1106 /*
1107  * Clone a portion of a bio, starting at the given byte offset
1108  * and continuing for the number of bytes indicated.
1109  */
1110 static struct bio *bio_clone_range(struct bio *bio_src,
1111                                         unsigned int offset,
1112                                         unsigned int len,
1113                                         gfp_t gfpmask)
1114 {
1115         struct bio_vec *bv;
1116         unsigned int resid;
1117         unsigned short idx;
1118         unsigned int voff;
1119         unsigned short end_idx;
1120         unsigned short vcnt;
1121         struct bio *bio;
1122
1123         /* Handle the easy case for the caller */
1124
1125         if (!offset && len == bio_src->bi_size)
1126                 return bio_clone(bio_src, gfpmask);
1127
1128         if (WARN_ON_ONCE(!len))
1129                 return NULL;
1130         if (WARN_ON_ONCE(len > bio_src->bi_size))
1131                 return NULL;
1132         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1133                 return NULL;
1134
1135         /* Find first affected segment... */
1136
1137         resid = offset;
1138         __bio_for_each_segment(bv, bio_src, idx, 0) {
1139                 if (resid < bv->bv_len)
1140                         break;
1141                 resid -= bv->bv_len;
1142         }
1143         voff = resid;
1144
1145         /* ...and the last affected segment */
1146
1147         resid += len;
1148         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1149                 if (resid <= bv->bv_len)
1150                         break;
1151                 resid -= bv->bv_len;
1152         }
1153         vcnt = end_idx - idx + 1;
1154
1155         /* Build the clone */
1156
1157         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1158         if (!bio)
1159                 return NULL;    /* ENOMEM */
1160
1161         bio->bi_bdev = bio_src->bi_bdev;
1162         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1163         bio->bi_rw = bio_src->bi_rw;
1164         bio->bi_flags |= 1 << BIO_CLONED;
1165
1166         /*
1167          * Copy over our part of the bio_vec, then update the first
1168          * and last (or only) entries.
1169          */
1170         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1171                         vcnt * sizeof (struct bio_vec));
1172         bio->bi_io_vec[0].bv_offset += voff;
1173         if (vcnt > 1) {
1174                 bio->bi_io_vec[0].bv_len -= voff;
1175                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1176         } else {
1177                 bio->bi_io_vec[0].bv_len = len;
1178         }
1179
1180         bio->bi_vcnt = vcnt;
1181         bio->bi_size = len;
1182         bio->bi_idx = 0;
1183
1184         return bio;
1185 }
1186
1187 /*
1188  * Clone a portion of a bio chain, starting at the given byte offset
1189  * into the first bio in the source chain and continuing for the
1190  * number of bytes indicated.  The result is another bio chain of
1191  * exactly the given length, or a null pointer on error.
1192  *
1193  * The bio_src and offset parameters are both in-out.  On entry they
1194  * refer to the first source bio and the offset into that bio where
1195  * the start of data to be cloned is located.
1196  *
1197  * On return, bio_src is updated to refer to the bio in the source
1198  * chain that contains first un-cloned byte, and *offset will
1199  * contain the offset of that byte within that bio.
1200  */
1201 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1202                                         unsigned int *offset,
1203                                         unsigned int len,
1204                                         gfp_t gfpmask)
1205 {
1206         struct bio *bi = *bio_src;
1207         unsigned int off = *offset;
1208         struct bio *chain = NULL;
1209         struct bio **end;
1210
1211         /* Build up a chain of clone bios up to the limit */
1212
1213         if (!bi || off >= bi->bi_size || !len)
1214                 return NULL;            /* Nothing to clone */
1215
1216         end = &chain;
1217         while (len) {
1218                 unsigned int bi_size;
1219                 struct bio *bio;
1220
1221                 if (!bi) {
1222                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1223                         goto out_err;   /* EINVAL; ran out of bio's */
1224                 }
1225                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1226                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1227                 if (!bio)
1228                         goto out_err;   /* ENOMEM */
1229
1230                 *end = bio;
1231                 end = &bio->bi_next;
1232
1233                 off += bi_size;
1234                 if (off == bi->bi_size) {
1235                         bi = bi->bi_next;
1236                         off = 0;
1237                 }
1238                 len -= bi_size;
1239         }
1240         *bio_src = bi;
1241         *offset = off;
1242
1243         return chain;
1244 out_err:
1245         bio_chain_put(chain);
1246
1247         return NULL;
1248 }
1249
1250 /*
1251  * The default/initial value for all object request flags is 0.  For
1252  * each flag, once its value is set to 1 it is never reset to 0
1253  * again.
1254  */
1255 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1256 {
1257         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1258                 struct rbd_device *rbd_dev;
1259
1260                 rbd_dev = obj_request->img_request->rbd_dev;
1261                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1262                         obj_request);
1263         }
1264 }
1265
1266 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1267 {
1268         smp_mb();
1269         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1270 }
1271
1272 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1273 {
1274         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1275                 struct rbd_device *rbd_dev = NULL;
1276
1277                 if (obj_request_img_data_test(obj_request))
1278                         rbd_dev = obj_request->img_request->rbd_dev;
1279                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1280                         obj_request);
1281         }
1282 }
1283
1284 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1285 {
1286         smp_mb();
1287         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1288 }
1289
1290 /*
1291  * This sets the KNOWN flag after (possibly) setting the EXISTS
1292  * flag.  The latter is set based on the "exists" value provided.
1293  *
1294  * Note that for our purposes once an object exists it never goes
1295  * away again.  It's possible that the response from two existence
1296  * checks are separated by the creation of the target object, and
1297  * the first ("doesn't exist") response arrives *after* the second
1298  * ("does exist").  In that case we ignore the second one.
1299  */
1300 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1301                                 bool exists)
1302 {
1303         if (exists)
1304                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1305         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1306         smp_mb();
1307 }
1308
1309 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1313 }
1314
1315 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1316 {
1317         smp_mb();
1318         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1319 }
1320
1321 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1322 {
1323         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1324                 atomic_read(&obj_request->kref.refcount));
1325         kref_get(&obj_request->kref);
1326 }
1327
1328 static void rbd_obj_request_destroy(struct kref *kref);
1329 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1330 {
1331         rbd_assert(obj_request != NULL);
1332         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1333                 atomic_read(&obj_request->kref.refcount));
1334         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1335 }
1336
1337 static void rbd_img_request_get(struct rbd_img_request *img_request)
1338 {
1339         dout("%s: img %p (was %d)\n", __func__, img_request,
1340                 atomic_read(&img_request->kref.refcount));
1341         kref_get(&img_request->kref);
1342 }
1343
1344 static void rbd_img_request_destroy(struct kref *kref);
1345 static void rbd_img_request_put(struct rbd_img_request *img_request)
1346 {
1347         rbd_assert(img_request != NULL);
1348         dout("%s: img %p (was %d)\n", __func__, img_request,
1349                 atomic_read(&img_request->kref.refcount));
1350         kref_put(&img_request->kref, rbd_img_request_destroy);
1351 }
1352
1353 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1354                                         struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request->img_request == NULL);
1357
1358         /* Image request now owns object's original reference */
1359         obj_request->img_request = img_request;
1360         obj_request->which = img_request->obj_request_count;
1361         rbd_assert(!obj_request_img_data_test(obj_request));
1362         obj_request_img_data_set(obj_request);
1363         rbd_assert(obj_request->which != BAD_WHICH);
1364         img_request->obj_request_count++;
1365         list_add_tail(&obj_request->links, &img_request->obj_requests);
1366         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1367                 obj_request->which);
1368 }
1369
1370 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1371                                         struct rbd_obj_request *obj_request)
1372 {
1373         rbd_assert(obj_request->which != BAD_WHICH);
1374
1375         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1376                 obj_request->which);
1377         list_del(&obj_request->links);
1378         rbd_assert(img_request->obj_request_count > 0);
1379         img_request->obj_request_count--;
1380         rbd_assert(obj_request->which == img_request->obj_request_count);
1381         obj_request->which = BAD_WHICH;
1382         rbd_assert(obj_request_img_data_test(obj_request));
1383         rbd_assert(obj_request->img_request == img_request);
1384         obj_request->img_request = NULL;
1385         obj_request->callback = NULL;
1386         rbd_obj_request_put(obj_request);
1387 }
1388
1389 static bool obj_request_type_valid(enum obj_request_type type)
1390 {
1391         switch (type) {
1392         case OBJ_REQUEST_NODATA:
1393         case OBJ_REQUEST_BIO:
1394         case OBJ_REQUEST_PAGES:
1395                 return true;
1396         default:
1397                 return false;
1398         }
1399 }
1400
1401 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1402                                 struct rbd_obj_request *obj_request)
1403 {
1404         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1405
1406         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1407 }
1408
1409 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1410 {
1411
1412         dout("%s: img %p\n", __func__, img_request);
1413
1414         /*
1415          * If no error occurred, compute the aggregate transfer
1416          * count for the image request.  We could instead use
1417          * atomic64_cmpxchg() to update it as each object request
1418          * completes; not clear which way is better off hand.
1419          */
1420         if (!img_request->result) {
1421                 struct rbd_obj_request *obj_request;
1422                 u64 xferred = 0;
1423
1424                 for_each_obj_request(img_request, obj_request)
1425                         xferred += obj_request->xferred;
1426                 img_request->xferred = xferred;
1427         }
1428
1429         if (img_request->callback)
1430                 img_request->callback(img_request);
1431         else
1432                 rbd_img_request_put(img_request);
1433 }
1434
1435 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1436
1437 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1438 {
1439         dout("%s: obj %p\n", __func__, obj_request);
1440
1441         return wait_for_completion_interruptible(&obj_request->completion);
1442 }
1443
1444 /*
1445  * The default/initial value for all image request flags is 0.  Each
1446  * is conditionally set to 1 at image request initialization time
1447  * and currently never change thereafter.
1448  */
1449 static void img_request_write_set(struct rbd_img_request *img_request)
1450 {
1451         set_bit(IMG_REQ_WRITE, &img_request->flags);
1452         smp_mb();
1453 }
1454
1455 static bool img_request_write_test(struct rbd_img_request *img_request)
1456 {
1457         smp_mb();
1458         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1459 }
1460
1461 static void img_request_child_set(struct rbd_img_request *img_request)
1462 {
1463         set_bit(IMG_REQ_CHILD, &img_request->flags);
1464         smp_mb();
1465 }
1466
1467 static bool img_request_child_test(struct rbd_img_request *img_request)
1468 {
1469         smp_mb();
1470         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1471 }
1472
1473 static void img_request_layered_set(struct rbd_img_request *img_request)
1474 {
1475         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1476         smp_mb();
1477 }
1478
1479 static bool img_request_layered_test(struct rbd_img_request *img_request)
1480 {
1481         smp_mb();
1482         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1483 }
1484
1485 static void
1486 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1487 {
1488         u64 xferred = obj_request->xferred;
1489         u64 length = obj_request->length;
1490
1491         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1492                 obj_request, obj_request->img_request, obj_request->result,
1493                 xferred, length);
1494         /*
1495          * ENOENT means a hole in the image.  We zero-fill the
1496          * entire length of the request.  A short read also implies
1497          * zero-fill to the end of the request.  Either way we
1498          * update the xferred count to indicate the whole request
1499          * was satisfied.
1500          */
1501         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1502         if (obj_request->result == -ENOENT) {
1503                 if (obj_request->type == OBJ_REQUEST_BIO)
1504                         zero_bio_chain(obj_request->bio_list, 0);
1505                 else
1506                         zero_pages(obj_request->pages, 0, length);
1507                 obj_request->result = 0;
1508                 obj_request->xferred = length;
1509         } else if (xferred < length && !obj_request->result) {
1510                 if (obj_request->type == OBJ_REQUEST_BIO)
1511                         zero_bio_chain(obj_request->bio_list, xferred);
1512                 else
1513                         zero_pages(obj_request->pages, xferred, length);
1514                 obj_request->xferred = length;
1515         }
1516         obj_request_done_set(obj_request);
1517 }
1518
1519 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1520 {
1521         dout("%s: obj %p cb %p\n", __func__, obj_request,
1522                 obj_request->callback);
1523         if (obj_request->callback)
1524                 obj_request->callback(obj_request);
1525         else
1526                 complete_all(&obj_request->completion);
1527 }
1528
1529 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1530 {
1531         dout("%s: obj %p\n", __func__, obj_request);
1532         obj_request_done_set(obj_request);
1533 }
1534
1535 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1536 {
1537         struct rbd_img_request *img_request = NULL;
1538         struct rbd_device *rbd_dev = NULL;
1539         bool layered = false;
1540
1541         if (obj_request_img_data_test(obj_request)) {
1542                 img_request = obj_request->img_request;
1543                 layered = img_request && img_request_layered_test(img_request);
1544                 rbd_dev = img_request->rbd_dev;
1545         }
1546
1547         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1548                 obj_request, img_request, obj_request->result,
1549                 obj_request->xferred, obj_request->length);
1550         if (layered && obj_request->result == -ENOENT &&
1551                         obj_request->img_offset < rbd_dev->parent_overlap)
1552                 rbd_img_parent_read(obj_request);
1553         else if (img_request)
1554                 rbd_img_obj_request_read_callback(obj_request);
1555         else
1556                 obj_request_done_set(obj_request);
1557 }
1558
1559 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1560 {
1561         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1562                 obj_request->result, obj_request->length);
1563         /*
1564          * There is no such thing as a successful short write.  Set
1565          * it to our originally-requested length.
1566          */
1567         obj_request->xferred = obj_request->length;
1568         obj_request_done_set(obj_request);
1569 }
1570
1571 /*
1572  * For a simple stat call there's nothing to do.  We'll do more if
1573  * this is part of a write sequence for a layered image.
1574  */
1575 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1576 {
1577         dout("%s: obj %p\n", __func__, obj_request);
1578         obj_request_done_set(obj_request);
1579 }
1580
1581 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1582                                 struct ceph_msg *msg)
1583 {
1584         struct rbd_obj_request *obj_request = osd_req->r_priv;
1585         u16 opcode;
1586
1587         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1588         rbd_assert(osd_req == obj_request->osd_req);
1589         if (obj_request_img_data_test(obj_request)) {
1590                 rbd_assert(obj_request->img_request);
1591                 rbd_assert(obj_request->which != BAD_WHICH);
1592         } else {
1593                 rbd_assert(obj_request->which == BAD_WHICH);
1594         }
1595
1596         if (osd_req->r_result < 0)
1597                 obj_request->result = osd_req->r_result;
1598
1599         BUG_ON(osd_req->r_num_ops > 2);
1600
1601         /*
1602          * We support a 64-bit length, but ultimately it has to be
1603          * passed to blk_end_request(), which takes an unsigned int.
1604          */
1605         obj_request->xferred = osd_req->r_reply_op_len[0];
1606         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1607         opcode = osd_req->r_ops[0].op;
1608         switch (opcode) {
1609         case CEPH_OSD_OP_READ:
1610                 rbd_osd_read_callback(obj_request);
1611                 break;
1612         case CEPH_OSD_OP_WRITE:
1613                 rbd_osd_write_callback(obj_request);
1614                 break;
1615         case CEPH_OSD_OP_STAT:
1616                 rbd_osd_stat_callback(obj_request);
1617                 break;
1618         case CEPH_OSD_OP_CALL:
1619         case CEPH_OSD_OP_NOTIFY_ACK:
1620         case CEPH_OSD_OP_WATCH:
1621                 rbd_osd_trivial_callback(obj_request);
1622                 break;
1623         default:
1624                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1625                         obj_request->object_name, (unsigned short) opcode);
1626                 break;
1627         }
1628
1629         if (obj_request_done_test(obj_request))
1630                 rbd_obj_request_complete(obj_request);
1631 }
1632
1633 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1634 {
1635         struct rbd_img_request *img_request = obj_request->img_request;
1636         struct ceph_osd_request *osd_req = obj_request->osd_req;
1637         u64 snap_id;
1638
1639         rbd_assert(osd_req != NULL);
1640
1641         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1642         ceph_osdc_build_request(osd_req, obj_request->offset,
1643                         NULL, snap_id, NULL);
1644 }
1645
1646 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1647 {
1648         struct rbd_img_request *img_request = obj_request->img_request;
1649         struct ceph_osd_request *osd_req = obj_request->osd_req;
1650         struct ceph_snap_context *snapc;
1651         struct timespec mtime = CURRENT_TIME;
1652
1653         rbd_assert(osd_req != NULL);
1654
1655         snapc = img_request ? img_request->snapc : NULL;
1656         ceph_osdc_build_request(osd_req, obj_request->offset,
1657                         snapc, CEPH_NOSNAP, &mtime);
1658 }
1659
1660 static struct ceph_osd_request *rbd_osd_req_create(
1661                                         struct rbd_device *rbd_dev,
1662                                         bool write_request,
1663                                         struct rbd_obj_request *obj_request)
1664 {
1665         struct ceph_snap_context *snapc = NULL;
1666         struct ceph_osd_client *osdc;
1667         struct ceph_osd_request *osd_req;
1668
1669         if (obj_request_img_data_test(obj_request)) {
1670                 struct rbd_img_request *img_request = obj_request->img_request;
1671
1672                 rbd_assert(write_request ==
1673                                 img_request_write_test(img_request));
1674                 if (write_request)
1675                         snapc = img_request->snapc;
1676         }
1677
1678         /* Allocate and initialize the request, for the single op */
1679
1680         osdc = &rbd_dev->rbd_client->client->osdc;
1681         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1682         if (!osd_req)
1683                 return NULL;    /* ENOMEM */
1684
1685         if (write_request)
1686                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1687         else
1688                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1689
1690         osd_req->r_callback = rbd_osd_req_callback;
1691         osd_req->r_priv = obj_request;
1692
1693         osd_req->r_oid_len = strlen(obj_request->object_name);
1694         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1695         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1696
1697         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1698
1699         return osd_req;
1700 }
1701
1702 /*
1703  * Create a copyup osd request based on the information in the
1704  * object request supplied.  A copyup request has two osd ops,
1705  * a copyup method call, and a "normal" write request.
1706  */
1707 static struct ceph_osd_request *
1708 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1709 {
1710         struct rbd_img_request *img_request;
1711         struct ceph_snap_context *snapc;
1712         struct rbd_device *rbd_dev;
1713         struct ceph_osd_client *osdc;
1714         struct ceph_osd_request *osd_req;
1715
1716         rbd_assert(obj_request_img_data_test(obj_request));
1717         img_request = obj_request->img_request;
1718         rbd_assert(img_request);
1719         rbd_assert(img_request_write_test(img_request));
1720
1721         /* Allocate and initialize the request, for the two ops */
1722
1723         snapc = img_request->snapc;
1724         rbd_dev = img_request->rbd_dev;
1725         osdc = &rbd_dev->rbd_client->client->osdc;
1726         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1727         if (!osd_req)
1728                 return NULL;    /* ENOMEM */
1729
1730         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1731         osd_req->r_callback = rbd_osd_req_callback;
1732         osd_req->r_priv = obj_request;
1733
1734         osd_req->r_oid_len = strlen(obj_request->object_name);
1735         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1736         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1737
1738         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1739
1740         return osd_req;
1741 }
1742
1743
1744 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1745 {
1746         ceph_osdc_put_request(osd_req);
1747 }
1748
1749 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1750
1751 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1752                                                 u64 offset, u64 length,
1753                                                 enum obj_request_type type)
1754 {
1755         struct rbd_obj_request *obj_request;
1756         size_t size;
1757         char *name;
1758
1759         rbd_assert(obj_request_type_valid(type));
1760
1761         size = strlen(object_name) + 1;
1762         name = kmalloc(size, GFP_KERNEL);
1763         if (!name)
1764                 return NULL;
1765
1766         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1767         if (!obj_request) {
1768                 kfree(name);
1769                 return NULL;
1770         }
1771
1772         obj_request->object_name = memcpy(name, object_name, size);
1773         obj_request->offset = offset;
1774         obj_request->length = length;
1775         obj_request->flags = 0;
1776         obj_request->which = BAD_WHICH;
1777         obj_request->type = type;
1778         INIT_LIST_HEAD(&obj_request->links);
1779         init_completion(&obj_request->completion);
1780         kref_init(&obj_request->kref);
1781
1782         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1783                 offset, length, (int)type, obj_request);
1784
1785         return obj_request;
1786 }
1787
1788 static void rbd_obj_request_destroy(struct kref *kref)
1789 {
1790         struct rbd_obj_request *obj_request;
1791
1792         obj_request = container_of(kref, struct rbd_obj_request, kref);
1793
1794         dout("%s: obj %p\n", __func__, obj_request);
1795
1796         rbd_assert(obj_request->img_request == NULL);
1797         rbd_assert(obj_request->which == BAD_WHICH);
1798
1799         if (obj_request->osd_req)
1800                 rbd_osd_req_destroy(obj_request->osd_req);
1801
1802         rbd_assert(obj_request_type_valid(obj_request->type));
1803         switch (obj_request->type) {
1804         case OBJ_REQUEST_NODATA:
1805                 break;          /* Nothing to do */
1806         case OBJ_REQUEST_BIO:
1807                 if (obj_request->bio_list)
1808                         bio_chain_put(obj_request->bio_list);
1809                 break;
1810         case OBJ_REQUEST_PAGES:
1811                 if (obj_request->pages)
1812                         ceph_release_page_vector(obj_request->pages,
1813                                                 obj_request->page_count);
1814                 break;
1815         }
1816
1817         kfree(obj_request->object_name);
1818         obj_request->object_name = NULL;
1819         kmem_cache_free(rbd_obj_request_cache, obj_request);
1820 }
1821
1822 /*
1823  * Caller is responsible for filling in the list of object requests
1824  * that comprises the image request, and the Linux request pointer
1825  * (if there is one).
1826  */
1827 static struct rbd_img_request *rbd_img_request_create(
1828                                         struct rbd_device *rbd_dev,
1829                                         u64 offset, u64 length,
1830                                         bool write_request,
1831                                         bool child_request)
1832 {
1833         struct rbd_img_request *img_request;
1834
1835         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1836         if (!img_request)
1837                 return NULL;
1838
1839         if (write_request) {
1840                 down_read(&rbd_dev->header_rwsem);
1841                 ceph_get_snap_context(rbd_dev->header.snapc);
1842                 up_read(&rbd_dev->header_rwsem);
1843         }
1844
1845         img_request->rq = NULL;
1846         img_request->rbd_dev = rbd_dev;
1847         img_request->offset = offset;
1848         img_request->length = length;
1849         img_request->flags = 0;
1850         if (write_request) {
1851                 img_request_write_set(img_request);
1852                 img_request->snapc = rbd_dev->header.snapc;
1853         } else {
1854                 img_request->snap_id = rbd_dev->spec->snap_id;
1855         }
1856         if (child_request)
1857                 img_request_child_set(img_request);
1858         if (rbd_dev->parent_spec)
1859                 img_request_layered_set(img_request);
1860         spin_lock_init(&img_request->completion_lock);
1861         img_request->next_completion = 0;
1862         img_request->callback = NULL;
1863         img_request->result = 0;
1864         img_request->obj_request_count = 0;
1865         INIT_LIST_HEAD(&img_request->obj_requests);
1866         kref_init(&img_request->kref);
1867
1868         rbd_img_request_get(img_request);       /* Avoid a warning */
1869         rbd_img_request_put(img_request);       /* TEMPORARY */
1870
1871         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1872                 write_request ? "write" : "read", offset, length,
1873                 img_request);
1874
1875         return img_request;
1876 }
1877
1878 static void rbd_img_request_destroy(struct kref *kref)
1879 {
1880         struct rbd_img_request *img_request;
1881         struct rbd_obj_request *obj_request;
1882         struct rbd_obj_request *next_obj_request;
1883
1884         img_request = container_of(kref, struct rbd_img_request, kref);
1885
1886         dout("%s: img %p\n", __func__, img_request);
1887
1888         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1889                 rbd_img_obj_request_del(img_request, obj_request);
1890         rbd_assert(img_request->obj_request_count == 0);
1891
1892         if (img_request_write_test(img_request))
1893                 ceph_put_snap_context(img_request->snapc);
1894
1895         if (img_request_child_test(img_request))
1896                 rbd_obj_request_put(img_request->obj_request);
1897
1898         kmem_cache_free(rbd_img_request_cache, img_request);
1899 }
1900
1901 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1902 {
1903         struct rbd_img_request *img_request;
1904         unsigned int xferred;
1905         int result;
1906         bool more;
1907
1908         rbd_assert(obj_request_img_data_test(obj_request));
1909         img_request = obj_request->img_request;
1910
1911         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1912         xferred = (unsigned int)obj_request->xferred;
1913         result = obj_request->result;
1914         if (result) {
1915                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1916
1917                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1918                         img_request_write_test(img_request) ? "write" : "read",
1919                         obj_request->length, obj_request->img_offset,
1920                         obj_request->offset);
1921                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1922                         result, xferred);
1923                 if (!img_request->result)
1924                         img_request->result = result;
1925         }
1926
1927         /* Image object requests don't own their page array */
1928
1929         if (obj_request->type == OBJ_REQUEST_PAGES) {
1930                 obj_request->pages = NULL;
1931                 obj_request->page_count = 0;
1932         }
1933
1934         if (img_request_child_test(img_request)) {
1935                 rbd_assert(img_request->obj_request != NULL);
1936                 more = obj_request->which < img_request->obj_request_count - 1;
1937         } else {
1938                 rbd_assert(img_request->rq != NULL);
1939                 more = blk_end_request(img_request->rq, result, xferred);
1940         }
1941
1942         return more;
1943 }
1944
1945 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1946 {
1947         struct rbd_img_request *img_request;
1948         u32 which = obj_request->which;
1949         bool more = true;
1950
1951         rbd_assert(obj_request_img_data_test(obj_request));
1952         img_request = obj_request->img_request;
1953
1954         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1955         rbd_assert(img_request != NULL);
1956         rbd_assert(img_request->obj_request_count > 0);
1957         rbd_assert(which != BAD_WHICH);
1958         rbd_assert(which < img_request->obj_request_count);
1959         rbd_assert(which >= img_request->next_completion);
1960
1961         spin_lock_irq(&img_request->completion_lock);
1962         if (which != img_request->next_completion)
1963                 goto out;
1964
1965         for_each_obj_request_from(img_request, obj_request) {
1966                 rbd_assert(more);
1967                 rbd_assert(which < img_request->obj_request_count);
1968
1969                 if (!obj_request_done_test(obj_request))
1970                         break;
1971                 more = rbd_img_obj_end_request(obj_request);
1972                 which++;
1973         }
1974
1975         rbd_assert(more ^ (which == img_request->obj_request_count));
1976         img_request->next_completion = which;
1977 out:
1978         spin_unlock_irq(&img_request->completion_lock);
1979
1980         if (!more)
1981                 rbd_img_request_complete(img_request);
1982 }
1983
1984 /*
1985  * Split up an image request into one or more object requests, each
1986  * to a different object.  The "type" parameter indicates whether
1987  * "data_desc" is the pointer to the head of a list of bio
1988  * structures, or the base of a page array.  In either case this
1989  * function assumes data_desc describes memory sufficient to hold
1990  * all data described by the image request.
1991  */
1992 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1993                                         enum obj_request_type type,
1994                                         void *data_desc)
1995 {
1996         struct rbd_device *rbd_dev = img_request->rbd_dev;
1997         struct rbd_obj_request *obj_request = NULL;
1998         struct rbd_obj_request *next_obj_request;
1999         bool write_request = img_request_write_test(img_request);
2000         struct bio *bio_list;
2001         unsigned int bio_offset = 0;
2002         struct page **pages;
2003         u64 img_offset;
2004         u64 resid;
2005         u16 opcode;
2006
2007         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2008                 (int)type, data_desc);
2009
2010         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2011         img_offset = img_request->offset;
2012         resid = img_request->length;
2013         rbd_assert(resid > 0);
2014
2015         if (type == OBJ_REQUEST_BIO) {
2016                 bio_list = data_desc;
2017                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2018         } else {
2019                 rbd_assert(type == OBJ_REQUEST_PAGES);
2020                 pages = data_desc;
2021         }
2022
2023         while (resid) {
2024                 struct ceph_osd_request *osd_req;
2025                 const char *object_name;
2026                 u64 offset;
2027                 u64 length;
2028
2029                 object_name = rbd_segment_name(rbd_dev, img_offset);
2030                 if (!object_name)
2031                         goto out_unwind;
2032                 offset = rbd_segment_offset(rbd_dev, img_offset);
2033                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2034                 obj_request = rbd_obj_request_create(object_name,
2035                                                 offset, length, type);
2036                 kfree(object_name);     /* object request has its own copy */
2037                 if (!obj_request)
2038                         goto out_unwind;
2039
2040                 if (type == OBJ_REQUEST_BIO) {
2041                         unsigned int clone_size;
2042
2043                         rbd_assert(length <= (u64)UINT_MAX);
2044                         clone_size = (unsigned int)length;
2045                         obj_request->bio_list =
2046                                         bio_chain_clone_range(&bio_list,
2047                                                                 &bio_offset,
2048                                                                 clone_size,
2049                                                                 GFP_ATOMIC);
2050                         if (!obj_request->bio_list)
2051                                 goto out_partial;
2052                 } else {
2053                         unsigned int page_count;
2054
2055                         obj_request->pages = pages;
2056                         page_count = (u32)calc_pages_for(offset, length);
2057                         obj_request->page_count = page_count;
2058                         if ((offset + length) & ~PAGE_MASK)
2059                                 page_count--;   /* more on last page */
2060                         pages += page_count;
2061                 }
2062
2063                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2064                                                 obj_request);
2065                 if (!osd_req)
2066                         goto out_partial;
2067                 obj_request->osd_req = osd_req;
2068                 obj_request->callback = rbd_img_obj_callback;
2069
2070                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2071                                                 0, 0);
2072                 if (type == OBJ_REQUEST_BIO)
2073                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2074                                         obj_request->bio_list, length);
2075                 else
2076                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2077                                         obj_request->pages, length,
2078                                         offset & ~PAGE_MASK, false, false);
2079
2080                 if (write_request)
2081                         rbd_osd_req_format_write(obj_request);
2082                 else
2083                         rbd_osd_req_format_read(obj_request);
2084
2085                 obj_request->img_offset = img_offset;
2086                 rbd_img_obj_request_add(img_request, obj_request);
2087
2088                 img_offset += length;
2089                 resid -= length;
2090         }
2091
2092         return 0;
2093
2094 out_partial:
2095         rbd_obj_request_put(obj_request);
2096 out_unwind:
2097         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2098                 rbd_obj_request_put(obj_request);
2099
2100         return -ENOMEM;
2101 }
2102
2103 static void
2104 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2105 {
2106         struct rbd_img_request *img_request;
2107         struct rbd_device *rbd_dev;
2108         u64 length;
2109         u32 page_count;
2110
2111         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2112         rbd_assert(obj_request_img_data_test(obj_request));
2113         img_request = obj_request->img_request;
2114         rbd_assert(img_request);
2115
2116         rbd_dev = img_request->rbd_dev;
2117         rbd_assert(rbd_dev);
2118         length = (u64)1 << rbd_dev->header.obj_order;
2119         page_count = (u32)calc_pages_for(0, length);
2120
2121         rbd_assert(obj_request->copyup_pages);
2122         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2123         obj_request->copyup_pages = NULL;
2124
2125         /*
2126          * We want the transfer count to reflect the size of the
2127          * original write request.  There is no such thing as a
2128          * successful short write, so if the request was successful
2129          * we can just set it to the originally-requested length.
2130          */
2131         if (!obj_request->result)
2132                 obj_request->xferred = obj_request->length;
2133
2134         /* Finish up with the normal image object callback */
2135
2136         rbd_img_obj_callback(obj_request);
2137 }
2138
2139 static void
2140 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2141 {
2142         struct rbd_obj_request *orig_request;
2143         struct ceph_osd_request *osd_req;
2144         struct ceph_osd_client *osdc;
2145         struct rbd_device *rbd_dev;
2146         struct page **pages;
2147         int result;
2148         u64 obj_size;
2149         u64 xferred;
2150
2151         rbd_assert(img_request_child_test(img_request));
2152
2153         /* First get what we need from the image request */
2154
2155         pages = img_request->copyup_pages;
2156         rbd_assert(pages != NULL);
2157         img_request->copyup_pages = NULL;
2158
2159         orig_request = img_request->obj_request;
2160         rbd_assert(orig_request != NULL);
2161         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2162         result = img_request->result;
2163         obj_size = img_request->length;
2164         xferred = img_request->xferred;
2165
2166         rbd_dev = img_request->rbd_dev;
2167         rbd_assert(rbd_dev);
2168         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2169
2170         rbd_img_request_put(img_request);
2171
2172         if (result)
2173                 goto out_err;
2174
2175         /* Allocate the new copyup osd request for the original request */
2176
2177         result = -ENOMEM;
2178         rbd_assert(!orig_request->osd_req);
2179         osd_req = rbd_osd_req_create_copyup(orig_request);
2180         if (!osd_req)
2181                 goto out_err;
2182         orig_request->osd_req = osd_req;
2183         orig_request->copyup_pages = pages;
2184
2185         /* Initialize the copyup op */
2186
2187         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2188         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2189                                                 false, false);
2190
2191         /* Then the original write request op */
2192
2193         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2194                                         orig_request->offset,
2195                                         orig_request->length, 0, 0);
2196         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2197                                         orig_request->length);
2198
2199         rbd_osd_req_format_write(orig_request);
2200
2201         /* All set, send it off. */
2202
2203         orig_request->callback = rbd_img_obj_copyup_callback;
2204         osdc = &rbd_dev->rbd_client->client->osdc;
2205         result = rbd_obj_request_submit(osdc, orig_request);
2206         if (!result)
2207                 return;
2208 out_err:
2209         /* Record the error code and complete the request */
2210
2211         orig_request->result = result;
2212         orig_request->xferred = 0;
2213         obj_request_done_set(orig_request);
2214         rbd_obj_request_complete(orig_request);
2215 }
2216
2217 /*
2218  * Read from the parent image the range of data that covers the
2219  * entire target of the given object request.  This is used for
2220  * satisfying a layered image write request when the target of an
2221  * object request from the image request does not exist.
2222  *
2223  * A page array big enough to hold the returned data is allocated
2224  * and supplied to rbd_img_request_fill() as the "data descriptor."
2225  * When the read completes, this page array will be transferred to
2226  * the original object request for the copyup operation.
2227  *
2228  * If an error occurs, record it as the result of the original
2229  * object request and mark it done so it gets completed.
2230  */
2231 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2232 {
2233         struct rbd_img_request *img_request = NULL;
2234         struct rbd_img_request *parent_request = NULL;
2235         struct rbd_device *rbd_dev;
2236         u64 img_offset;
2237         u64 length;
2238         struct page **pages = NULL;
2239         u32 page_count;
2240         int result;
2241
2242         rbd_assert(obj_request_img_data_test(obj_request));
2243         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2244
2245         img_request = obj_request->img_request;
2246         rbd_assert(img_request != NULL);
2247         rbd_dev = img_request->rbd_dev;
2248         rbd_assert(rbd_dev->parent != NULL);
2249
2250         /*
2251          * First things first.  The original osd request is of no
2252          * use to use any more, we'll need a new one that can hold
2253          * the two ops in a copyup request.  We'll get that later,
2254          * but for now we can release the old one.
2255          */
2256         rbd_osd_req_destroy(obj_request->osd_req);
2257         obj_request->osd_req = NULL;
2258
2259         /*
2260          * Determine the byte range covered by the object in the
2261          * child image to which the original request was to be sent.
2262          */
2263         img_offset = obj_request->img_offset - obj_request->offset;
2264         length = (u64)1 << rbd_dev->header.obj_order;
2265
2266         /*
2267          * There is no defined parent data beyond the parent
2268          * overlap, so limit what we read at that boundary if
2269          * necessary.
2270          */
2271         if (img_offset + length > rbd_dev->parent_overlap) {
2272                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2273                 length = rbd_dev->parent_overlap - img_offset;
2274         }
2275
2276         /*
2277          * Allocate a page array big enough to receive the data read
2278          * from the parent.
2279          */
2280         page_count = (u32)calc_pages_for(0, length);
2281         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2282         if (IS_ERR(pages)) {
2283                 result = PTR_ERR(pages);
2284                 pages = NULL;
2285                 goto out_err;
2286         }
2287
2288         result = -ENOMEM;
2289         parent_request = rbd_img_request_create(rbd_dev->parent,
2290                                                 img_offset, length,
2291                                                 false, true);
2292         if (!parent_request)
2293                 goto out_err;
2294         rbd_obj_request_get(obj_request);
2295         parent_request->obj_request = obj_request;
2296
2297         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2298         if (result)
2299                 goto out_err;
2300         parent_request->copyup_pages = pages;
2301
2302         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2303         result = rbd_img_request_submit(parent_request);
2304         if (!result)
2305                 return 0;
2306
2307         parent_request->copyup_pages = NULL;
2308         parent_request->obj_request = NULL;
2309         rbd_obj_request_put(obj_request);
2310 out_err:
2311         if (pages)
2312                 ceph_release_page_vector(pages, page_count);
2313         if (parent_request)
2314                 rbd_img_request_put(parent_request);
2315         obj_request->result = result;
2316         obj_request->xferred = 0;
2317         obj_request_done_set(obj_request);
2318
2319         return result;
2320 }
2321
2322 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2323 {
2324         struct rbd_obj_request *orig_request;
2325         int result;
2326
2327         rbd_assert(!obj_request_img_data_test(obj_request));
2328
2329         /*
2330          * All we need from the object request is the original
2331          * request and the result of the STAT op.  Grab those, then
2332          * we're done with the request.
2333          */
2334         orig_request = obj_request->obj_request;
2335         obj_request->obj_request = NULL;
2336         rbd_assert(orig_request);
2337         rbd_assert(orig_request->img_request);
2338
2339         result = obj_request->result;
2340         obj_request->result = 0;
2341
2342         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2343                 obj_request, orig_request, result,
2344                 obj_request->xferred, obj_request->length);
2345         rbd_obj_request_put(obj_request);
2346
2347         rbd_assert(orig_request);
2348         rbd_assert(orig_request->img_request);
2349
2350         /*
2351          * Our only purpose here is to determine whether the object
2352          * exists, and we don't want to treat the non-existence as
2353          * an error.  If something else comes back, transfer the
2354          * error to the original request and complete it now.
2355          */
2356         if (!result) {
2357                 obj_request_existence_set(orig_request, true);
2358         } else if (result == -ENOENT) {
2359                 obj_request_existence_set(orig_request, false);
2360         } else if (result) {
2361                 orig_request->result = result;
2362                 goto out;
2363         }
2364
2365         /*
2366          * Resubmit the original request now that we have recorded
2367          * whether the target object exists.
2368          */
2369         orig_request->result = rbd_img_obj_request_submit(orig_request);
2370 out:
2371         if (orig_request->result)
2372                 rbd_obj_request_complete(orig_request);
2373         rbd_obj_request_put(orig_request);
2374 }
2375
2376 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2377 {
2378         struct rbd_obj_request *stat_request;
2379         struct rbd_device *rbd_dev;
2380         struct ceph_osd_client *osdc;
2381         struct page **pages = NULL;
2382         u32 page_count;
2383         size_t size;
2384         int ret;
2385
2386         /*
2387          * The response data for a STAT call consists of:
2388          *     le64 length;
2389          *     struct {
2390          *         le32 tv_sec;
2391          *         le32 tv_nsec;
2392          *     } mtime;
2393          */
2394         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2395         page_count = (u32)calc_pages_for(0, size);
2396         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2397         if (IS_ERR(pages))
2398                 return PTR_ERR(pages);
2399
2400         ret = -ENOMEM;
2401         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2402                                                         OBJ_REQUEST_PAGES);
2403         if (!stat_request)
2404                 goto out;
2405
2406         rbd_obj_request_get(obj_request);
2407         stat_request->obj_request = obj_request;
2408         stat_request->pages = pages;
2409         stat_request->page_count = page_count;
2410
2411         rbd_assert(obj_request->img_request);
2412         rbd_dev = obj_request->img_request->rbd_dev;
2413         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2414                                                 stat_request);
2415         if (!stat_request->osd_req)
2416                 goto out;
2417         stat_request->callback = rbd_img_obj_exists_callback;
2418
2419         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2420         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2421                                         false, false);
2422         rbd_osd_req_format_read(stat_request);
2423
2424         osdc = &rbd_dev->rbd_client->client->osdc;
2425         ret = rbd_obj_request_submit(osdc, stat_request);
2426 out:
2427         if (ret)
2428                 rbd_obj_request_put(obj_request);
2429
2430         return ret;
2431 }
2432
2433 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2434 {
2435         struct rbd_img_request *img_request;
2436         struct rbd_device *rbd_dev;
2437         bool known;
2438
2439         rbd_assert(obj_request_img_data_test(obj_request));
2440
2441         img_request = obj_request->img_request;
2442         rbd_assert(img_request);
2443         rbd_dev = img_request->rbd_dev;
2444
2445         /*
2446          * Only writes to layered images need special handling.
2447          * Reads and non-layered writes are simple object requests.
2448          * Layered writes that start beyond the end of the overlap
2449          * with the parent have no parent data, so they too are
2450          * simple object requests.  Finally, if the target object is
2451          * known to already exist, its parent data has already been
2452          * copied, so a write to the object can also be handled as a
2453          * simple object request.
2454          */
2455         if (!img_request_write_test(img_request) ||
2456                 !img_request_layered_test(img_request) ||
2457                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2458                 ((known = obj_request_known_test(obj_request)) &&
2459                         obj_request_exists_test(obj_request))) {
2460
2461                 struct rbd_device *rbd_dev;
2462                 struct ceph_osd_client *osdc;
2463
2464                 rbd_dev = obj_request->img_request->rbd_dev;
2465                 osdc = &rbd_dev->rbd_client->client->osdc;
2466
2467                 return rbd_obj_request_submit(osdc, obj_request);
2468         }
2469
2470         /*
2471          * It's a layered write.  The target object might exist but
2472          * we may not know that yet.  If we know it doesn't exist,
2473          * start by reading the data for the full target object from
2474          * the parent so we can use it for a copyup to the target.
2475          */
2476         if (known)
2477                 return rbd_img_obj_parent_read_full(obj_request);
2478
2479         /* We don't know whether the target exists.  Go find out. */
2480
2481         return rbd_img_obj_exists_submit(obj_request);
2482 }
2483
2484 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2485 {
2486         struct rbd_obj_request *obj_request;
2487         struct rbd_obj_request *next_obj_request;
2488
2489         dout("%s: img %p\n", __func__, img_request);
2490         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2491                 int ret;
2492
2493                 ret = rbd_img_obj_request_submit(obj_request);
2494                 if (ret)
2495                         return ret;
2496         }
2497
2498         return 0;
2499 }
2500
2501 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2502 {
2503         struct rbd_obj_request *obj_request;
2504         struct rbd_device *rbd_dev;
2505         u64 obj_end;
2506
2507         rbd_assert(img_request_child_test(img_request));
2508
2509         obj_request = img_request->obj_request;
2510         rbd_assert(obj_request);
2511         rbd_assert(obj_request->img_request);
2512
2513         obj_request->result = img_request->result;
2514         if (obj_request->result)
2515                 goto out;
2516
2517         /*
2518          * We need to zero anything beyond the parent overlap
2519          * boundary.  Since rbd_img_obj_request_read_callback()
2520          * will zero anything beyond the end of a short read, an
2521          * easy way to do this is to pretend the data from the
2522          * parent came up short--ending at the overlap boundary.
2523          */
2524         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2525         obj_end = obj_request->img_offset + obj_request->length;
2526         rbd_dev = obj_request->img_request->rbd_dev;
2527         if (obj_end > rbd_dev->parent_overlap) {
2528                 u64 xferred = 0;
2529
2530                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2531                         xferred = rbd_dev->parent_overlap -
2532                                         obj_request->img_offset;
2533
2534                 obj_request->xferred = min(img_request->xferred, xferred);
2535         } else {
2536                 obj_request->xferred = img_request->xferred;
2537         }
2538 out:
2539         rbd_img_obj_request_read_callback(obj_request);
2540         rbd_obj_request_complete(obj_request);
2541 }
2542
2543 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2544 {
2545         struct rbd_device *rbd_dev;
2546         struct rbd_img_request *img_request;
2547         int result;
2548
2549         rbd_assert(obj_request_img_data_test(obj_request));
2550         rbd_assert(obj_request->img_request != NULL);
2551         rbd_assert(obj_request->result == (s32) -ENOENT);
2552         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2553
2554         rbd_dev = obj_request->img_request->rbd_dev;
2555         rbd_assert(rbd_dev->parent != NULL);
2556         /* rbd_read_finish(obj_request, obj_request->length); */
2557         img_request = rbd_img_request_create(rbd_dev->parent,
2558                                                 obj_request->img_offset,
2559                                                 obj_request->length,
2560                                                 false, true);
2561         result = -ENOMEM;
2562         if (!img_request)
2563                 goto out_err;
2564
2565         rbd_obj_request_get(obj_request);
2566         img_request->obj_request = obj_request;
2567
2568         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2569                                         obj_request->bio_list);
2570         if (result)
2571                 goto out_err;
2572
2573         img_request->callback = rbd_img_parent_read_callback;
2574         result = rbd_img_request_submit(img_request);
2575         if (result)
2576                 goto out_err;
2577
2578         return;
2579 out_err:
2580         if (img_request)
2581                 rbd_img_request_put(img_request);
2582         obj_request->result = result;
2583         obj_request->xferred = 0;
2584         obj_request_done_set(obj_request);
2585 }
2586
2587 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2588 {
2589         struct rbd_obj_request *obj_request;
2590         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2591         int ret;
2592
2593         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2594                                                         OBJ_REQUEST_NODATA);
2595         if (!obj_request)
2596                 return -ENOMEM;
2597
2598         ret = -ENOMEM;
2599         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2600         if (!obj_request->osd_req)
2601                 goto out;
2602         obj_request->callback = rbd_obj_request_put;
2603
2604         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2605                                         notify_id, 0, 0);
2606         rbd_osd_req_format_read(obj_request);
2607
2608         ret = rbd_obj_request_submit(osdc, obj_request);
2609 out:
2610         if (ret)
2611                 rbd_obj_request_put(obj_request);
2612
2613         return ret;
2614 }
2615
2616 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2617 {
2618         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2619
2620         if (!rbd_dev)
2621                 return;
2622
2623         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2624                 rbd_dev->header_name, (unsigned long long)notify_id,
2625                 (unsigned int)opcode);
2626         (void)rbd_dev_refresh(rbd_dev);
2627
2628         rbd_obj_notify_ack(rbd_dev, notify_id);
2629 }
2630
2631 /*
2632  * Request sync osd watch/unwatch.  The value of "start" determines
2633  * whether a watch request is being initiated or torn down.
2634  */
2635 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2636 {
2637         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2638         struct rbd_obj_request *obj_request;
2639         int ret;
2640
2641         rbd_assert(start ^ !!rbd_dev->watch_event);
2642         rbd_assert(start ^ !!rbd_dev->watch_request);
2643
2644         if (start) {
2645                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2646                                                 &rbd_dev->watch_event);
2647                 if (ret < 0)
2648                         return ret;
2649                 rbd_assert(rbd_dev->watch_event != NULL);
2650         }
2651
2652         ret = -ENOMEM;
2653         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2654                                                         OBJ_REQUEST_NODATA);
2655         if (!obj_request)
2656                 goto out_cancel;
2657
2658         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2659         if (!obj_request->osd_req)
2660                 goto out_cancel;
2661
2662         if (start)
2663                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2664         else
2665                 ceph_osdc_unregister_linger_request(osdc,
2666                                         rbd_dev->watch_request->osd_req);
2667
2668         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2669                                 rbd_dev->watch_event->cookie, 0, start);
2670         rbd_osd_req_format_write(obj_request);
2671
2672         ret = rbd_obj_request_submit(osdc, obj_request);
2673         if (ret)
2674                 goto out_cancel;
2675         ret = rbd_obj_request_wait(obj_request);
2676         if (ret)
2677                 goto out_cancel;
2678         ret = obj_request->result;
2679         if (ret)
2680                 goto out_cancel;
2681
2682         /*
2683          * A watch request is set to linger, so the underlying osd
2684          * request won't go away until we unregister it.  We retain
2685          * a pointer to the object request during that time (in
2686          * rbd_dev->watch_request), so we'll keep a reference to
2687          * it.  We'll drop that reference (below) after we've
2688          * unregistered it.
2689          */
2690         if (start) {
2691                 rbd_dev->watch_request = obj_request;
2692
2693                 return 0;
2694         }
2695
2696         /* We have successfully torn down the watch request */
2697
2698         rbd_obj_request_put(rbd_dev->watch_request);
2699         rbd_dev->watch_request = NULL;
2700 out_cancel:
2701         /* Cancel the event if we're tearing down, or on error */
2702         ceph_osdc_cancel_event(rbd_dev->watch_event);
2703         rbd_dev->watch_event = NULL;
2704         if (obj_request)
2705                 rbd_obj_request_put(obj_request);
2706
2707         return ret;
2708 }
2709
2710 /*
2711  * Synchronous osd object method call.  Returns the number of bytes
2712  * returned in the outbound buffer, or a negative error code.
2713  */
2714 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2715                              const char *object_name,
2716                              const char *class_name,
2717                              const char *method_name,
2718                              const void *outbound,
2719                              size_t outbound_size,
2720                              void *inbound,
2721                              size_t inbound_size)
2722 {
2723         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2724         struct rbd_obj_request *obj_request;
2725         struct page **pages;
2726         u32 page_count;
2727         int ret;
2728
2729         /*
2730          * Method calls are ultimately read operations.  The result
2731          * should placed into the inbound buffer provided.  They
2732          * also supply outbound data--parameters for the object
2733          * method.  Currently if this is present it will be a
2734          * snapshot id.
2735          */
2736         page_count = (u32)calc_pages_for(0, inbound_size);
2737         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2738         if (IS_ERR(pages))
2739                 return PTR_ERR(pages);
2740
2741         ret = -ENOMEM;
2742         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2743                                                         OBJ_REQUEST_PAGES);
2744         if (!obj_request)
2745                 goto out;
2746
2747         obj_request->pages = pages;
2748         obj_request->page_count = page_count;
2749
2750         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2751         if (!obj_request->osd_req)
2752                 goto out;
2753
2754         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2755                                         class_name, method_name);
2756         if (outbound_size) {
2757                 struct ceph_pagelist *pagelist;
2758
2759                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2760                 if (!pagelist)
2761                         goto out;
2762
2763                 ceph_pagelist_init(pagelist);
2764                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2765                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2766                                                 pagelist);
2767         }
2768         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2769                                         obj_request->pages, inbound_size,
2770                                         0, false, false);
2771         rbd_osd_req_format_read(obj_request);
2772
2773         ret = rbd_obj_request_submit(osdc, obj_request);
2774         if (ret)
2775                 goto out;
2776         ret = rbd_obj_request_wait(obj_request);
2777         if (ret)
2778                 goto out;
2779
2780         ret = obj_request->result;
2781         if (ret < 0)
2782                 goto out;
2783
2784         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2785         ret = (int)obj_request->xferred;
2786         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2787 out:
2788         if (obj_request)
2789                 rbd_obj_request_put(obj_request);
2790         else
2791                 ceph_release_page_vector(pages, page_count);
2792
2793         return ret;
2794 }
2795
2796 static void rbd_request_fn(struct request_queue *q)
2797                 __releases(q->queue_lock) __acquires(q->queue_lock)
2798 {
2799         struct rbd_device *rbd_dev = q->queuedata;
2800         bool read_only = rbd_dev->mapping.read_only;
2801         struct request *rq;
2802         int result;
2803
2804         while ((rq = blk_fetch_request(q))) {
2805                 bool write_request = rq_data_dir(rq) == WRITE;
2806                 struct rbd_img_request *img_request;
2807                 u64 offset;
2808                 u64 length;
2809
2810                 /* Ignore any non-FS requests that filter through. */
2811
2812                 if (rq->cmd_type != REQ_TYPE_FS) {
2813                         dout("%s: non-fs request type %d\n", __func__,
2814                                 (int) rq->cmd_type);
2815                         __blk_end_request_all(rq, 0);
2816                         continue;
2817                 }
2818
2819                 /* Ignore/skip any zero-length requests */
2820
2821                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2822                 length = (u64) blk_rq_bytes(rq);
2823
2824                 if (!length) {
2825                         dout("%s: zero-length request\n", __func__);
2826                         __blk_end_request_all(rq, 0);
2827                         continue;
2828                 }
2829
2830                 spin_unlock_irq(q->queue_lock);
2831
2832                 /* Disallow writes to a read-only device */
2833
2834                 if (write_request) {
2835                         result = -EROFS;
2836                         if (read_only)
2837                                 goto end_request;
2838                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2839                 }
2840
2841                 /*
2842                  * Quit early if the mapped snapshot no longer
2843                  * exists.  It's still possible the snapshot will
2844                  * have disappeared by the time our request arrives
2845                  * at the osd, but there's no sense in sending it if
2846                  * we already know.
2847                  */
2848                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2849                         dout("request for non-existent snapshot");
2850                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2851                         result = -ENXIO;
2852                         goto end_request;
2853                 }
2854
2855                 result = -EINVAL;
2856                 if (offset && length > U64_MAX - offset + 1) {
2857                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2858                                 offset, length);
2859                         goto end_request;       /* Shouldn't happen */
2860                 }
2861
2862                 result = -ENOMEM;
2863                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2864                                                         write_request, false);
2865                 if (!img_request)
2866                         goto end_request;
2867
2868                 img_request->rq = rq;
2869
2870                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2871                                                 rq->bio);
2872                 if (!result)
2873                         result = rbd_img_request_submit(img_request);
2874                 if (result)
2875                         rbd_img_request_put(img_request);
2876 end_request:
2877                 spin_lock_irq(q->queue_lock);
2878                 if (result < 0) {
2879                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2880                                 write_request ? "write" : "read",
2881                                 length, offset, result);
2882
2883                         __blk_end_request_all(rq, result);
2884                 }
2885         }
2886 }
2887
2888 /*
2889  * a queue callback. Makes sure that we don't create a bio that spans across
2890  * multiple osd objects. One exception would be with a single page bios,
2891  * which we handle later at bio_chain_clone_range()
2892  */
2893 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2894                           struct bio_vec *bvec)
2895 {
2896         struct rbd_device *rbd_dev = q->queuedata;
2897         sector_t sector_offset;
2898         sector_t sectors_per_obj;
2899         sector_t obj_sector_offset;
2900         int ret;
2901
2902         /*
2903          * Find how far into its rbd object the partition-relative
2904          * bio start sector is to offset relative to the enclosing
2905          * device.
2906          */
2907         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2908         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2909         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2910
2911         /*
2912          * Compute the number of bytes from that offset to the end
2913          * of the object.  Account for what's already used by the bio.
2914          */
2915         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2916         if (ret > bmd->bi_size)
2917                 ret -= bmd->bi_size;
2918         else
2919                 ret = 0;
2920
2921         /*
2922          * Don't send back more than was asked for.  And if the bio
2923          * was empty, let the whole thing through because:  "Note
2924          * that a block device *must* allow a single page to be
2925          * added to an empty bio."
2926          */
2927         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2928         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2929                 ret = (int) bvec->bv_len;
2930
2931         return ret;
2932 }
2933
2934 static void rbd_free_disk(struct rbd_device *rbd_dev)
2935 {
2936         struct gendisk *disk = rbd_dev->disk;
2937
2938         if (!disk)
2939                 return;
2940
2941         rbd_dev->disk = NULL;
2942         if (disk->flags & GENHD_FL_UP) {
2943                 del_gendisk(disk);
2944                 if (disk->queue)
2945                         blk_cleanup_queue(disk->queue);
2946         }
2947         put_disk(disk);
2948 }
2949
2950 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2951                                 const char *object_name,
2952                                 u64 offset, u64 length, void *buf)
2953
2954 {
2955         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2956         struct rbd_obj_request *obj_request;
2957         struct page **pages = NULL;
2958         u32 page_count;
2959         size_t size;
2960         int ret;
2961
2962         page_count = (u32) calc_pages_for(offset, length);
2963         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2964         if (IS_ERR(pages))
2965                 ret = PTR_ERR(pages);
2966
2967         ret = -ENOMEM;
2968         obj_request = rbd_obj_request_create(object_name, offset, length,
2969                                                         OBJ_REQUEST_PAGES);
2970         if (!obj_request)
2971                 goto out;
2972
2973         obj_request->pages = pages;
2974         obj_request->page_count = page_count;
2975
2976         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2977         if (!obj_request->osd_req)
2978                 goto out;
2979
2980         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2981                                         offset, length, 0, 0);
2982         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2983                                         obj_request->pages,
2984                                         obj_request->length,
2985                                         obj_request->offset & ~PAGE_MASK,
2986                                         false, false);
2987         rbd_osd_req_format_read(obj_request);
2988
2989         ret = rbd_obj_request_submit(osdc, obj_request);
2990         if (ret)
2991                 goto out;
2992         ret = rbd_obj_request_wait(obj_request);
2993         if (ret)
2994                 goto out;
2995
2996         ret = obj_request->result;
2997         if (ret < 0)
2998                 goto out;
2999
3000         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3001         size = (size_t) obj_request->xferred;
3002         ceph_copy_from_page_vector(pages, buf, 0, size);
3003         rbd_assert(size <= (size_t)INT_MAX);
3004         ret = (int)size;
3005 out:
3006         if (obj_request)
3007                 rbd_obj_request_put(obj_request);
3008         else
3009                 ceph_release_page_vector(pages, page_count);
3010
3011         return ret;
3012 }
3013
3014 /*
3015  * Read the complete header for the given rbd device.
3016  *
3017  * Returns a pointer to a dynamically-allocated buffer containing
3018  * the complete and validated header.  Caller can pass the address
3019  * of a variable that will be filled in with the version of the
3020  * header object at the time it was read.
3021  *
3022  * Returns a pointer-coded errno if a failure occurs.
3023  */
3024 static struct rbd_image_header_ondisk *
3025 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3026 {
3027         struct rbd_image_header_ondisk *ondisk = NULL;
3028         u32 snap_count = 0;
3029         u64 names_size = 0;
3030         u32 want_count;
3031         int ret;
3032
3033         /*
3034          * The complete header will include an array of its 64-bit
3035          * snapshot ids, followed by the names of those snapshots as
3036          * a contiguous block of NUL-terminated strings.  Note that
3037          * the number of snapshots could change by the time we read
3038          * it in, in which case we re-read it.
3039          */
3040         do {
3041                 size_t size;
3042
3043                 kfree(ondisk);
3044
3045                 size = sizeof (*ondisk);
3046                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3047                 size += names_size;
3048                 ondisk = kmalloc(size, GFP_KERNEL);
3049                 if (!ondisk)
3050                         return ERR_PTR(-ENOMEM);
3051
3052                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3053                                        0, size, ondisk);
3054                 if (ret < 0)
3055                         goto out_err;
3056                 if ((size_t)ret < size) {
3057                         ret = -ENXIO;
3058                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3059                                 size, ret);
3060                         goto out_err;
3061                 }
3062                 if (!rbd_dev_ondisk_valid(ondisk)) {
3063                         ret = -ENXIO;
3064                         rbd_warn(rbd_dev, "invalid header");
3065                         goto out_err;
3066                 }
3067
3068                 names_size = le64_to_cpu(ondisk->snap_names_len);
3069                 want_count = snap_count;
3070                 snap_count = le32_to_cpu(ondisk->snap_count);
3071         } while (snap_count != want_count);
3072
3073         return ondisk;
3074
3075 out_err:
3076         kfree(ondisk);
3077
3078         return ERR_PTR(ret);
3079 }
3080
3081 /*
3082  * reload the ondisk the header
3083  */
3084 static int rbd_read_header(struct rbd_device *rbd_dev,
3085                            struct rbd_image_header *header)
3086 {
3087         struct rbd_image_header_ondisk *ondisk;
3088         int ret;
3089
3090         ondisk = rbd_dev_v1_header_read(rbd_dev);
3091         if (IS_ERR(ondisk))
3092                 return PTR_ERR(ondisk);
3093         ret = rbd_header_from_disk(header, ondisk);
3094         kfree(ondisk);
3095
3096         return ret;
3097 }
3098
3099 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3100 {
3101         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3102                 return;
3103
3104         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3105                 sector_t size;
3106
3107                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3108                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3109                 dout("setting size to %llu sectors", (unsigned long long)size);
3110                 set_capacity(rbd_dev->disk, size);
3111         }
3112 }
3113
3114 /*
3115  * only read the first part of the ondisk header, without the snaps info
3116  */
3117 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3118 {
3119         int ret;
3120         struct rbd_image_header h;
3121
3122         ret = rbd_read_header(rbd_dev, &h);
3123         if (ret < 0)
3124                 return ret;
3125
3126         down_write(&rbd_dev->header_rwsem);
3127
3128         /* Update image size, and check for resize of mapped image */
3129         rbd_dev->header.image_size = h.image_size;
3130         rbd_update_mapping_size(rbd_dev);
3131
3132         /* rbd_dev->header.object_prefix shouldn't change */
3133         kfree(rbd_dev->header.snap_sizes);
3134         kfree(rbd_dev->header.snap_names);
3135         /* osd requests may still refer to snapc */
3136         ceph_put_snap_context(rbd_dev->header.snapc);
3137
3138         rbd_dev->header.image_size = h.image_size;
3139         rbd_dev->header.snapc = h.snapc;
3140         rbd_dev->header.snap_names = h.snap_names;
3141         rbd_dev->header.snap_sizes = h.snap_sizes;
3142         /* Free the extra copy of the object prefix */
3143         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3144                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3145         kfree(h.object_prefix);
3146
3147         up_write(&rbd_dev->header_rwsem);
3148
3149         return ret;
3150 }
3151
3152 /*
3153  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3154  * has disappeared from the (just updated) snapshot context.
3155  */
3156 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3157 {
3158         u64 snap_id;
3159
3160         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3161                 return;
3162
3163         snap_id = rbd_dev->spec->snap_id;
3164         if (snap_id == CEPH_NOSNAP)
3165                 return;
3166
3167         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3168                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3169 }
3170
3171 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3172 {
3173         u64 image_size;
3174         int ret;
3175
3176         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3177         image_size = rbd_dev->header.image_size;
3178         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3179         if (rbd_dev->image_format == 1)
3180                 ret = rbd_dev_v1_refresh(rbd_dev);
3181         else
3182                 ret = rbd_dev_v2_refresh(rbd_dev);
3183
3184         /* If it's a mapped snapshot, validate its EXISTS flag */
3185
3186         rbd_exists_validate(rbd_dev);
3187         mutex_unlock(&ctl_mutex);
3188         if (ret)
3189                 rbd_warn(rbd_dev, "got notification but failed to "
3190                            " update snaps: %d\n", ret);
3191         if (image_size != rbd_dev->header.image_size)
3192                 revalidate_disk(rbd_dev->disk);
3193
3194         return ret;
3195 }
3196
3197 static int rbd_init_disk(struct rbd_device *rbd_dev)
3198 {
3199         struct gendisk *disk;
3200         struct request_queue *q;
3201         u64 segment_size;
3202
3203         /* create gendisk info */
3204         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3205         if (!disk)
3206                 return -ENOMEM;
3207
3208         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3209                  rbd_dev->dev_id);
3210         disk->major = rbd_dev->major;
3211         disk->first_minor = 0;
3212         disk->fops = &rbd_bd_ops;
3213         disk->private_data = rbd_dev;
3214
3215         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3216         if (!q)
3217                 goto out_disk;
3218
3219         /* We use the default size, but let's be explicit about it. */
3220         blk_queue_physical_block_size(q, SECTOR_SIZE);
3221
3222         /* set io sizes to object size */
3223         segment_size = rbd_obj_bytes(&rbd_dev->header);
3224         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3225         blk_queue_max_segment_size(q, segment_size);
3226         blk_queue_io_min(q, segment_size);
3227         blk_queue_io_opt(q, segment_size);
3228
3229         blk_queue_merge_bvec(q, rbd_merge_bvec);
3230         disk->queue = q;
3231
3232         q->queuedata = rbd_dev;
3233
3234         rbd_dev->disk = disk;
3235
3236         return 0;
3237 out_disk:
3238         put_disk(disk);
3239
3240         return -ENOMEM;
3241 }
3242
3243 /*
3244   sysfs
3245 */
3246
3247 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3248 {
3249         return container_of(dev, struct rbd_device, dev);
3250 }
3251
3252 static ssize_t rbd_size_show(struct device *dev,
3253                              struct device_attribute *attr, char *buf)
3254 {
3255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3256
3257         return sprintf(buf, "%llu\n",
3258                 (unsigned long long)rbd_dev->mapping.size);
3259 }
3260
3261 /*
3262  * Note this shows the features for whatever's mapped, which is not
3263  * necessarily the base image.
3264  */
3265 static ssize_t rbd_features_show(struct device *dev,
3266                              struct device_attribute *attr, char *buf)
3267 {
3268         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3269
3270         return sprintf(buf, "0x%016llx\n",
3271                         (unsigned long long)rbd_dev->mapping.features);
3272 }
3273
3274 static ssize_t rbd_major_show(struct device *dev,
3275                               struct device_attribute *attr, char *buf)
3276 {
3277         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3278
3279         if (rbd_dev->major)
3280                 return sprintf(buf, "%d\n", rbd_dev->major);
3281
3282         return sprintf(buf, "(none)\n");
3283
3284 }
3285
3286 static ssize_t rbd_client_id_show(struct device *dev,
3287                                   struct device_attribute *attr, char *buf)
3288 {
3289         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290
3291         return sprintf(buf, "client%lld\n",
3292                         ceph_client_id(rbd_dev->rbd_client->client));
3293 }
3294
3295 static ssize_t rbd_pool_show(struct device *dev,
3296                              struct device_attribute *attr, char *buf)
3297 {
3298         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3299
3300         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3301 }
3302
3303 static ssize_t rbd_pool_id_show(struct device *dev,
3304                              struct device_attribute *attr, char *buf)
3305 {
3306         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3307
3308         return sprintf(buf, "%llu\n",
3309                         (unsigned long long) rbd_dev->spec->pool_id);
3310 }
3311
3312 static ssize_t rbd_name_show(struct device *dev,
3313                              struct device_attribute *attr, char *buf)
3314 {
3315         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3316
3317         if (rbd_dev->spec->image_name)
3318                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3319
3320         return sprintf(buf, "(unknown)\n");
3321 }
3322
3323 static ssize_t rbd_image_id_show(struct device *dev,
3324                              struct device_attribute *attr, char *buf)
3325 {
3326         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3327
3328         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3329 }
3330
3331 /*
3332  * Shows the name of the currently-mapped snapshot (or
3333  * RBD_SNAP_HEAD_NAME for the base image).
3334  */
3335 static ssize_t rbd_snap_show(struct device *dev,
3336                              struct device_attribute *attr,
3337                              char *buf)
3338 {
3339         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3340
3341         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3342 }
3343
3344 /*
3345  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3346  * for the parent image.  If there is no parent, simply shows
3347  * "(no parent image)".
3348  */
3349 static ssize_t rbd_parent_show(struct device *dev,
3350                              struct device_attribute *attr,
3351                              char *buf)
3352 {
3353         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3354         struct rbd_spec *spec = rbd_dev->parent_spec;
3355         int count;
3356         char *bufp = buf;
3357
3358         if (!spec)
3359                 return sprintf(buf, "(no parent image)\n");
3360
3361         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3362                         (unsigned long long) spec->pool_id, spec->pool_name);
3363         if (count < 0)
3364                 return count;
3365         bufp += count;
3366
3367         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3368                         spec->image_name ? spec->image_name : "(unknown)");
3369         if (count < 0)
3370                 return count;
3371         bufp += count;
3372
3373         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3374                         (unsigned long long) spec->snap_id, spec->snap_name);
3375         if (count < 0)
3376                 return count;
3377         bufp += count;
3378
3379         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3380         if (count < 0)
3381                 return count;
3382         bufp += count;
3383
3384         return (ssize_t) (bufp - buf);
3385 }
3386
3387 static ssize_t rbd_image_refresh(struct device *dev,
3388                                  struct device_attribute *attr,
3389                                  const char *buf,
3390                                  size_t size)
3391 {
3392         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3393         int ret;
3394
3395         ret = rbd_dev_refresh(rbd_dev);
3396
3397         return ret < 0 ? ret : size;
3398 }
3399
3400 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3401 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3402 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3403 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3404 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3405 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3406 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3407 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3408 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3409 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3410 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3411
3412 static struct attribute *rbd_attrs[] = {
3413         &dev_attr_size.attr,
3414         &dev_attr_features.attr,
3415         &dev_attr_major.attr,
3416         &dev_attr_client_id.attr,
3417         &dev_attr_pool.attr,
3418         &dev_attr_pool_id.attr,
3419         &dev_attr_name.attr,
3420         &dev_attr_image_id.attr,
3421         &dev_attr_current_snap.attr,
3422         &dev_attr_parent.attr,
3423         &dev_attr_refresh.attr,
3424         NULL
3425 };
3426
3427 static struct attribute_group rbd_attr_group = {
3428         .attrs = rbd_attrs,
3429 };
3430
3431 static const struct attribute_group *rbd_attr_groups[] = {
3432         &rbd_attr_group,
3433         NULL
3434 };
3435
3436 static void rbd_sysfs_dev_release(struct device *dev)
3437 {
3438 }
3439
3440 static struct device_type rbd_device_type = {
3441         .name           = "rbd",
3442         .groups         = rbd_attr_groups,
3443         .release        = rbd_sysfs_dev_release,
3444 };
3445
3446 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3447 {
3448         kref_get(&spec->kref);
3449
3450         return spec;
3451 }
3452
3453 static void rbd_spec_free(struct kref *kref);
3454 static void rbd_spec_put(struct rbd_spec *spec)
3455 {
3456         if (spec)
3457                 kref_put(&spec->kref, rbd_spec_free);
3458 }
3459
3460 static struct rbd_spec *rbd_spec_alloc(void)
3461 {
3462         struct rbd_spec *spec;
3463
3464         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3465         if (!spec)
3466                 return NULL;
3467         kref_init(&spec->kref);
3468
3469         return spec;
3470 }
3471
3472 static void rbd_spec_free(struct kref *kref)
3473 {
3474         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3475
3476         kfree(spec->pool_name);
3477         kfree(spec->image_id);
3478         kfree(spec->image_name);
3479         kfree(spec->snap_name);
3480         kfree(spec);
3481 }
3482
3483 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3484                                 struct rbd_spec *spec)
3485 {
3486         struct rbd_device *rbd_dev;
3487
3488         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3489         if (!rbd_dev)
3490                 return NULL;
3491
3492         spin_lock_init(&rbd_dev->lock);
3493         rbd_dev->flags = 0;
3494         INIT_LIST_HEAD(&rbd_dev->node);
3495         init_rwsem(&rbd_dev->header_rwsem);
3496
3497         rbd_dev->spec = spec;
3498         rbd_dev->rbd_client = rbdc;
3499
3500         /* Initialize the layout used for all rbd requests */
3501
3502         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3503         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3504         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3505         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3506
3507         return rbd_dev;
3508 }
3509
3510 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3511 {
3512         rbd_put_client(rbd_dev->rbd_client);
3513         rbd_spec_put(rbd_dev->spec);
3514         kfree(rbd_dev);
3515 }
3516
3517 /*
3518  * Get the size and object order for an image snapshot, or if
3519  * snap_id is CEPH_NOSNAP, gets this information for the base
3520  * image.
3521  */
3522 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3523                                 u8 *order, u64 *snap_size)
3524 {
3525         __le64 snapid = cpu_to_le64(snap_id);
3526         int ret;
3527         struct {
3528                 u8 order;
3529                 __le64 size;
3530         } __attribute__ ((packed)) size_buf = { 0 };
3531
3532         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3533                                 "rbd", "get_size",
3534                                 &snapid, sizeof (snapid),
3535                                 &size_buf, sizeof (size_buf));
3536         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3537         if (ret < 0)
3538                 return ret;
3539         if (ret < sizeof (size_buf))
3540                 return -ERANGE;
3541
3542         if (order)
3543                 *order = size_buf.order;
3544         *snap_size = le64_to_cpu(size_buf.size);
3545
3546         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3547                 (unsigned long long)snap_id, (unsigned int)*order,
3548                 (unsigned long long)*snap_size);
3549
3550         return 0;
3551 }
3552
3553 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3554 {
3555         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3556                                         &rbd_dev->header.obj_order,
3557                                         &rbd_dev->header.image_size);
3558 }
3559
3560 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3561 {
3562         void *reply_buf;
3563         int ret;
3564         void *p;
3565
3566         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3567         if (!reply_buf)
3568                 return -ENOMEM;
3569
3570         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3571                                 "rbd", "get_object_prefix", NULL, 0,
3572                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3573         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3574         if (ret < 0)
3575                 goto out;
3576
3577         p = reply_buf;
3578         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3579                                                 p + ret, NULL, GFP_NOIO);
3580         ret = 0;
3581
3582         if (IS_ERR(rbd_dev->header.object_prefix)) {
3583                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3584                 rbd_dev->header.object_prefix = NULL;
3585         } else {
3586                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3587         }
3588 out:
3589         kfree(reply_buf);
3590
3591         return ret;
3592 }
3593
3594 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3595                 u64 *snap_features)
3596 {
3597         __le64 snapid = cpu_to_le64(snap_id);
3598         struct {
3599                 __le64 features;
3600                 __le64 incompat;
3601         } __attribute__ ((packed)) features_buf = { 0 };
3602         u64 incompat;
3603         int ret;
3604
3605         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3606                                 "rbd", "get_features",
3607                                 &snapid, sizeof (snapid),
3608                                 &features_buf, sizeof (features_buf));
3609         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3610         if (ret < 0)
3611                 return ret;
3612         if (ret < sizeof (features_buf))
3613                 return -ERANGE;
3614
3615         incompat = le64_to_cpu(features_buf.incompat);
3616         if (incompat & ~RBD_FEATURES_SUPPORTED)
3617                 return -ENXIO;
3618
3619         *snap_features = le64_to_cpu(features_buf.features);
3620
3621         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3622                 (unsigned long long)snap_id,
3623                 (unsigned long long)*snap_features,
3624                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3625
3626         return 0;
3627 }
3628
3629 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3630 {
3631         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3632                                                 &rbd_dev->header.features);
3633 }
3634
3635 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3636 {
3637         struct rbd_spec *parent_spec;
3638         size_t size;
3639         void *reply_buf = NULL;
3640         __le64 snapid;
3641         void *p;
3642         void *end;
3643         char *image_id;
3644         u64 overlap;
3645         int ret;
3646
3647         parent_spec = rbd_spec_alloc();
3648         if (!parent_spec)
3649                 return -ENOMEM;
3650
3651         size = sizeof (__le64) +                                /* pool_id */
3652                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3653                 sizeof (__le64) +                               /* snap_id */
3654                 sizeof (__le64);                                /* overlap */
3655         reply_buf = kmalloc(size, GFP_KERNEL);
3656         if (!reply_buf) {
3657                 ret = -ENOMEM;
3658                 goto out_err;
3659         }
3660
3661         snapid = cpu_to_le64(CEPH_NOSNAP);
3662         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3663                                 "rbd", "get_parent",
3664                                 &snapid, sizeof (snapid),
3665                                 reply_buf, size);
3666         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3667         if (ret < 0)
3668                 goto out_err;
3669
3670         p = reply_buf;
3671         end = reply_buf + ret;
3672         ret = -ERANGE;
3673         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3674         if (parent_spec->pool_id == CEPH_NOPOOL)
3675                 goto out;       /* No parent?  No problem. */
3676
3677         /* The ceph file layout needs to fit pool id in 32 bits */
3678
3679         ret = -EIO;
3680         if (parent_spec->pool_id > (u64)U32_MAX) {
3681                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3682                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3683                 goto out_err;
3684         }
3685
3686         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3687         if (IS_ERR(image_id)) {
3688                 ret = PTR_ERR(image_id);
3689                 goto out_err;
3690         }
3691         parent_spec->image_id = image_id;
3692         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3693         ceph_decode_64_safe(&p, end, overlap, out_err);
3694
3695         rbd_dev->parent_overlap = overlap;
3696         rbd_dev->parent_spec = parent_spec;
3697         parent_spec = NULL;     /* rbd_dev now owns this */
3698 out:
3699         ret = 0;
3700 out_err:
3701         kfree(reply_buf);
3702         rbd_spec_put(parent_spec);
3703
3704         return ret;
3705 }
3706
3707 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3708 {
3709         struct {
3710                 __le64 stripe_unit;
3711                 __le64 stripe_count;
3712         } __attribute__ ((packed)) striping_info_buf = { 0 };
3713         size_t size = sizeof (striping_info_buf);
3714         void *p;
3715         u64 obj_size;
3716         u64 stripe_unit;
3717         u64 stripe_count;
3718         int ret;
3719
3720         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3721                                 "rbd", "get_stripe_unit_count", NULL, 0,
3722                                 (char *)&striping_info_buf, size);
3723         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3724         if (ret < 0)
3725                 return ret;
3726         if (ret < size)
3727                 return -ERANGE;
3728
3729         /*
3730          * We don't actually support the "fancy striping" feature
3731          * (STRIPINGV2) yet, but if the striping sizes are the
3732          * defaults the behavior is the same as before.  So find
3733          * out, and only fail if the image has non-default values.
3734          */
3735         ret = -EINVAL;
3736         obj_size = (u64)1 << rbd_dev->header.obj_order;
3737         p = &striping_info_buf;
3738         stripe_unit = ceph_decode_64(&p);
3739         if (stripe_unit != obj_size) {
3740                 rbd_warn(rbd_dev, "unsupported stripe unit "
3741                                 "(got %llu want %llu)",
3742                                 stripe_unit, obj_size);
3743                 return -EINVAL;
3744         }
3745         stripe_count = ceph_decode_64(&p);
3746         if (stripe_count != 1) {
3747                 rbd_warn(rbd_dev, "unsupported stripe count "
3748                                 "(got %llu want 1)", stripe_count);
3749                 return -EINVAL;
3750         }
3751         rbd_dev->header.stripe_unit = stripe_unit;
3752         rbd_dev->header.stripe_count = stripe_count;
3753
3754         return 0;
3755 }
3756
3757 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3758 {
3759         size_t image_id_size;
3760         char *image_id;
3761         void *p;
3762         void *end;
3763         size_t size;
3764         void *reply_buf = NULL;
3765         size_t len = 0;
3766         char *image_name = NULL;
3767         int ret;
3768
3769         rbd_assert(!rbd_dev->spec->image_name);
3770
3771         len = strlen(rbd_dev->spec->image_id);
3772         image_id_size = sizeof (__le32) + len;
3773         image_id = kmalloc(image_id_size, GFP_KERNEL);
3774         if (!image_id)
3775                 return NULL;
3776
3777         p = image_id;
3778         end = image_id + image_id_size;
3779         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3780
3781         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3782         reply_buf = kmalloc(size, GFP_KERNEL);
3783         if (!reply_buf)
3784                 goto out;
3785
3786         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3787                                 "rbd", "dir_get_name",
3788                                 image_id, image_id_size,
3789                                 reply_buf, size);
3790         if (ret < 0)
3791                 goto out;
3792         p = reply_buf;
3793         end = reply_buf + ret;
3794
3795         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3796         if (IS_ERR(image_name))
3797                 image_name = NULL;
3798         else
3799                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3800 out:
3801         kfree(reply_buf);
3802         kfree(image_id);
3803
3804         return image_name;
3805 }
3806
3807 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3808 {
3809         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3810         const char *snap_name;
3811         u32 which = 0;
3812
3813         /* Skip over names until we find the one we are looking for */
3814
3815         snap_name = rbd_dev->header.snap_names;
3816         while (which < snapc->num_snaps) {
3817                 if (!strcmp(name, snap_name))
3818                         return snapc->snaps[which];
3819                 snap_name += strlen(snap_name) + 1;
3820                 which++;
3821         }
3822         return CEPH_NOSNAP;
3823 }
3824
3825 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3826 {
3827         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3828         u32 which;
3829         bool found = false;
3830         u64 snap_id;
3831
3832         for (which = 0; !found && which < snapc->num_snaps; which++) {
3833                 const char *snap_name;
3834
3835                 snap_id = snapc->snaps[which];
3836                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3837                 if (IS_ERR(snap_name))
3838                         break;
3839                 found = !strcmp(name, snap_name);
3840                 kfree(snap_name);
3841         }
3842         return found ? snap_id : CEPH_NOSNAP;
3843 }
3844
3845 /*
3846  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3847  * no snapshot by that name is found, or if an error occurs.
3848  */
3849 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3850 {
3851         if (rbd_dev->image_format == 1)
3852                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3853
3854         return rbd_v2_snap_id_by_name(rbd_dev, name);
3855 }
3856
3857 /*
3858  * When an rbd image has a parent image, it is identified by the
3859  * pool, image, and snapshot ids (not names).  This function fills
3860  * in the names for those ids.  (It's OK if we can't figure out the
3861  * name for an image id, but the pool and snapshot ids should always
3862  * exist and have names.)  All names in an rbd spec are dynamically
3863  * allocated.
3864  *
3865  * When an image being mapped (not a parent) is probed, we have the
3866  * pool name and pool id, image name and image id, and the snapshot
3867  * name.  The only thing we're missing is the snapshot id.
3868  */
3869 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3870 {
3871         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3872         struct rbd_spec *spec = rbd_dev->spec;
3873         const char *pool_name;
3874         const char *image_name;
3875         const char *snap_name;
3876         int ret;
3877
3878         /*
3879          * An image being mapped will have the pool name (etc.), but
3880          * we need to look up the snapshot id.
3881          */
3882         if (spec->pool_name) {
3883                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3884                         u64 snap_id;
3885
3886                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3887                         if (snap_id == CEPH_NOSNAP)
3888                                 return -ENOENT;
3889                         spec->snap_id = snap_id;
3890                 } else {
3891                         spec->snap_id = CEPH_NOSNAP;
3892                 }
3893
3894                 return 0;
3895         }
3896
3897         /* Get the pool name; we have to make our own copy of this */
3898
3899         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3900         if (!pool_name) {
3901                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3902                 return -EIO;
3903         }
3904         pool_name = kstrdup(pool_name, GFP_KERNEL);
3905         if (!pool_name)
3906                 return -ENOMEM;
3907
3908         /* Fetch the image name; tolerate failure here */
3909
3910         image_name = rbd_dev_image_name(rbd_dev);
3911         if (!image_name)
3912                 rbd_warn(rbd_dev, "unable to get image name");
3913
3914         /* Look up the snapshot name, and make a copy */
3915
3916         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3917         if (!snap_name) {
3918                 ret = -ENOMEM;
3919                 goto out_err;
3920         }
3921
3922         spec->pool_name = pool_name;
3923         spec->image_name = image_name;
3924         spec->snap_name = snap_name;
3925
3926         return 0;
3927 out_err:
3928         kfree(image_name);
3929         kfree(pool_name);
3930
3931         return ret;
3932 }
3933
3934 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3935 {
3936         size_t size;
3937         int ret;
3938         void *reply_buf;
3939         void *p;
3940         void *end;
3941         u64 seq;
3942         u32 snap_count;
3943         struct ceph_snap_context *snapc;
3944         u32 i;
3945
3946         /*
3947          * We'll need room for the seq value (maximum snapshot id),
3948          * snapshot count, and array of that many snapshot ids.
3949          * For now we have a fixed upper limit on the number we're
3950          * prepared to receive.
3951          */
3952         size = sizeof (__le64) + sizeof (__le32) +
3953                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3954         reply_buf = kzalloc(size, GFP_KERNEL);
3955         if (!reply_buf)
3956                 return -ENOMEM;
3957
3958         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3959                                 "rbd", "get_snapcontext", NULL, 0,
3960                                 reply_buf, size);
3961         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3962         if (ret < 0)
3963                 goto out;
3964
3965         p = reply_buf;
3966         end = reply_buf + ret;
3967         ret = -ERANGE;
3968         ceph_decode_64_safe(&p, end, seq, out);
3969         ceph_decode_32_safe(&p, end, snap_count, out);
3970
3971         /*
3972          * Make sure the reported number of snapshot ids wouldn't go
3973          * beyond the end of our buffer.  But before checking that,
3974          * make sure the computed size of the snapshot context we
3975          * allocate is representable in a size_t.
3976          */
3977         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3978                                  / sizeof (u64)) {
3979                 ret = -EINVAL;
3980                 goto out;
3981         }
3982         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3983                 goto out;
3984         ret = 0;
3985
3986         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3987         if (!snapc) {
3988                 ret = -ENOMEM;
3989                 goto out;
3990         }
3991         snapc->seq = seq;
3992         for (i = 0; i < snap_count; i++)
3993                 snapc->snaps[i] = ceph_decode_64(&p);
3994
3995         rbd_dev->header.snapc = snapc;
3996
3997         dout("  snap context seq = %llu, snap_count = %u\n",
3998                 (unsigned long long)seq, (unsigned int)snap_count);
3999 out:
4000         kfree(reply_buf);
4001
4002         return ret;
4003 }
4004
4005 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4006                                         u64 snap_id)
4007 {
4008         size_t size;
4009         void *reply_buf;
4010         __le64 snapid;
4011         int ret;
4012         void *p;
4013         void *end;
4014         char *snap_name;
4015
4016         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4017         reply_buf = kmalloc(size, GFP_KERNEL);
4018         if (!reply_buf)
4019                 return ERR_PTR(-ENOMEM);
4020
4021         snapid = cpu_to_le64(snap_id);
4022         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4023                                 "rbd", "get_snapshot_name",
4024                                 &snapid, sizeof (snapid),
4025                                 reply_buf, size);
4026         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4027         if (ret < 0) {
4028                 snap_name = ERR_PTR(ret);
4029                 goto out;
4030         }
4031
4032         p = reply_buf;
4033         end = reply_buf + ret;
4034         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4035         if (IS_ERR(snap_name))
4036                 goto out;
4037
4038         dout("  snap_id 0x%016llx snap_name = %s\n",
4039                 (unsigned long long)snap_id, snap_name);
4040 out:
4041         kfree(reply_buf);
4042
4043         return snap_name;
4044 }
4045
4046 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4047 {
4048         int ret;
4049
4050         down_write(&rbd_dev->header_rwsem);
4051
4052         ret = rbd_dev_v2_image_size(rbd_dev);
4053         if (ret)
4054                 goto out;
4055         rbd_update_mapping_size(rbd_dev);
4056
4057         ret = rbd_dev_v2_snap_context(rbd_dev);
4058         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4059         if (ret)
4060                 goto out;
4061 out:
4062         up_write(&rbd_dev->header_rwsem);
4063
4064         return ret;
4065 }
4066
4067 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4068 {
4069         struct device *dev;
4070         int ret;
4071
4072         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4073
4074         dev = &rbd_dev->dev;
4075         dev->bus = &rbd_bus_type;
4076         dev->type = &rbd_device_type;
4077         dev->parent = &rbd_root_dev;
4078         dev->release = rbd_dev_device_release;
4079         dev_set_name(dev, "%d", rbd_dev->dev_id);
4080         ret = device_register(dev);
4081
4082         mutex_unlock(&ctl_mutex);
4083
4084         return ret;
4085 }
4086
4087 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4088 {
4089         device_unregister(&rbd_dev->dev);
4090 }
4091
4092 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4093
4094 /*
4095  * Get a unique rbd identifier for the given new rbd_dev, and add
4096  * the rbd_dev to the global list.  The minimum rbd id is 1.
4097  */
4098 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4099 {
4100         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4101
4102         spin_lock(&rbd_dev_list_lock);
4103         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4104         spin_unlock(&rbd_dev_list_lock);
4105         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4106                 (unsigned long long) rbd_dev->dev_id);
4107 }
4108
4109 /*
4110  * Remove an rbd_dev from the global list, and record that its
4111  * identifier is no longer in use.
4112  */
4113 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4114 {
4115         struct list_head *tmp;
4116         int rbd_id = rbd_dev->dev_id;
4117         int max_id;
4118
4119         rbd_assert(rbd_id > 0);
4120
4121         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4122                 (unsigned long long) rbd_dev->dev_id);
4123         spin_lock(&rbd_dev_list_lock);
4124         list_del_init(&rbd_dev->node);
4125
4126         /*
4127          * If the id being "put" is not the current maximum, there
4128          * is nothing special we need to do.
4129          */
4130         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4131                 spin_unlock(&rbd_dev_list_lock);
4132                 return;
4133         }
4134
4135         /*
4136          * We need to update the current maximum id.  Search the
4137          * list to find out what it is.  We're more likely to find
4138          * the maximum at the end, so search the list backward.
4139          */
4140         max_id = 0;
4141         list_for_each_prev(tmp, &rbd_dev_list) {
4142                 struct rbd_device *rbd_dev;
4143
4144                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4145                 if (rbd_dev->dev_id > max_id)
4146                         max_id = rbd_dev->dev_id;
4147         }
4148         spin_unlock(&rbd_dev_list_lock);
4149
4150         /*
4151          * The max id could have been updated by rbd_dev_id_get(), in
4152          * which case it now accurately reflects the new maximum.
4153          * Be careful not to overwrite the maximum value in that
4154          * case.
4155          */
4156         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4157         dout("  max dev id has been reset\n");
4158 }
4159
4160 /*
4161  * Skips over white space at *buf, and updates *buf to point to the
4162  * first found non-space character (if any). Returns the length of
4163  * the token (string of non-white space characters) found.  Note
4164  * that *buf must be terminated with '\0'.
4165  */
4166 static inline size_t next_token(const char **buf)
4167 {
4168         /*
4169         * These are the characters that produce nonzero for
4170         * isspace() in the "C" and "POSIX" locales.
4171         */
4172         const char *spaces = " \f\n\r\t\v";
4173
4174         *buf += strspn(*buf, spaces);   /* Find start of token */
4175
4176         return strcspn(*buf, spaces);   /* Return token length */
4177 }
4178
4179 /*
4180  * Finds the next token in *buf, and if the provided token buffer is
4181  * big enough, copies the found token into it.  The result, if
4182  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4183  * must be terminated with '\0' on entry.
4184  *
4185  * Returns the length of the token found (not including the '\0').
4186  * Return value will be 0 if no token is found, and it will be >=
4187  * token_size if the token would not fit.
4188  *
4189  * The *buf pointer will be updated to point beyond the end of the
4190  * found token.  Note that this occurs even if the token buffer is
4191  * too small to hold it.
4192  */
4193 static inline size_t copy_token(const char **buf,
4194                                 char *token,
4195                                 size_t token_size)
4196 {
4197         size_t len;
4198
4199         len = next_token(buf);
4200         if (len < token_size) {
4201                 memcpy(token, *buf, len);
4202                 *(token + len) = '\0';
4203         }
4204         *buf += len;
4205
4206         return len;
4207 }
4208
4209 /*
4210  * Finds the next token in *buf, dynamically allocates a buffer big
4211  * enough to hold a copy of it, and copies the token into the new
4212  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4213  * that a duplicate buffer is created even for a zero-length token.
4214  *
4215  * Returns a pointer to the newly-allocated duplicate, or a null
4216  * pointer if memory for the duplicate was not available.  If
4217  * the lenp argument is a non-null pointer, the length of the token
4218  * (not including the '\0') is returned in *lenp.
4219  *
4220  * If successful, the *buf pointer will be updated to point beyond
4221  * the end of the found token.
4222  *
4223  * Note: uses GFP_KERNEL for allocation.
4224  */
4225 static inline char *dup_token(const char **buf, size_t *lenp)
4226 {
4227         char *dup;
4228         size_t len;
4229
4230         len = next_token(buf);
4231         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4232         if (!dup)
4233                 return NULL;
4234         *(dup + len) = '\0';
4235         *buf += len;
4236
4237         if (lenp)
4238                 *lenp = len;
4239
4240         return dup;
4241 }
4242
4243 /*
4244  * Parse the options provided for an "rbd add" (i.e., rbd image
4245  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4246  * and the data written is passed here via a NUL-terminated buffer.
4247  * Returns 0 if successful or an error code otherwise.
4248  *
4249  * The information extracted from these options is recorded in
4250  * the other parameters which return dynamically-allocated
4251  * structures:
4252  *  ceph_opts
4253  *      The address of a pointer that will refer to a ceph options
4254  *      structure.  Caller must release the returned pointer using
4255  *      ceph_destroy_options() when it is no longer needed.
4256  *  rbd_opts
4257  *      Address of an rbd options pointer.  Fully initialized by
4258  *      this function; caller must release with kfree().
4259  *  spec
4260  *      Address of an rbd image specification pointer.  Fully
4261  *      initialized by this function based on parsed options.
4262  *      Caller must release with rbd_spec_put().
4263  *
4264  * The options passed take this form:
4265  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4266  * where:
4267  *  <mon_addrs>
4268  *      A comma-separated list of one or more monitor addresses.
4269  *      A monitor address is an ip address, optionally followed
4270  *      by a port number (separated by a colon).
4271  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4272  *  <options>
4273  *      A comma-separated list of ceph and/or rbd options.
4274  *  <pool_name>
4275  *      The name of the rados pool containing the rbd image.
4276  *  <image_name>
4277  *      The name of the image in that pool to map.
4278  *  <snap_id>
4279  *      An optional snapshot id.  If provided, the mapping will
4280  *      present data from the image at the time that snapshot was
4281  *      created.  The image head is used if no snapshot id is
4282  *      provided.  Snapshot mappings are always read-only.
4283  */
4284 static int rbd_add_parse_args(const char *buf,
4285                                 struct ceph_options **ceph_opts,
4286                                 struct rbd_options **opts,
4287                                 struct rbd_spec **rbd_spec)
4288 {
4289         size_t len;
4290         char *options;
4291         const char *mon_addrs;
4292         char *snap_name;
4293         size_t mon_addrs_size;
4294         struct rbd_spec *spec = NULL;
4295         struct rbd_options *rbd_opts = NULL;
4296         struct ceph_options *copts;
4297         int ret;
4298
4299         /* The first four tokens are required */
4300
4301         len = next_token(&buf);
4302         if (!len) {
4303                 rbd_warn(NULL, "no monitor address(es) provided");
4304                 return -EINVAL;
4305         }
4306         mon_addrs = buf;
4307         mon_addrs_size = len + 1;
4308         buf += len;
4309
4310         ret = -EINVAL;
4311         options = dup_token(&buf, NULL);
4312         if (!options)
4313                 return -ENOMEM;
4314         if (!*options) {
4315                 rbd_warn(NULL, "no options provided");
4316                 goto out_err;
4317         }
4318
4319         spec = rbd_spec_alloc();
4320         if (!spec)
4321                 goto out_mem;
4322
4323         spec->pool_name = dup_token(&buf, NULL);
4324         if (!spec->pool_name)
4325                 goto out_mem;
4326         if (!*spec->pool_name) {
4327                 rbd_warn(NULL, "no pool name provided");
4328                 goto out_err;
4329         }
4330
4331         spec->image_name = dup_token(&buf, NULL);
4332         if (!spec->image_name)
4333                 goto out_mem;
4334         if (!*spec->image_name) {
4335                 rbd_warn(NULL, "no image name provided");
4336                 goto out_err;
4337         }
4338
4339         /*
4340          * Snapshot name is optional; default is to use "-"
4341          * (indicating the head/no snapshot).
4342          */
4343         len = next_token(&buf);
4344         if (!len) {
4345                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4346                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4347         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4348                 ret = -ENAMETOOLONG;
4349                 goto out_err;
4350         }
4351         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4352         if (!snap_name)
4353                 goto out_mem;
4354         *(snap_name + len) = '\0';
4355         spec->snap_name = snap_name;
4356
4357         /* Initialize all rbd options to the defaults */
4358
4359         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4360         if (!rbd_opts)
4361                 goto out_mem;
4362
4363         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4364
4365         copts = ceph_parse_options(options, mon_addrs,
4366                                         mon_addrs + mon_addrs_size - 1,
4367                                         parse_rbd_opts_token, rbd_opts);
4368         if (IS_ERR(copts)) {
4369                 ret = PTR_ERR(copts);
4370                 goto out_err;
4371         }
4372         kfree(options);
4373
4374         *ceph_opts = copts;
4375         *opts = rbd_opts;
4376         *rbd_spec = spec;
4377
4378         return 0;
4379 out_mem:
4380         ret = -ENOMEM;
4381 out_err:
4382         kfree(rbd_opts);
4383         rbd_spec_put(spec);
4384         kfree(options);
4385
4386         return ret;
4387 }
4388
4389 /*
4390  * An rbd format 2 image has a unique identifier, distinct from the
4391  * name given to it by the user.  Internally, that identifier is
4392  * what's used to specify the names of objects related to the image.
4393  *
4394  * A special "rbd id" object is used to map an rbd image name to its
4395  * id.  If that object doesn't exist, then there is no v2 rbd image
4396  * with the supplied name.
4397  *
4398  * This function will record the given rbd_dev's image_id field if
4399  * it can be determined, and in that case will return 0.  If any
4400  * errors occur a negative errno will be returned and the rbd_dev's
4401  * image_id field will be unchanged (and should be NULL).
4402  */
4403 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4404 {
4405         int ret;
4406         size_t size;
4407         char *object_name;
4408         void *response;
4409         char *image_id;
4410
4411         /*
4412          * When probing a parent image, the image id is already
4413          * known (and the image name likely is not).  There's no
4414          * need to fetch the image id again in this case.  We
4415          * do still need to set the image format though.
4416          */
4417         if (rbd_dev->spec->image_id) {
4418                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4419
4420                 return 0;
4421         }
4422
4423         /*
4424          * First, see if the format 2 image id file exists, and if
4425          * so, get the image's persistent id from it.
4426          */
4427         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4428         object_name = kmalloc(size, GFP_NOIO);
4429         if (!object_name)
4430                 return -ENOMEM;
4431         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4432         dout("rbd id object name is %s\n", object_name);
4433
4434         /* Response will be an encoded string, which includes a length */
4435
4436         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4437         response = kzalloc(size, GFP_NOIO);
4438         if (!response) {
4439                 ret = -ENOMEM;
4440                 goto out;
4441         }
4442
4443         /* If it doesn't exist we'll assume it's a format 1 image */
4444
4445         ret = rbd_obj_method_sync(rbd_dev, object_name,
4446                                 "rbd", "get_id", NULL, 0,
4447                                 response, RBD_IMAGE_ID_LEN_MAX);
4448         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4449         if (ret == -ENOENT) {
4450                 image_id = kstrdup("", GFP_KERNEL);
4451                 ret = image_id ? 0 : -ENOMEM;
4452                 if (!ret)
4453                         rbd_dev->image_format = 1;
4454         } else if (ret > sizeof (__le32)) {
4455                 void *p = response;
4456
4457                 image_id = ceph_extract_encoded_string(&p, p + ret,
4458                                                 NULL, GFP_NOIO);
4459                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4460                 if (!ret)
4461                         rbd_dev->image_format = 2;
4462         } else {
4463                 ret = -EINVAL;
4464         }
4465
4466         if (!ret) {
4467                 rbd_dev->spec->image_id = image_id;
4468                 dout("image_id is %s\n", image_id);
4469         }
4470 out:
4471         kfree(response);
4472         kfree(object_name);
4473
4474         return ret;
4475 }
4476
4477 /* Undo whatever state changes are made by v1 or v2 image probe */
4478
4479 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4480 {
4481         struct rbd_image_header *header;
4482
4483         rbd_dev_remove_parent(rbd_dev);
4484         rbd_spec_put(rbd_dev->parent_spec);
4485         rbd_dev->parent_spec = NULL;
4486         rbd_dev->parent_overlap = 0;
4487
4488         /* Free dynamic fields from the header, then zero it out */
4489
4490         header = &rbd_dev->header;
4491         ceph_put_snap_context(header->snapc);
4492         kfree(header->snap_sizes);
4493         kfree(header->snap_names);
4494         kfree(header->object_prefix);
4495         memset(header, 0, sizeof (*header));
4496 }
4497
4498 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4499 {
4500         int ret;
4501
4502         /* Populate rbd image metadata */
4503
4504         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4505         if (ret < 0)
4506                 goto out_err;
4507
4508         /* Version 1 images have no parent (no layering) */
4509
4510         rbd_dev->parent_spec = NULL;
4511         rbd_dev->parent_overlap = 0;
4512
4513         dout("discovered version 1 image, header name is %s\n",
4514                 rbd_dev->header_name);
4515
4516         return 0;
4517
4518 out_err:
4519         kfree(rbd_dev->header_name);
4520         rbd_dev->header_name = NULL;
4521         kfree(rbd_dev->spec->image_id);
4522         rbd_dev->spec->image_id = NULL;
4523
4524         return ret;
4525 }
4526
4527 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4528 {
4529         int ret;
4530
4531         ret = rbd_dev_v2_image_size(rbd_dev);
4532         if (ret)
4533                 goto out_err;
4534
4535         /* Get the object prefix (a.k.a. block_name) for the image */
4536
4537         ret = rbd_dev_v2_object_prefix(rbd_dev);
4538         if (ret)
4539                 goto out_err;
4540
4541         /* Get the and check features for the image */
4542
4543         ret = rbd_dev_v2_features(rbd_dev);
4544         if (ret)
4545                 goto out_err;
4546
4547         /* If the image supports layering, get the parent info */
4548
4549         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4550                 ret = rbd_dev_v2_parent_info(rbd_dev);
4551                 if (ret)
4552                         goto out_err;
4553
4554                 /*
4555                  * Don't print a warning for parent images.  We can
4556                  * tell this point because we won't know its pool
4557                  * name yet (just its pool id).
4558                  */
4559                 if (rbd_dev->spec->pool_name)
4560                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4561                                         "is EXPERIMENTAL!");
4562         }
4563
4564         /* If the image supports fancy striping, get its parameters */
4565
4566         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4567                 ret = rbd_dev_v2_striping_info(rbd_dev);
4568                 if (ret < 0)
4569                         goto out_err;
4570         }
4571
4572         /* crypto and compression type aren't (yet) supported for v2 images */
4573
4574         rbd_dev->header.crypt_type = 0;
4575         rbd_dev->header.comp_type = 0;
4576
4577         /* Get the snapshot context, plus the header version */
4578
4579         ret = rbd_dev_v2_snap_context(rbd_dev);
4580         if (ret)
4581                 goto out_err;
4582
4583         dout("discovered version 2 image, header name is %s\n",
4584                 rbd_dev->header_name);
4585
4586         return 0;
4587 out_err:
4588         rbd_dev->parent_overlap = 0;
4589         rbd_spec_put(rbd_dev->parent_spec);
4590         rbd_dev->parent_spec = NULL;
4591         kfree(rbd_dev->header_name);
4592         rbd_dev->header_name = NULL;
4593         kfree(rbd_dev->header.object_prefix);
4594         rbd_dev->header.object_prefix = NULL;
4595
4596         return ret;
4597 }
4598
4599 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4600 {
4601         struct rbd_device *parent = NULL;
4602         struct rbd_spec *parent_spec;
4603         struct rbd_client *rbdc;
4604         int ret;
4605
4606         if (!rbd_dev->parent_spec)
4607                 return 0;
4608         /*
4609          * We need to pass a reference to the client and the parent
4610          * spec when creating the parent rbd_dev.  Images related by
4611          * parent/child relationships always share both.
4612          */
4613         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4614         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4615
4616         ret = -ENOMEM;
4617         parent = rbd_dev_create(rbdc, parent_spec);
4618         if (!parent)
4619                 goto out_err;
4620
4621         ret = rbd_dev_image_probe(parent);
4622         if (ret < 0)
4623                 goto out_err;
4624         rbd_dev->parent = parent;
4625
4626         return 0;
4627 out_err:
4628         if (parent) {
4629                 rbd_spec_put(rbd_dev->parent_spec);
4630                 kfree(rbd_dev->header_name);
4631                 rbd_dev_destroy(parent);
4632         } else {
4633                 rbd_put_client(rbdc);
4634                 rbd_spec_put(parent_spec);
4635         }
4636
4637         return ret;
4638 }
4639
4640 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4641 {
4642         int ret;
4643
4644         ret = rbd_dev_mapping_set(rbd_dev);
4645         if (ret)
4646                 return ret;
4647
4648         /* generate unique id: find highest unique id, add one */
4649         rbd_dev_id_get(rbd_dev);
4650
4651         /* Fill in the device name, now that we have its id. */
4652         BUILD_BUG_ON(DEV_NAME_LEN
4653                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4654         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4655
4656         /* Get our block major device number. */
4657
4658         ret = register_blkdev(0, rbd_dev->name);
4659         if (ret < 0)
4660                 goto err_out_id;
4661         rbd_dev->major = ret;
4662
4663         /* Set up the blkdev mapping. */
4664
4665         ret = rbd_init_disk(rbd_dev);
4666         if (ret)
4667                 goto err_out_blkdev;
4668
4669         ret = rbd_bus_add_dev(rbd_dev);
4670         if (ret)
4671                 goto err_out_disk;
4672
4673         /* Everything's ready.  Announce the disk to the world. */
4674
4675         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4676         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4677         add_disk(rbd_dev->disk);
4678
4679         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4680                 (unsigned long long) rbd_dev->mapping.size);
4681
4682         return ret;
4683
4684 err_out_disk:
4685         rbd_free_disk(rbd_dev);
4686 err_out_blkdev:
4687         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4688 err_out_id:
4689         rbd_dev_id_put(rbd_dev);
4690         rbd_dev_mapping_clear(rbd_dev);
4691
4692         return ret;
4693 }
4694
4695 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4696 {
4697         struct rbd_spec *spec = rbd_dev->spec;
4698         size_t size;
4699
4700         /* Record the header object name for this rbd image. */
4701
4702         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4703
4704         if (rbd_dev->image_format == 1)
4705                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4706         else
4707                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4708
4709         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4710         if (!rbd_dev->header_name)
4711                 return -ENOMEM;
4712
4713         if (rbd_dev->image_format == 1)
4714                 sprintf(rbd_dev->header_name, "%s%s",
4715                         spec->image_name, RBD_SUFFIX);
4716         else
4717                 sprintf(rbd_dev->header_name, "%s%s",
4718                         RBD_HEADER_PREFIX, spec->image_id);
4719         return 0;
4720 }
4721
4722 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4723 {
4724         int ret;
4725
4726         rbd_dev_unprobe(rbd_dev);
4727         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4728         if (ret)
4729                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4730         kfree(rbd_dev->header_name);
4731         rbd_dev->header_name = NULL;
4732         rbd_dev->image_format = 0;
4733         kfree(rbd_dev->spec->image_id);
4734         rbd_dev->spec->image_id = NULL;
4735
4736         rbd_dev_destroy(rbd_dev);
4737 }
4738
4739 /*
4740  * Probe for the existence of the header object for the given rbd
4741  * device.  For format 2 images this includes determining the image
4742  * id.
4743  */
4744 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4745 {
4746         int ret;
4747         int tmp;
4748
4749         /*
4750          * Get the id from the image id object.  If it's not a
4751          * format 2 image, we'll get ENOENT back, and we'll assume
4752          * it's a format 1 image.
4753          */
4754         ret = rbd_dev_image_id(rbd_dev);
4755         if (ret)
4756                 return ret;
4757         rbd_assert(rbd_dev->spec->image_id);
4758         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4759
4760         ret = rbd_dev_header_name(rbd_dev);
4761         if (ret)
4762                 goto err_out_format;
4763
4764         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4765         if (ret)
4766                 goto out_header_name;
4767
4768         if (rbd_dev->image_format == 1)
4769                 ret = rbd_dev_v1_probe(rbd_dev);
4770         else
4771                 ret = rbd_dev_v2_probe(rbd_dev);
4772         if (ret)
4773                 goto err_out_watch;
4774
4775         ret = rbd_dev_spec_update(rbd_dev);
4776         if (ret)
4777                 goto err_out_probe;
4778
4779         ret = rbd_dev_probe_parent(rbd_dev);
4780         if (!ret)
4781                 return 0;
4782
4783 err_out_probe:
4784         rbd_dev_unprobe(rbd_dev);
4785 err_out_watch:
4786         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4787         if (tmp)
4788                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4789 out_header_name:
4790         kfree(rbd_dev->header_name);
4791         rbd_dev->header_name = NULL;
4792 err_out_format:
4793         rbd_dev->image_format = 0;
4794         kfree(rbd_dev->spec->image_id);
4795         rbd_dev->spec->image_id = NULL;
4796
4797         dout("probe failed, returning %d\n", ret);
4798
4799         return ret;
4800 }
4801
4802 static ssize_t rbd_add(struct bus_type *bus,
4803                        const char *buf,
4804                        size_t count)
4805 {
4806         struct rbd_device *rbd_dev = NULL;
4807         struct ceph_options *ceph_opts = NULL;
4808         struct rbd_options *rbd_opts = NULL;
4809         struct rbd_spec *spec = NULL;
4810         struct rbd_client *rbdc;
4811         struct ceph_osd_client *osdc;
4812         int rc = -ENOMEM;
4813
4814         if (!try_module_get(THIS_MODULE))
4815                 return -ENODEV;
4816
4817         /* parse add command */
4818         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4819         if (rc < 0)
4820                 goto err_out_module;
4821
4822         rbdc = rbd_get_client(ceph_opts);
4823         if (IS_ERR(rbdc)) {
4824                 rc = PTR_ERR(rbdc);
4825                 goto err_out_args;
4826         }
4827         ceph_opts = NULL;       /* rbd_dev client now owns this */
4828
4829         /* pick the pool */
4830         osdc = &rbdc->client->osdc;
4831         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4832         if (rc < 0)
4833                 goto err_out_client;
4834         spec->pool_id = (u64)rc;
4835
4836         /* The ceph file layout needs to fit pool id in 32 bits */
4837
4838         if (spec->pool_id > (u64)U32_MAX) {
4839                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4840                                 (unsigned long long)spec->pool_id, U32_MAX);
4841                 rc = -EIO;
4842                 goto err_out_client;
4843         }
4844
4845         rbd_dev = rbd_dev_create(rbdc, spec);
4846         if (!rbd_dev)
4847                 goto err_out_client;
4848         rbdc = NULL;            /* rbd_dev now owns this */
4849         spec = NULL;            /* rbd_dev now owns this */
4850
4851         rbd_dev->mapping.read_only = rbd_opts->read_only;
4852         kfree(rbd_opts);
4853         rbd_opts = NULL;        /* done with this */
4854
4855         rc = rbd_dev_image_probe(rbd_dev);
4856         if (rc < 0)
4857                 goto err_out_rbd_dev;
4858
4859         rc = rbd_dev_device_setup(rbd_dev);
4860         if (!rc)
4861                 return count;
4862
4863         rbd_dev_image_release(rbd_dev);
4864 err_out_rbd_dev:
4865         rbd_dev_destroy(rbd_dev);
4866 err_out_client:
4867         rbd_put_client(rbdc);
4868 err_out_args:
4869         if (ceph_opts)
4870                 ceph_destroy_options(ceph_opts);
4871         kfree(rbd_opts);
4872         rbd_spec_put(spec);
4873 err_out_module:
4874         module_put(THIS_MODULE);
4875
4876         dout("Error adding device %s\n", buf);
4877
4878         return (ssize_t)rc;
4879 }
4880
4881 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4882 {
4883         struct list_head *tmp;
4884         struct rbd_device *rbd_dev;
4885
4886         spin_lock(&rbd_dev_list_lock);
4887         list_for_each(tmp, &rbd_dev_list) {
4888                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4889                 if (rbd_dev->dev_id == dev_id) {
4890                         spin_unlock(&rbd_dev_list_lock);
4891                         return rbd_dev;
4892                 }
4893         }
4894         spin_unlock(&rbd_dev_list_lock);
4895         return NULL;
4896 }
4897
4898 static void rbd_dev_device_release(struct device *dev)
4899 {
4900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4901
4902         rbd_free_disk(rbd_dev);
4903         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4904         rbd_dev_clear_mapping(rbd_dev);
4905         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4906         rbd_dev->major = 0;
4907         rbd_dev_id_put(rbd_dev);
4908         rbd_dev_mapping_clear(rbd_dev);
4909 }
4910
4911 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4912 {
4913         while (rbd_dev->parent) {
4914                 struct rbd_device *first = rbd_dev;
4915                 struct rbd_device *second = first->parent;
4916                 struct rbd_device *third;
4917
4918                 /*
4919                  * Follow to the parent with no grandparent and
4920                  * remove it.
4921                  */
4922                 while (second && (third = second->parent)) {
4923                         first = second;
4924                         second = third;
4925                 }
4926                 rbd_assert(second);
4927                 rbd_dev_image_release(second);
4928                 first->parent = NULL;
4929                 first->parent_overlap = 0;
4930
4931                 rbd_assert(first->parent_spec);
4932                 rbd_spec_put(first->parent_spec);
4933                 first->parent_spec = NULL;
4934         }
4935 }
4936
4937 static ssize_t rbd_remove(struct bus_type *bus,
4938                           const char *buf,
4939                           size_t count)
4940 {
4941         struct rbd_device *rbd_dev = NULL;
4942         int target_id;
4943         unsigned long ul;
4944         int ret;
4945
4946         ret = strict_strtoul(buf, 10, &ul);
4947         if (ret)
4948                 return ret;
4949
4950         /* convert to int; abort if we lost anything in the conversion */
4951         target_id = (int) ul;
4952         if (target_id != ul)
4953                 return -EINVAL;
4954
4955         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4956
4957         rbd_dev = __rbd_get_dev(target_id);
4958         if (!rbd_dev) {
4959                 ret = -ENOENT;
4960                 goto done;
4961         }
4962
4963         spin_lock_irq(&rbd_dev->lock);
4964         if (rbd_dev->open_count)
4965                 ret = -EBUSY;
4966         else
4967                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4968         spin_unlock_irq(&rbd_dev->lock);
4969         if (ret < 0)
4970                 goto done;
4971         ret = count;
4972         rbd_bus_del_dev(rbd_dev);
4973         rbd_dev_image_release(rbd_dev);
4974         module_put(THIS_MODULE);
4975 done:
4976         mutex_unlock(&ctl_mutex);
4977
4978         return ret;
4979 }
4980
4981 /*
4982  * create control files in sysfs
4983  * /sys/bus/rbd/...
4984  */
4985 static int rbd_sysfs_init(void)
4986 {
4987         int ret;
4988
4989         ret = device_register(&rbd_root_dev);
4990         if (ret < 0)
4991                 return ret;
4992
4993         ret = bus_register(&rbd_bus_type);
4994         if (ret < 0)
4995                 device_unregister(&rbd_root_dev);
4996
4997         return ret;
4998 }
4999
5000 static void rbd_sysfs_cleanup(void)
5001 {
5002         bus_unregister(&rbd_bus_type);
5003         device_unregister(&rbd_root_dev);
5004 }
5005
5006 static int rbd_slab_init(void)
5007 {
5008         rbd_assert(!rbd_img_request_cache);
5009         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5010                                         sizeof (struct rbd_img_request),
5011                                         __alignof__(struct rbd_img_request),
5012                                         0, NULL);
5013         if (!rbd_img_request_cache)
5014                 return -ENOMEM;
5015
5016         rbd_assert(!rbd_obj_request_cache);
5017         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5018                                         sizeof (struct rbd_obj_request),
5019                                         __alignof__(struct rbd_obj_request),
5020                                         0, NULL);
5021         if (rbd_obj_request_cache)
5022                 return 0;
5023
5024         kmem_cache_destroy(rbd_img_request_cache);
5025         rbd_img_request_cache = NULL;
5026
5027         return -ENOMEM;
5028 }
5029
5030 static void rbd_slab_exit(void)
5031 {
5032         rbd_assert(rbd_obj_request_cache);
5033         kmem_cache_destroy(rbd_obj_request_cache);
5034         rbd_obj_request_cache = NULL;
5035
5036         rbd_assert(rbd_img_request_cache);
5037         kmem_cache_destroy(rbd_img_request_cache);
5038         rbd_img_request_cache = NULL;
5039 }
5040
5041 static int __init rbd_init(void)
5042 {
5043         int rc;
5044
5045         if (!libceph_compatible(NULL)) {
5046                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5047
5048                 return -EINVAL;
5049         }
5050         rc = rbd_slab_init();
5051         if (rc)
5052                 return rc;
5053         rc = rbd_sysfs_init();
5054         if (rc)
5055                 rbd_slab_exit();
5056         else
5057                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5058
5059         return rc;
5060 }
5061
5062 static void __exit rbd_exit(void)
5063 {
5064         rbd_sysfs_cleanup();
5065         rbd_slab_exit();
5066 }
5067
5068 module_init(rbd_init);
5069 module_exit(rbd_exit);
5070
5071 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5072 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5073 MODULE_DESCRIPTION("rados block device");
5074
5075 /* following authorship retained from original osdblk.c */
5076 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5077
5078 MODULE_LICENSE("GPL");