]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: always set read-only flag in rbd_add()
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 stripe_unit;
117         u64 stripe_count;
118 };
119
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146         u64             pool_id;
147         const char      *pool_name;
148
149         const char      *image_id;
150         const char      *image_name;
151
152         u64             snap_id;
153         const char      *snap_name;
154
155         struct kref     kref;
156 };
157
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162         struct ceph_client      *client;
163         struct kref             kref;
164         struct list_head        node;
165 };
166
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
171
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
175 enum obj_request_type {
176         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178
179 enum obj_req_flags {
180         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
181         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
182         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
183         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
184 };
185
186 struct rbd_obj_request {
187         const char              *object_name;
188         u64                     offset;         /* object start byte */
189         u64                     length;         /* bytes from offset */
190         unsigned long           flags;
191
192         /*
193          * An object request associated with an image will have its
194          * img_data flag set; a standalone object request will not.
195          *
196          * A standalone object request will have which == BAD_WHICH
197          * and a null obj_request pointer.
198          *
199          * An object request initiated in support of a layered image
200          * object (to check for its existence before a write) will
201          * have which == BAD_WHICH and a non-null obj_request pointer.
202          *
203          * Finally, an object request for rbd image data will have
204          * which != BAD_WHICH, and will have a non-null img_request
205          * pointer.  The value of which will be in the range
206          * 0..(img_request->obj_request_count-1).
207          */
208         union {
209                 struct rbd_obj_request  *obj_request;   /* STAT op */
210                 struct {
211                         struct rbd_img_request  *img_request;
212                         u64                     img_offset;
213                         /* links for img_request->obj_requests list */
214                         struct list_head        links;
215                 };
216         };
217         u32                     which;          /* posn image request list */
218
219         enum obj_request_type   type;
220         union {
221                 struct bio      *bio_list;
222                 struct {
223                         struct page     **pages;
224                         u32             page_count;
225                 };
226         };
227         struct page             **copyup_pages;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         spinlock_t              completion_lock;/* protects next_completion */
261         u32                     next_completion;
262         rbd_img_callback_t      callback;
263         u64                     xferred;/* aggregate bytes transferred */
264         int                     result; /* first nonzero obj_request result */
265
266         u32                     obj_request_count;
267         struct list_head        obj_requests;   /* rbd_obj_request structs */
268
269         struct kref             kref;
270 };
271
272 #define for_each_obj_request(ireq, oreq) \
273         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278
279 struct rbd_mapping {
280         u64                     size;
281         u64                     features;
282         bool                    read_only;
283 };
284
285 /*
286  * a single device
287  */
288 struct rbd_device {
289         int                     dev_id;         /* blkdev unique id */
290
291         int                     major;          /* blkdev assigned major */
292         struct gendisk          *disk;          /* blkdev's gendisk and rq */
293
294         u32                     image_format;   /* Either 1 or 2 */
295         struct rbd_client       *rbd_client;
296
297         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
299         spinlock_t              lock;           /* queue, flags, open_count */
300
301         struct rbd_image_header header;
302         unsigned long           flags;          /* possibly lock protected */
303         struct rbd_spec         *spec;
304
305         char                    *header_name;
306
307         struct ceph_file_layout layout;
308
309         struct ceph_osd_event   *watch_event;
310         struct rbd_obj_request  *watch_request;
311
312         struct rbd_spec         *parent_spec;
313         u64                     parent_overlap;
314         struct rbd_device       *parent;
315
316         /* protects updating the header */
317         struct rw_semaphore     header_rwsem;
318
319         struct rbd_mapping      mapping;
320
321         struct list_head        node;
322
323         /* sysfs related */
324         struct device           dev;
325         unsigned long           open_count;     /* protected by lock */
326 };
327
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
337         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
338 };
339
340 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
341
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
345 static LIST_HEAD(rbd_client_list);              /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347
348 /* Slab caches for frequently-allocated structures */
349
350 static struct kmem_cache        *rbd_img_request_cache;
351 static struct kmem_cache        *rbd_obj_request_cache;
352 static struct kmem_cache        *rbd_segment_name_cache;
353
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
356 static void rbd_dev_device_release(struct device *dev);
357
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359                        size_t count);
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361                           size_t count);
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
363
364 static struct bus_attribute rbd_bus_attrs[] = {
365         __ATTR(add, S_IWUSR, NULL, rbd_add),
366         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
367         __ATTR_NULL
368 };
369
370 static struct bus_type rbd_bus_type = {
371         .name           = "rbd",
372         .bus_attrs      = rbd_bus_attrs,
373 };
374
375 static void rbd_root_dev_release(struct device *dev)
376 {
377 }
378
379 static struct device rbd_root_dev = {
380         .init_name =    "rbd",
381         .release =      rbd_root_dev_release,
382 };
383
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 {
387         struct va_format vaf;
388         va_list args;
389
390         va_start(args, fmt);
391         vaf.fmt = fmt;
392         vaf.va = &args;
393
394         if (!rbd_dev)
395                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396         else if (rbd_dev->disk)
397                 printk(KERN_WARNING "%s: %s: %pV\n",
398                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399         else if (rbd_dev->spec && rbd_dev->spec->image_name)
400                 printk(KERN_WARNING "%s: image %s: %pV\n",
401                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402         else if (rbd_dev->spec && rbd_dev->spec->image_id)
403                 printk(KERN_WARNING "%s: id %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405         else    /* punt */
406                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407                         RBD_DRV_NAME, rbd_dev, &vaf);
408         va_end(args);
409 }
410
411 #ifdef RBD_DEBUG
412 #define rbd_assert(expr)                                                \
413                 if (unlikely(!(expr))) {                                \
414                         printk(KERN_ERR "\nAssertion failure in %s() "  \
415                                                 "at line %d:\n\n"       \
416                                         "\trbd_assert(%s);\n\n",        \
417                                         __func__, __LINE__, #expr);     \
418                         BUG();                                          \
419                 }
420 #else /* !RBD_DEBUG */
421 #  define rbd_assert(expr)      ((void) 0)
422 #endif /* !RBD_DEBUG */
423
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431                                         u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433                                 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435                 u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476
477         return 0;
478 }
479
480 static const struct block_device_operations rbd_bd_ops = {
481         .owner                  = THIS_MODULE,
482         .open                   = rbd_open,
483         .release                = rbd_release,
484 };
485
486 /*
487  * Initialize an rbd client instance.
488  * We own *ceph_opts.
489  */
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *rbdc;
493         int ret = -ENOMEM;
494
495         dout("%s:\n", __func__);
496         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497         if (!rbdc)
498                 goto out_opt;
499
500         kref_init(&rbdc->kref);
501         INIT_LIST_HEAD(&rbdc->node);
502
503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
505         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506         if (IS_ERR(rbdc->client))
507                 goto out_mutex;
508         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509
510         ret = ceph_open_session(rbdc->client);
511         if (ret < 0)
512                 goto out_err;
513
514         spin_lock(&rbd_client_list_lock);
515         list_add_tail(&rbdc->node, &rbd_client_list);
516         spin_unlock(&rbd_client_list_lock);
517
518         mutex_unlock(&ctl_mutex);
519         dout("%s: rbdc %p\n", __func__, rbdc);
520
521         return rbdc;
522
523 out_err:
524         ceph_destroy_client(rbdc->client);
525 out_mutex:
526         mutex_unlock(&ctl_mutex);
527         kfree(rbdc);
528 out_opt:
529         if (ceph_opts)
530                 ceph_destroy_options(ceph_opts);
531         dout("%s: error %d\n", __func__, ret);
532
533         return ERR_PTR(ret);
534 }
535
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 {
538         kref_get(&rbdc->kref);
539
540         return rbdc;
541 }
542
543 /*
544  * Find a ceph client with specific addr and configuration.  If
545  * found, bump its reference count.
546  */
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 {
549         struct rbd_client *client_node;
550         bool found = false;
551
552         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553                 return NULL;
554
555         spin_lock(&rbd_client_list_lock);
556         list_for_each_entry(client_node, &rbd_client_list, node) {
557                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558                         __rbd_get_client(client_node);
559
560                         found = true;
561                         break;
562                 }
563         }
564         spin_unlock(&rbd_client_list_lock);
565
566         return found ? client_node : NULL;
567 }
568
569 /*
570  * mount options
571  */
572 enum {
573         Opt_last_int,
574         /* int args above */
575         Opt_last_string,
576         /* string args above */
577         Opt_read_only,
578         Opt_read_write,
579         /* Boolean args above */
580         Opt_last_bool,
581 };
582
583 static match_table_t rbd_opts_tokens = {
584         /* int args above */
585         /* string args above */
586         {Opt_read_only, "read_only"},
587         {Opt_read_only, "ro"},          /* Alternate spelling */
588         {Opt_read_write, "read_write"},
589         {Opt_read_write, "rw"},         /* Alternate spelling */
590         /* Boolean args above */
591         {-1, NULL}
592 };
593
594 struct rbd_options {
595         bool    read_only;
596 };
597
598 #define RBD_READ_ONLY_DEFAULT   false
599
600 static int parse_rbd_opts_token(char *c, void *private)
601 {
602         struct rbd_options *rbd_opts = private;
603         substring_t argstr[MAX_OPT_ARGS];
604         int token, intval, ret;
605
606         token = match_token(c, rbd_opts_tokens, argstr);
607         if (token < 0)
608                 return -EINVAL;
609
610         if (token < Opt_last_int) {
611                 ret = match_int(&argstr[0], &intval);
612                 if (ret < 0) {
613                         pr_err("bad mount option arg (not int) "
614                                "at '%s'\n", c);
615                         return ret;
616                 }
617                 dout("got int token %d val %d\n", token, intval);
618         } else if (token > Opt_last_int && token < Opt_last_string) {
619                 dout("got string token %d val %s\n", token,
620                      argstr[0].from);
621         } else if (token > Opt_last_string && token < Opt_last_bool) {
622                 dout("got Boolean token %d\n", token);
623         } else {
624                 dout("got token %d\n", token);
625         }
626
627         switch (token) {
628         case Opt_read_only:
629                 rbd_opts->read_only = true;
630                 break;
631         case Opt_read_write:
632                 rbd_opts->read_only = false;
633                 break;
634         default:
635                 rbd_assert(false);
636                 break;
637         }
638         return 0;
639 }
640
641 /*
642  * Get a ceph client with specific addr and configuration, if one does
643  * not exist create it.
644  */
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 {
647         struct rbd_client *rbdc;
648
649         rbdc = rbd_client_find(ceph_opts);
650         if (rbdc)       /* using an existing client */
651                 ceph_destroy_options(ceph_opts);
652         else
653                 rbdc = rbd_client_create(ceph_opts);
654
655         return rbdc;
656 }
657
658 /*
659  * Destroy ceph client
660  *
661  * Caller must hold rbd_client_list_lock.
662  */
663 static void rbd_client_release(struct kref *kref)
664 {
665         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
667         dout("%s: rbdc %p\n", __func__, rbdc);
668         spin_lock(&rbd_client_list_lock);
669         list_del(&rbdc->node);
670         spin_unlock(&rbd_client_list_lock);
671
672         ceph_destroy_client(rbdc->client);
673         kfree(rbdc);
674 }
675
676 /*
677  * Drop reference to ceph client node. If it's not referenced anymore, release
678  * it.
679  */
680 static void rbd_put_client(struct rbd_client *rbdc)
681 {
682         if (rbdc)
683                 kref_put(&rbdc->kref, rbd_client_release);
684 }
685
686 static bool rbd_image_format_valid(u32 image_format)
687 {
688         return image_format == 1 || image_format == 2;
689 }
690
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692 {
693         size_t size;
694         u32 snap_count;
695
696         /* The header has to start with the magic rbd header text */
697         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698                 return false;
699
700         /* The bio layer requires at least sector-sized I/O */
701
702         if (ondisk->options.order < SECTOR_SHIFT)
703                 return false;
704
705         /* If we use u64 in a few spots we may be able to loosen this */
706
707         if (ondisk->options.order > 8 * sizeof (int) - 1)
708                 return false;
709
710         /*
711          * The size of a snapshot header has to fit in a size_t, and
712          * that limits the number of snapshots.
713          */
714         snap_count = le32_to_cpu(ondisk->snap_count);
715         size = SIZE_MAX - sizeof (struct ceph_snap_context);
716         if (snap_count > size / sizeof (__le64))
717                 return false;
718
719         /*
720          * Not only that, but the size of the entire the snapshot
721          * header must also be representable in a size_t.
722          */
723         size -= snap_count * sizeof (__le64);
724         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725                 return false;
726
727         return true;
728 }
729
730 /*
731  * Create a new header structure, translate header format from the on-disk
732  * header.
733  */
734 static int rbd_header_from_disk(struct rbd_image_header *header,
735                                  struct rbd_image_header_ondisk *ondisk)
736 {
737         u32 snap_count;
738         size_t len;
739         size_t size;
740         u32 i;
741
742         memset(header, 0, sizeof (*header));
743
744         snap_count = le32_to_cpu(ondisk->snap_count);
745
746         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
748         if (!header->object_prefix)
749                 return -ENOMEM;
750         memcpy(header->object_prefix, ondisk->object_prefix, len);
751         header->object_prefix[len] = '\0';
752
753         if (snap_count) {
754                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
755
756                 /* Save a copy of the snapshot names */
757
758                 if (snap_names_len > (u64) SIZE_MAX)
759                         return -EIO;
760                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
761                 if (!header->snap_names)
762                         goto out_err;
763                 /*
764                  * Note that rbd_dev_v1_header_read() guarantees
765                  * the ondisk buffer we're working with has
766                  * snap_names_len bytes beyond the end of the
767                  * snapshot id array, this memcpy() is safe.
768                  */
769                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
770                         snap_names_len);
771
772                 /* Record each snapshot's size */
773
774                 size = snap_count * sizeof (*header->snap_sizes);
775                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
776                 if (!header->snap_sizes)
777                         goto out_err;
778                 for (i = 0; i < snap_count; i++)
779                         header->snap_sizes[i] =
780                                 le64_to_cpu(ondisk->snaps[i].image_size);
781         } else {
782                 header->snap_names = NULL;
783                 header->snap_sizes = NULL;
784         }
785
786         header->features = 0;   /* No features support in v1 images */
787         header->obj_order = ondisk->options.order;
788         header->crypt_type = ondisk->options.crypt_type;
789         header->comp_type = ondisk->options.comp_type;
790
791         /* Allocate and fill in the snapshot context */
792
793         header->image_size = le64_to_cpu(ondisk->image_size);
794
795         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
796         if (!header->snapc)
797                 goto out_err;
798         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
799         for (i = 0; i < snap_count; i++)
800                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
801
802         return 0;
803
804 out_err:
805         kfree(header->snap_sizes);
806         header->snap_sizes = NULL;
807         kfree(header->snap_names);
808         header->snap_names = NULL;
809         kfree(header->object_prefix);
810         header->object_prefix = NULL;
811
812         return -ENOMEM;
813 }
814
815 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
816 {
817         const char *snap_name;
818
819         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
820
821         /* Skip over names until we find the one we are looking for */
822
823         snap_name = rbd_dev->header.snap_names;
824         while (which--)
825                 snap_name += strlen(snap_name) + 1;
826
827         return kstrdup(snap_name, GFP_KERNEL);
828 }
829
830 /*
831  * Snapshot id comparison function for use with qsort()/bsearch().
832  * Note that result is for snapshots in *descending* order.
833  */
834 static int snapid_compare_reverse(const void *s1, const void *s2)
835 {
836         u64 snap_id1 = *(u64 *)s1;
837         u64 snap_id2 = *(u64 *)s2;
838
839         if (snap_id1 < snap_id2)
840                 return 1;
841         return snap_id1 == snap_id2 ? 0 : -1;
842 }
843
844 /*
845  * Search a snapshot context to see if the given snapshot id is
846  * present.
847  *
848  * Returns the position of the snapshot id in the array if it's found,
849  * or BAD_SNAP_INDEX otherwise.
850  *
851  * Note: The snapshot array is in kept sorted (by the osd) in
852  * reverse order, highest snapshot id first.
853  */
854 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
855 {
856         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
857         u64 *found;
858
859         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860                                 sizeof (snap_id), snapid_compare_reverse);
861
862         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
863 }
864
865 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
866                                         u64 snap_id)
867 {
868         u32 which;
869
870         which = rbd_dev_snap_index(rbd_dev, snap_id);
871         if (which == BAD_SNAP_INDEX)
872                 return NULL;
873
874         return _rbd_dev_v1_snap_name(rbd_dev, which);
875 }
876
877 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
878 {
879         if (snap_id == CEPH_NOSNAP)
880                 return RBD_SNAP_HEAD_NAME;
881
882         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883         if (rbd_dev->image_format == 1)
884                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
885
886         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
887 }
888
889 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
890                                 u64 *snap_size)
891 {
892         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893         if (snap_id == CEPH_NOSNAP) {
894                 *snap_size = rbd_dev->header.image_size;
895         } else if (rbd_dev->image_format == 1) {
896                 u32 which;
897
898                 which = rbd_dev_snap_index(rbd_dev, snap_id);
899                 if (which == BAD_SNAP_INDEX)
900                         return -ENOENT;
901
902                 *snap_size = rbd_dev->header.snap_sizes[which];
903         } else {
904                 u64 size = 0;
905                 int ret;
906
907                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
908                 if (ret)
909                         return ret;
910
911                 *snap_size = size;
912         }
913         return 0;
914 }
915
916 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
917                         u64 *snap_features)
918 {
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (snap_id == CEPH_NOSNAP) {
921                 *snap_features = rbd_dev->header.features;
922         } else if (rbd_dev->image_format == 1) {
923                 *snap_features = 0;     /* No features for format 1 */
924         } else {
925                 u64 features = 0;
926                 int ret;
927
928                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
929                 if (ret)
930                         return ret;
931
932                 *snap_features = features;
933         }
934         return 0;
935 }
936
937 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
938 {
939         u64 snap_id = rbd_dev->spec->snap_id;
940         u64 size = 0;
941         u64 features = 0;
942         int ret;
943
944         ret = rbd_snap_size(rbd_dev, snap_id, &size);
945         if (ret)
946                 return ret;
947         ret = rbd_snap_features(rbd_dev, snap_id, &features);
948         if (ret)
949                 return ret;
950
951         rbd_dev->mapping.size = size;
952         rbd_dev->mapping.features = features;
953
954         return 0;
955 }
956
957 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
958 {
959         rbd_dev->mapping.size = 0;
960         rbd_dev->mapping.features = 0;
961 }
962
963 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
964 {
965         char *name;
966         u64 segment;
967         int ret;
968
969         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
970         if (!name)
971                 return NULL;
972         segment = offset >> rbd_dev->header.obj_order;
973         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
974                         rbd_dev->header.object_prefix, segment);
975         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
976                 pr_err("error formatting segment name for #%llu (%d)\n",
977                         segment, ret);
978                 kfree(name);
979                 name = NULL;
980         }
981
982         return name;
983 }
984
985 static void rbd_segment_name_free(const char *name)
986 {
987         /* The explicit cast here is needed to drop the const qualifier */
988
989         kmem_cache_free(rbd_segment_name_cache, (void *)name);
990 }
991
992 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
993 {
994         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
995
996         return offset & (segment_size - 1);
997 }
998
999 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1000                                 u64 offset, u64 length)
1001 {
1002         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1003
1004         offset &= segment_size - 1;
1005
1006         rbd_assert(length <= U64_MAX - offset);
1007         if (offset + length > segment_size)
1008                 length = segment_size - offset;
1009
1010         return length;
1011 }
1012
1013 /*
1014  * returns the size of an object in the image
1015  */
1016 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1017 {
1018         return 1 << header->obj_order;
1019 }
1020
1021 /*
1022  * bio helpers
1023  */
1024
1025 static void bio_chain_put(struct bio *chain)
1026 {
1027         struct bio *tmp;
1028
1029         while (chain) {
1030                 tmp = chain;
1031                 chain = chain->bi_next;
1032                 bio_put(tmp);
1033         }
1034 }
1035
1036 /*
1037  * zeros a bio chain, starting at specific offset
1038  */
1039 static void zero_bio_chain(struct bio *chain, int start_ofs)
1040 {
1041         struct bio_vec *bv;
1042         unsigned long flags;
1043         void *buf;
1044         int i;
1045         int pos = 0;
1046
1047         while (chain) {
1048                 bio_for_each_segment(bv, chain, i) {
1049                         if (pos + bv->bv_len > start_ofs) {
1050                                 int remainder = max(start_ofs - pos, 0);
1051                                 buf = bvec_kmap_irq(bv, &flags);
1052                                 memset(buf + remainder, 0,
1053                                        bv->bv_len - remainder);
1054                                 bvec_kunmap_irq(buf, &flags);
1055                         }
1056                         pos += bv->bv_len;
1057                 }
1058
1059                 chain = chain->bi_next;
1060         }
1061 }
1062
1063 /*
1064  * similar to zero_bio_chain(), zeros data defined by a page array,
1065  * starting at the given byte offset from the start of the array and
1066  * continuing up to the given end offset.  The pages array is
1067  * assumed to be big enough to hold all bytes up to the end.
1068  */
1069 static void zero_pages(struct page **pages, u64 offset, u64 end)
1070 {
1071         struct page **page = &pages[offset >> PAGE_SHIFT];
1072
1073         rbd_assert(end > offset);
1074         rbd_assert(end - offset <= (u64)SIZE_MAX);
1075         while (offset < end) {
1076                 size_t page_offset;
1077                 size_t length;
1078                 unsigned long flags;
1079                 void *kaddr;
1080
1081                 page_offset = (size_t)(offset & ~PAGE_MASK);
1082                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1083                 local_irq_save(flags);
1084                 kaddr = kmap_atomic(*page);
1085                 memset(kaddr + page_offset, 0, length);
1086                 kunmap_atomic(kaddr);
1087                 local_irq_restore(flags);
1088
1089                 offset += length;
1090                 page++;
1091         }
1092 }
1093
1094 /*
1095  * Clone a portion of a bio, starting at the given byte offset
1096  * and continuing for the number of bytes indicated.
1097  */
1098 static struct bio *bio_clone_range(struct bio *bio_src,
1099                                         unsigned int offset,
1100                                         unsigned int len,
1101                                         gfp_t gfpmask)
1102 {
1103         struct bio_vec *bv;
1104         unsigned int resid;
1105         unsigned short idx;
1106         unsigned int voff;
1107         unsigned short end_idx;
1108         unsigned short vcnt;
1109         struct bio *bio;
1110
1111         /* Handle the easy case for the caller */
1112
1113         if (!offset && len == bio_src->bi_size)
1114                 return bio_clone(bio_src, gfpmask);
1115
1116         if (WARN_ON_ONCE(!len))
1117                 return NULL;
1118         if (WARN_ON_ONCE(len > bio_src->bi_size))
1119                 return NULL;
1120         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1121                 return NULL;
1122
1123         /* Find first affected segment... */
1124
1125         resid = offset;
1126         __bio_for_each_segment(bv, bio_src, idx, 0) {
1127                 if (resid < bv->bv_len)
1128                         break;
1129                 resid -= bv->bv_len;
1130         }
1131         voff = resid;
1132
1133         /* ...and the last affected segment */
1134
1135         resid += len;
1136         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1137                 if (resid <= bv->bv_len)
1138                         break;
1139                 resid -= bv->bv_len;
1140         }
1141         vcnt = end_idx - idx + 1;
1142
1143         /* Build the clone */
1144
1145         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1146         if (!bio)
1147                 return NULL;    /* ENOMEM */
1148
1149         bio->bi_bdev = bio_src->bi_bdev;
1150         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1151         bio->bi_rw = bio_src->bi_rw;
1152         bio->bi_flags |= 1 << BIO_CLONED;
1153
1154         /*
1155          * Copy over our part of the bio_vec, then update the first
1156          * and last (or only) entries.
1157          */
1158         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1159                         vcnt * sizeof (struct bio_vec));
1160         bio->bi_io_vec[0].bv_offset += voff;
1161         if (vcnt > 1) {
1162                 bio->bi_io_vec[0].bv_len -= voff;
1163                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1164         } else {
1165                 bio->bi_io_vec[0].bv_len = len;
1166         }
1167
1168         bio->bi_vcnt = vcnt;
1169         bio->bi_size = len;
1170         bio->bi_idx = 0;
1171
1172         return bio;
1173 }
1174
1175 /*
1176  * Clone a portion of a bio chain, starting at the given byte offset
1177  * into the first bio in the source chain and continuing for the
1178  * number of bytes indicated.  The result is another bio chain of
1179  * exactly the given length, or a null pointer on error.
1180  *
1181  * The bio_src and offset parameters are both in-out.  On entry they
1182  * refer to the first source bio and the offset into that bio where
1183  * the start of data to be cloned is located.
1184  *
1185  * On return, bio_src is updated to refer to the bio in the source
1186  * chain that contains first un-cloned byte, and *offset will
1187  * contain the offset of that byte within that bio.
1188  */
1189 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1190                                         unsigned int *offset,
1191                                         unsigned int len,
1192                                         gfp_t gfpmask)
1193 {
1194         struct bio *bi = *bio_src;
1195         unsigned int off = *offset;
1196         struct bio *chain = NULL;
1197         struct bio **end;
1198
1199         /* Build up a chain of clone bios up to the limit */
1200
1201         if (!bi || off >= bi->bi_size || !len)
1202                 return NULL;            /* Nothing to clone */
1203
1204         end = &chain;
1205         while (len) {
1206                 unsigned int bi_size;
1207                 struct bio *bio;
1208
1209                 if (!bi) {
1210                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1211                         goto out_err;   /* EINVAL; ran out of bio's */
1212                 }
1213                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1214                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1215                 if (!bio)
1216                         goto out_err;   /* ENOMEM */
1217
1218                 *end = bio;
1219                 end = &bio->bi_next;
1220
1221                 off += bi_size;
1222                 if (off == bi->bi_size) {
1223                         bi = bi->bi_next;
1224                         off = 0;
1225                 }
1226                 len -= bi_size;
1227         }
1228         *bio_src = bi;
1229         *offset = off;
1230
1231         return chain;
1232 out_err:
1233         bio_chain_put(chain);
1234
1235         return NULL;
1236 }
1237
1238 /*
1239  * The default/initial value for all object request flags is 0.  For
1240  * each flag, once its value is set to 1 it is never reset to 0
1241  * again.
1242  */
1243 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1244 {
1245         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1246                 struct rbd_device *rbd_dev;
1247
1248                 rbd_dev = obj_request->img_request->rbd_dev;
1249                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1250                         obj_request);
1251         }
1252 }
1253
1254 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1255 {
1256         smp_mb();
1257         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1258 }
1259
1260 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1261 {
1262         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1263                 struct rbd_device *rbd_dev = NULL;
1264
1265                 if (obj_request_img_data_test(obj_request))
1266                         rbd_dev = obj_request->img_request->rbd_dev;
1267                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1268                         obj_request);
1269         }
1270 }
1271
1272 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1273 {
1274         smp_mb();
1275         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1276 }
1277
1278 /*
1279  * This sets the KNOWN flag after (possibly) setting the EXISTS
1280  * flag.  The latter is set based on the "exists" value provided.
1281  *
1282  * Note that for our purposes once an object exists it never goes
1283  * away again.  It's possible that the response from two existence
1284  * checks are separated by the creation of the target object, and
1285  * the first ("doesn't exist") response arrives *after* the second
1286  * ("does exist").  In that case we ignore the second one.
1287  */
1288 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1289                                 bool exists)
1290 {
1291         if (exists)
1292                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1293         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1294         smp_mb();
1295 }
1296
1297 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1298 {
1299         smp_mb();
1300         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1301 }
1302
1303 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1304 {
1305         smp_mb();
1306         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1307 }
1308
1309 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1310 {
1311         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1312                 atomic_read(&obj_request->kref.refcount));
1313         kref_get(&obj_request->kref);
1314 }
1315
1316 static void rbd_obj_request_destroy(struct kref *kref);
1317 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1318 {
1319         rbd_assert(obj_request != NULL);
1320         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1321                 atomic_read(&obj_request->kref.refcount));
1322         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1323 }
1324
1325 static void rbd_img_request_get(struct rbd_img_request *img_request)
1326 {
1327         dout("%s: img %p (was %d)\n", __func__, img_request,
1328                 atomic_read(&img_request->kref.refcount));
1329         kref_get(&img_request->kref);
1330 }
1331
1332 static void rbd_img_request_destroy(struct kref *kref);
1333 static void rbd_img_request_put(struct rbd_img_request *img_request)
1334 {
1335         rbd_assert(img_request != NULL);
1336         dout("%s: img %p (was %d)\n", __func__, img_request,
1337                 atomic_read(&img_request->kref.refcount));
1338         kref_put(&img_request->kref, rbd_img_request_destroy);
1339 }
1340
1341 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1342                                         struct rbd_obj_request *obj_request)
1343 {
1344         rbd_assert(obj_request->img_request == NULL);
1345
1346         /* Image request now owns object's original reference */
1347         obj_request->img_request = img_request;
1348         obj_request->which = img_request->obj_request_count;
1349         rbd_assert(!obj_request_img_data_test(obj_request));
1350         obj_request_img_data_set(obj_request);
1351         rbd_assert(obj_request->which != BAD_WHICH);
1352         img_request->obj_request_count++;
1353         list_add_tail(&obj_request->links, &img_request->obj_requests);
1354         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1355                 obj_request->which);
1356 }
1357
1358 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1359                                         struct rbd_obj_request *obj_request)
1360 {
1361         rbd_assert(obj_request->which != BAD_WHICH);
1362
1363         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1364                 obj_request->which);
1365         list_del(&obj_request->links);
1366         rbd_assert(img_request->obj_request_count > 0);
1367         img_request->obj_request_count--;
1368         rbd_assert(obj_request->which == img_request->obj_request_count);
1369         obj_request->which = BAD_WHICH;
1370         rbd_assert(obj_request_img_data_test(obj_request));
1371         rbd_assert(obj_request->img_request == img_request);
1372         obj_request->img_request = NULL;
1373         obj_request->callback = NULL;
1374         rbd_obj_request_put(obj_request);
1375 }
1376
1377 static bool obj_request_type_valid(enum obj_request_type type)
1378 {
1379         switch (type) {
1380         case OBJ_REQUEST_NODATA:
1381         case OBJ_REQUEST_BIO:
1382         case OBJ_REQUEST_PAGES:
1383                 return true;
1384         default:
1385                 return false;
1386         }
1387 }
1388
1389 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1390                                 struct rbd_obj_request *obj_request)
1391 {
1392         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1393
1394         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1395 }
1396
1397 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1398 {
1399
1400         dout("%s: img %p\n", __func__, img_request);
1401
1402         /*
1403          * If no error occurred, compute the aggregate transfer
1404          * count for the image request.  We could instead use
1405          * atomic64_cmpxchg() to update it as each object request
1406          * completes; not clear which way is better off hand.
1407          */
1408         if (!img_request->result) {
1409                 struct rbd_obj_request *obj_request;
1410                 u64 xferred = 0;
1411
1412                 for_each_obj_request(img_request, obj_request)
1413                         xferred += obj_request->xferred;
1414                 img_request->xferred = xferred;
1415         }
1416
1417         if (img_request->callback)
1418                 img_request->callback(img_request);
1419         else
1420                 rbd_img_request_put(img_request);
1421 }
1422
1423 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1424
1425 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1426 {
1427         dout("%s: obj %p\n", __func__, obj_request);
1428
1429         return wait_for_completion_interruptible(&obj_request->completion);
1430 }
1431
1432 /*
1433  * The default/initial value for all image request flags is 0.  Each
1434  * is conditionally set to 1 at image request initialization time
1435  * and currently never change thereafter.
1436  */
1437 static void img_request_write_set(struct rbd_img_request *img_request)
1438 {
1439         set_bit(IMG_REQ_WRITE, &img_request->flags);
1440         smp_mb();
1441 }
1442
1443 static bool img_request_write_test(struct rbd_img_request *img_request)
1444 {
1445         smp_mb();
1446         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1447 }
1448
1449 static void img_request_child_set(struct rbd_img_request *img_request)
1450 {
1451         set_bit(IMG_REQ_CHILD, &img_request->flags);
1452         smp_mb();
1453 }
1454
1455 static bool img_request_child_test(struct rbd_img_request *img_request)
1456 {
1457         smp_mb();
1458         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1459 }
1460
1461 static void img_request_layered_set(struct rbd_img_request *img_request)
1462 {
1463         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1464         smp_mb();
1465 }
1466
1467 static bool img_request_layered_test(struct rbd_img_request *img_request)
1468 {
1469         smp_mb();
1470         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1471 }
1472
1473 static void
1474 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1475 {
1476         u64 xferred = obj_request->xferred;
1477         u64 length = obj_request->length;
1478
1479         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1480                 obj_request, obj_request->img_request, obj_request->result,
1481                 xferred, length);
1482         /*
1483          * ENOENT means a hole in the image.  We zero-fill the
1484          * entire length of the request.  A short read also implies
1485          * zero-fill to the end of the request.  Either way we
1486          * update the xferred count to indicate the whole request
1487          * was satisfied.
1488          */
1489         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1490         if (obj_request->result == -ENOENT) {
1491                 if (obj_request->type == OBJ_REQUEST_BIO)
1492                         zero_bio_chain(obj_request->bio_list, 0);
1493                 else
1494                         zero_pages(obj_request->pages, 0, length);
1495                 obj_request->result = 0;
1496                 obj_request->xferred = length;
1497         } else if (xferred < length && !obj_request->result) {
1498                 if (obj_request->type == OBJ_REQUEST_BIO)
1499                         zero_bio_chain(obj_request->bio_list, xferred);
1500                 else
1501                         zero_pages(obj_request->pages, xferred, length);
1502                 obj_request->xferred = length;
1503         }
1504         obj_request_done_set(obj_request);
1505 }
1506
1507 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1508 {
1509         dout("%s: obj %p cb %p\n", __func__, obj_request,
1510                 obj_request->callback);
1511         if (obj_request->callback)
1512                 obj_request->callback(obj_request);
1513         else
1514                 complete_all(&obj_request->completion);
1515 }
1516
1517 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1518 {
1519         dout("%s: obj %p\n", __func__, obj_request);
1520         obj_request_done_set(obj_request);
1521 }
1522
1523 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1524 {
1525         struct rbd_img_request *img_request = NULL;
1526         struct rbd_device *rbd_dev = NULL;
1527         bool layered = false;
1528
1529         if (obj_request_img_data_test(obj_request)) {
1530                 img_request = obj_request->img_request;
1531                 layered = img_request && img_request_layered_test(img_request);
1532                 rbd_dev = img_request->rbd_dev;
1533         }
1534
1535         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1536                 obj_request, img_request, obj_request->result,
1537                 obj_request->xferred, obj_request->length);
1538         if (layered && obj_request->result == -ENOENT &&
1539                         obj_request->img_offset < rbd_dev->parent_overlap)
1540                 rbd_img_parent_read(obj_request);
1541         else if (img_request)
1542                 rbd_img_obj_request_read_callback(obj_request);
1543         else
1544                 obj_request_done_set(obj_request);
1545 }
1546
1547 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1550                 obj_request->result, obj_request->length);
1551         /*
1552          * There is no such thing as a successful short write.  Set
1553          * it to our originally-requested length.
1554          */
1555         obj_request->xferred = obj_request->length;
1556         obj_request_done_set(obj_request);
1557 }
1558
1559 /*
1560  * For a simple stat call there's nothing to do.  We'll do more if
1561  * this is part of a write sequence for a layered image.
1562  */
1563 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1564 {
1565         dout("%s: obj %p\n", __func__, obj_request);
1566         obj_request_done_set(obj_request);
1567 }
1568
1569 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1570                                 struct ceph_msg *msg)
1571 {
1572         struct rbd_obj_request *obj_request = osd_req->r_priv;
1573         u16 opcode;
1574
1575         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1576         rbd_assert(osd_req == obj_request->osd_req);
1577         if (obj_request_img_data_test(obj_request)) {
1578                 rbd_assert(obj_request->img_request);
1579                 rbd_assert(obj_request->which != BAD_WHICH);
1580         } else {
1581                 rbd_assert(obj_request->which == BAD_WHICH);
1582         }
1583
1584         if (osd_req->r_result < 0)
1585                 obj_request->result = osd_req->r_result;
1586
1587         BUG_ON(osd_req->r_num_ops > 2);
1588
1589         /*
1590          * We support a 64-bit length, but ultimately it has to be
1591          * passed to blk_end_request(), which takes an unsigned int.
1592          */
1593         obj_request->xferred = osd_req->r_reply_op_len[0];
1594         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1595         opcode = osd_req->r_ops[0].op;
1596         switch (opcode) {
1597         case CEPH_OSD_OP_READ:
1598                 rbd_osd_read_callback(obj_request);
1599                 break;
1600         case CEPH_OSD_OP_WRITE:
1601                 rbd_osd_write_callback(obj_request);
1602                 break;
1603         case CEPH_OSD_OP_STAT:
1604                 rbd_osd_stat_callback(obj_request);
1605                 break;
1606         case CEPH_OSD_OP_CALL:
1607         case CEPH_OSD_OP_NOTIFY_ACK:
1608         case CEPH_OSD_OP_WATCH:
1609                 rbd_osd_trivial_callback(obj_request);
1610                 break;
1611         default:
1612                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1613                         obj_request->object_name, (unsigned short) opcode);
1614                 break;
1615         }
1616
1617         if (obj_request_done_test(obj_request))
1618                 rbd_obj_request_complete(obj_request);
1619 }
1620
1621 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1622 {
1623         struct rbd_img_request *img_request = obj_request->img_request;
1624         struct ceph_osd_request *osd_req = obj_request->osd_req;
1625         u64 snap_id;
1626
1627         rbd_assert(osd_req != NULL);
1628
1629         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1630         ceph_osdc_build_request(osd_req, obj_request->offset,
1631                         NULL, snap_id, NULL);
1632 }
1633
1634 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1635 {
1636         struct rbd_img_request *img_request = obj_request->img_request;
1637         struct ceph_osd_request *osd_req = obj_request->osd_req;
1638         struct ceph_snap_context *snapc;
1639         struct timespec mtime = CURRENT_TIME;
1640
1641         rbd_assert(osd_req != NULL);
1642
1643         snapc = img_request ? img_request->snapc : NULL;
1644         ceph_osdc_build_request(osd_req, obj_request->offset,
1645                         snapc, CEPH_NOSNAP, &mtime);
1646 }
1647
1648 static struct ceph_osd_request *rbd_osd_req_create(
1649                                         struct rbd_device *rbd_dev,
1650                                         bool write_request,
1651                                         struct rbd_obj_request *obj_request)
1652 {
1653         struct ceph_snap_context *snapc = NULL;
1654         struct ceph_osd_client *osdc;
1655         struct ceph_osd_request *osd_req;
1656
1657         if (obj_request_img_data_test(obj_request)) {
1658                 struct rbd_img_request *img_request = obj_request->img_request;
1659
1660                 rbd_assert(write_request ==
1661                                 img_request_write_test(img_request));
1662                 if (write_request)
1663                         snapc = img_request->snapc;
1664         }
1665
1666         /* Allocate and initialize the request, for the single op */
1667
1668         osdc = &rbd_dev->rbd_client->client->osdc;
1669         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1670         if (!osd_req)
1671                 return NULL;    /* ENOMEM */
1672
1673         if (write_request)
1674                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1675         else
1676                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1677
1678         osd_req->r_callback = rbd_osd_req_callback;
1679         osd_req->r_priv = obj_request;
1680
1681         osd_req->r_oid_len = strlen(obj_request->object_name);
1682         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1683         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1684
1685         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1686
1687         return osd_req;
1688 }
1689
1690 /*
1691  * Create a copyup osd request based on the information in the
1692  * object request supplied.  A copyup request has two osd ops,
1693  * a copyup method call, and a "normal" write request.
1694  */
1695 static struct ceph_osd_request *
1696 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1697 {
1698         struct rbd_img_request *img_request;
1699         struct ceph_snap_context *snapc;
1700         struct rbd_device *rbd_dev;
1701         struct ceph_osd_client *osdc;
1702         struct ceph_osd_request *osd_req;
1703
1704         rbd_assert(obj_request_img_data_test(obj_request));
1705         img_request = obj_request->img_request;
1706         rbd_assert(img_request);
1707         rbd_assert(img_request_write_test(img_request));
1708
1709         /* Allocate and initialize the request, for the two ops */
1710
1711         snapc = img_request->snapc;
1712         rbd_dev = img_request->rbd_dev;
1713         osdc = &rbd_dev->rbd_client->client->osdc;
1714         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1715         if (!osd_req)
1716                 return NULL;    /* ENOMEM */
1717
1718         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1719         osd_req->r_callback = rbd_osd_req_callback;
1720         osd_req->r_priv = obj_request;
1721
1722         osd_req->r_oid_len = strlen(obj_request->object_name);
1723         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1724         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1725
1726         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1727
1728         return osd_req;
1729 }
1730
1731
1732 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1733 {
1734         ceph_osdc_put_request(osd_req);
1735 }
1736
1737 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1738
1739 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1740                                                 u64 offset, u64 length,
1741                                                 enum obj_request_type type)
1742 {
1743         struct rbd_obj_request *obj_request;
1744         size_t size;
1745         char *name;
1746
1747         rbd_assert(obj_request_type_valid(type));
1748
1749         size = strlen(object_name) + 1;
1750         name = kmalloc(size, GFP_KERNEL);
1751         if (!name)
1752                 return NULL;
1753
1754         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1755         if (!obj_request) {
1756                 kfree(name);
1757                 return NULL;
1758         }
1759
1760         obj_request->object_name = memcpy(name, object_name, size);
1761         obj_request->offset = offset;
1762         obj_request->length = length;
1763         obj_request->flags = 0;
1764         obj_request->which = BAD_WHICH;
1765         obj_request->type = type;
1766         INIT_LIST_HEAD(&obj_request->links);
1767         init_completion(&obj_request->completion);
1768         kref_init(&obj_request->kref);
1769
1770         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1771                 offset, length, (int)type, obj_request);
1772
1773         return obj_request;
1774 }
1775
1776 static void rbd_obj_request_destroy(struct kref *kref)
1777 {
1778         struct rbd_obj_request *obj_request;
1779
1780         obj_request = container_of(kref, struct rbd_obj_request, kref);
1781
1782         dout("%s: obj %p\n", __func__, obj_request);
1783
1784         rbd_assert(obj_request->img_request == NULL);
1785         rbd_assert(obj_request->which == BAD_WHICH);
1786
1787         if (obj_request->osd_req)
1788                 rbd_osd_req_destroy(obj_request->osd_req);
1789
1790         rbd_assert(obj_request_type_valid(obj_request->type));
1791         switch (obj_request->type) {
1792         case OBJ_REQUEST_NODATA:
1793                 break;          /* Nothing to do */
1794         case OBJ_REQUEST_BIO:
1795                 if (obj_request->bio_list)
1796                         bio_chain_put(obj_request->bio_list);
1797                 break;
1798         case OBJ_REQUEST_PAGES:
1799                 if (obj_request->pages)
1800                         ceph_release_page_vector(obj_request->pages,
1801                                                 obj_request->page_count);
1802                 break;
1803         }
1804
1805         kfree(obj_request->object_name);
1806         obj_request->object_name = NULL;
1807         kmem_cache_free(rbd_obj_request_cache, obj_request);
1808 }
1809
1810 /*
1811  * Caller is responsible for filling in the list of object requests
1812  * that comprises the image request, and the Linux request pointer
1813  * (if there is one).
1814  */
1815 static struct rbd_img_request *rbd_img_request_create(
1816                                         struct rbd_device *rbd_dev,
1817                                         u64 offset, u64 length,
1818                                         bool write_request,
1819                                         bool child_request)
1820 {
1821         struct rbd_img_request *img_request;
1822
1823         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1824         if (!img_request)
1825                 return NULL;
1826
1827         if (write_request) {
1828                 down_read(&rbd_dev->header_rwsem);
1829                 ceph_get_snap_context(rbd_dev->header.snapc);
1830                 up_read(&rbd_dev->header_rwsem);
1831         }
1832
1833         img_request->rq = NULL;
1834         img_request->rbd_dev = rbd_dev;
1835         img_request->offset = offset;
1836         img_request->length = length;
1837         img_request->flags = 0;
1838         if (write_request) {
1839                 img_request_write_set(img_request);
1840                 img_request->snapc = rbd_dev->header.snapc;
1841         } else {
1842                 img_request->snap_id = rbd_dev->spec->snap_id;
1843         }
1844         if (child_request)
1845                 img_request_child_set(img_request);
1846         if (rbd_dev->parent_spec)
1847                 img_request_layered_set(img_request);
1848         spin_lock_init(&img_request->completion_lock);
1849         img_request->next_completion = 0;
1850         img_request->callback = NULL;
1851         img_request->result = 0;
1852         img_request->obj_request_count = 0;
1853         INIT_LIST_HEAD(&img_request->obj_requests);
1854         kref_init(&img_request->kref);
1855
1856         rbd_img_request_get(img_request);       /* Avoid a warning */
1857         rbd_img_request_put(img_request);       /* TEMPORARY */
1858
1859         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1860                 write_request ? "write" : "read", offset, length,
1861                 img_request);
1862
1863         return img_request;
1864 }
1865
1866 static void rbd_img_request_destroy(struct kref *kref)
1867 {
1868         struct rbd_img_request *img_request;
1869         struct rbd_obj_request *obj_request;
1870         struct rbd_obj_request *next_obj_request;
1871
1872         img_request = container_of(kref, struct rbd_img_request, kref);
1873
1874         dout("%s: img %p\n", __func__, img_request);
1875
1876         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1877                 rbd_img_obj_request_del(img_request, obj_request);
1878         rbd_assert(img_request->obj_request_count == 0);
1879
1880         if (img_request_write_test(img_request))
1881                 ceph_put_snap_context(img_request->snapc);
1882
1883         if (img_request_child_test(img_request))
1884                 rbd_obj_request_put(img_request->obj_request);
1885
1886         kmem_cache_free(rbd_img_request_cache, img_request);
1887 }
1888
1889 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1890 {
1891         struct rbd_img_request *img_request;
1892         unsigned int xferred;
1893         int result;
1894         bool more;
1895
1896         rbd_assert(obj_request_img_data_test(obj_request));
1897         img_request = obj_request->img_request;
1898
1899         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1900         xferred = (unsigned int)obj_request->xferred;
1901         result = obj_request->result;
1902         if (result) {
1903                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1904
1905                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1906                         img_request_write_test(img_request) ? "write" : "read",
1907                         obj_request->length, obj_request->img_offset,
1908                         obj_request->offset);
1909                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1910                         result, xferred);
1911                 if (!img_request->result)
1912                         img_request->result = result;
1913         }
1914
1915         /* Image object requests don't own their page array */
1916
1917         if (obj_request->type == OBJ_REQUEST_PAGES) {
1918                 obj_request->pages = NULL;
1919                 obj_request->page_count = 0;
1920         }
1921
1922         if (img_request_child_test(img_request)) {
1923                 rbd_assert(img_request->obj_request != NULL);
1924                 more = obj_request->which < img_request->obj_request_count - 1;
1925         } else {
1926                 rbd_assert(img_request->rq != NULL);
1927                 more = blk_end_request(img_request->rq, result, xferred);
1928         }
1929
1930         return more;
1931 }
1932
1933 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1934 {
1935         struct rbd_img_request *img_request;
1936         u32 which = obj_request->which;
1937         bool more = true;
1938
1939         rbd_assert(obj_request_img_data_test(obj_request));
1940         img_request = obj_request->img_request;
1941
1942         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1943         rbd_assert(img_request != NULL);
1944         rbd_assert(img_request->obj_request_count > 0);
1945         rbd_assert(which != BAD_WHICH);
1946         rbd_assert(which < img_request->obj_request_count);
1947         rbd_assert(which >= img_request->next_completion);
1948
1949         spin_lock_irq(&img_request->completion_lock);
1950         if (which != img_request->next_completion)
1951                 goto out;
1952
1953         for_each_obj_request_from(img_request, obj_request) {
1954                 rbd_assert(more);
1955                 rbd_assert(which < img_request->obj_request_count);
1956
1957                 if (!obj_request_done_test(obj_request))
1958                         break;
1959                 more = rbd_img_obj_end_request(obj_request);
1960                 which++;
1961         }
1962
1963         rbd_assert(more ^ (which == img_request->obj_request_count));
1964         img_request->next_completion = which;
1965 out:
1966         spin_unlock_irq(&img_request->completion_lock);
1967
1968         if (!more)
1969                 rbd_img_request_complete(img_request);
1970 }
1971
1972 /*
1973  * Split up an image request into one or more object requests, each
1974  * to a different object.  The "type" parameter indicates whether
1975  * "data_desc" is the pointer to the head of a list of bio
1976  * structures, or the base of a page array.  In either case this
1977  * function assumes data_desc describes memory sufficient to hold
1978  * all data described by the image request.
1979  */
1980 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1981                                         enum obj_request_type type,
1982                                         void *data_desc)
1983 {
1984         struct rbd_device *rbd_dev = img_request->rbd_dev;
1985         struct rbd_obj_request *obj_request = NULL;
1986         struct rbd_obj_request *next_obj_request;
1987         bool write_request = img_request_write_test(img_request);
1988         struct bio *bio_list;
1989         unsigned int bio_offset = 0;
1990         struct page **pages;
1991         u64 img_offset;
1992         u64 resid;
1993         u16 opcode;
1994
1995         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1996                 (int)type, data_desc);
1997
1998         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1999         img_offset = img_request->offset;
2000         resid = img_request->length;
2001         rbd_assert(resid > 0);
2002
2003         if (type == OBJ_REQUEST_BIO) {
2004                 bio_list = data_desc;
2005                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2006         } else {
2007                 rbd_assert(type == OBJ_REQUEST_PAGES);
2008                 pages = data_desc;
2009         }
2010
2011         while (resid) {
2012                 struct ceph_osd_request *osd_req;
2013                 const char *object_name;
2014                 u64 offset;
2015                 u64 length;
2016
2017                 object_name = rbd_segment_name(rbd_dev, img_offset);
2018                 if (!object_name)
2019                         goto out_unwind;
2020                 offset = rbd_segment_offset(rbd_dev, img_offset);
2021                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2022                 obj_request = rbd_obj_request_create(object_name,
2023                                                 offset, length, type);
2024                 /* object request has its own copy of the object name */
2025                 rbd_segment_name_free(object_name);
2026                 if (!obj_request)
2027                         goto out_unwind;
2028
2029                 if (type == OBJ_REQUEST_BIO) {
2030                         unsigned int clone_size;
2031
2032                         rbd_assert(length <= (u64)UINT_MAX);
2033                         clone_size = (unsigned int)length;
2034                         obj_request->bio_list =
2035                                         bio_chain_clone_range(&bio_list,
2036                                                                 &bio_offset,
2037                                                                 clone_size,
2038                                                                 GFP_ATOMIC);
2039                         if (!obj_request->bio_list)
2040                                 goto out_partial;
2041                 } else {
2042                         unsigned int page_count;
2043
2044                         obj_request->pages = pages;
2045                         page_count = (u32)calc_pages_for(offset, length);
2046                         obj_request->page_count = page_count;
2047                         if ((offset + length) & ~PAGE_MASK)
2048                                 page_count--;   /* more on last page */
2049                         pages += page_count;
2050                 }
2051
2052                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2053                                                 obj_request);
2054                 if (!osd_req)
2055                         goto out_partial;
2056                 obj_request->osd_req = osd_req;
2057                 obj_request->callback = rbd_img_obj_callback;
2058
2059                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2060                                                 0, 0);
2061                 if (type == OBJ_REQUEST_BIO)
2062                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2063                                         obj_request->bio_list, length);
2064                 else
2065                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2066                                         obj_request->pages, length,
2067                                         offset & ~PAGE_MASK, false, false);
2068
2069                 if (write_request)
2070                         rbd_osd_req_format_write(obj_request);
2071                 else
2072                         rbd_osd_req_format_read(obj_request);
2073
2074                 obj_request->img_offset = img_offset;
2075                 rbd_img_obj_request_add(img_request, obj_request);
2076
2077                 img_offset += length;
2078                 resid -= length;
2079         }
2080
2081         return 0;
2082
2083 out_partial:
2084         rbd_obj_request_put(obj_request);
2085 out_unwind:
2086         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2087                 rbd_obj_request_put(obj_request);
2088
2089         return -ENOMEM;
2090 }
2091
2092 static void
2093 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2094 {
2095         struct rbd_img_request *img_request;
2096         struct rbd_device *rbd_dev;
2097         u64 length;
2098         u32 page_count;
2099
2100         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2101         rbd_assert(obj_request_img_data_test(obj_request));
2102         img_request = obj_request->img_request;
2103         rbd_assert(img_request);
2104
2105         rbd_dev = img_request->rbd_dev;
2106         rbd_assert(rbd_dev);
2107         length = (u64)1 << rbd_dev->header.obj_order;
2108         page_count = (u32)calc_pages_for(0, length);
2109
2110         rbd_assert(obj_request->copyup_pages);
2111         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2112         obj_request->copyup_pages = NULL;
2113
2114         /*
2115          * We want the transfer count to reflect the size of the
2116          * original write request.  There is no such thing as a
2117          * successful short write, so if the request was successful
2118          * we can just set it to the originally-requested length.
2119          */
2120         if (!obj_request->result)
2121                 obj_request->xferred = obj_request->length;
2122
2123         /* Finish up with the normal image object callback */
2124
2125         rbd_img_obj_callback(obj_request);
2126 }
2127
2128 static void
2129 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2130 {
2131         struct rbd_obj_request *orig_request;
2132         struct ceph_osd_request *osd_req;
2133         struct ceph_osd_client *osdc;
2134         struct rbd_device *rbd_dev;
2135         struct page **pages;
2136         int result;
2137         u64 obj_size;
2138         u64 xferred;
2139
2140         rbd_assert(img_request_child_test(img_request));
2141
2142         /* First get what we need from the image request */
2143
2144         pages = img_request->copyup_pages;
2145         rbd_assert(pages != NULL);
2146         img_request->copyup_pages = NULL;
2147
2148         orig_request = img_request->obj_request;
2149         rbd_assert(orig_request != NULL);
2150         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2151         result = img_request->result;
2152         obj_size = img_request->length;
2153         xferred = img_request->xferred;
2154
2155         rbd_dev = img_request->rbd_dev;
2156         rbd_assert(rbd_dev);
2157         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2158
2159         rbd_img_request_put(img_request);
2160
2161         if (result)
2162                 goto out_err;
2163
2164         /* Allocate the new copyup osd request for the original request */
2165
2166         result = -ENOMEM;
2167         rbd_assert(!orig_request->osd_req);
2168         osd_req = rbd_osd_req_create_copyup(orig_request);
2169         if (!osd_req)
2170                 goto out_err;
2171         orig_request->osd_req = osd_req;
2172         orig_request->copyup_pages = pages;
2173
2174         /* Initialize the copyup op */
2175
2176         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2177         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2178                                                 false, false);
2179
2180         /* Then the original write request op */
2181
2182         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2183                                         orig_request->offset,
2184                                         orig_request->length, 0, 0);
2185         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2186                                         orig_request->length);
2187
2188         rbd_osd_req_format_write(orig_request);
2189
2190         /* All set, send it off. */
2191
2192         orig_request->callback = rbd_img_obj_copyup_callback;
2193         osdc = &rbd_dev->rbd_client->client->osdc;
2194         result = rbd_obj_request_submit(osdc, orig_request);
2195         if (!result)
2196                 return;
2197 out_err:
2198         /* Record the error code and complete the request */
2199
2200         orig_request->result = result;
2201         orig_request->xferred = 0;
2202         obj_request_done_set(orig_request);
2203         rbd_obj_request_complete(orig_request);
2204 }
2205
2206 /*
2207  * Read from the parent image the range of data that covers the
2208  * entire target of the given object request.  This is used for
2209  * satisfying a layered image write request when the target of an
2210  * object request from the image request does not exist.
2211  *
2212  * A page array big enough to hold the returned data is allocated
2213  * and supplied to rbd_img_request_fill() as the "data descriptor."
2214  * When the read completes, this page array will be transferred to
2215  * the original object request for the copyup operation.
2216  *
2217  * If an error occurs, record it as the result of the original
2218  * object request and mark it done so it gets completed.
2219  */
2220 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2221 {
2222         struct rbd_img_request *img_request = NULL;
2223         struct rbd_img_request *parent_request = NULL;
2224         struct rbd_device *rbd_dev;
2225         u64 img_offset;
2226         u64 length;
2227         struct page **pages = NULL;
2228         u32 page_count;
2229         int result;
2230
2231         rbd_assert(obj_request_img_data_test(obj_request));
2232         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2233
2234         img_request = obj_request->img_request;
2235         rbd_assert(img_request != NULL);
2236         rbd_dev = img_request->rbd_dev;
2237         rbd_assert(rbd_dev->parent != NULL);
2238
2239         /*
2240          * First things first.  The original osd request is of no
2241          * use to use any more, we'll need a new one that can hold
2242          * the two ops in a copyup request.  We'll get that later,
2243          * but for now we can release the old one.
2244          */
2245         rbd_osd_req_destroy(obj_request->osd_req);
2246         obj_request->osd_req = NULL;
2247
2248         /*
2249          * Determine the byte range covered by the object in the
2250          * child image to which the original request was to be sent.
2251          */
2252         img_offset = obj_request->img_offset - obj_request->offset;
2253         length = (u64)1 << rbd_dev->header.obj_order;
2254
2255         /*
2256          * There is no defined parent data beyond the parent
2257          * overlap, so limit what we read at that boundary if
2258          * necessary.
2259          */
2260         if (img_offset + length > rbd_dev->parent_overlap) {
2261                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2262                 length = rbd_dev->parent_overlap - img_offset;
2263         }
2264
2265         /*
2266          * Allocate a page array big enough to receive the data read
2267          * from the parent.
2268          */
2269         page_count = (u32)calc_pages_for(0, length);
2270         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2271         if (IS_ERR(pages)) {
2272                 result = PTR_ERR(pages);
2273                 pages = NULL;
2274                 goto out_err;
2275         }
2276
2277         result = -ENOMEM;
2278         parent_request = rbd_img_request_create(rbd_dev->parent,
2279                                                 img_offset, length,
2280                                                 false, true);
2281         if (!parent_request)
2282                 goto out_err;
2283         rbd_obj_request_get(obj_request);
2284         parent_request->obj_request = obj_request;
2285
2286         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2287         if (result)
2288                 goto out_err;
2289         parent_request->copyup_pages = pages;
2290
2291         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2292         result = rbd_img_request_submit(parent_request);
2293         if (!result)
2294                 return 0;
2295
2296         parent_request->copyup_pages = NULL;
2297         parent_request->obj_request = NULL;
2298         rbd_obj_request_put(obj_request);
2299 out_err:
2300         if (pages)
2301                 ceph_release_page_vector(pages, page_count);
2302         if (parent_request)
2303                 rbd_img_request_put(parent_request);
2304         obj_request->result = result;
2305         obj_request->xferred = 0;
2306         obj_request_done_set(obj_request);
2307
2308         return result;
2309 }
2310
2311 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2312 {
2313         struct rbd_obj_request *orig_request;
2314         int result;
2315
2316         rbd_assert(!obj_request_img_data_test(obj_request));
2317
2318         /*
2319          * All we need from the object request is the original
2320          * request and the result of the STAT op.  Grab those, then
2321          * we're done with the request.
2322          */
2323         orig_request = obj_request->obj_request;
2324         obj_request->obj_request = NULL;
2325         rbd_assert(orig_request);
2326         rbd_assert(orig_request->img_request);
2327
2328         result = obj_request->result;
2329         obj_request->result = 0;
2330
2331         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2332                 obj_request, orig_request, result,
2333                 obj_request->xferred, obj_request->length);
2334         rbd_obj_request_put(obj_request);
2335
2336         rbd_assert(orig_request);
2337         rbd_assert(orig_request->img_request);
2338
2339         /*
2340          * Our only purpose here is to determine whether the object
2341          * exists, and we don't want to treat the non-existence as
2342          * an error.  If something else comes back, transfer the
2343          * error to the original request and complete it now.
2344          */
2345         if (!result) {
2346                 obj_request_existence_set(orig_request, true);
2347         } else if (result == -ENOENT) {
2348                 obj_request_existence_set(orig_request, false);
2349         } else if (result) {
2350                 orig_request->result = result;
2351                 goto out;
2352         }
2353
2354         /*
2355          * Resubmit the original request now that we have recorded
2356          * whether the target object exists.
2357          */
2358         orig_request->result = rbd_img_obj_request_submit(orig_request);
2359 out:
2360         if (orig_request->result)
2361                 rbd_obj_request_complete(orig_request);
2362         rbd_obj_request_put(orig_request);
2363 }
2364
2365 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2366 {
2367         struct rbd_obj_request *stat_request;
2368         struct rbd_device *rbd_dev;
2369         struct ceph_osd_client *osdc;
2370         struct page **pages = NULL;
2371         u32 page_count;
2372         size_t size;
2373         int ret;
2374
2375         /*
2376          * The response data for a STAT call consists of:
2377          *     le64 length;
2378          *     struct {
2379          *         le32 tv_sec;
2380          *         le32 tv_nsec;
2381          *     } mtime;
2382          */
2383         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2384         page_count = (u32)calc_pages_for(0, size);
2385         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2386         if (IS_ERR(pages))
2387                 return PTR_ERR(pages);
2388
2389         ret = -ENOMEM;
2390         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2391                                                         OBJ_REQUEST_PAGES);
2392         if (!stat_request)
2393                 goto out;
2394
2395         rbd_obj_request_get(obj_request);
2396         stat_request->obj_request = obj_request;
2397         stat_request->pages = pages;
2398         stat_request->page_count = page_count;
2399
2400         rbd_assert(obj_request->img_request);
2401         rbd_dev = obj_request->img_request->rbd_dev;
2402         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2403                                                 stat_request);
2404         if (!stat_request->osd_req)
2405                 goto out;
2406         stat_request->callback = rbd_img_obj_exists_callback;
2407
2408         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2409         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2410                                         false, false);
2411         rbd_osd_req_format_read(stat_request);
2412
2413         osdc = &rbd_dev->rbd_client->client->osdc;
2414         ret = rbd_obj_request_submit(osdc, stat_request);
2415 out:
2416         if (ret)
2417                 rbd_obj_request_put(obj_request);
2418
2419         return ret;
2420 }
2421
2422 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2423 {
2424         struct rbd_img_request *img_request;
2425         struct rbd_device *rbd_dev;
2426         bool known;
2427
2428         rbd_assert(obj_request_img_data_test(obj_request));
2429
2430         img_request = obj_request->img_request;
2431         rbd_assert(img_request);
2432         rbd_dev = img_request->rbd_dev;
2433
2434         /*
2435          * Only writes to layered images need special handling.
2436          * Reads and non-layered writes are simple object requests.
2437          * Layered writes that start beyond the end of the overlap
2438          * with the parent have no parent data, so they too are
2439          * simple object requests.  Finally, if the target object is
2440          * known to already exist, its parent data has already been
2441          * copied, so a write to the object can also be handled as a
2442          * simple object request.
2443          */
2444         if (!img_request_write_test(img_request) ||
2445                 !img_request_layered_test(img_request) ||
2446                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2447                 ((known = obj_request_known_test(obj_request)) &&
2448                         obj_request_exists_test(obj_request))) {
2449
2450                 struct rbd_device *rbd_dev;
2451                 struct ceph_osd_client *osdc;
2452
2453                 rbd_dev = obj_request->img_request->rbd_dev;
2454                 osdc = &rbd_dev->rbd_client->client->osdc;
2455
2456                 return rbd_obj_request_submit(osdc, obj_request);
2457         }
2458
2459         /*
2460          * It's a layered write.  The target object might exist but
2461          * we may not know that yet.  If we know it doesn't exist,
2462          * start by reading the data for the full target object from
2463          * the parent so we can use it for a copyup to the target.
2464          */
2465         if (known)
2466                 return rbd_img_obj_parent_read_full(obj_request);
2467
2468         /* We don't know whether the target exists.  Go find out. */
2469
2470         return rbd_img_obj_exists_submit(obj_request);
2471 }
2472
2473 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2474 {
2475         struct rbd_obj_request *obj_request;
2476         struct rbd_obj_request *next_obj_request;
2477
2478         dout("%s: img %p\n", __func__, img_request);
2479         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2480                 int ret;
2481
2482                 ret = rbd_img_obj_request_submit(obj_request);
2483                 if (ret)
2484                         return ret;
2485         }
2486
2487         return 0;
2488 }
2489
2490 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2491 {
2492         struct rbd_obj_request *obj_request;
2493         struct rbd_device *rbd_dev;
2494         u64 obj_end;
2495
2496         rbd_assert(img_request_child_test(img_request));
2497
2498         obj_request = img_request->obj_request;
2499         rbd_assert(obj_request);
2500         rbd_assert(obj_request->img_request);
2501
2502         obj_request->result = img_request->result;
2503         if (obj_request->result)
2504                 goto out;
2505
2506         /*
2507          * We need to zero anything beyond the parent overlap
2508          * boundary.  Since rbd_img_obj_request_read_callback()
2509          * will zero anything beyond the end of a short read, an
2510          * easy way to do this is to pretend the data from the
2511          * parent came up short--ending at the overlap boundary.
2512          */
2513         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2514         obj_end = obj_request->img_offset + obj_request->length;
2515         rbd_dev = obj_request->img_request->rbd_dev;
2516         if (obj_end > rbd_dev->parent_overlap) {
2517                 u64 xferred = 0;
2518
2519                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2520                         xferred = rbd_dev->parent_overlap -
2521                                         obj_request->img_offset;
2522
2523                 obj_request->xferred = min(img_request->xferred, xferred);
2524         } else {
2525                 obj_request->xferred = img_request->xferred;
2526         }
2527 out:
2528         rbd_img_request_put(img_request);
2529         rbd_img_obj_request_read_callback(obj_request);
2530         rbd_obj_request_complete(obj_request);
2531 }
2532
2533 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2534 {
2535         struct rbd_device *rbd_dev;
2536         struct rbd_img_request *img_request;
2537         int result;
2538
2539         rbd_assert(obj_request_img_data_test(obj_request));
2540         rbd_assert(obj_request->img_request != NULL);
2541         rbd_assert(obj_request->result == (s32) -ENOENT);
2542         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2543
2544         rbd_dev = obj_request->img_request->rbd_dev;
2545         rbd_assert(rbd_dev->parent != NULL);
2546         /* rbd_read_finish(obj_request, obj_request->length); */
2547         img_request = rbd_img_request_create(rbd_dev->parent,
2548                                                 obj_request->img_offset,
2549                                                 obj_request->length,
2550                                                 false, true);
2551         result = -ENOMEM;
2552         if (!img_request)
2553                 goto out_err;
2554
2555         rbd_obj_request_get(obj_request);
2556         img_request->obj_request = obj_request;
2557
2558         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2559                                         obj_request->bio_list);
2560         if (result)
2561                 goto out_err;
2562
2563         img_request->callback = rbd_img_parent_read_callback;
2564         result = rbd_img_request_submit(img_request);
2565         if (result)
2566                 goto out_err;
2567
2568         return;
2569 out_err:
2570         if (img_request)
2571                 rbd_img_request_put(img_request);
2572         obj_request->result = result;
2573         obj_request->xferred = 0;
2574         obj_request_done_set(obj_request);
2575 }
2576
2577 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2578 {
2579         struct rbd_obj_request *obj_request;
2580         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2581         int ret;
2582
2583         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2584                                                         OBJ_REQUEST_NODATA);
2585         if (!obj_request)
2586                 return -ENOMEM;
2587
2588         ret = -ENOMEM;
2589         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2590         if (!obj_request->osd_req)
2591                 goto out;
2592         obj_request->callback = rbd_obj_request_put;
2593
2594         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2595                                         notify_id, 0, 0);
2596         rbd_osd_req_format_read(obj_request);
2597
2598         ret = rbd_obj_request_submit(osdc, obj_request);
2599 out:
2600         if (ret)
2601                 rbd_obj_request_put(obj_request);
2602
2603         return ret;
2604 }
2605
2606 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2607 {
2608         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2609         int ret;
2610
2611         if (!rbd_dev)
2612                 return;
2613
2614         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2615                 rbd_dev->header_name, (unsigned long long)notify_id,
2616                 (unsigned int)opcode);
2617         ret = rbd_dev_refresh(rbd_dev);
2618         if (ret)
2619                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2620
2621         rbd_obj_notify_ack(rbd_dev, notify_id);
2622 }
2623
2624 /*
2625  * Request sync osd watch/unwatch.  The value of "start" determines
2626  * whether a watch request is being initiated or torn down.
2627  */
2628 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2629 {
2630         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2631         struct rbd_obj_request *obj_request;
2632         int ret;
2633
2634         rbd_assert(start ^ !!rbd_dev->watch_event);
2635         rbd_assert(start ^ !!rbd_dev->watch_request);
2636
2637         if (start) {
2638                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2639                                                 &rbd_dev->watch_event);
2640                 if (ret < 0)
2641                         return ret;
2642                 rbd_assert(rbd_dev->watch_event != NULL);
2643         }
2644
2645         ret = -ENOMEM;
2646         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2647                                                         OBJ_REQUEST_NODATA);
2648         if (!obj_request)
2649                 goto out_cancel;
2650
2651         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2652         if (!obj_request->osd_req)
2653                 goto out_cancel;
2654
2655         if (start)
2656                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2657         else
2658                 ceph_osdc_unregister_linger_request(osdc,
2659                                         rbd_dev->watch_request->osd_req);
2660
2661         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2662                                 rbd_dev->watch_event->cookie, 0, start);
2663         rbd_osd_req_format_write(obj_request);
2664
2665         ret = rbd_obj_request_submit(osdc, obj_request);
2666         if (ret)
2667                 goto out_cancel;
2668         ret = rbd_obj_request_wait(obj_request);
2669         if (ret)
2670                 goto out_cancel;
2671         ret = obj_request->result;
2672         if (ret)
2673                 goto out_cancel;
2674
2675         /*
2676          * A watch request is set to linger, so the underlying osd
2677          * request won't go away until we unregister it.  We retain
2678          * a pointer to the object request during that time (in
2679          * rbd_dev->watch_request), so we'll keep a reference to
2680          * it.  We'll drop that reference (below) after we've
2681          * unregistered it.
2682          */
2683         if (start) {
2684                 rbd_dev->watch_request = obj_request;
2685
2686                 return 0;
2687         }
2688
2689         /* We have successfully torn down the watch request */
2690
2691         rbd_obj_request_put(rbd_dev->watch_request);
2692         rbd_dev->watch_request = NULL;
2693 out_cancel:
2694         /* Cancel the event if we're tearing down, or on error */
2695         ceph_osdc_cancel_event(rbd_dev->watch_event);
2696         rbd_dev->watch_event = NULL;
2697         if (obj_request)
2698                 rbd_obj_request_put(obj_request);
2699
2700         return ret;
2701 }
2702
2703 /*
2704  * Synchronous osd object method call.  Returns the number of bytes
2705  * returned in the outbound buffer, or a negative error code.
2706  */
2707 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2708                              const char *object_name,
2709                              const char *class_name,
2710                              const char *method_name,
2711                              const void *outbound,
2712                              size_t outbound_size,
2713                              void *inbound,
2714                              size_t inbound_size)
2715 {
2716         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2717         struct rbd_obj_request *obj_request;
2718         struct page **pages;
2719         u32 page_count;
2720         int ret;
2721
2722         /*
2723          * Method calls are ultimately read operations.  The result
2724          * should placed into the inbound buffer provided.  They
2725          * also supply outbound data--parameters for the object
2726          * method.  Currently if this is present it will be a
2727          * snapshot id.
2728          */
2729         page_count = (u32)calc_pages_for(0, inbound_size);
2730         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2731         if (IS_ERR(pages))
2732                 return PTR_ERR(pages);
2733
2734         ret = -ENOMEM;
2735         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2736                                                         OBJ_REQUEST_PAGES);
2737         if (!obj_request)
2738                 goto out;
2739
2740         obj_request->pages = pages;
2741         obj_request->page_count = page_count;
2742
2743         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2744         if (!obj_request->osd_req)
2745                 goto out;
2746
2747         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2748                                         class_name, method_name);
2749         if (outbound_size) {
2750                 struct ceph_pagelist *pagelist;
2751
2752                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2753                 if (!pagelist)
2754                         goto out;
2755
2756                 ceph_pagelist_init(pagelist);
2757                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2758                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2759                                                 pagelist);
2760         }
2761         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2762                                         obj_request->pages, inbound_size,
2763                                         0, false, false);
2764         rbd_osd_req_format_read(obj_request);
2765
2766         ret = rbd_obj_request_submit(osdc, obj_request);
2767         if (ret)
2768                 goto out;
2769         ret = rbd_obj_request_wait(obj_request);
2770         if (ret)
2771                 goto out;
2772
2773         ret = obj_request->result;
2774         if (ret < 0)
2775                 goto out;
2776
2777         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2778         ret = (int)obj_request->xferred;
2779         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2780 out:
2781         if (obj_request)
2782                 rbd_obj_request_put(obj_request);
2783         else
2784                 ceph_release_page_vector(pages, page_count);
2785
2786         return ret;
2787 }
2788
2789 static void rbd_request_fn(struct request_queue *q)
2790                 __releases(q->queue_lock) __acquires(q->queue_lock)
2791 {
2792         struct rbd_device *rbd_dev = q->queuedata;
2793         bool read_only = rbd_dev->mapping.read_only;
2794         struct request *rq;
2795         int result;
2796
2797         while ((rq = blk_fetch_request(q))) {
2798                 bool write_request = rq_data_dir(rq) == WRITE;
2799                 struct rbd_img_request *img_request;
2800                 u64 offset;
2801                 u64 length;
2802
2803                 /* Ignore any non-FS requests that filter through. */
2804
2805                 if (rq->cmd_type != REQ_TYPE_FS) {
2806                         dout("%s: non-fs request type %d\n", __func__,
2807                                 (int) rq->cmd_type);
2808                         __blk_end_request_all(rq, 0);
2809                         continue;
2810                 }
2811
2812                 /* Ignore/skip any zero-length requests */
2813
2814                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2815                 length = (u64) blk_rq_bytes(rq);
2816
2817                 if (!length) {
2818                         dout("%s: zero-length request\n", __func__);
2819                         __blk_end_request_all(rq, 0);
2820                         continue;
2821                 }
2822
2823                 spin_unlock_irq(q->queue_lock);
2824
2825                 /* Disallow writes to a read-only device */
2826
2827                 if (write_request) {
2828                         result = -EROFS;
2829                         if (read_only)
2830                                 goto end_request;
2831                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2832                 }
2833
2834                 /*
2835                  * Quit early if the mapped snapshot no longer
2836                  * exists.  It's still possible the snapshot will
2837                  * have disappeared by the time our request arrives
2838                  * at the osd, but there's no sense in sending it if
2839                  * we already know.
2840                  */
2841                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2842                         dout("request for non-existent snapshot");
2843                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2844                         result = -ENXIO;
2845                         goto end_request;
2846                 }
2847
2848                 result = -EINVAL;
2849                 if (offset && length > U64_MAX - offset + 1) {
2850                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2851                                 offset, length);
2852                         goto end_request;       /* Shouldn't happen */
2853                 }
2854
2855                 result = -EIO;
2856                 if (offset + length > rbd_dev->mapping.size) {
2857                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2858                                 offset, length, rbd_dev->mapping.size);
2859                         goto end_request;
2860                 }
2861
2862                 result = -ENOMEM;
2863                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2864                                                         write_request, false);
2865                 if (!img_request)
2866                         goto end_request;
2867
2868                 img_request->rq = rq;
2869
2870                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2871                                                 rq->bio);
2872                 if (!result)
2873                         result = rbd_img_request_submit(img_request);
2874                 if (result)
2875                         rbd_img_request_put(img_request);
2876 end_request:
2877                 spin_lock_irq(q->queue_lock);
2878                 if (result < 0) {
2879                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2880                                 write_request ? "write" : "read",
2881                                 length, offset, result);
2882
2883                         __blk_end_request_all(rq, result);
2884                 }
2885         }
2886 }
2887
2888 /*
2889  * a queue callback. Makes sure that we don't create a bio that spans across
2890  * multiple osd objects. One exception would be with a single page bios,
2891  * which we handle later at bio_chain_clone_range()
2892  */
2893 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2894                           struct bio_vec *bvec)
2895 {
2896         struct rbd_device *rbd_dev = q->queuedata;
2897         sector_t sector_offset;
2898         sector_t sectors_per_obj;
2899         sector_t obj_sector_offset;
2900         int ret;
2901
2902         /*
2903          * Find how far into its rbd object the partition-relative
2904          * bio start sector is to offset relative to the enclosing
2905          * device.
2906          */
2907         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2908         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2909         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2910
2911         /*
2912          * Compute the number of bytes from that offset to the end
2913          * of the object.  Account for what's already used by the bio.
2914          */
2915         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2916         if (ret > bmd->bi_size)
2917                 ret -= bmd->bi_size;
2918         else
2919                 ret = 0;
2920
2921         /*
2922          * Don't send back more than was asked for.  And if the bio
2923          * was empty, let the whole thing through because:  "Note
2924          * that a block device *must* allow a single page to be
2925          * added to an empty bio."
2926          */
2927         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2928         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2929                 ret = (int) bvec->bv_len;
2930
2931         return ret;
2932 }
2933
2934 static void rbd_free_disk(struct rbd_device *rbd_dev)
2935 {
2936         struct gendisk *disk = rbd_dev->disk;
2937
2938         if (!disk)
2939                 return;
2940
2941         rbd_dev->disk = NULL;
2942         if (disk->flags & GENHD_FL_UP) {
2943                 del_gendisk(disk);
2944                 if (disk->queue)
2945                         blk_cleanup_queue(disk->queue);
2946         }
2947         put_disk(disk);
2948 }
2949
2950 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2951                                 const char *object_name,
2952                                 u64 offset, u64 length, void *buf)
2953
2954 {
2955         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2956         struct rbd_obj_request *obj_request;
2957         struct page **pages = NULL;
2958         u32 page_count;
2959         size_t size;
2960         int ret;
2961
2962         page_count = (u32) calc_pages_for(offset, length);
2963         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2964         if (IS_ERR(pages))
2965                 ret = PTR_ERR(pages);
2966
2967         ret = -ENOMEM;
2968         obj_request = rbd_obj_request_create(object_name, offset, length,
2969                                                         OBJ_REQUEST_PAGES);
2970         if (!obj_request)
2971                 goto out;
2972
2973         obj_request->pages = pages;
2974         obj_request->page_count = page_count;
2975
2976         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2977         if (!obj_request->osd_req)
2978                 goto out;
2979
2980         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2981                                         offset, length, 0, 0);
2982         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2983                                         obj_request->pages,
2984                                         obj_request->length,
2985                                         obj_request->offset & ~PAGE_MASK,
2986                                         false, false);
2987         rbd_osd_req_format_read(obj_request);
2988
2989         ret = rbd_obj_request_submit(osdc, obj_request);
2990         if (ret)
2991                 goto out;
2992         ret = rbd_obj_request_wait(obj_request);
2993         if (ret)
2994                 goto out;
2995
2996         ret = obj_request->result;
2997         if (ret < 0)
2998                 goto out;
2999
3000         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3001         size = (size_t) obj_request->xferred;
3002         ceph_copy_from_page_vector(pages, buf, 0, size);
3003         rbd_assert(size <= (size_t)INT_MAX);
3004         ret = (int)size;
3005 out:
3006         if (obj_request)
3007                 rbd_obj_request_put(obj_request);
3008         else
3009                 ceph_release_page_vector(pages, page_count);
3010
3011         return ret;
3012 }
3013
3014 /*
3015  * Read the complete header for the given rbd device.
3016  *
3017  * Returns a pointer to a dynamically-allocated buffer containing
3018  * the complete and validated header.  Caller can pass the address
3019  * of a variable that will be filled in with the version of the
3020  * header object at the time it was read.
3021  *
3022  * Returns a pointer-coded errno if a failure occurs.
3023  */
3024 static struct rbd_image_header_ondisk *
3025 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3026 {
3027         struct rbd_image_header_ondisk *ondisk = NULL;
3028         u32 snap_count = 0;
3029         u64 names_size = 0;
3030         u32 want_count;
3031         int ret;
3032
3033         /*
3034          * The complete header will include an array of its 64-bit
3035          * snapshot ids, followed by the names of those snapshots as
3036          * a contiguous block of NUL-terminated strings.  Note that
3037          * the number of snapshots could change by the time we read
3038          * it in, in which case we re-read it.
3039          */
3040         do {
3041                 size_t size;
3042
3043                 kfree(ondisk);
3044
3045                 size = sizeof (*ondisk);
3046                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3047                 size += names_size;
3048                 ondisk = kmalloc(size, GFP_KERNEL);
3049                 if (!ondisk)
3050                         return ERR_PTR(-ENOMEM);
3051
3052                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3053                                        0, size, ondisk);
3054                 if (ret < 0)
3055                         goto out_err;
3056                 if ((size_t)ret < size) {
3057                         ret = -ENXIO;
3058                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3059                                 size, ret);
3060                         goto out_err;
3061                 }
3062                 if (!rbd_dev_ondisk_valid(ondisk)) {
3063                         ret = -ENXIO;
3064                         rbd_warn(rbd_dev, "invalid header");
3065                         goto out_err;
3066                 }
3067
3068                 names_size = le64_to_cpu(ondisk->snap_names_len);
3069                 want_count = snap_count;
3070                 snap_count = le32_to_cpu(ondisk->snap_count);
3071         } while (snap_count != want_count);
3072
3073         return ondisk;
3074
3075 out_err:
3076         kfree(ondisk);
3077
3078         return ERR_PTR(ret);
3079 }
3080
3081 /*
3082  * reload the ondisk the header
3083  */
3084 static int rbd_read_header(struct rbd_device *rbd_dev,
3085                            struct rbd_image_header *header)
3086 {
3087         struct rbd_image_header_ondisk *ondisk;
3088         int ret;
3089
3090         ondisk = rbd_dev_v1_header_read(rbd_dev);
3091         if (IS_ERR(ondisk))
3092                 return PTR_ERR(ondisk);
3093         ret = rbd_header_from_disk(header, ondisk);
3094         kfree(ondisk);
3095
3096         return ret;
3097 }
3098
3099 /*
3100  * only read the first part of the ondisk header, without the snaps info
3101  */
3102 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3103 {
3104         int ret;
3105         struct rbd_image_header h;
3106
3107         ret = rbd_read_header(rbd_dev, &h);
3108         if (ret < 0)
3109                 return ret;
3110
3111         down_write(&rbd_dev->header_rwsem);
3112
3113         /* Update image size, and check for resize of mapped image */
3114         rbd_dev->header.image_size = h.image_size;
3115         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3116                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3117                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3118
3119         /* rbd_dev->header.object_prefix shouldn't change */
3120         kfree(rbd_dev->header.snap_sizes);
3121         kfree(rbd_dev->header.snap_names);
3122         /* osd requests may still refer to snapc */
3123         ceph_put_snap_context(rbd_dev->header.snapc);
3124
3125         rbd_dev->header.image_size = h.image_size;
3126         rbd_dev->header.snapc = h.snapc;
3127         rbd_dev->header.snap_names = h.snap_names;
3128         rbd_dev->header.snap_sizes = h.snap_sizes;
3129         /* Free the extra copy of the object prefix */
3130         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3131                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3132         kfree(h.object_prefix);
3133
3134         up_write(&rbd_dev->header_rwsem);
3135
3136         return ret;
3137 }
3138
3139 /*
3140  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3141  * has disappeared from the (just updated) snapshot context.
3142  */
3143 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3144 {
3145         u64 snap_id;
3146
3147         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3148                 return;
3149
3150         snap_id = rbd_dev->spec->snap_id;
3151         if (snap_id == CEPH_NOSNAP)
3152                 return;
3153
3154         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3155                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3156 }
3157
3158 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3159 {
3160         u64 mapping_size;
3161         int ret;
3162
3163         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3164         mapping_size = rbd_dev->mapping.size;
3165         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3166         if (rbd_dev->image_format == 1)
3167                 ret = rbd_dev_v1_refresh(rbd_dev);
3168         else
3169                 ret = rbd_dev_v2_refresh(rbd_dev);
3170
3171         /* If it's a mapped snapshot, validate its EXISTS flag */
3172
3173         rbd_exists_validate(rbd_dev);
3174         mutex_unlock(&ctl_mutex);
3175         if (mapping_size != rbd_dev->mapping.size) {
3176                 sector_t size;
3177
3178                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3179                 dout("setting size to %llu sectors", (unsigned long long)size);
3180                 set_capacity(rbd_dev->disk, size);
3181                 revalidate_disk(rbd_dev->disk);
3182         }
3183
3184         return ret;
3185 }
3186
3187 static int rbd_init_disk(struct rbd_device *rbd_dev)
3188 {
3189         struct gendisk *disk;
3190         struct request_queue *q;
3191         u64 segment_size;
3192
3193         /* create gendisk info */
3194         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3195         if (!disk)
3196                 return -ENOMEM;
3197
3198         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3199                  rbd_dev->dev_id);
3200         disk->major = rbd_dev->major;
3201         disk->first_minor = 0;
3202         disk->fops = &rbd_bd_ops;
3203         disk->private_data = rbd_dev;
3204
3205         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3206         if (!q)
3207                 goto out_disk;
3208
3209         /* We use the default size, but let's be explicit about it. */
3210         blk_queue_physical_block_size(q, SECTOR_SIZE);
3211
3212         /* set io sizes to object size */
3213         segment_size = rbd_obj_bytes(&rbd_dev->header);
3214         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3215         blk_queue_max_segment_size(q, segment_size);
3216         blk_queue_io_min(q, segment_size);
3217         blk_queue_io_opt(q, segment_size);
3218
3219         blk_queue_merge_bvec(q, rbd_merge_bvec);
3220         disk->queue = q;
3221
3222         q->queuedata = rbd_dev;
3223
3224         rbd_dev->disk = disk;
3225
3226         return 0;
3227 out_disk:
3228         put_disk(disk);
3229
3230         return -ENOMEM;
3231 }
3232
3233 /*
3234   sysfs
3235 */
3236
3237 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3238 {
3239         return container_of(dev, struct rbd_device, dev);
3240 }
3241
3242 static ssize_t rbd_size_show(struct device *dev,
3243                              struct device_attribute *attr, char *buf)
3244 {
3245         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3246
3247         return sprintf(buf, "%llu\n",
3248                 (unsigned long long)rbd_dev->mapping.size);
3249 }
3250
3251 /*
3252  * Note this shows the features for whatever's mapped, which is not
3253  * necessarily the base image.
3254  */
3255 static ssize_t rbd_features_show(struct device *dev,
3256                              struct device_attribute *attr, char *buf)
3257 {
3258         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3259
3260         return sprintf(buf, "0x%016llx\n",
3261                         (unsigned long long)rbd_dev->mapping.features);
3262 }
3263
3264 static ssize_t rbd_major_show(struct device *dev,
3265                               struct device_attribute *attr, char *buf)
3266 {
3267         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268
3269         if (rbd_dev->major)
3270                 return sprintf(buf, "%d\n", rbd_dev->major);
3271
3272         return sprintf(buf, "(none)\n");
3273
3274 }
3275
3276 static ssize_t rbd_client_id_show(struct device *dev,
3277                                   struct device_attribute *attr, char *buf)
3278 {
3279         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3280
3281         return sprintf(buf, "client%lld\n",
3282                         ceph_client_id(rbd_dev->rbd_client->client));
3283 }
3284
3285 static ssize_t rbd_pool_show(struct device *dev,
3286                              struct device_attribute *attr, char *buf)
3287 {
3288         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3289
3290         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3291 }
3292
3293 static ssize_t rbd_pool_id_show(struct device *dev,
3294                              struct device_attribute *attr, char *buf)
3295 {
3296         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3297
3298         return sprintf(buf, "%llu\n",
3299                         (unsigned long long) rbd_dev->spec->pool_id);
3300 }
3301
3302 static ssize_t rbd_name_show(struct device *dev,
3303                              struct device_attribute *attr, char *buf)
3304 {
3305         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3306
3307         if (rbd_dev->spec->image_name)
3308                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3309
3310         return sprintf(buf, "(unknown)\n");
3311 }
3312
3313 static ssize_t rbd_image_id_show(struct device *dev,
3314                              struct device_attribute *attr, char *buf)
3315 {
3316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3317
3318         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3319 }
3320
3321 /*
3322  * Shows the name of the currently-mapped snapshot (or
3323  * RBD_SNAP_HEAD_NAME for the base image).
3324  */
3325 static ssize_t rbd_snap_show(struct device *dev,
3326                              struct device_attribute *attr,
3327                              char *buf)
3328 {
3329         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3330
3331         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3332 }
3333
3334 /*
3335  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3336  * for the parent image.  If there is no parent, simply shows
3337  * "(no parent image)".
3338  */
3339 static ssize_t rbd_parent_show(struct device *dev,
3340                              struct device_attribute *attr,
3341                              char *buf)
3342 {
3343         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3344         struct rbd_spec *spec = rbd_dev->parent_spec;
3345         int count;
3346         char *bufp = buf;
3347
3348         if (!spec)
3349                 return sprintf(buf, "(no parent image)\n");
3350
3351         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3352                         (unsigned long long) spec->pool_id, spec->pool_name);
3353         if (count < 0)
3354                 return count;
3355         bufp += count;
3356
3357         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3358                         spec->image_name ? spec->image_name : "(unknown)");
3359         if (count < 0)
3360                 return count;
3361         bufp += count;
3362
3363         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3364                         (unsigned long long) spec->snap_id, spec->snap_name);
3365         if (count < 0)
3366                 return count;
3367         bufp += count;
3368
3369         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3370         if (count < 0)
3371                 return count;
3372         bufp += count;
3373
3374         return (ssize_t) (bufp - buf);
3375 }
3376
3377 static ssize_t rbd_image_refresh(struct device *dev,
3378                                  struct device_attribute *attr,
3379                                  const char *buf,
3380                                  size_t size)
3381 {
3382         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3383         int ret;
3384
3385         ret = rbd_dev_refresh(rbd_dev);
3386         if (ret)
3387                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3388
3389         return ret < 0 ? ret : size;
3390 }
3391
3392 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3393 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3394 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3395 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3396 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3397 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3398 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3399 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3400 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3401 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3402 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3403
3404 static struct attribute *rbd_attrs[] = {
3405         &dev_attr_size.attr,
3406         &dev_attr_features.attr,
3407         &dev_attr_major.attr,
3408         &dev_attr_client_id.attr,
3409         &dev_attr_pool.attr,
3410         &dev_attr_pool_id.attr,
3411         &dev_attr_name.attr,
3412         &dev_attr_image_id.attr,
3413         &dev_attr_current_snap.attr,
3414         &dev_attr_parent.attr,
3415         &dev_attr_refresh.attr,
3416         NULL
3417 };
3418
3419 static struct attribute_group rbd_attr_group = {
3420         .attrs = rbd_attrs,
3421 };
3422
3423 static const struct attribute_group *rbd_attr_groups[] = {
3424         &rbd_attr_group,
3425         NULL
3426 };
3427
3428 static void rbd_sysfs_dev_release(struct device *dev)
3429 {
3430 }
3431
3432 static struct device_type rbd_device_type = {
3433         .name           = "rbd",
3434         .groups         = rbd_attr_groups,
3435         .release        = rbd_sysfs_dev_release,
3436 };
3437
3438 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3439 {
3440         kref_get(&spec->kref);
3441
3442         return spec;
3443 }
3444
3445 static void rbd_spec_free(struct kref *kref);
3446 static void rbd_spec_put(struct rbd_spec *spec)
3447 {
3448         if (spec)
3449                 kref_put(&spec->kref, rbd_spec_free);
3450 }
3451
3452 static struct rbd_spec *rbd_spec_alloc(void)
3453 {
3454         struct rbd_spec *spec;
3455
3456         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3457         if (!spec)
3458                 return NULL;
3459         kref_init(&spec->kref);
3460
3461         return spec;
3462 }
3463
3464 static void rbd_spec_free(struct kref *kref)
3465 {
3466         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3467
3468         kfree(spec->pool_name);
3469         kfree(spec->image_id);
3470         kfree(spec->image_name);
3471         kfree(spec->snap_name);
3472         kfree(spec);
3473 }
3474
3475 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3476                                 struct rbd_spec *spec)
3477 {
3478         struct rbd_device *rbd_dev;
3479
3480         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3481         if (!rbd_dev)
3482                 return NULL;
3483
3484         spin_lock_init(&rbd_dev->lock);
3485         rbd_dev->flags = 0;
3486         INIT_LIST_HEAD(&rbd_dev->node);
3487         init_rwsem(&rbd_dev->header_rwsem);
3488
3489         rbd_dev->spec = spec;
3490         rbd_dev->rbd_client = rbdc;
3491
3492         /* Initialize the layout used for all rbd requests */
3493
3494         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3495         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3496         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3497         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3498
3499         return rbd_dev;
3500 }
3501
3502 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3503 {
3504         rbd_put_client(rbd_dev->rbd_client);
3505         rbd_spec_put(rbd_dev->spec);
3506         kfree(rbd_dev);
3507 }
3508
3509 /*
3510  * Get the size and object order for an image snapshot, or if
3511  * snap_id is CEPH_NOSNAP, gets this information for the base
3512  * image.
3513  */
3514 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3515                                 u8 *order, u64 *snap_size)
3516 {
3517         __le64 snapid = cpu_to_le64(snap_id);
3518         int ret;
3519         struct {
3520                 u8 order;
3521                 __le64 size;
3522         } __attribute__ ((packed)) size_buf = { 0 };
3523
3524         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3525                                 "rbd", "get_size",
3526                                 &snapid, sizeof (snapid),
3527                                 &size_buf, sizeof (size_buf));
3528         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3529         if (ret < 0)
3530                 return ret;
3531         if (ret < sizeof (size_buf))
3532                 return -ERANGE;
3533
3534         if (order)
3535                 *order = size_buf.order;
3536         *snap_size = le64_to_cpu(size_buf.size);
3537
3538         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3539                 (unsigned long long)snap_id, (unsigned int)*order,
3540                 (unsigned long long)*snap_size);
3541
3542         return 0;
3543 }
3544
3545 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3546 {
3547         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3548                                         &rbd_dev->header.obj_order,
3549                                         &rbd_dev->header.image_size);
3550 }
3551
3552 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3553 {
3554         void *reply_buf;
3555         int ret;
3556         void *p;
3557
3558         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3559         if (!reply_buf)
3560                 return -ENOMEM;
3561
3562         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3563                                 "rbd", "get_object_prefix", NULL, 0,
3564                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3565         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3566         if (ret < 0)
3567                 goto out;
3568
3569         p = reply_buf;
3570         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3571                                                 p + ret, NULL, GFP_NOIO);
3572         ret = 0;
3573
3574         if (IS_ERR(rbd_dev->header.object_prefix)) {
3575                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3576                 rbd_dev->header.object_prefix = NULL;
3577         } else {
3578                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3579         }
3580 out:
3581         kfree(reply_buf);
3582
3583         return ret;
3584 }
3585
3586 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3587                 u64 *snap_features)
3588 {
3589         __le64 snapid = cpu_to_le64(snap_id);
3590         struct {
3591                 __le64 features;
3592                 __le64 incompat;
3593         } __attribute__ ((packed)) features_buf = { 0 };
3594         u64 incompat;
3595         int ret;
3596
3597         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3598                                 "rbd", "get_features",
3599                                 &snapid, sizeof (snapid),
3600                                 &features_buf, sizeof (features_buf));
3601         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3602         if (ret < 0)
3603                 return ret;
3604         if (ret < sizeof (features_buf))
3605                 return -ERANGE;
3606
3607         incompat = le64_to_cpu(features_buf.incompat);
3608         if (incompat & ~RBD_FEATURES_SUPPORTED)
3609                 return -ENXIO;
3610
3611         *snap_features = le64_to_cpu(features_buf.features);
3612
3613         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3614                 (unsigned long long)snap_id,
3615                 (unsigned long long)*snap_features,
3616                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3617
3618         return 0;
3619 }
3620
3621 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3622 {
3623         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3624                                                 &rbd_dev->header.features);
3625 }
3626
3627 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3628 {
3629         struct rbd_spec *parent_spec;
3630         size_t size;
3631         void *reply_buf = NULL;
3632         __le64 snapid;
3633         void *p;
3634         void *end;
3635         char *image_id;
3636         u64 overlap;
3637         int ret;
3638
3639         parent_spec = rbd_spec_alloc();
3640         if (!parent_spec)
3641                 return -ENOMEM;
3642
3643         size = sizeof (__le64) +                                /* pool_id */
3644                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3645                 sizeof (__le64) +                               /* snap_id */
3646                 sizeof (__le64);                                /* overlap */
3647         reply_buf = kmalloc(size, GFP_KERNEL);
3648         if (!reply_buf) {
3649                 ret = -ENOMEM;
3650                 goto out_err;
3651         }
3652
3653         snapid = cpu_to_le64(CEPH_NOSNAP);
3654         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3655                                 "rbd", "get_parent",
3656                                 &snapid, sizeof (snapid),
3657                                 reply_buf, size);
3658         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3659         if (ret < 0)
3660                 goto out_err;
3661
3662         p = reply_buf;
3663         end = reply_buf + ret;
3664         ret = -ERANGE;
3665         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3666         if (parent_spec->pool_id == CEPH_NOPOOL)
3667                 goto out;       /* No parent?  No problem. */
3668
3669         /* The ceph file layout needs to fit pool id in 32 bits */
3670
3671         ret = -EIO;
3672         if (parent_spec->pool_id > (u64)U32_MAX) {
3673                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3674                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3675                 goto out_err;
3676         }
3677
3678         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3679         if (IS_ERR(image_id)) {
3680                 ret = PTR_ERR(image_id);
3681                 goto out_err;
3682         }
3683         parent_spec->image_id = image_id;
3684         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3685         ceph_decode_64_safe(&p, end, overlap, out_err);
3686
3687         rbd_dev->parent_overlap = overlap;
3688         rbd_dev->parent_spec = parent_spec;
3689         parent_spec = NULL;     /* rbd_dev now owns this */
3690 out:
3691         ret = 0;
3692 out_err:
3693         kfree(reply_buf);
3694         rbd_spec_put(parent_spec);
3695
3696         return ret;
3697 }
3698
3699 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3700 {
3701         struct {
3702                 __le64 stripe_unit;
3703                 __le64 stripe_count;
3704         } __attribute__ ((packed)) striping_info_buf = { 0 };
3705         size_t size = sizeof (striping_info_buf);
3706         void *p;
3707         u64 obj_size;
3708         u64 stripe_unit;
3709         u64 stripe_count;
3710         int ret;
3711
3712         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3713                                 "rbd", "get_stripe_unit_count", NULL, 0,
3714                                 (char *)&striping_info_buf, size);
3715         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3716         if (ret < 0)
3717                 return ret;
3718         if (ret < size)
3719                 return -ERANGE;
3720
3721         /*
3722          * We don't actually support the "fancy striping" feature
3723          * (STRIPINGV2) yet, but if the striping sizes are the
3724          * defaults the behavior is the same as before.  So find
3725          * out, and only fail if the image has non-default values.
3726          */
3727         ret = -EINVAL;
3728         obj_size = (u64)1 << rbd_dev->header.obj_order;
3729         p = &striping_info_buf;
3730         stripe_unit = ceph_decode_64(&p);
3731         if (stripe_unit != obj_size) {
3732                 rbd_warn(rbd_dev, "unsupported stripe unit "
3733                                 "(got %llu want %llu)",
3734                                 stripe_unit, obj_size);
3735                 return -EINVAL;
3736         }
3737         stripe_count = ceph_decode_64(&p);
3738         if (stripe_count != 1) {
3739                 rbd_warn(rbd_dev, "unsupported stripe count "
3740                                 "(got %llu want 1)", stripe_count);
3741                 return -EINVAL;
3742         }
3743         rbd_dev->header.stripe_unit = stripe_unit;
3744         rbd_dev->header.stripe_count = stripe_count;
3745
3746         return 0;
3747 }
3748
3749 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3750 {
3751         size_t image_id_size;
3752         char *image_id;
3753         void *p;
3754         void *end;
3755         size_t size;
3756         void *reply_buf = NULL;
3757         size_t len = 0;
3758         char *image_name = NULL;
3759         int ret;
3760
3761         rbd_assert(!rbd_dev->spec->image_name);
3762
3763         len = strlen(rbd_dev->spec->image_id);
3764         image_id_size = sizeof (__le32) + len;
3765         image_id = kmalloc(image_id_size, GFP_KERNEL);
3766         if (!image_id)
3767                 return NULL;
3768
3769         p = image_id;
3770         end = image_id + image_id_size;
3771         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3772
3773         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3774         reply_buf = kmalloc(size, GFP_KERNEL);
3775         if (!reply_buf)
3776                 goto out;
3777
3778         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3779                                 "rbd", "dir_get_name",
3780                                 image_id, image_id_size,
3781                                 reply_buf, size);
3782         if (ret < 0)
3783                 goto out;
3784         p = reply_buf;
3785         end = reply_buf + ret;
3786
3787         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3788         if (IS_ERR(image_name))
3789                 image_name = NULL;
3790         else
3791                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3792 out:
3793         kfree(reply_buf);
3794         kfree(image_id);
3795
3796         return image_name;
3797 }
3798
3799 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3800 {
3801         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3802         const char *snap_name;
3803         u32 which = 0;
3804
3805         /* Skip over names until we find the one we are looking for */
3806
3807         snap_name = rbd_dev->header.snap_names;
3808         while (which < snapc->num_snaps) {
3809                 if (!strcmp(name, snap_name))
3810                         return snapc->snaps[which];
3811                 snap_name += strlen(snap_name) + 1;
3812                 which++;
3813         }
3814         return CEPH_NOSNAP;
3815 }
3816
3817 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3818 {
3819         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3820         u32 which;
3821         bool found = false;
3822         u64 snap_id;
3823
3824         for (which = 0; !found && which < snapc->num_snaps; which++) {
3825                 const char *snap_name;
3826
3827                 snap_id = snapc->snaps[which];
3828                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3829                 if (IS_ERR(snap_name))
3830                         break;
3831                 found = !strcmp(name, snap_name);
3832                 kfree(snap_name);
3833         }
3834         return found ? snap_id : CEPH_NOSNAP;
3835 }
3836
3837 /*
3838  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3839  * no snapshot by that name is found, or if an error occurs.
3840  */
3841 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3842 {
3843         if (rbd_dev->image_format == 1)
3844                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3845
3846         return rbd_v2_snap_id_by_name(rbd_dev, name);
3847 }
3848
3849 /*
3850  * When an rbd image has a parent image, it is identified by the
3851  * pool, image, and snapshot ids (not names).  This function fills
3852  * in the names for those ids.  (It's OK if we can't figure out the
3853  * name for an image id, but the pool and snapshot ids should always
3854  * exist and have names.)  All names in an rbd spec are dynamically
3855  * allocated.
3856  *
3857  * When an image being mapped (not a parent) is probed, we have the
3858  * pool name and pool id, image name and image id, and the snapshot
3859  * name.  The only thing we're missing is the snapshot id.
3860  */
3861 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3862 {
3863         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3864         struct rbd_spec *spec = rbd_dev->spec;
3865         const char *pool_name;
3866         const char *image_name;
3867         const char *snap_name;
3868         int ret;
3869
3870         /*
3871          * An image being mapped will have the pool name (etc.), but
3872          * we need to look up the snapshot id.
3873          */
3874         if (spec->pool_name) {
3875                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3876                         u64 snap_id;
3877
3878                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3879                         if (snap_id == CEPH_NOSNAP)
3880                                 return -ENOENT;
3881                         spec->snap_id = snap_id;
3882                 } else {
3883                         spec->snap_id = CEPH_NOSNAP;
3884                 }
3885
3886                 return 0;
3887         }
3888
3889         /* Get the pool name; we have to make our own copy of this */
3890
3891         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3892         if (!pool_name) {
3893                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3894                 return -EIO;
3895         }
3896         pool_name = kstrdup(pool_name, GFP_KERNEL);
3897         if (!pool_name)
3898                 return -ENOMEM;
3899
3900         /* Fetch the image name; tolerate failure here */
3901
3902         image_name = rbd_dev_image_name(rbd_dev);
3903         if (!image_name)
3904                 rbd_warn(rbd_dev, "unable to get image name");
3905
3906         /* Look up the snapshot name, and make a copy */
3907
3908         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3909         if (!snap_name) {
3910                 ret = -ENOMEM;
3911                 goto out_err;
3912         }
3913
3914         spec->pool_name = pool_name;
3915         spec->image_name = image_name;
3916         spec->snap_name = snap_name;
3917
3918         return 0;
3919 out_err:
3920         kfree(image_name);
3921         kfree(pool_name);
3922
3923         return ret;
3924 }
3925
3926 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3927 {
3928         size_t size;
3929         int ret;
3930         void *reply_buf;
3931         void *p;
3932         void *end;
3933         u64 seq;
3934         u32 snap_count;
3935         struct ceph_snap_context *snapc;
3936         u32 i;
3937
3938         /*
3939          * We'll need room for the seq value (maximum snapshot id),
3940          * snapshot count, and array of that many snapshot ids.
3941          * For now we have a fixed upper limit on the number we're
3942          * prepared to receive.
3943          */
3944         size = sizeof (__le64) + sizeof (__le32) +
3945                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3946         reply_buf = kzalloc(size, GFP_KERNEL);
3947         if (!reply_buf)
3948                 return -ENOMEM;
3949
3950         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3951                                 "rbd", "get_snapcontext", NULL, 0,
3952                                 reply_buf, size);
3953         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3954         if (ret < 0)
3955                 goto out;
3956
3957         p = reply_buf;
3958         end = reply_buf + ret;
3959         ret = -ERANGE;
3960         ceph_decode_64_safe(&p, end, seq, out);
3961         ceph_decode_32_safe(&p, end, snap_count, out);
3962
3963         /*
3964          * Make sure the reported number of snapshot ids wouldn't go
3965          * beyond the end of our buffer.  But before checking that,
3966          * make sure the computed size of the snapshot context we
3967          * allocate is representable in a size_t.
3968          */
3969         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3970                                  / sizeof (u64)) {
3971                 ret = -EINVAL;
3972                 goto out;
3973         }
3974         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3975                 goto out;
3976         ret = 0;
3977
3978         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3979         if (!snapc) {
3980                 ret = -ENOMEM;
3981                 goto out;
3982         }
3983         snapc->seq = seq;
3984         for (i = 0; i < snap_count; i++)
3985                 snapc->snaps[i] = ceph_decode_64(&p);
3986
3987         ceph_put_snap_context(rbd_dev->header.snapc);
3988         rbd_dev->header.snapc = snapc;
3989
3990         dout("  snap context seq = %llu, snap_count = %u\n",
3991                 (unsigned long long)seq, (unsigned int)snap_count);
3992 out:
3993         kfree(reply_buf);
3994
3995         return ret;
3996 }
3997
3998 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3999                                         u64 snap_id)
4000 {
4001         size_t size;
4002         void *reply_buf;
4003         __le64 snapid;
4004         int ret;
4005         void *p;
4006         void *end;
4007         char *snap_name;
4008
4009         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4010         reply_buf = kmalloc(size, GFP_KERNEL);
4011         if (!reply_buf)
4012                 return ERR_PTR(-ENOMEM);
4013
4014         snapid = cpu_to_le64(snap_id);
4015         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4016                                 "rbd", "get_snapshot_name",
4017                                 &snapid, sizeof (snapid),
4018                                 reply_buf, size);
4019         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4020         if (ret < 0) {
4021                 snap_name = ERR_PTR(ret);
4022                 goto out;
4023         }
4024
4025         p = reply_buf;
4026         end = reply_buf + ret;
4027         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4028         if (IS_ERR(snap_name))
4029                 goto out;
4030
4031         dout("  snap_id 0x%016llx snap_name = %s\n",
4032                 (unsigned long long)snap_id, snap_name);
4033 out:
4034         kfree(reply_buf);
4035
4036         return snap_name;
4037 }
4038
4039 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4040 {
4041         int ret;
4042
4043         down_write(&rbd_dev->header_rwsem);
4044
4045         ret = rbd_dev_v2_image_size(rbd_dev);
4046         if (ret)
4047                 goto out;
4048         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4049                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4050                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4051
4052         ret = rbd_dev_v2_snap_context(rbd_dev);
4053         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4054         if (ret)
4055                 goto out;
4056 out:
4057         up_write(&rbd_dev->header_rwsem);
4058
4059         return ret;
4060 }
4061
4062 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4063 {
4064         struct device *dev;
4065         int ret;
4066
4067         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4068
4069         dev = &rbd_dev->dev;
4070         dev->bus = &rbd_bus_type;
4071         dev->type = &rbd_device_type;
4072         dev->parent = &rbd_root_dev;
4073         dev->release = rbd_dev_device_release;
4074         dev_set_name(dev, "%d", rbd_dev->dev_id);
4075         ret = device_register(dev);
4076
4077         mutex_unlock(&ctl_mutex);
4078
4079         return ret;
4080 }
4081
4082 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4083 {
4084         device_unregister(&rbd_dev->dev);
4085 }
4086
4087 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4088
4089 /*
4090  * Get a unique rbd identifier for the given new rbd_dev, and add
4091  * the rbd_dev to the global list.  The minimum rbd id is 1.
4092  */
4093 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4094 {
4095         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4096
4097         spin_lock(&rbd_dev_list_lock);
4098         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4099         spin_unlock(&rbd_dev_list_lock);
4100         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4101                 (unsigned long long) rbd_dev->dev_id);
4102 }
4103
4104 /*
4105  * Remove an rbd_dev from the global list, and record that its
4106  * identifier is no longer in use.
4107  */
4108 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4109 {
4110         struct list_head *tmp;
4111         int rbd_id = rbd_dev->dev_id;
4112         int max_id;
4113
4114         rbd_assert(rbd_id > 0);
4115
4116         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4117                 (unsigned long long) rbd_dev->dev_id);
4118         spin_lock(&rbd_dev_list_lock);
4119         list_del_init(&rbd_dev->node);
4120
4121         /*
4122          * If the id being "put" is not the current maximum, there
4123          * is nothing special we need to do.
4124          */
4125         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4126                 spin_unlock(&rbd_dev_list_lock);
4127                 return;
4128         }
4129
4130         /*
4131          * We need to update the current maximum id.  Search the
4132          * list to find out what it is.  We're more likely to find
4133          * the maximum at the end, so search the list backward.
4134          */
4135         max_id = 0;
4136         list_for_each_prev(tmp, &rbd_dev_list) {
4137                 struct rbd_device *rbd_dev;
4138
4139                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4140                 if (rbd_dev->dev_id > max_id)
4141                         max_id = rbd_dev->dev_id;
4142         }
4143         spin_unlock(&rbd_dev_list_lock);
4144
4145         /*
4146          * The max id could have been updated by rbd_dev_id_get(), in
4147          * which case it now accurately reflects the new maximum.
4148          * Be careful not to overwrite the maximum value in that
4149          * case.
4150          */
4151         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4152         dout("  max dev id has been reset\n");
4153 }
4154
4155 /*
4156  * Skips over white space at *buf, and updates *buf to point to the
4157  * first found non-space character (if any). Returns the length of
4158  * the token (string of non-white space characters) found.  Note
4159  * that *buf must be terminated with '\0'.
4160  */
4161 static inline size_t next_token(const char **buf)
4162 {
4163         /*
4164         * These are the characters that produce nonzero for
4165         * isspace() in the "C" and "POSIX" locales.
4166         */
4167         const char *spaces = " \f\n\r\t\v";
4168
4169         *buf += strspn(*buf, spaces);   /* Find start of token */
4170
4171         return strcspn(*buf, spaces);   /* Return token length */
4172 }
4173
4174 /*
4175  * Finds the next token in *buf, and if the provided token buffer is
4176  * big enough, copies the found token into it.  The result, if
4177  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4178  * must be terminated with '\0' on entry.
4179  *
4180  * Returns the length of the token found (not including the '\0').
4181  * Return value will be 0 if no token is found, and it will be >=
4182  * token_size if the token would not fit.
4183  *
4184  * The *buf pointer will be updated to point beyond the end of the
4185  * found token.  Note that this occurs even if the token buffer is
4186  * too small to hold it.
4187  */
4188 static inline size_t copy_token(const char **buf,
4189                                 char *token,
4190                                 size_t token_size)
4191 {
4192         size_t len;
4193
4194         len = next_token(buf);
4195         if (len < token_size) {
4196                 memcpy(token, *buf, len);
4197                 *(token + len) = '\0';
4198         }
4199         *buf += len;
4200
4201         return len;
4202 }
4203
4204 /*
4205  * Finds the next token in *buf, dynamically allocates a buffer big
4206  * enough to hold a copy of it, and copies the token into the new
4207  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4208  * that a duplicate buffer is created even for a zero-length token.
4209  *
4210  * Returns a pointer to the newly-allocated duplicate, or a null
4211  * pointer if memory for the duplicate was not available.  If
4212  * the lenp argument is a non-null pointer, the length of the token
4213  * (not including the '\0') is returned in *lenp.
4214  *
4215  * If successful, the *buf pointer will be updated to point beyond
4216  * the end of the found token.
4217  *
4218  * Note: uses GFP_KERNEL for allocation.
4219  */
4220 static inline char *dup_token(const char **buf, size_t *lenp)
4221 {
4222         char *dup;
4223         size_t len;
4224
4225         len = next_token(buf);
4226         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4227         if (!dup)
4228                 return NULL;
4229         *(dup + len) = '\0';
4230         *buf += len;
4231
4232         if (lenp)
4233                 *lenp = len;
4234
4235         return dup;
4236 }
4237
4238 /*
4239  * Parse the options provided for an "rbd add" (i.e., rbd image
4240  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4241  * and the data written is passed here via a NUL-terminated buffer.
4242  * Returns 0 if successful or an error code otherwise.
4243  *
4244  * The information extracted from these options is recorded in
4245  * the other parameters which return dynamically-allocated
4246  * structures:
4247  *  ceph_opts
4248  *      The address of a pointer that will refer to a ceph options
4249  *      structure.  Caller must release the returned pointer using
4250  *      ceph_destroy_options() when it is no longer needed.
4251  *  rbd_opts
4252  *      Address of an rbd options pointer.  Fully initialized by
4253  *      this function; caller must release with kfree().
4254  *  spec
4255  *      Address of an rbd image specification pointer.  Fully
4256  *      initialized by this function based on parsed options.
4257  *      Caller must release with rbd_spec_put().
4258  *
4259  * The options passed take this form:
4260  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4261  * where:
4262  *  <mon_addrs>
4263  *      A comma-separated list of one or more monitor addresses.
4264  *      A monitor address is an ip address, optionally followed
4265  *      by a port number (separated by a colon).
4266  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4267  *  <options>
4268  *      A comma-separated list of ceph and/or rbd options.
4269  *  <pool_name>
4270  *      The name of the rados pool containing the rbd image.
4271  *  <image_name>
4272  *      The name of the image in that pool to map.
4273  *  <snap_id>
4274  *      An optional snapshot id.  If provided, the mapping will
4275  *      present data from the image at the time that snapshot was
4276  *      created.  The image head is used if no snapshot id is
4277  *      provided.  Snapshot mappings are always read-only.
4278  */
4279 static int rbd_add_parse_args(const char *buf,
4280                                 struct ceph_options **ceph_opts,
4281                                 struct rbd_options **opts,
4282                                 struct rbd_spec **rbd_spec)
4283 {
4284         size_t len;
4285         char *options;
4286         const char *mon_addrs;
4287         char *snap_name;
4288         size_t mon_addrs_size;
4289         struct rbd_spec *spec = NULL;
4290         struct rbd_options *rbd_opts = NULL;
4291         struct ceph_options *copts;
4292         int ret;
4293
4294         /* The first four tokens are required */
4295
4296         len = next_token(&buf);
4297         if (!len) {
4298                 rbd_warn(NULL, "no monitor address(es) provided");
4299                 return -EINVAL;
4300         }
4301         mon_addrs = buf;
4302         mon_addrs_size = len + 1;
4303         buf += len;
4304
4305         ret = -EINVAL;
4306         options = dup_token(&buf, NULL);
4307         if (!options)
4308                 return -ENOMEM;
4309         if (!*options) {
4310                 rbd_warn(NULL, "no options provided");
4311                 goto out_err;
4312         }
4313
4314         spec = rbd_spec_alloc();
4315         if (!spec)
4316                 goto out_mem;
4317
4318         spec->pool_name = dup_token(&buf, NULL);
4319         if (!spec->pool_name)
4320                 goto out_mem;
4321         if (!*spec->pool_name) {
4322                 rbd_warn(NULL, "no pool name provided");
4323                 goto out_err;
4324         }
4325
4326         spec->image_name = dup_token(&buf, NULL);
4327         if (!spec->image_name)
4328                 goto out_mem;
4329         if (!*spec->image_name) {
4330                 rbd_warn(NULL, "no image name provided");
4331                 goto out_err;
4332         }
4333
4334         /*
4335          * Snapshot name is optional; default is to use "-"
4336          * (indicating the head/no snapshot).
4337          */
4338         len = next_token(&buf);
4339         if (!len) {
4340                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4341                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4342         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4343                 ret = -ENAMETOOLONG;
4344                 goto out_err;
4345         }
4346         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4347         if (!snap_name)
4348                 goto out_mem;
4349         *(snap_name + len) = '\0';
4350         spec->snap_name = snap_name;
4351
4352         /* Initialize all rbd options to the defaults */
4353
4354         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4355         if (!rbd_opts)
4356                 goto out_mem;
4357
4358         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4359
4360         copts = ceph_parse_options(options, mon_addrs,
4361                                         mon_addrs + mon_addrs_size - 1,
4362                                         parse_rbd_opts_token, rbd_opts);
4363         if (IS_ERR(copts)) {
4364                 ret = PTR_ERR(copts);
4365                 goto out_err;
4366         }
4367         kfree(options);
4368
4369         *ceph_opts = copts;
4370         *opts = rbd_opts;
4371         *rbd_spec = spec;
4372
4373         return 0;
4374 out_mem:
4375         ret = -ENOMEM;
4376 out_err:
4377         kfree(rbd_opts);
4378         rbd_spec_put(spec);
4379         kfree(options);
4380
4381         return ret;
4382 }
4383
4384 /*
4385  * An rbd format 2 image has a unique identifier, distinct from the
4386  * name given to it by the user.  Internally, that identifier is
4387  * what's used to specify the names of objects related to the image.
4388  *
4389  * A special "rbd id" object is used to map an rbd image name to its
4390  * id.  If that object doesn't exist, then there is no v2 rbd image
4391  * with the supplied name.
4392  *
4393  * This function will record the given rbd_dev's image_id field if
4394  * it can be determined, and in that case will return 0.  If any
4395  * errors occur a negative errno will be returned and the rbd_dev's
4396  * image_id field will be unchanged (and should be NULL).
4397  */
4398 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4399 {
4400         int ret;
4401         size_t size;
4402         char *object_name;
4403         void *response;
4404         char *image_id;
4405
4406         /*
4407          * When probing a parent image, the image id is already
4408          * known (and the image name likely is not).  There's no
4409          * need to fetch the image id again in this case.  We
4410          * do still need to set the image format though.
4411          */
4412         if (rbd_dev->spec->image_id) {
4413                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4414
4415                 return 0;
4416         }
4417
4418         /*
4419          * First, see if the format 2 image id file exists, and if
4420          * so, get the image's persistent id from it.
4421          */
4422         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4423         object_name = kmalloc(size, GFP_NOIO);
4424         if (!object_name)
4425                 return -ENOMEM;
4426         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4427         dout("rbd id object name is %s\n", object_name);
4428
4429         /* Response will be an encoded string, which includes a length */
4430
4431         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4432         response = kzalloc(size, GFP_NOIO);
4433         if (!response) {
4434                 ret = -ENOMEM;
4435                 goto out;
4436         }
4437
4438         /* If it doesn't exist we'll assume it's a format 1 image */
4439
4440         ret = rbd_obj_method_sync(rbd_dev, object_name,
4441                                 "rbd", "get_id", NULL, 0,
4442                                 response, RBD_IMAGE_ID_LEN_MAX);
4443         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4444         if (ret == -ENOENT) {
4445                 image_id = kstrdup("", GFP_KERNEL);
4446                 ret = image_id ? 0 : -ENOMEM;
4447                 if (!ret)
4448                         rbd_dev->image_format = 1;
4449         } else if (ret > sizeof (__le32)) {
4450                 void *p = response;
4451
4452                 image_id = ceph_extract_encoded_string(&p, p + ret,
4453                                                 NULL, GFP_NOIO);
4454                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4455                 if (!ret)
4456                         rbd_dev->image_format = 2;
4457         } else {
4458                 ret = -EINVAL;
4459         }
4460
4461         if (!ret) {
4462                 rbd_dev->spec->image_id = image_id;
4463                 dout("image_id is %s\n", image_id);
4464         }
4465 out:
4466         kfree(response);
4467         kfree(object_name);
4468
4469         return ret;
4470 }
4471
4472 /* Undo whatever state changes are made by v1 or v2 image probe */
4473
4474 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4475 {
4476         struct rbd_image_header *header;
4477
4478         rbd_dev_remove_parent(rbd_dev);
4479         rbd_spec_put(rbd_dev->parent_spec);
4480         rbd_dev->parent_spec = NULL;
4481         rbd_dev->parent_overlap = 0;
4482
4483         /* Free dynamic fields from the header, then zero it out */
4484
4485         header = &rbd_dev->header;
4486         ceph_put_snap_context(header->snapc);
4487         kfree(header->snap_sizes);
4488         kfree(header->snap_names);
4489         kfree(header->object_prefix);
4490         memset(header, 0, sizeof (*header));
4491 }
4492
4493 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4494 {
4495         int ret;
4496
4497         /* Populate rbd image metadata */
4498
4499         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4500         if (ret < 0)
4501                 goto out_err;
4502
4503         /* Version 1 images have no parent (no layering) */
4504
4505         rbd_dev->parent_spec = NULL;
4506         rbd_dev->parent_overlap = 0;
4507
4508         dout("discovered version 1 image, header name is %s\n",
4509                 rbd_dev->header_name);
4510
4511         return 0;
4512
4513 out_err:
4514         kfree(rbd_dev->header_name);
4515         rbd_dev->header_name = NULL;
4516         kfree(rbd_dev->spec->image_id);
4517         rbd_dev->spec->image_id = NULL;
4518
4519         return ret;
4520 }
4521
4522 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4523 {
4524         int ret;
4525
4526         ret = rbd_dev_v2_image_size(rbd_dev);
4527         if (ret)
4528                 goto out_err;
4529
4530         /* Get the object prefix (a.k.a. block_name) for the image */
4531
4532         ret = rbd_dev_v2_object_prefix(rbd_dev);
4533         if (ret)
4534                 goto out_err;
4535
4536         /* Get the and check features for the image */
4537
4538         ret = rbd_dev_v2_features(rbd_dev);
4539         if (ret)
4540                 goto out_err;
4541
4542         /* If the image supports layering, get the parent info */
4543
4544         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4545                 ret = rbd_dev_v2_parent_info(rbd_dev);
4546                 if (ret)
4547                         goto out_err;
4548                 /*
4549                  * Print a warning if this image has a parent.
4550                  * Don't print it if the image now being probed
4551                  * is itself a parent.  We can tell at this point
4552                  * because we won't know its pool name yet (just its
4553                  * pool id).
4554                  */
4555                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4556                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4557                                         "is EXPERIMENTAL!");
4558         }
4559
4560         /* If the image supports fancy striping, get its parameters */
4561
4562         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4563                 ret = rbd_dev_v2_striping_info(rbd_dev);
4564                 if (ret < 0)
4565                         goto out_err;
4566         }
4567
4568         /* crypto and compression type aren't (yet) supported for v2 images */
4569
4570         rbd_dev->header.crypt_type = 0;
4571         rbd_dev->header.comp_type = 0;
4572
4573         /* Get the snapshot context, plus the header version */
4574
4575         ret = rbd_dev_v2_snap_context(rbd_dev);
4576         if (ret)
4577                 goto out_err;
4578
4579         dout("discovered version 2 image, header name is %s\n",
4580                 rbd_dev->header_name);
4581
4582         return 0;
4583 out_err:
4584         rbd_dev->parent_overlap = 0;
4585         rbd_spec_put(rbd_dev->parent_spec);
4586         rbd_dev->parent_spec = NULL;
4587         kfree(rbd_dev->header_name);
4588         rbd_dev->header_name = NULL;
4589         kfree(rbd_dev->header.object_prefix);
4590         rbd_dev->header.object_prefix = NULL;
4591
4592         return ret;
4593 }
4594
4595 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4596 {
4597         struct rbd_device *parent = NULL;
4598         struct rbd_spec *parent_spec;
4599         struct rbd_client *rbdc;
4600         int ret;
4601
4602         if (!rbd_dev->parent_spec)
4603                 return 0;
4604         /*
4605          * We need to pass a reference to the client and the parent
4606          * spec when creating the parent rbd_dev.  Images related by
4607          * parent/child relationships always share both.
4608          */
4609         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4610         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4611
4612         ret = -ENOMEM;
4613         parent = rbd_dev_create(rbdc, parent_spec);
4614         if (!parent)
4615                 goto out_err;
4616
4617         ret = rbd_dev_image_probe(parent, true);
4618         if (ret < 0)
4619                 goto out_err;
4620         rbd_dev->parent = parent;
4621
4622         return 0;
4623 out_err:
4624         if (parent) {
4625                 rbd_spec_put(rbd_dev->parent_spec);
4626                 kfree(rbd_dev->header_name);
4627                 rbd_dev_destroy(parent);
4628         } else {
4629                 rbd_put_client(rbdc);
4630                 rbd_spec_put(parent_spec);
4631         }
4632
4633         return ret;
4634 }
4635
4636 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4637 {
4638         int ret;
4639
4640         ret = rbd_dev_mapping_set(rbd_dev);
4641         if (ret)
4642                 return ret;
4643
4644         /* generate unique id: find highest unique id, add one */
4645         rbd_dev_id_get(rbd_dev);
4646
4647         /* Fill in the device name, now that we have its id. */
4648         BUILD_BUG_ON(DEV_NAME_LEN
4649                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4650         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4651
4652         /* Get our block major device number. */
4653
4654         ret = register_blkdev(0, rbd_dev->name);
4655         if (ret < 0)
4656                 goto err_out_id;
4657         rbd_dev->major = ret;
4658
4659         /* Set up the blkdev mapping. */
4660
4661         ret = rbd_init_disk(rbd_dev);
4662         if (ret)
4663                 goto err_out_blkdev;
4664
4665         ret = rbd_bus_add_dev(rbd_dev);
4666         if (ret)
4667                 goto err_out_disk;
4668
4669         /* Everything's ready.  Announce the disk to the world. */
4670
4671         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4672         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4673         add_disk(rbd_dev->disk);
4674
4675         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4676                 (unsigned long long) rbd_dev->mapping.size);
4677
4678         return ret;
4679
4680 err_out_disk:
4681         rbd_free_disk(rbd_dev);
4682 err_out_blkdev:
4683         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4684 err_out_id:
4685         rbd_dev_id_put(rbd_dev);
4686         rbd_dev_mapping_clear(rbd_dev);
4687
4688         return ret;
4689 }
4690
4691 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4692 {
4693         struct rbd_spec *spec = rbd_dev->spec;
4694         size_t size;
4695
4696         /* Record the header object name for this rbd image. */
4697
4698         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4699
4700         if (rbd_dev->image_format == 1)
4701                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4702         else
4703                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4704
4705         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4706         if (!rbd_dev->header_name)
4707                 return -ENOMEM;
4708
4709         if (rbd_dev->image_format == 1)
4710                 sprintf(rbd_dev->header_name, "%s%s",
4711                         spec->image_name, RBD_SUFFIX);
4712         else
4713                 sprintf(rbd_dev->header_name, "%s%s",
4714                         RBD_HEADER_PREFIX, spec->image_id);
4715         return 0;
4716 }
4717
4718 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4719 {
4720         int ret;
4721
4722         rbd_dev_unprobe(rbd_dev);
4723         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4724         if (ret)
4725                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4726         kfree(rbd_dev->header_name);
4727         rbd_dev->header_name = NULL;
4728         rbd_dev->image_format = 0;
4729         kfree(rbd_dev->spec->image_id);
4730         rbd_dev->spec->image_id = NULL;
4731
4732         rbd_dev_destroy(rbd_dev);
4733 }
4734
4735 /*
4736  * Probe for the existence of the header object for the given rbd
4737  * device.  For format 2 images this includes determining the image
4738  * id.
4739  */
4740 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4741 {
4742         int ret;
4743         int tmp;
4744
4745         /*
4746          * Get the id from the image id object.  If it's not a
4747          * format 2 image, we'll get ENOENT back, and we'll assume
4748          * it's a format 1 image.
4749          */
4750         ret = rbd_dev_image_id(rbd_dev);
4751         if (ret)
4752                 return ret;
4753         rbd_assert(rbd_dev->spec->image_id);
4754         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4755
4756         ret = rbd_dev_header_name(rbd_dev);
4757         if (ret)
4758                 goto err_out_format;
4759
4760         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4761         if (ret)
4762                 goto out_header_name;
4763
4764         if (rbd_dev->image_format == 1)
4765                 ret = rbd_dev_v1_probe(rbd_dev);
4766         else
4767                 ret = rbd_dev_v2_probe(rbd_dev);
4768         if (ret)
4769                 goto err_out_watch;
4770
4771         ret = rbd_dev_spec_update(rbd_dev);
4772         if (ret)
4773                 goto err_out_probe;
4774
4775         /* If we are mapping a snapshot it must be marked read-only */
4776
4777         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4778                 read_only = true;
4779         rbd_dev->mapping.read_only = read_only;
4780
4781         ret = rbd_dev_probe_parent(rbd_dev);
4782         if (!ret)
4783                 return 0;
4784
4785 err_out_probe:
4786         rbd_dev_unprobe(rbd_dev);
4787 err_out_watch:
4788         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4789         if (tmp)
4790                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4791 out_header_name:
4792         kfree(rbd_dev->header_name);
4793         rbd_dev->header_name = NULL;
4794 err_out_format:
4795         rbd_dev->image_format = 0;
4796         kfree(rbd_dev->spec->image_id);
4797         rbd_dev->spec->image_id = NULL;
4798
4799         dout("probe failed, returning %d\n", ret);
4800
4801         return ret;
4802 }
4803
4804 static ssize_t rbd_add(struct bus_type *bus,
4805                        const char *buf,
4806                        size_t count)
4807 {
4808         struct rbd_device *rbd_dev = NULL;
4809         struct ceph_options *ceph_opts = NULL;
4810         struct rbd_options *rbd_opts = NULL;
4811         struct rbd_spec *spec = NULL;
4812         struct rbd_client *rbdc;
4813         struct ceph_osd_client *osdc;
4814         bool read_only;
4815         int rc = -ENOMEM;
4816
4817         if (!try_module_get(THIS_MODULE))
4818                 return -ENODEV;
4819
4820         /* parse add command */
4821         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4822         if (rc < 0)
4823                 goto err_out_module;
4824         read_only = rbd_opts->read_only;
4825         kfree(rbd_opts);
4826         rbd_opts = NULL;        /* done with this */
4827
4828         rbdc = rbd_get_client(ceph_opts);
4829         if (IS_ERR(rbdc)) {
4830                 rc = PTR_ERR(rbdc);
4831                 goto err_out_args;
4832         }
4833         ceph_opts = NULL;       /* rbd_dev client now owns this */
4834
4835         /* pick the pool */
4836         osdc = &rbdc->client->osdc;
4837         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4838         if (rc < 0)
4839                 goto err_out_client;
4840         spec->pool_id = (u64)rc;
4841
4842         /* The ceph file layout needs to fit pool id in 32 bits */
4843
4844         if (spec->pool_id > (u64)U32_MAX) {
4845                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4846                                 (unsigned long long)spec->pool_id, U32_MAX);
4847                 rc = -EIO;
4848                 goto err_out_client;
4849         }
4850
4851         rbd_dev = rbd_dev_create(rbdc, spec);
4852         if (!rbd_dev)
4853                 goto err_out_client;
4854         rbdc = NULL;            /* rbd_dev now owns this */
4855         spec = NULL;            /* rbd_dev now owns this */
4856
4857         rc = rbd_dev_image_probe(rbd_dev, read_only);
4858         if (rc < 0)
4859                 goto err_out_rbd_dev;
4860
4861         rc = rbd_dev_device_setup(rbd_dev);
4862         if (!rc)
4863                 return count;
4864
4865         rbd_dev_image_release(rbd_dev);
4866 err_out_rbd_dev:
4867         rbd_dev_destroy(rbd_dev);
4868 err_out_client:
4869         rbd_put_client(rbdc);
4870 err_out_args:
4871         if (ceph_opts)
4872                 ceph_destroy_options(ceph_opts);
4873         kfree(rbd_opts);
4874         rbd_spec_put(spec);
4875 err_out_module:
4876         module_put(THIS_MODULE);
4877
4878         dout("Error adding device %s\n", buf);
4879
4880         return (ssize_t)rc;
4881 }
4882
4883 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4884 {
4885         struct list_head *tmp;
4886         struct rbd_device *rbd_dev;
4887
4888         spin_lock(&rbd_dev_list_lock);
4889         list_for_each(tmp, &rbd_dev_list) {
4890                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4891                 if (rbd_dev->dev_id == dev_id) {
4892                         spin_unlock(&rbd_dev_list_lock);
4893                         return rbd_dev;
4894                 }
4895         }
4896         spin_unlock(&rbd_dev_list_lock);
4897         return NULL;
4898 }
4899
4900 static void rbd_dev_device_release(struct device *dev)
4901 {
4902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4903
4904         rbd_free_disk(rbd_dev);
4905         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4906         rbd_dev_mapping_clear(rbd_dev);
4907         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4908         rbd_dev->major = 0;
4909         rbd_dev_id_put(rbd_dev);
4910         rbd_dev_mapping_clear(rbd_dev);
4911 }
4912
4913 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4914 {
4915         while (rbd_dev->parent) {
4916                 struct rbd_device *first = rbd_dev;
4917                 struct rbd_device *second = first->parent;
4918                 struct rbd_device *third;
4919
4920                 /*
4921                  * Follow to the parent with no grandparent and
4922                  * remove it.
4923                  */
4924                 while (second && (third = second->parent)) {
4925                         first = second;
4926                         second = third;
4927                 }
4928                 rbd_assert(second);
4929                 rbd_dev_image_release(second);
4930                 first->parent = NULL;
4931                 first->parent_overlap = 0;
4932
4933                 rbd_assert(first->parent_spec);
4934                 rbd_spec_put(first->parent_spec);
4935                 first->parent_spec = NULL;
4936         }
4937 }
4938
4939 static ssize_t rbd_remove(struct bus_type *bus,
4940                           const char *buf,
4941                           size_t count)
4942 {
4943         struct rbd_device *rbd_dev = NULL;
4944         int target_id;
4945         unsigned long ul;
4946         int ret;
4947
4948         ret = strict_strtoul(buf, 10, &ul);
4949         if (ret)
4950                 return ret;
4951
4952         /* convert to int; abort if we lost anything in the conversion */
4953         target_id = (int) ul;
4954         if (target_id != ul)
4955                 return -EINVAL;
4956
4957         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4958
4959         rbd_dev = __rbd_get_dev(target_id);
4960         if (!rbd_dev) {
4961                 ret = -ENOENT;
4962                 goto done;
4963         }
4964
4965         spin_lock_irq(&rbd_dev->lock);
4966         if (rbd_dev->open_count)
4967                 ret = -EBUSY;
4968         else
4969                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4970         spin_unlock_irq(&rbd_dev->lock);
4971         if (ret < 0)
4972                 goto done;
4973         ret = count;
4974         rbd_bus_del_dev(rbd_dev);
4975         rbd_dev_image_release(rbd_dev);
4976         module_put(THIS_MODULE);
4977 done:
4978         mutex_unlock(&ctl_mutex);
4979
4980         return ret;
4981 }
4982
4983 /*
4984  * create control files in sysfs
4985  * /sys/bus/rbd/...
4986  */
4987 static int rbd_sysfs_init(void)
4988 {
4989         int ret;
4990
4991         ret = device_register(&rbd_root_dev);
4992         if (ret < 0)
4993                 return ret;
4994
4995         ret = bus_register(&rbd_bus_type);
4996         if (ret < 0)
4997                 device_unregister(&rbd_root_dev);
4998
4999         return ret;
5000 }
5001
5002 static void rbd_sysfs_cleanup(void)
5003 {
5004         bus_unregister(&rbd_bus_type);
5005         device_unregister(&rbd_root_dev);
5006 }
5007
5008 static int rbd_slab_init(void)
5009 {
5010         rbd_assert(!rbd_img_request_cache);
5011         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012                                         sizeof (struct rbd_img_request),
5013                                         __alignof__(struct rbd_img_request),
5014                                         0, NULL);
5015         if (!rbd_img_request_cache)
5016                 return -ENOMEM;
5017
5018         rbd_assert(!rbd_obj_request_cache);
5019         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020                                         sizeof (struct rbd_obj_request),
5021                                         __alignof__(struct rbd_obj_request),
5022                                         0, NULL);
5023         if (!rbd_obj_request_cache)
5024                 goto out_err;
5025
5026         rbd_assert(!rbd_segment_name_cache);
5027         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029         if (rbd_segment_name_cache)
5030                 return 0;
5031 out_err:
5032         if (rbd_obj_request_cache) {
5033                 kmem_cache_destroy(rbd_obj_request_cache);
5034                 rbd_obj_request_cache = NULL;
5035         }
5036
5037         kmem_cache_destroy(rbd_img_request_cache);
5038         rbd_img_request_cache = NULL;
5039
5040         return -ENOMEM;
5041 }
5042
5043 static void rbd_slab_exit(void)
5044 {
5045         rbd_assert(rbd_segment_name_cache);
5046         kmem_cache_destroy(rbd_segment_name_cache);
5047         rbd_segment_name_cache = NULL;
5048
5049         rbd_assert(rbd_obj_request_cache);
5050         kmem_cache_destroy(rbd_obj_request_cache);
5051         rbd_obj_request_cache = NULL;
5052
5053         rbd_assert(rbd_img_request_cache);
5054         kmem_cache_destroy(rbd_img_request_cache);
5055         rbd_img_request_cache = NULL;
5056 }
5057
5058 static int __init rbd_init(void)
5059 {
5060         int rc;
5061
5062         if (!libceph_compatible(NULL)) {
5063                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5064
5065                 return -EINVAL;
5066         }
5067         rc = rbd_slab_init();
5068         if (rc)
5069                 return rc;
5070         rc = rbd_sysfs_init();
5071         if (rc)
5072                 rbd_slab_exit();
5073         else
5074                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5075
5076         return rc;
5077 }
5078
5079 static void __exit rbd_exit(void)
5080 {
5081         rbd_sysfs_cleanup();
5082         rbd_slab_exit();
5083 }
5084
5085 module_init(rbd_init);
5086 module_exit(rbd_exit);
5087
5088 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090 MODULE_DESCRIPTION("rados block device");
5091
5092 /* following authorship retained from original osdblk.c */
5093 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5094
5095 MODULE_LICENSE("GPL");