]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: kill rbd_img_request_get()
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431                                         u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433                                 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435                 u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476
477         return 0;
478 }
479
480 static const struct block_device_operations rbd_bd_ops = {
481         .owner                  = THIS_MODULE,
482         .open                   = rbd_open,
483         .release                = rbd_release,
484 };
485
486 /*
487  * Initialize an rbd client instance.
488  * We own *ceph_opts.
489  */
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *rbdc;
493         int ret = -ENOMEM;
494
495         dout("%s:\n", __func__);
496         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497         if (!rbdc)
498                 goto out_opt;
499
500         kref_init(&rbdc->kref);
501         INIT_LIST_HEAD(&rbdc->node);
502
503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
505         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506         if (IS_ERR(rbdc->client))
507                 goto out_mutex;
508         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509
510         ret = ceph_open_session(rbdc->client);
511         if (ret < 0)
512                 goto out_err;
513
514         spin_lock(&rbd_client_list_lock);
515         list_add_tail(&rbdc->node, &rbd_client_list);
516         spin_unlock(&rbd_client_list_lock);
517
518         mutex_unlock(&ctl_mutex);
519         dout("%s: rbdc %p\n", __func__, rbdc);
520
521         return rbdc;
522
523 out_err:
524         ceph_destroy_client(rbdc->client);
525 out_mutex:
526         mutex_unlock(&ctl_mutex);
527         kfree(rbdc);
528 out_opt:
529         if (ceph_opts)
530                 ceph_destroy_options(ceph_opts);
531         dout("%s: error %d\n", __func__, ret);
532
533         return ERR_PTR(ret);
534 }
535
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 {
538         kref_get(&rbdc->kref);
539
540         return rbdc;
541 }
542
543 /*
544  * Find a ceph client with specific addr and configuration.  If
545  * found, bump its reference count.
546  */
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 {
549         struct rbd_client *client_node;
550         bool found = false;
551
552         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553                 return NULL;
554
555         spin_lock(&rbd_client_list_lock);
556         list_for_each_entry(client_node, &rbd_client_list, node) {
557                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558                         __rbd_get_client(client_node);
559
560                         found = true;
561                         break;
562                 }
563         }
564         spin_unlock(&rbd_client_list_lock);
565
566         return found ? client_node : NULL;
567 }
568
569 /*
570  * mount options
571  */
572 enum {
573         Opt_last_int,
574         /* int args above */
575         Opt_last_string,
576         /* string args above */
577         Opt_read_only,
578         Opt_read_write,
579         /* Boolean args above */
580         Opt_last_bool,
581 };
582
583 static match_table_t rbd_opts_tokens = {
584         /* int args above */
585         /* string args above */
586         {Opt_read_only, "read_only"},
587         {Opt_read_only, "ro"},          /* Alternate spelling */
588         {Opt_read_write, "read_write"},
589         {Opt_read_write, "rw"},         /* Alternate spelling */
590         /* Boolean args above */
591         {-1, NULL}
592 };
593
594 struct rbd_options {
595         bool    read_only;
596 };
597
598 #define RBD_READ_ONLY_DEFAULT   false
599
600 static int parse_rbd_opts_token(char *c, void *private)
601 {
602         struct rbd_options *rbd_opts = private;
603         substring_t argstr[MAX_OPT_ARGS];
604         int token, intval, ret;
605
606         token = match_token(c, rbd_opts_tokens, argstr);
607         if (token < 0)
608                 return -EINVAL;
609
610         if (token < Opt_last_int) {
611                 ret = match_int(&argstr[0], &intval);
612                 if (ret < 0) {
613                         pr_err("bad mount option arg (not int) "
614                                "at '%s'\n", c);
615                         return ret;
616                 }
617                 dout("got int token %d val %d\n", token, intval);
618         } else if (token > Opt_last_int && token < Opt_last_string) {
619                 dout("got string token %d val %s\n", token,
620                      argstr[0].from);
621         } else if (token > Opt_last_string && token < Opt_last_bool) {
622                 dout("got Boolean token %d\n", token);
623         } else {
624                 dout("got token %d\n", token);
625         }
626
627         switch (token) {
628         case Opt_read_only:
629                 rbd_opts->read_only = true;
630                 break;
631         case Opt_read_write:
632                 rbd_opts->read_only = false;
633                 break;
634         default:
635                 rbd_assert(false);
636                 break;
637         }
638         return 0;
639 }
640
641 /*
642  * Get a ceph client with specific addr and configuration, if one does
643  * not exist create it.
644  */
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 {
647         struct rbd_client *rbdc;
648
649         rbdc = rbd_client_find(ceph_opts);
650         if (rbdc)       /* using an existing client */
651                 ceph_destroy_options(ceph_opts);
652         else
653                 rbdc = rbd_client_create(ceph_opts);
654
655         return rbdc;
656 }
657
658 /*
659  * Destroy ceph client
660  *
661  * Caller must hold rbd_client_list_lock.
662  */
663 static void rbd_client_release(struct kref *kref)
664 {
665         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
667         dout("%s: rbdc %p\n", __func__, rbdc);
668         spin_lock(&rbd_client_list_lock);
669         list_del(&rbdc->node);
670         spin_unlock(&rbd_client_list_lock);
671
672         ceph_destroy_client(rbdc->client);
673         kfree(rbdc);
674 }
675
676 /*
677  * Drop reference to ceph client node. If it's not referenced anymore, release
678  * it.
679  */
680 static void rbd_put_client(struct rbd_client *rbdc)
681 {
682         if (rbdc)
683                 kref_put(&rbdc->kref, rbd_client_release);
684 }
685
686 static bool rbd_image_format_valid(u32 image_format)
687 {
688         return image_format == 1 || image_format == 2;
689 }
690
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692 {
693         size_t size;
694         u32 snap_count;
695
696         /* The header has to start with the magic rbd header text */
697         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698                 return false;
699
700         /* The bio layer requires at least sector-sized I/O */
701
702         if (ondisk->options.order < SECTOR_SHIFT)
703                 return false;
704
705         /* If we use u64 in a few spots we may be able to loosen this */
706
707         if (ondisk->options.order > 8 * sizeof (int) - 1)
708                 return false;
709
710         /*
711          * The size of a snapshot header has to fit in a size_t, and
712          * that limits the number of snapshots.
713          */
714         snap_count = le32_to_cpu(ondisk->snap_count);
715         size = SIZE_MAX - sizeof (struct ceph_snap_context);
716         if (snap_count > size / sizeof (__le64))
717                 return false;
718
719         /*
720          * Not only that, but the size of the entire the snapshot
721          * header must also be representable in a size_t.
722          */
723         size -= snap_count * sizeof (__le64);
724         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725                 return false;
726
727         return true;
728 }
729
730 /*
731  * Fill an rbd image header with information from the given format 1
732  * on-disk header.
733  */
734 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
735                                  struct rbd_image_header_ondisk *ondisk)
736 {
737         struct rbd_image_header *header = &rbd_dev->header;
738         bool first_time = header->object_prefix == NULL;
739         struct ceph_snap_context *snapc;
740         char *object_prefix = NULL;
741         char *snap_names = NULL;
742         u64 *snap_sizes = NULL;
743         u32 snap_count;
744         size_t size;
745         int ret = -ENOMEM;
746         u32 i;
747
748         /* Allocate this now to avoid having to handle failure below */
749
750         if (first_time) {
751                 size_t len;
752
753                 len = strnlen(ondisk->object_prefix,
754                                 sizeof (ondisk->object_prefix));
755                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
756                 if (!object_prefix)
757                         return -ENOMEM;
758                 memcpy(object_prefix, ondisk->object_prefix, len);
759                 object_prefix[len] = '\0';
760         }
761
762         /* Allocate the snapshot context and fill it in */
763
764         snap_count = le32_to_cpu(ondisk->snap_count);
765         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
766         if (!snapc)
767                 goto out_err;
768         snapc->seq = le64_to_cpu(ondisk->snap_seq);
769         if (snap_count) {
770                 struct rbd_image_snap_ondisk *snaps;
771                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
772
773                 /* We'll keep a copy of the snapshot names... */
774
775                 if (snap_names_len > (u64)SIZE_MAX)
776                         goto out_2big;
777                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
778                 if (!snap_names)
779                         goto out_err;
780
781                 /* ...as well as the array of their sizes. */
782
783                 size = snap_count * sizeof (*header->snap_sizes);
784                 snap_sizes = kmalloc(size, GFP_KERNEL);
785                 if (!snap_sizes)
786                         goto out_err;
787
788                 /*
789                  * Copy the names, and fill in each snapshot's id
790                  * and size.
791                  *
792                  * Note that rbd_dev_v1_header_info() guarantees the
793                  * ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798                 snaps = ondisk->snaps;
799                 for (i = 0; i < snap_count; i++) {
800                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
802                 }
803         }
804
805         /* We won't fail any more, fill in the header */
806
807         down_write(&rbd_dev->header_rwsem);
808         if (first_time) {
809                 header->object_prefix = object_prefix;
810                 header->obj_order = ondisk->options.order;
811                 header->crypt_type = ondisk->options.crypt_type;
812                 header->comp_type = ondisk->options.comp_type;
813                 /* The rest aren't used for format 1 images */
814                 header->stripe_unit = 0;
815                 header->stripe_count = 0;
816                 header->features = 0;
817         } else {
818                 ceph_put_snap_context(header->snapc);
819                 kfree(header->snap_names);
820                 kfree(header->snap_sizes);
821         }
822
823         /* The remaining fields always get updated (when we refresh) */
824
825         header->image_size = le64_to_cpu(ondisk->image_size);
826         header->snapc = snapc;
827         header->snap_names = snap_names;
828         header->snap_sizes = snap_sizes;
829
830         /* Make sure mapping size is consistent with header info */
831
832         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833                 if (rbd_dev->mapping.size != header->image_size)
834                         rbd_dev->mapping.size = header->image_size;
835
836         up_write(&rbd_dev->header_rwsem);
837
838         return 0;
839 out_2big:
840         ret = -EIO;
841 out_err:
842         kfree(snap_sizes);
843         kfree(snap_names);
844         ceph_put_snap_context(snapc);
845         kfree(object_prefix);
846
847         return ret;
848 }
849
850 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
851 {
852         const char *snap_name;
853
854         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
855
856         /* Skip over names until we find the one we are looking for */
857
858         snap_name = rbd_dev->header.snap_names;
859         while (which--)
860                 snap_name += strlen(snap_name) + 1;
861
862         return kstrdup(snap_name, GFP_KERNEL);
863 }
864
865 /*
866  * Snapshot id comparison function for use with qsort()/bsearch().
867  * Note that result is for snapshots in *descending* order.
868  */
869 static int snapid_compare_reverse(const void *s1, const void *s2)
870 {
871         u64 snap_id1 = *(u64 *)s1;
872         u64 snap_id2 = *(u64 *)s2;
873
874         if (snap_id1 < snap_id2)
875                 return 1;
876         return snap_id1 == snap_id2 ? 0 : -1;
877 }
878
879 /*
880  * Search a snapshot context to see if the given snapshot id is
881  * present.
882  *
883  * Returns the position of the snapshot id in the array if it's found,
884  * or BAD_SNAP_INDEX otherwise.
885  *
886  * Note: The snapshot array is in kept sorted (by the osd) in
887  * reverse order, highest snapshot id first.
888  */
889 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
890 {
891         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
892         u64 *found;
893
894         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895                                 sizeof (snap_id), snapid_compare_reverse);
896
897         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
898 }
899
900 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
901                                         u64 snap_id)
902 {
903         u32 which;
904
905         which = rbd_dev_snap_index(rbd_dev, snap_id);
906         if (which == BAD_SNAP_INDEX)
907                 return NULL;
908
909         return _rbd_dev_v1_snap_name(rbd_dev, which);
910 }
911
912 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
913 {
914         if (snap_id == CEPH_NOSNAP)
915                 return RBD_SNAP_HEAD_NAME;
916
917         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918         if (rbd_dev->image_format == 1)
919                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
920
921         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
922 }
923
924 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
925                                 u64 *snap_size)
926 {
927         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928         if (snap_id == CEPH_NOSNAP) {
929                 *snap_size = rbd_dev->header.image_size;
930         } else if (rbd_dev->image_format == 1) {
931                 u32 which;
932
933                 which = rbd_dev_snap_index(rbd_dev, snap_id);
934                 if (which == BAD_SNAP_INDEX)
935                         return -ENOENT;
936
937                 *snap_size = rbd_dev->header.snap_sizes[which];
938         } else {
939                 u64 size = 0;
940                 int ret;
941
942                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
943                 if (ret)
944                         return ret;
945
946                 *snap_size = size;
947         }
948         return 0;
949 }
950
951 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
952                         u64 *snap_features)
953 {
954         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955         if (snap_id == CEPH_NOSNAP) {
956                 *snap_features = rbd_dev->header.features;
957         } else if (rbd_dev->image_format == 1) {
958                 *snap_features = 0;     /* No features for format 1 */
959         } else {
960                 u64 features = 0;
961                 int ret;
962
963                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
964                 if (ret)
965                         return ret;
966
967                 *snap_features = features;
968         }
969         return 0;
970 }
971
972 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
973 {
974         u64 snap_id = rbd_dev->spec->snap_id;
975         u64 size = 0;
976         u64 features = 0;
977         int ret;
978
979         ret = rbd_snap_size(rbd_dev, snap_id, &size);
980         if (ret)
981                 return ret;
982         ret = rbd_snap_features(rbd_dev, snap_id, &features);
983         if (ret)
984                 return ret;
985
986         rbd_dev->mapping.size = size;
987         rbd_dev->mapping.features = features;
988
989         return 0;
990 }
991
992 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
993 {
994         rbd_dev->mapping.size = 0;
995         rbd_dev->mapping.features = 0;
996 }
997
998 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
999 {
1000         char *name;
1001         u64 segment;
1002         int ret;
1003
1004         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1005         if (!name)
1006                 return NULL;
1007         segment = offset >> rbd_dev->header.obj_order;
1008         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1009                         rbd_dev->header.object_prefix, segment);
1010         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1011                 pr_err("error formatting segment name for #%llu (%d)\n",
1012                         segment, ret);
1013                 kfree(name);
1014                 name = NULL;
1015         }
1016
1017         return name;
1018 }
1019
1020 static void rbd_segment_name_free(const char *name)
1021 {
1022         /* The explicit cast here is needed to drop the const qualifier */
1023
1024         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1025 }
1026
1027 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1028 {
1029         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1030
1031         return offset & (segment_size - 1);
1032 }
1033
1034 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035                                 u64 offset, u64 length)
1036 {
1037         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1038
1039         offset &= segment_size - 1;
1040
1041         rbd_assert(length <= U64_MAX - offset);
1042         if (offset + length > segment_size)
1043                 length = segment_size - offset;
1044
1045         return length;
1046 }
1047
1048 /*
1049  * returns the size of an object in the image
1050  */
1051 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1052 {
1053         return 1 << header->obj_order;
1054 }
1055
1056 /*
1057  * bio helpers
1058  */
1059
1060 static void bio_chain_put(struct bio *chain)
1061 {
1062         struct bio *tmp;
1063
1064         while (chain) {
1065                 tmp = chain;
1066                 chain = chain->bi_next;
1067                 bio_put(tmp);
1068         }
1069 }
1070
1071 /*
1072  * zeros a bio chain, starting at specific offset
1073  */
1074 static void zero_bio_chain(struct bio *chain, int start_ofs)
1075 {
1076         struct bio_vec *bv;
1077         unsigned long flags;
1078         void *buf;
1079         int i;
1080         int pos = 0;
1081
1082         while (chain) {
1083                 bio_for_each_segment(bv, chain, i) {
1084                         if (pos + bv->bv_len > start_ofs) {
1085                                 int remainder = max(start_ofs - pos, 0);
1086                                 buf = bvec_kmap_irq(bv, &flags);
1087                                 memset(buf + remainder, 0,
1088                                        bv->bv_len - remainder);
1089                                 bvec_kunmap_irq(buf, &flags);
1090                         }
1091                         pos += bv->bv_len;
1092                 }
1093
1094                 chain = chain->bi_next;
1095         }
1096 }
1097
1098 /*
1099  * similar to zero_bio_chain(), zeros data defined by a page array,
1100  * starting at the given byte offset from the start of the array and
1101  * continuing up to the given end offset.  The pages array is
1102  * assumed to be big enough to hold all bytes up to the end.
1103  */
1104 static void zero_pages(struct page **pages, u64 offset, u64 end)
1105 {
1106         struct page **page = &pages[offset >> PAGE_SHIFT];
1107
1108         rbd_assert(end > offset);
1109         rbd_assert(end - offset <= (u64)SIZE_MAX);
1110         while (offset < end) {
1111                 size_t page_offset;
1112                 size_t length;
1113                 unsigned long flags;
1114                 void *kaddr;
1115
1116                 page_offset = (size_t)(offset & ~PAGE_MASK);
1117                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118                 local_irq_save(flags);
1119                 kaddr = kmap_atomic(*page);
1120                 memset(kaddr + page_offset, 0, length);
1121                 kunmap_atomic(kaddr);
1122                 local_irq_restore(flags);
1123
1124                 offset += length;
1125                 page++;
1126         }
1127 }
1128
1129 /*
1130  * Clone a portion of a bio, starting at the given byte offset
1131  * and continuing for the number of bytes indicated.
1132  */
1133 static struct bio *bio_clone_range(struct bio *bio_src,
1134                                         unsigned int offset,
1135                                         unsigned int len,
1136                                         gfp_t gfpmask)
1137 {
1138         struct bio_vec *bv;
1139         unsigned int resid;
1140         unsigned short idx;
1141         unsigned int voff;
1142         unsigned short end_idx;
1143         unsigned short vcnt;
1144         struct bio *bio;
1145
1146         /* Handle the easy case for the caller */
1147
1148         if (!offset && len == bio_src->bi_size)
1149                 return bio_clone(bio_src, gfpmask);
1150
1151         if (WARN_ON_ONCE(!len))
1152                 return NULL;
1153         if (WARN_ON_ONCE(len > bio_src->bi_size))
1154                 return NULL;
1155         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1156                 return NULL;
1157
1158         /* Find first affected segment... */
1159
1160         resid = offset;
1161         __bio_for_each_segment(bv, bio_src, idx, 0) {
1162                 if (resid < bv->bv_len)
1163                         break;
1164                 resid -= bv->bv_len;
1165         }
1166         voff = resid;
1167
1168         /* ...and the last affected segment */
1169
1170         resid += len;
1171         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172                 if (resid <= bv->bv_len)
1173                         break;
1174                 resid -= bv->bv_len;
1175         }
1176         vcnt = end_idx - idx + 1;
1177
1178         /* Build the clone */
1179
1180         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1181         if (!bio)
1182                 return NULL;    /* ENOMEM */
1183
1184         bio->bi_bdev = bio_src->bi_bdev;
1185         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186         bio->bi_rw = bio_src->bi_rw;
1187         bio->bi_flags |= 1 << BIO_CLONED;
1188
1189         /*
1190          * Copy over our part of the bio_vec, then update the first
1191          * and last (or only) entries.
1192          */
1193         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194                         vcnt * sizeof (struct bio_vec));
1195         bio->bi_io_vec[0].bv_offset += voff;
1196         if (vcnt > 1) {
1197                 bio->bi_io_vec[0].bv_len -= voff;
1198                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1199         } else {
1200                 bio->bi_io_vec[0].bv_len = len;
1201         }
1202
1203         bio->bi_vcnt = vcnt;
1204         bio->bi_size = len;
1205         bio->bi_idx = 0;
1206
1207         return bio;
1208 }
1209
1210 /*
1211  * Clone a portion of a bio chain, starting at the given byte offset
1212  * into the first bio in the source chain and continuing for the
1213  * number of bytes indicated.  The result is another bio chain of
1214  * exactly the given length, or a null pointer on error.
1215  *
1216  * The bio_src and offset parameters are both in-out.  On entry they
1217  * refer to the first source bio and the offset into that bio where
1218  * the start of data to be cloned is located.
1219  *
1220  * On return, bio_src is updated to refer to the bio in the source
1221  * chain that contains first un-cloned byte, and *offset will
1222  * contain the offset of that byte within that bio.
1223  */
1224 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225                                         unsigned int *offset,
1226                                         unsigned int len,
1227                                         gfp_t gfpmask)
1228 {
1229         struct bio *bi = *bio_src;
1230         unsigned int off = *offset;
1231         struct bio *chain = NULL;
1232         struct bio **end;
1233
1234         /* Build up a chain of clone bios up to the limit */
1235
1236         if (!bi || off >= bi->bi_size || !len)
1237                 return NULL;            /* Nothing to clone */
1238
1239         end = &chain;
1240         while (len) {
1241                 unsigned int bi_size;
1242                 struct bio *bio;
1243
1244                 if (!bi) {
1245                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1246                         goto out_err;   /* EINVAL; ran out of bio's */
1247                 }
1248                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1250                 if (!bio)
1251                         goto out_err;   /* ENOMEM */
1252
1253                 *end = bio;
1254                 end = &bio->bi_next;
1255
1256                 off += bi_size;
1257                 if (off == bi->bi_size) {
1258                         bi = bi->bi_next;
1259                         off = 0;
1260                 }
1261                 len -= bi_size;
1262         }
1263         *bio_src = bi;
1264         *offset = off;
1265
1266         return chain;
1267 out_err:
1268         bio_chain_put(chain);
1269
1270         return NULL;
1271 }
1272
1273 /*
1274  * The default/initial value for all object request flags is 0.  For
1275  * each flag, once its value is set to 1 it is never reset to 0
1276  * again.
1277  */
1278 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1279 {
1280         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1281                 struct rbd_device *rbd_dev;
1282
1283                 rbd_dev = obj_request->img_request->rbd_dev;
1284                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1285                         obj_request);
1286         }
1287 }
1288
1289 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1290 {
1291         smp_mb();
1292         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1293 }
1294
1295 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1296 {
1297         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298                 struct rbd_device *rbd_dev = NULL;
1299
1300                 if (obj_request_img_data_test(obj_request))
1301                         rbd_dev = obj_request->img_request->rbd_dev;
1302                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1303                         obj_request);
1304         }
1305 }
1306
1307 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1308 {
1309         smp_mb();
1310         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1311 }
1312
1313 /*
1314  * This sets the KNOWN flag after (possibly) setting the EXISTS
1315  * flag.  The latter is set based on the "exists" value provided.
1316  *
1317  * Note that for our purposes once an object exists it never goes
1318  * away again.  It's possible that the response from two existence
1319  * checks are separated by the creation of the target object, and
1320  * the first ("doesn't exist") response arrives *after* the second
1321  * ("does exist").  In that case we ignore the second one.
1322  */
1323 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1324                                 bool exists)
1325 {
1326         if (exists)
1327                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1329         smp_mb();
1330 }
1331
1332 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1333 {
1334         smp_mb();
1335         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1336 }
1337
1338 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1339 {
1340         smp_mb();
1341         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1342 }
1343
1344 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1345 {
1346         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347                 atomic_read(&obj_request->kref.refcount));
1348         kref_get(&obj_request->kref);
1349 }
1350
1351 static void rbd_obj_request_destroy(struct kref *kref);
1352 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1353 {
1354         rbd_assert(obj_request != NULL);
1355         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356                 atomic_read(&obj_request->kref.refcount));
1357         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1358 }
1359
1360 static void rbd_img_request_destroy(struct kref *kref);
1361 static void rbd_img_request_put(struct rbd_img_request *img_request)
1362 {
1363         rbd_assert(img_request != NULL);
1364         dout("%s: img %p (was %d)\n", __func__, img_request,
1365                 atomic_read(&img_request->kref.refcount));
1366         kref_put(&img_request->kref, rbd_img_request_destroy);
1367 }
1368
1369 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1370                                         struct rbd_obj_request *obj_request)
1371 {
1372         rbd_assert(obj_request->img_request == NULL);
1373
1374         /* Image request now owns object's original reference */
1375         obj_request->img_request = img_request;
1376         obj_request->which = img_request->obj_request_count;
1377         rbd_assert(!obj_request_img_data_test(obj_request));
1378         obj_request_img_data_set(obj_request);
1379         rbd_assert(obj_request->which != BAD_WHICH);
1380         img_request->obj_request_count++;
1381         list_add_tail(&obj_request->links, &img_request->obj_requests);
1382         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1383                 obj_request->which);
1384 }
1385
1386 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1387                                         struct rbd_obj_request *obj_request)
1388 {
1389         rbd_assert(obj_request->which != BAD_WHICH);
1390
1391         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1392                 obj_request->which);
1393         list_del(&obj_request->links);
1394         rbd_assert(img_request->obj_request_count > 0);
1395         img_request->obj_request_count--;
1396         rbd_assert(obj_request->which == img_request->obj_request_count);
1397         obj_request->which = BAD_WHICH;
1398         rbd_assert(obj_request_img_data_test(obj_request));
1399         rbd_assert(obj_request->img_request == img_request);
1400         obj_request->img_request = NULL;
1401         obj_request->callback = NULL;
1402         rbd_obj_request_put(obj_request);
1403 }
1404
1405 static bool obj_request_type_valid(enum obj_request_type type)
1406 {
1407         switch (type) {
1408         case OBJ_REQUEST_NODATA:
1409         case OBJ_REQUEST_BIO:
1410         case OBJ_REQUEST_PAGES:
1411                 return true;
1412         default:
1413                 return false;
1414         }
1415 }
1416
1417 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1418                                 struct rbd_obj_request *obj_request)
1419 {
1420         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1421
1422         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1423 }
1424
1425 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1426 {
1427
1428         dout("%s: img %p\n", __func__, img_request);
1429
1430         /*
1431          * If no error occurred, compute the aggregate transfer
1432          * count for the image request.  We could instead use
1433          * atomic64_cmpxchg() to update it as each object request
1434          * completes; not clear which way is better off hand.
1435          */
1436         if (!img_request->result) {
1437                 struct rbd_obj_request *obj_request;
1438                 u64 xferred = 0;
1439
1440                 for_each_obj_request(img_request, obj_request)
1441                         xferred += obj_request->xferred;
1442                 img_request->xferred = xferred;
1443         }
1444
1445         if (img_request->callback)
1446                 img_request->callback(img_request);
1447         else
1448                 rbd_img_request_put(img_request);
1449 }
1450
1451 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1452
1453 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1454 {
1455         dout("%s: obj %p\n", __func__, obj_request);
1456
1457         return wait_for_completion_interruptible(&obj_request->completion);
1458 }
1459
1460 /*
1461  * The default/initial value for all image request flags is 0.  Each
1462  * is conditionally set to 1 at image request initialization time
1463  * and currently never change thereafter.
1464  */
1465 static void img_request_write_set(struct rbd_img_request *img_request)
1466 {
1467         set_bit(IMG_REQ_WRITE, &img_request->flags);
1468         smp_mb();
1469 }
1470
1471 static bool img_request_write_test(struct rbd_img_request *img_request)
1472 {
1473         smp_mb();
1474         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1475 }
1476
1477 static void img_request_child_set(struct rbd_img_request *img_request)
1478 {
1479         set_bit(IMG_REQ_CHILD, &img_request->flags);
1480         smp_mb();
1481 }
1482
1483 static bool img_request_child_test(struct rbd_img_request *img_request)
1484 {
1485         smp_mb();
1486         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1487 }
1488
1489 static void img_request_layered_set(struct rbd_img_request *img_request)
1490 {
1491         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1492         smp_mb();
1493 }
1494
1495 static bool img_request_layered_test(struct rbd_img_request *img_request)
1496 {
1497         smp_mb();
1498         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1499 }
1500
1501 static void
1502 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1503 {
1504         u64 xferred = obj_request->xferred;
1505         u64 length = obj_request->length;
1506
1507         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1508                 obj_request, obj_request->img_request, obj_request->result,
1509                 xferred, length);
1510         /*
1511          * ENOENT means a hole in the image.  We zero-fill the
1512          * entire length of the request.  A short read also implies
1513          * zero-fill to the end of the request.  Either way we
1514          * update the xferred count to indicate the whole request
1515          * was satisfied.
1516          */
1517         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1518         if (obj_request->result == -ENOENT) {
1519                 if (obj_request->type == OBJ_REQUEST_BIO)
1520                         zero_bio_chain(obj_request->bio_list, 0);
1521                 else
1522                         zero_pages(obj_request->pages, 0, length);
1523                 obj_request->result = 0;
1524                 obj_request->xferred = length;
1525         } else if (xferred < length && !obj_request->result) {
1526                 if (obj_request->type == OBJ_REQUEST_BIO)
1527                         zero_bio_chain(obj_request->bio_list, xferred);
1528                 else
1529                         zero_pages(obj_request->pages, xferred, length);
1530                 obj_request->xferred = length;
1531         }
1532         obj_request_done_set(obj_request);
1533 }
1534
1535 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1536 {
1537         dout("%s: obj %p cb %p\n", __func__, obj_request,
1538                 obj_request->callback);
1539         if (obj_request->callback)
1540                 obj_request->callback(obj_request);
1541         else
1542                 complete_all(&obj_request->completion);
1543 }
1544
1545 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1546 {
1547         dout("%s: obj %p\n", __func__, obj_request);
1548         obj_request_done_set(obj_request);
1549 }
1550
1551 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1552 {
1553         struct rbd_img_request *img_request = NULL;
1554         struct rbd_device *rbd_dev = NULL;
1555         bool layered = false;
1556
1557         if (obj_request_img_data_test(obj_request)) {
1558                 img_request = obj_request->img_request;
1559                 layered = img_request && img_request_layered_test(img_request);
1560                 rbd_dev = img_request->rbd_dev;
1561         }
1562
1563         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1564                 obj_request, img_request, obj_request->result,
1565                 obj_request->xferred, obj_request->length);
1566         if (layered && obj_request->result == -ENOENT &&
1567                         obj_request->img_offset < rbd_dev->parent_overlap)
1568                 rbd_img_parent_read(obj_request);
1569         else if (img_request)
1570                 rbd_img_obj_request_read_callback(obj_request);
1571         else
1572                 obj_request_done_set(obj_request);
1573 }
1574
1575 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1576 {
1577         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1578                 obj_request->result, obj_request->length);
1579         /*
1580          * There is no such thing as a successful short write.  Set
1581          * it to our originally-requested length.
1582          */
1583         obj_request->xferred = obj_request->length;
1584         obj_request_done_set(obj_request);
1585 }
1586
1587 /*
1588  * For a simple stat call there's nothing to do.  We'll do more if
1589  * this is part of a write sequence for a layered image.
1590  */
1591 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1592 {
1593         dout("%s: obj %p\n", __func__, obj_request);
1594         obj_request_done_set(obj_request);
1595 }
1596
1597 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1598                                 struct ceph_msg *msg)
1599 {
1600         struct rbd_obj_request *obj_request = osd_req->r_priv;
1601         u16 opcode;
1602
1603         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1604         rbd_assert(osd_req == obj_request->osd_req);
1605         if (obj_request_img_data_test(obj_request)) {
1606                 rbd_assert(obj_request->img_request);
1607                 rbd_assert(obj_request->which != BAD_WHICH);
1608         } else {
1609                 rbd_assert(obj_request->which == BAD_WHICH);
1610         }
1611
1612         if (osd_req->r_result < 0)
1613                 obj_request->result = osd_req->r_result;
1614
1615         BUG_ON(osd_req->r_num_ops > 2);
1616
1617         /*
1618          * We support a 64-bit length, but ultimately it has to be
1619          * passed to blk_end_request(), which takes an unsigned int.
1620          */
1621         obj_request->xferred = osd_req->r_reply_op_len[0];
1622         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1623         opcode = osd_req->r_ops[0].op;
1624         switch (opcode) {
1625         case CEPH_OSD_OP_READ:
1626                 rbd_osd_read_callback(obj_request);
1627                 break;
1628         case CEPH_OSD_OP_WRITE:
1629                 rbd_osd_write_callback(obj_request);
1630                 break;
1631         case CEPH_OSD_OP_STAT:
1632                 rbd_osd_stat_callback(obj_request);
1633                 break;
1634         case CEPH_OSD_OP_CALL:
1635         case CEPH_OSD_OP_NOTIFY_ACK:
1636         case CEPH_OSD_OP_WATCH:
1637                 rbd_osd_trivial_callback(obj_request);
1638                 break;
1639         default:
1640                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1641                         obj_request->object_name, (unsigned short) opcode);
1642                 break;
1643         }
1644
1645         if (obj_request_done_test(obj_request))
1646                 rbd_obj_request_complete(obj_request);
1647 }
1648
1649 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1650 {
1651         struct rbd_img_request *img_request = obj_request->img_request;
1652         struct ceph_osd_request *osd_req = obj_request->osd_req;
1653         u64 snap_id;
1654
1655         rbd_assert(osd_req != NULL);
1656
1657         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1658         ceph_osdc_build_request(osd_req, obj_request->offset,
1659                         NULL, snap_id, NULL);
1660 }
1661
1662 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1663 {
1664         struct rbd_img_request *img_request = obj_request->img_request;
1665         struct ceph_osd_request *osd_req = obj_request->osd_req;
1666         struct ceph_snap_context *snapc;
1667         struct timespec mtime = CURRENT_TIME;
1668
1669         rbd_assert(osd_req != NULL);
1670
1671         snapc = img_request ? img_request->snapc : NULL;
1672         ceph_osdc_build_request(osd_req, obj_request->offset,
1673                         snapc, CEPH_NOSNAP, &mtime);
1674 }
1675
1676 static struct ceph_osd_request *rbd_osd_req_create(
1677                                         struct rbd_device *rbd_dev,
1678                                         bool write_request,
1679                                         struct rbd_obj_request *obj_request)
1680 {
1681         struct ceph_snap_context *snapc = NULL;
1682         struct ceph_osd_client *osdc;
1683         struct ceph_osd_request *osd_req;
1684
1685         if (obj_request_img_data_test(obj_request)) {
1686                 struct rbd_img_request *img_request = obj_request->img_request;
1687
1688                 rbd_assert(write_request ==
1689                                 img_request_write_test(img_request));
1690                 if (write_request)
1691                         snapc = img_request->snapc;
1692         }
1693
1694         /* Allocate and initialize the request, for the single op */
1695
1696         osdc = &rbd_dev->rbd_client->client->osdc;
1697         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1698         if (!osd_req)
1699                 return NULL;    /* ENOMEM */
1700
1701         if (write_request)
1702                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1703         else
1704                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1705
1706         osd_req->r_callback = rbd_osd_req_callback;
1707         osd_req->r_priv = obj_request;
1708
1709         osd_req->r_oid_len = strlen(obj_request->object_name);
1710         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1711         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1712
1713         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1714
1715         return osd_req;
1716 }
1717
1718 /*
1719  * Create a copyup osd request based on the information in the
1720  * object request supplied.  A copyup request has two osd ops,
1721  * a copyup method call, and a "normal" write request.
1722  */
1723 static struct ceph_osd_request *
1724 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1725 {
1726         struct rbd_img_request *img_request;
1727         struct ceph_snap_context *snapc;
1728         struct rbd_device *rbd_dev;
1729         struct ceph_osd_client *osdc;
1730         struct ceph_osd_request *osd_req;
1731
1732         rbd_assert(obj_request_img_data_test(obj_request));
1733         img_request = obj_request->img_request;
1734         rbd_assert(img_request);
1735         rbd_assert(img_request_write_test(img_request));
1736
1737         /* Allocate and initialize the request, for the two ops */
1738
1739         snapc = img_request->snapc;
1740         rbd_dev = img_request->rbd_dev;
1741         osdc = &rbd_dev->rbd_client->client->osdc;
1742         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1743         if (!osd_req)
1744                 return NULL;    /* ENOMEM */
1745
1746         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1747         osd_req->r_callback = rbd_osd_req_callback;
1748         osd_req->r_priv = obj_request;
1749
1750         osd_req->r_oid_len = strlen(obj_request->object_name);
1751         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1752         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1753
1754         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1755
1756         return osd_req;
1757 }
1758
1759
1760 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1761 {
1762         ceph_osdc_put_request(osd_req);
1763 }
1764
1765 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1766
1767 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1768                                                 u64 offset, u64 length,
1769                                                 enum obj_request_type type)
1770 {
1771         struct rbd_obj_request *obj_request;
1772         size_t size;
1773         char *name;
1774
1775         rbd_assert(obj_request_type_valid(type));
1776
1777         size = strlen(object_name) + 1;
1778         name = kmalloc(size, GFP_KERNEL);
1779         if (!name)
1780                 return NULL;
1781
1782         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1783         if (!obj_request) {
1784                 kfree(name);
1785                 return NULL;
1786         }
1787
1788         obj_request->object_name = memcpy(name, object_name, size);
1789         obj_request->offset = offset;
1790         obj_request->length = length;
1791         obj_request->flags = 0;
1792         obj_request->which = BAD_WHICH;
1793         obj_request->type = type;
1794         INIT_LIST_HEAD(&obj_request->links);
1795         init_completion(&obj_request->completion);
1796         kref_init(&obj_request->kref);
1797
1798         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1799                 offset, length, (int)type, obj_request);
1800
1801         return obj_request;
1802 }
1803
1804 static void rbd_obj_request_destroy(struct kref *kref)
1805 {
1806         struct rbd_obj_request *obj_request;
1807
1808         obj_request = container_of(kref, struct rbd_obj_request, kref);
1809
1810         dout("%s: obj %p\n", __func__, obj_request);
1811
1812         rbd_assert(obj_request->img_request == NULL);
1813         rbd_assert(obj_request->which == BAD_WHICH);
1814
1815         if (obj_request->osd_req)
1816                 rbd_osd_req_destroy(obj_request->osd_req);
1817
1818         rbd_assert(obj_request_type_valid(obj_request->type));
1819         switch (obj_request->type) {
1820         case OBJ_REQUEST_NODATA:
1821                 break;          /* Nothing to do */
1822         case OBJ_REQUEST_BIO:
1823                 if (obj_request->bio_list)
1824                         bio_chain_put(obj_request->bio_list);
1825                 break;
1826         case OBJ_REQUEST_PAGES:
1827                 if (obj_request->pages)
1828                         ceph_release_page_vector(obj_request->pages,
1829                                                 obj_request->page_count);
1830                 break;
1831         }
1832
1833         kfree(obj_request->object_name);
1834         obj_request->object_name = NULL;
1835         kmem_cache_free(rbd_obj_request_cache, obj_request);
1836 }
1837
1838 /*
1839  * Caller is responsible for filling in the list of object requests
1840  * that comprises the image request, and the Linux request pointer
1841  * (if there is one).
1842  */
1843 static struct rbd_img_request *rbd_img_request_create(
1844                                         struct rbd_device *rbd_dev,
1845                                         u64 offset, u64 length,
1846                                         bool write_request,
1847                                         bool child_request)
1848 {
1849         struct rbd_img_request *img_request;
1850
1851         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1852         if (!img_request)
1853                 return NULL;
1854
1855         if (write_request) {
1856                 down_read(&rbd_dev->header_rwsem);
1857                 ceph_get_snap_context(rbd_dev->header.snapc);
1858                 up_read(&rbd_dev->header_rwsem);
1859         }
1860
1861         img_request->rq = NULL;
1862         img_request->rbd_dev = rbd_dev;
1863         img_request->offset = offset;
1864         img_request->length = length;
1865         img_request->flags = 0;
1866         if (write_request) {
1867                 img_request_write_set(img_request);
1868                 img_request->snapc = rbd_dev->header.snapc;
1869         } else {
1870                 img_request->snap_id = rbd_dev->spec->snap_id;
1871         }
1872         if (child_request)
1873                 img_request_child_set(img_request);
1874         if (rbd_dev->parent_spec)
1875                 img_request_layered_set(img_request);
1876         spin_lock_init(&img_request->completion_lock);
1877         img_request->next_completion = 0;
1878         img_request->callback = NULL;
1879         img_request->result = 0;
1880         img_request->obj_request_count = 0;
1881         INIT_LIST_HEAD(&img_request->obj_requests);
1882         kref_init(&img_request->kref);
1883
1884         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1885                 write_request ? "write" : "read", offset, length,
1886                 img_request);
1887
1888         return img_request;
1889 }
1890
1891 static void rbd_img_request_destroy(struct kref *kref)
1892 {
1893         struct rbd_img_request *img_request;
1894         struct rbd_obj_request *obj_request;
1895         struct rbd_obj_request *next_obj_request;
1896
1897         img_request = container_of(kref, struct rbd_img_request, kref);
1898
1899         dout("%s: img %p\n", __func__, img_request);
1900
1901         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1902                 rbd_img_obj_request_del(img_request, obj_request);
1903         rbd_assert(img_request->obj_request_count == 0);
1904
1905         if (img_request_write_test(img_request))
1906                 ceph_put_snap_context(img_request->snapc);
1907
1908         if (img_request_child_test(img_request))
1909                 rbd_obj_request_put(img_request->obj_request);
1910
1911         kmem_cache_free(rbd_img_request_cache, img_request);
1912 }
1913
1914 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1915 {
1916         struct rbd_img_request *img_request;
1917         unsigned int xferred;
1918         int result;
1919         bool more;
1920
1921         rbd_assert(obj_request_img_data_test(obj_request));
1922         img_request = obj_request->img_request;
1923
1924         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1925         xferred = (unsigned int)obj_request->xferred;
1926         result = obj_request->result;
1927         if (result) {
1928                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1929
1930                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1931                         img_request_write_test(img_request) ? "write" : "read",
1932                         obj_request->length, obj_request->img_offset,
1933                         obj_request->offset);
1934                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1935                         result, xferred);
1936                 if (!img_request->result)
1937                         img_request->result = result;
1938         }
1939
1940         /* Image object requests don't own their page array */
1941
1942         if (obj_request->type == OBJ_REQUEST_PAGES) {
1943                 obj_request->pages = NULL;
1944                 obj_request->page_count = 0;
1945         }
1946
1947         if (img_request_child_test(img_request)) {
1948                 rbd_assert(img_request->obj_request != NULL);
1949                 more = obj_request->which < img_request->obj_request_count - 1;
1950         } else {
1951                 rbd_assert(img_request->rq != NULL);
1952                 more = blk_end_request(img_request->rq, result, xferred);
1953         }
1954
1955         return more;
1956 }
1957
1958 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1959 {
1960         struct rbd_img_request *img_request;
1961         u32 which = obj_request->which;
1962         bool more = true;
1963
1964         rbd_assert(obj_request_img_data_test(obj_request));
1965         img_request = obj_request->img_request;
1966
1967         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1968         rbd_assert(img_request != NULL);
1969         rbd_assert(img_request->obj_request_count > 0);
1970         rbd_assert(which != BAD_WHICH);
1971         rbd_assert(which < img_request->obj_request_count);
1972         rbd_assert(which >= img_request->next_completion);
1973
1974         spin_lock_irq(&img_request->completion_lock);
1975         if (which != img_request->next_completion)
1976                 goto out;
1977
1978         for_each_obj_request_from(img_request, obj_request) {
1979                 rbd_assert(more);
1980                 rbd_assert(which < img_request->obj_request_count);
1981
1982                 if (!obj_request_done_test(obj_request))
1983                         break;
1984                 more = rbd_img_obj_end_request(obj_request);
1985                 which++;
1986         }
1987
1988         rbd_assert(more ^ (which == img_request->obj_request_count));
1989         img_request->next_completion = which;
1990 out:
1991         spin_unlock_irq(&img_request->completion_lock);
1992
1993         if (!more)
1994                 rbd_img_request_complete(img_request);
1995 }
1996
1997 /*
1998  * Split up an image request into one or more object requests, each
1999  * to a different object.  The "type" parameter indicates whether
2000  * "data_desc" is the pointer to the head of a list of bio
2001  * structures, or the base of a page array.  In either case this
2002  * function assumes data_desc describes memory sufficient to hold
2003  * all data described by the image request.
2004  */
2005 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2006                                         enum obj_request_type type,
2007                                         void *data_desc)
2008 {
2009         struct rbd_device *rbd_dev = img_request->rbd_dev;
2010         struct rbd_obj_request *obj_request = NULL;
2011         struct rbd_obj_request *next_obj_request;
2012         bool write_request = img_request_write_test(img_request);
2013         struct bio *bio_list;
2014         unsigned int bio_offset = 0;
2015         struct page **pages;
2016         u64 img_offset;
2017         u64 resid;
2018         u16 opcode;
2019
2020         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2021                 (int)type, data_desc);
2022
2023         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2024         img_offset = img_request->offset;
2025         resid = img_request->length;
2026         rbd_assert(resid > 0);
2027
2028         if (type == OBJ_REQUEST_BIO) {
2029                 bio_list = data_desc;
2030                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2031         } else {
2032                 rbd_assert(type == OBJ_REQUEST_PAGES);
2033                 pages = data_desc;
2034         }
2035
2036         while (resid) {
2037                 struct ceph_osd_request *osd_req;
2038                 const char *object_name;
2039                 u64 offset;
2040                 u64 length;
2041
2042                 object_name = rbd_segment_name(rbd_dev, img_offset);
2043                 if (!object_name)
2044                         goto out_unwind;
2045                 offset = rbd_segment_offset(rbd_dev, img_offset);
2046                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2047                 obj_request = rbd_obj_request_create(object_name,
2048                                                 offset, length, type);
2049                 /* object request has its own copy of the object name */
2050                 rbd_segment_name_free(object_name);
2051                 if (!obj_request)
2052                         goto out_unwind;
2053
2054                 if (type == OBJ_REQUEST_BIO) {
2055                         unsigned int clone_size;
2056
2057                         rbd_assert(length <= (u64)UINT_MAX);
2058                         clone_size = (unsigned int)length;
2059                         obj_request->bio_list =
2060                                         bio_chain_clone_range(&bio_list,
2061                                                                 &bio_offset,
2062                                                                 clone_size,
2063                                                                 GFP_ATOMIC);
2064                         if (!obj_request->bio_list)
2065                                 goto out_partial;
2066                 } else {
2067                         unsigned int page_count;
2068
2069                         obj_request->pages = pages;
2070                         page_count = (u32)calc_pages_for(offset, length);
2071                         obj_request->page_count = page_count;
2072                         if ((offset + length) & ~PAGE_MASK)
2073                                 page_count--;   /* more on last page */
2074                         pages += page_count;
2075                 }
2076
2077                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2078                                                 obj_request);
2079                 if (!osd_req)
2080                         goto out_partial;
2081                 obj_request->osd_req = osd_req;
2082                 obj_request->callback = rbd_img_obj_callback;
2083
2084                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2085                                                 0, 0);
2086                 if (type == OBJ_REQUEST_BIO)
2087                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2088                                         obj_request->bio_list, length);
2089                 else
2090                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2091                                         obj_request->pages, length,
2092                                         offset & ~PAGE_MASK, false, false);
2093
2094                 if (write_request)
2095                         rbd_osd_req_format_write(obj_request);
2096                 else
2097                         rbd_osd_req_format_read(obj_request);
2098
2099                 obj_request->img_offset = img_offset;
2100                 rbd_img_obj_request_add(img_request, obj_request);
2101
2102                 img_offset += length;
2103                 resid -= length;
2104         }
2105
2106         return 0;
2107
2108 out_partial:
2109         rbd_obj_request_put(obj_request);
2110 out_unwind:
2111         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2112                 rbd_obj_request_put(obj_request);
2113
2114         return -ENOMEM;
2115 }
2116
2117 static void
2118 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2119 {
2120         struct rbd_img_request *img_request;
2121         struct rbd_device *rbd_dev;
2122         u64 length;
2123         u32 page_count;
2124
2125         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2126         rbd_assert(obj_request_img_data_test(obj_request));
2127         img_request = obj_request->img_request;
2128         rbd_assert(img_request);
2129
2130         rbd_dev = img_request->rbd_dev;
2131         rbd_assert(rbd_dev);
2132         length = (u64)1 << rbd_dev->header.obj_order;
2133         page_count = (u32)calc_pages_for(0, length);
2134
2135         rbd_assert(obj_request->copyup_pages);
2136         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2137         obj_request->copyup_pages = NULL;
2138
2139         /*
2140          * We want the transfer count to reflect the size of the
2141          * original write request.  There is no such thing as a
2142          * successful short write, so if the request was successful
2143          * we can just set it to the originally-requested length.
2144          */
2145         if (!obj_request->result)
2146                 obj_request->xferred = obj_request->length;
2147
2148         /* Finish up with the normal image object callback */
2149
2150         rbd_img_obj_callback(obj_request);
2151 }
2152
2153 static void
2154 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2155 {
2156         struct rbd_obj_request *orig_request;
2157         struct ceph_osd_request *osd_req;
2158         struct ceph_osd_client *osdc;
2159         struct rbd_device *rbd_dev;
2160         struct page **pages;
2161         int result;
2162         u64 obj_size;
2163         u64 xferred;
2164
2165         rbd_assert(img_request_child_test(img_request));
2166
2167         /* First get what we need from the image request */
2168
2169         pages = img_request->copyup_pages;
2170         rbd_assert(pages != NULL);
2171         img_request->copyup_pages = NULL;
2172
2173         orig_request = img_request->obj_request;
2174         rbd_assert(orig_request != NULL);
2175         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2176         result = img_request->result;
2177         obj_size = img_request->length;
2178         xferred = img_request->xferred;
2179         rbd_img_request_put(img_request);
2180
2181         rbd_assert(orig_request->img_request);
2182         rbd_dev = orig_request->img_request->rbd_dev;
2183         rbd_assert(rbd_dev);
2184         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2185
2186         if (result)
2187                 goto out_err;
2188
2189         /* Allocate the new copyup osd request for the original request */
2190
2191         result = -ENOMEM;
2192         rbd_assert(!orig_request->osd_req);
2193         osd_req = rbd_osd_req_create_copyup(orig_request);
2194         if (!osd_req)
2195                 goto out_err;
2196         orig_request->osd_req = osd_req;
2197         orig_request->copyup_pages = pages;
2198
2199         /* Initialize the copyup op */
2200
2201         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2202         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2203                                                 false, false);
2204
2205         /* Then the original write request op */
2206
2207         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2208                                         orig_request->offset,
2209                                         orig_request->length, 0, 0);
2210         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2211                                         orig_request->length);
2212
2213         rbd_osd_req_format_write(orig_request);
2214
2215         /* All set, send it off. */
2216
2217         orig_request->callback = rbd_img_obj_copyup_callback;
2218         osdc = &rbd_dev->rbd_client->client->osdc;
2219         result = rbd_obj_request_submit(osdc, orig_request);
2220         if (!result)
2221                 return;
2222 out_err:
2223         /* Record the error code and complete the request */
2224
2225         orig_request->result = result;
2226         orig_request->xferred = 0;
2227         obj_request_done_set(orig_request);
2228         rbd_obj_request_complete(orig_request);
2229 }
2230
2231 /*
2232  * Read from the parent image the range of data that covers the
2233  * entire target of the given object request.  This is used for
2234  * satisfying a layered image write request when the target of an
2235  * object request from the image request does not exist.
2236  *
2237  * A page array big enough to hold the returned data is allocated
2238  * and supplied to rbd_img_request_fill() as the "data descriptor."
2239  * When the read completes, this page array will be transferred to
2240  * the original object request for the copyup operation.
2241  *
2242  * If an error occurs, record it as the result of the original
2243  * object request and mark it done so it gets completed.
2244  */
2245 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2246 {
2247         struct rbd_img_request *img_request = NULL;
2248         struct rbd_img_request *parent_request = NULL;
2249         struct rbd_device *rbd_dev;
2250         u64 img_offset;
2251         u64 length;
2252         struct page **pages = NULL;
2253         u32 page_count;
2254         int result;
2255
2256         rbd_assert(obj_request_img_data_test(obj_request));
2257         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2258
2259         img_request = obj_request->img_request;
2260         rbd_assert(img_request != NULL);
2261         rbd_dev = img_request->rbd_dev;
2262         rbd_assert(rbd_dev->parent != NULL);
2263
2264         /*
2265          * First things first.  The original osd request is of no
2266          * use to use any more, we'll need a new one that can hold
2267          * the two ops in a copyup request.  We'll get that later,
2268          * but for now we can release the old one.
2269          */
2270         rbd_osd_req_destroy(obj_request->osd_req);
2271         obj_request->osd_req = NULL;
2272
2273         /*
2274          * Determine the byte range covered by the object in the
2275          * child image to which the original request was to be sent.
2276          */
2277         img_offset = obj_request->img_offset - obj_request->offset;
2278         length = (u64)1 << rbd_dev->header.obj_order;
2279
2280         /*
2281          * There is no defined parent data beyond the parent
2282          * overlap, so limit what we read at that boundary if
2283          * necessary.
2284          */
2285         if (img_offset + length > rbd_dev->parent_overlap) {
2286                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2287                 length = rbd_dev->parent_overlap - img_offset;
2288         }
2289
2290         /*
2291          * Allocate a page array big enough to receive the data read
2292          * from the parent.
2293          */
2294         page_count = (u32)calc_pages_for(0, length);
2295         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2296         if (IS_ERR(pages)) {
2297                 result = PTR_ERR(pages);
2298                 pages = NULL;
2299                 goto out_err;
2300         }
2301
2302         result = -ENOMEM;
2303         parent_request = rbd_img_request_create(rbd_dev->parent,
2304                                                 img_offset, length,
2305                                                 false, true);
2306         if (!parent_request)
2307                 goto out_err;
2308         rbd_obj_request_get(obj_request);
2309         parent_request->obj_request = obj_request;
2310
2311         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2312         if (result)
2313                 goto out_err;
2314         parent_request->copyup_pages = pages;
2315
2316         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2317         result = rbd_img_request_submit(parent_request);
2318         if (!result)
2319                 return 0;
2320
2321         parent_request->copyup_pages = NULL;
2322         parent_request->obj_request = NULL;
2323         rbd_obj_request_put(obj_request);
2324 out_err:
2325         if (pages)
2326                 ceph_release_page_vector(pages, page_count);
2327         if (parent_request)
2328                 rbd_img_request_put(parent_request);
2329         obj_request->result = result;
2330         obj_request->xferred = 0;
2331         obj_request_done_set(obj_request);
2332
2333         return result;
2334 }
2335
2336 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2337 {
2338         struct rbd_obj_request *orig_request;
2339         int result;
2340
2341         rbd_assert(!obj_request_img_data_test(obj_request));
2342
2343         /*
2344          * All we need from the object request is the original
2345          * request and the result of the STAT op.  Grab those, then
2346          * we're done with the request.
2347          */
2348         orig_request = obj_request->obj_request;
2349         obj_request->obj_request = NULL;
2350         rbd_assert(orig_request);
2351         rbd_assert(orig_request->img_request);
2352
2353         result = obj_request->result;
2354         obj_request->result = 0;
2355
2356         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2357                 obj_request, orig_request, result,
2358                 obj_request->xferred, obj_request->length);
2359         rbd_obj_request_put(obj_request);
2360
2361         rbd_assert(orig_request);
2362         rbd_assert(orig_request->img_request);
2363
2364         /*
2365          * Our only purpose here is to determine whether the object
2366          * exists, and we don't want to treat the non-existence as
2367          * an error.  If something else comes back, transfer the
2368          * error to the original request and complete it now.
2369          */
2370         if (!result) {
2371                 obj_request_existence_set(orig_request, true);
2372         } else if (result == -ENOENT) {
2373                 obj_request_existence_set(orig_request, false);
2374         } else if (result) {
2375                 orig_request->result = result;
2376                 goto out;
2377         }
2378
2379         /*
2380          * Resubmit the original request now that we have recorded
2381          * whether the target object exists.
2382          */
2383         orig_request->result = rbd_img_obj_request_submit(orig_request);
2384 out:
2385         if (orig_request->result)
2386                 rbd_obj_request_complete(orig_request);
2387         rbd_obj_request_put(orig_request);
2388 }
2389
2390 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2391 {
2392         struct rbd_obj_request *stat_request;
2393         struct rbd_device *rbd_dev;
2394         struct ceph_osd_client *osdc;
2395         struct page **pages = NULL;
2396         u32 page_count;
2397         size_t size;
2398         int ret;
2399
2400         /*
2401          * The response data for a STAT call consists of:
2402          *     le64 length;
2403          *     struct {
2404          *         le32 tv_sec;
2405          *         le32 tv_nsec;
2406          *     } mtime;
2407          */
2408         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2409         page_count = (u32)calc_pages_for(0, size);
2410         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2411         if (IS_ERR(pages))
2412                 return PTR_ERR(pages);
2413
2414         ret = -ENOMEM;
2415         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2416                                                         OBJ_REQUEST_PAGES);
2417         if (!stat_request)
2418                 goto out;
2419
2420         rbd_obj_request_get(obj_request);
2421         stat_request->obj_request = obj_request;
2422         stat_request->pages = pages;
2423         stat_request->page_count = page_count;
2424
2425         rbd_assert(obj_request->img_request);
2426         rbd_dev = obj_request->img_request->rbd_dev;
2427         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2428                                                 stat_request);
2429         if (!stat_request->osd_req)
2430                 goto out;
2431         stat_request->callback = rbd_img_obj_exists_callback;
2432
2433         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2434         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2435                                         false, false);
2436         rbd_osd_req_format_read(stat_request);
2437
2438         osdc = &rbd_dev->rbd_client->client->osdc;
2439         ret = rbd_obj_request_submit(osdc, stat_request);
2440 out:
2441         if (ret)
2442                 rbd_obj_request_put(obj_request);
2443
2444         return ret;
2445 }
2446
2447 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2448 {
2449         struct rbd_img_request *img_request;
2450         struct rbd_device *rbd_dev;
2451         bool known;
2452
2453         rbd_assert(obj_request_img_data_test(obj_request));
2454
2455         img_request = obj_request->img_request;
2456         rbd_assert(img_request);
2457         rbd_dev = img_request->rbd_dev;
2458
2459         /*
2460          * Only writes to layered images need special handling.
2461          * Reads and non-layered writes are simple object requests.
2462          * Layered writes that start beyond the end of the overlap
2463          * with the parent have no parent data, so they too are
2464          * simple object requests.  Finally, if the target object is
2465          * known to already exist, its parent data has already been
2466          * copied, so a write to the object can also be handled as a
2467          * simple object request.
2468          */
2469         if (!img_request_write_test(img_request) ||
2470                 !img_request_layered_test(img_request) ||
2471                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2472                 ((known = obj_request_known_test(obj_request)) &&
2473                         obj_request_exists_test(obj_request))) {
2474
2475                 struct rbd_device *rbd_dev;
2476                 struct ceph_osd_client *osdc;
2477
2478                 rbd_dev = obj_request->img_request->rbd_dev;
2479                 osdc = &rbd_dev->rbd_client->client->osdc;
2480
2481                 return rbd_obj_request_submit(osdc, obj_request);
2482         }
2483
2484         /*
2485          * It's a layered write.  The target object might exist but
2486          * we may not know that yet.  If we know it doesn't exist,
2487          * start by reading the data for the full target object from
2488          * the parent so we can use it for a copyup to the target.
2489          */
2490         if (known)
2491                 return rbd_img_obj_parent_read_full(obj_request);
2492
2493         /* We don't know whether the target exists.  Go find out. */
2494
2495         return rbd_img_obj_exists_submit(obj_request);
2496 }
2497
2498 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2499 {
2500         struct rbd_obj_request *obj_request;
2501         struct rbd_obj_request *next_obj_request;
2502
2503         dout("%s: img %p\n", __func__, img_request);
2504         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2505                 int ret;
2506
2507                 ret = rbd_img_obj_request_submit(obj_request);
2508                 if (ret)
2509                         return ret;
2510         }
2511
2512         return 0;
2513 }
2514
2515 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2516 {
2517         struct rbd_obj_request *obj_request;
2518         struct rbd_device *rbd_dev;
2519         u64 obj_end;
2520
2521         rbd_assert(img_request_child_test(img_request));
2522
2523         obj_request = img_request->obj_request;
2524         rbd_assert(obj_request);
2525         rbd_assert(obj_request->img_request);
2526
2527         obj_request->result = img_request->result;
2528         if (obj_request->result)
2529                 goto out;
2530
2531         /*
2532          * We need to zero anything beyond the parent overlap
2533          * boundary.  Since rbd_img_obj_request_read_callback()
2534          * will zero anything beyond the end of a short read, an
2535          * easy way to do this is to pretend the data from the
2536          * parent came up short--ending at the overlap boundary.
2537          */
2538         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2539         obj_end = obj_request->img_offset + obj_request->length;
2540         rbd_dev = obj_request->img_request->rbd_dev;
2541         if (obj_end > rbd_dev->parent_overlap) {
2542                 u64 xferred = 0;
2543
2544                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2545                         xferred = rbd_dev->parent_overlap -
2546                                         obj_request->img_offset;
2547
2548                 obj_request->xferred = min(img_request->xferred, xferred);
2549         } else {
2550                 obj_request->xferred = img_request->xferred;
2551         }
2552 out:
2553         rbd_img_request_put(img_request);
2554         rbd_img_obj_request_read_callback(obj_request);
2555         rbd_obj_request_complete(obj_request);
2556 }
2557
2558 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2559 {
2560         struct rbd_device *rbd_dev;
2561         struct rbd_img_request *img_request;
2562         int result;
2563
2564         rbd_assert(obj_request_img_data_test(obj_request));
2565         rbd_assert(obj_request->img_request != NULL);
2566         rbd_assert(obj_request->result == (s32) -ENOENT);
2567         rbd_assert(obj_request_type_valid(obj_request->type));
2568
2569         rbd_dev = obj_request->img_request->rbd_dev;
2570         rbd_assert(rbd_dev->parent != NULL);
2571         /* rbd_read_finish(obj_request, obj_request->length); */
2572         img_request = rbd_img_request_create(rbd_dev->parent,
2573                                                 obj_request->img_offset,
2574                                                 obj_request->length,
2575                                                 false, true);
2576         result = -ENOMEM;
2577         if (!img_request)
2578                 goto out_err;
2579
2580         rbd_obj_request_get(obj_request);
2581         img_request->obj_request = obj_request;
2582
2583         if (obj_request->type == OBJ_REQUEST_BIO)
2584                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2585                                                 obj_request->bio_list);
2586         else
2587                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2588                                                 obj_request->pages);
2589         if (result)
2590                 goto out_err;
2591
2592         img_request->callback = rbd_img_parent_read_callback;
2593         result = rbd_img_request_submit(img_request);
2594         if (result)
2595                 goto out_err;
2596
2597         return;
2598 out_err:
2599         if (img_request)
2600                 rbd_img_request_put(img_request);
2601         obj_request->result = result;
2602         obj_request->xferred = 0;
2603         obj_request_done_set(obj_request);
2604 }
2605
2606 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2607 {
2608         struct rbd_obj_request *obj_request;
2609         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2610         int ret;
2611
2612         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2613                                                         OBJ_REQUEST_NODATA);
2614         if (!obj_request)
2615                 return -ENOMEM;
2616
2617         ret = -ENOMEM;
2618         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2619         if (!obj_request->osd_req)
2620                 goto out;
2621         obj_request->callback = rbd_obj_request_put;
2622
2623         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2624                                         notify_id, 0, 0);
2625         rbd_osd_req_format_read(obj_request);
2626
2627         ret = rbd_obj_request_submit(osdc, obj_request);
2628 out:
2629         if (ret)
2630                 rbd_obj_request_put(obj_request);
2631
2632         return ret;
2633 }
2634
2635 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2636 {
2637         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2638         int ret;
2639
2640         if (!rbd_dev)
2641                 return;
2642
2643         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2644                 rbd_dev->header_name, (unsigned long long)notify_id,
2645                 (unsigned int)opcode);
2646         ret = rbd_dev_refresh(rbd_dev);
2647         if (ret)
2648                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2649
2650         rbd_obj_notify_ack(rbd_dev, notify_id);
2651 }
2652
2653 /*
2654  * Request sync osd watch/unwatch.  The value of "start" determines
2655  * whether a watch request is being initiated or torn down.
2656  */
2657 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2658 {
2659         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2660         struct rbd_obj_request *obj_request;
2661         int ret;
2662
2663         rbd_assert(start ^ !!rbd_dev->watch_event);
2664         rbd_assert(start ^ !!rbd_dev->watch_request);
2665
2666         if (start) {
2667                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2668                                                 &rbd_dev->watch_event);
2669                 if (ret < 0)
2670                         return ret;
2671                 rbd_assert(rbd_dev->watch_event != NULL);
2672         }
2673
2674         ret = -ENOMEM;
2675         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2676                                                         OBJ_REQUEST_NODATA);
2677         if (!obj_request)
2678                 goto out_cancel;
2679
2680         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2681         if (!obj_request->osd_req)
2682                 goto out_cancel;
2683
2684         if (start)
2685                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2686         else
2687                 ceph_osdc_unregister_linger_request(osdc,
2688                                         rbd_dev->watch_request->osd_req);
2689
2690         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2691                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2692         rbd_osd_req_format_write(obj_request);
2693
2694         ret = rbd_obj_request_submit(osdc, obj_request);
2695         if (ret)
2696                 goto out_cancel;
2697         ret = rbd_obj_request_wait(obj_request);
2698         if (ret)
2699                 goto out_cancel;
2700         ret = obj_request->result;
2701         if (ret)
2702                 goto out_cancel;
2703
2704         /*
2705          * A watch request is set to linger, so the underlying osd
2706          * request won't go away until we unregister it.  We retain
2707          * a pointer to the object request during that time (in
2708          * rbd_dev->watch_request), so we'll keep a reference to
2709          * it.  We'll drop that reference (below) after we've
2710          * unregistered it.
2711          */
2712         if (start) {
2713                 rbd_dev->watch_request = obj_request;
2714
2715                 return 0;
2716         }
2717
2718         /* We have successfully torn down the watch request */
2719
2720         rbd_obj_request_put(rbd_dev->watch_request);
2721         rbd_dev->watch_request = NULL;
2722 out_cancel:
2723         /* Cancel the event if we're tearing down, or on error */
2724         ceph_osdc_cancel_event(rbd_dev->watch_event);
2725         rbd_dev->watch_event = NULL;
2726         if (obj_request)
2727                 rbd_obj_request_put(obj_request);
2728
2729         return ret;
2730 }
2731
2732 /*
2733  * Synchronous osd object method call.  Returns the number of bytes
2734  * returned in the outbound buffer, or a negative error code.
2735  */
2736 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2737                              const char *object_name,
2738                              const char *class_name,
2739                              const char *method_name,
2740                              const void *outbound,
2741                              size_t outbound_size,
2742                              void *inbound,
2743                              size_t inbound_size)
2744 {
2745         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2746         struct rbd_obj_request *obj_request;
2747         struct page **pages;
2748         u32 page_count;
2749         int ret;
2750
2751         /*
2752          * Method calls are ultimately read operations.  The result
2753          * should placed into the inbound buffer provided.  They
2754          * also supply outbound data--parameters for the object
2755          * method.  Currently if this is present it will be a
2756          * snapshot id.
2757          */
2758         page_count = (u32)calc_pages_for(0, inbound_size);
2759         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2760         if (IS_ERR(pages))
2761                 return PTR_ERR(pages);
2762
2763         ret = -ENOMEM;
2764         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2765                                                         OBJ_REQUEST_PAGES);
2766         if (!obj_request)
2767                 goto out;
2768
2769         obj_request->pages = pages;
2770         obj_request->page_count = page_count;
2771
2772         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2773         if (!obj_request->osd_req)
2774                 goto out;
2775
2776         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2777                                         class_name, method_name);
2778         if (outbound_size) {
2779                 struct ceph_pagelist *pagelist;
2780
2781                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2782                 if (!pagelist)
2783                         goto out;
2784
2785                 ceph_pagelist_init(pagelist);
2786                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2787                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2788                                                 pagelist);
2789         }
2790         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2791                                         obj_request->pages, inbound_size,
2792                                         0, false, false);
2793         rbd_osd_req_format_read(obj_request);
2794
2795         ret = rbd_obj_request_submit(osdc, obj_request);
2796         if (ret)
2797                 goto out;
2798         ret = rbd_obj_request_wait(obj_request);
2799         if (ret)
2800                 goto out;
2801
2802         ret = obj_request->result;
2803         if (ret < 0)
2804                 goto out;
2805
2806         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2807         ret = (int)obj_request->xferred;
2808         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2809 out:
2810         if (obj_request)
2811                 rbd_obj_request_put(obj_request);
2812         else
2813                 ceph_release_page_vector(pages, page_count);
2814
2815         return ret;
2816 }
2817
2818 static void rbd_request_fn(struct request_queue *q)
2819                 __releases(q->queue_lock) __acquires(q->queue_lock)
2820 {
2821         struct rbd_device *rbd_dev = q->queuedata;
2822         bool read_only = rbd_dev->mapping.read_only;
2823         struct request *rq;
2824         int result;
2825
2826         while ((rq = blk_fetch_request(q))) {
2827                 bool write_request = rq_data_dir(rq) == WRITE;
2828                 struct rbd_img_request *img_request;
2829                 u64 offset;
2830                 u64 length;
2831
2832                 /* Ignore any non-FS requests that filter through. */
2833
2834                 if (rq->cmd_type != REQ_TYPE_FS) {
2835                         dout("%s: non-fs request type %d\n", __func__,
2836                                 (int) rq->cmd_type);
2837                         __blk_end_request_all(rq, 0);
2838                         continue;
2839                 }
2840
2841                 /* Ignore/skip any zero-length requests */
2842
2843                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2844                 length = (u64) blk_rq_bytes(rq);
2845
2846                 if (!length) {
2847                         dout("%s: zero-length request\n", __func__);
2848                         __blk_end_request_all(rq, 0);
2849                         continue;
2850                 }
2851
2852                 spin_unlock_irq(q->queue_lock);
2853
2854                 /* Disallow writes to a read-only device */
2855
2856                 if (write_request) {
2857                         result = -EROFS;
2858                         if (read_only)
2859                                 goto end_request;
2860                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2861                 }
2862
2863                 /*
2864                  * Quit early if the mapped snapshot no longer
2865                  * exists.  It's still possible the snapshot will
2866                  * have disappeared by the time our request arrives
2867                  * at the osd, but there's no sense in sending it if
2868                  * we already know.
2869                  */
2870                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2871                         dout("request for non-existent snapshot");
2872                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2873                         result = -ENXIO;
2874                         goto end_request;
2875                 }
2876
2877                 result = -EINVAL;
2878                 if (offset && length > U64_MAX - offset + 1) {
2879                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2880                                 offset, length);
2881                         goto end_request;       /* Shouldn't happen */
2882                 }
2883
2884                 result = -EIO;
2885                 if (offset + length > rbd_dev->mapping.size) {
2886                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2887                                 offset, length, rbd_dev->mapping.size);
2888                         goto end_request;
2889                 }
2890
2891                 result = -ENOMEM;
2892                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2893                                                         write_request, false);
2894                 if (!img_request)
2895                         goto end_request;
2896
2897                 img_request->rq = rq;
2898
2899                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2900                                                 rq->bio);
2901                 if (!result)
2902                         result = rbd_img_request_submit(img_request);
2903                 if (result)
2904                         rbd_img_request_put(img_request);
2905 end_request:
2906                 spin_lock_irq(q->queue_lock);
2907                 if (result < 0) {
2908                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2909                                 write_request ? "write" : "read",
2910                                 length, offset, result);
2911
2912                         __blk_end_request_all(rq, result);
2913                 }
2914         }
2915 }
2916
2917 /*
2918  * a queue callback. Makes sure that we don't create a bio that spans across
2919  * multiple osd objects. One exception would be with a single page bios,
2920  * which we handle later at bio_chain_clone_range()
2921  */
2922 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2923                           struct bio_vec *bvec)
2924 {
2925         struct rbd_device *rbd_dev = q->queuedata;
2926         sector_t sector_offset;
2927         sector_t sectors_per_obj;
2928         sector_t obj_sector_offset;
2929         int ret;
2930
2931         /*
2932          * Find how far into its rbd object the partition-relative
2933          * bio start sector is to offset relative to the enclosing
2934          * device.
2935          */
2936         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2937         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2938         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2939
2940         /*
2941          * Compute the number of bytes from that offset to the end
2942          * of the object.  Account for what's already used by the bio.
2943          */
2944         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2945         if (ret > bmd->bi_size)
2946                 ret -= bmd->bi_size;
2947         else
2948                 ret = 0;
2949
2950         /*
2951          * Don't send back more than was asked for.  And if the bio
2952          * was empty, let the whole thing through because:  "Note
2953          * that a block device *must* allow a single page to be
2954          * added to an empty bio."
2955          */
2956         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2957         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2958                 ret = (int) bvec->bv_len;
2959
2960         return ret;
2961 }
2962
2963 static void rbd_free_disk(struct rbd_device *rbd_dev)
2964 {
2965         struct gendisk *disk = rbd_dev->disk;
2966
2967         if (!disk)
2968                 return;
2969
2970         rbd_dev->disk = NULL;
2971         if (disk->flags & GENHD_FL_UP) {
2972                 del_gendisk(disk);
2973                 if (disk->queue)
2974                         blk_cleanup_queue(disk->queue);
2975         }
2976         put_disk(disk);
2977 }
2978
2979 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2980                                 const char *object_name,
2981                                 u64 offset, u64 length, void *buf)
2982
2983 {
2984         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2985         struct rbd_obj_request *obj_request;
2986         struct page **pages = NULL;
2987         u32 page_count;
2988         size_t size;
2989         int ret;
2990
2991         page_count = (u32) calc_pages_for(offset, length);
2992         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2993         if (IS_ERR(pages))
2994                 ret = PTR_ERR(pages);
2995
2996         ret = -ENOMEM;
2997         obj_request = rbd_obj_request_create(object_name, offset, length,
2998                                                         OBJ_REQUEST_PAGES);
2999         if (!obj_request)
3000                 goto out;
3001
3002         obj_request->pages = pages;
3003         obj_request->page_count = page_count;
3004
3005         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3006         if (!obj_request->osd_req)
3007                 goto out;
3008
3009         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3010                                         offset, length, 0, 0);
3011         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3012                                         obj_request->pages,
3013                                         obj_request->length,
3014                                         obj_request->offset & ~PAGE_MASK,
3015                                         false, false);
3016         rbd_osd_req_format_read(obj_request);
3017
3018         ret = rbd_obj_request_submit(osdc, obj_request);
3019         if (ret)
3020                 goto out;
3021         ret = rbd_obj_request_wait(obj_request);
3022         if (ret)
3023                 goto out;
3024
3025         ret = obj_request->result;
3026         if (ret < 0)
3027                 goto out;
3028
3029         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3030         size = (size_t) obj_request->xferred;
3031         ceph_copy_from_page_vector(pages, buf, 0, size);
3032         rbd_assert(size <= (size_t)INT_MAX);
3033         ret = (int)size;
3034 out:
3035         if (obj_request)
3036                 rbd_obj_request_put(obj_request);
3037         else
3038                 ceph_release_page_vector(pages, page_count);
3039
3040         return ret;
3041 }
3042
3043 /*
3044  * Read the complete header for the given rbd device.  On successful
3045  * return, the rbd_dev->header field will contain up-to-date
3046  * information about the image.
3047  */
3048 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3049 {
3050         struct rbd_image_header_ondisk *ondisk = NULL;
3051         u32 snap_count = 0;
3052         u64 names_size = 0;
3053         u32 want_count;
3054         int ret;
3055
3056         /*
3057          * The complete header will include an array of its 64-bit
3058          * snapshot ids, followed by the names of those snapshots as
3059          * a contiguous block of NUL-terminated strings.  Note that
3060          * the number of snapshots could change by the time we read
3061          * it in, in which case we re-read it.
3062          */
3063         do {
3064                 size_t size;
3065
3066                 kfree(ondisk);
3067
3068                 size = sizeof (*ondisk);
3069                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3070                 size += names_size;
3071                 ondisk = kmalloc(size, GFP_KERNEL);
3072                 if (!ondisk)
3073                         return -ENOMEM;
3074
3075                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3076                                        0, size, ondisk);
3077                 if (ret < 0)
3078                         goto out;
3079                 if ((size_t)ret < size) {
3080                         ret = -ENXIO;
3081                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3082                                 size, ret);
3083                         goto out;
3084                 }
3085                 if (!rbd_dev_ondisk_valid(ondisk)) {
3086                         ret = -ENXIO;
3087                         rbd_warn(rbd_dev, "invalid header");
3088                         goto out;
3089                 }
3090
3091                 names_size = le64_to_cpu(ondisk->snap_names_len);
3092                 want_count = snap_count;
3093                 snap_count = le32_to_cpu(ondisk->snap_count);
3094         } while (snap_count != want_count);
3095
3096         ret = rbd_header_from_disk(rbd_dev, ondisk);
3097 out:
3098         kfree(ondisk);
3099
3100         return ret;
3101 }
3102
3103 /*
3104  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3105  * has disappeared from the (just updated) snapshot context.
3106  */
3107 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3108 {
3109         u64 snap_id;
3110
3111         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3112                 return;
3113
3114         snap_id = rbd_dev->spec->snap_id;
3115         if (snap_id == CEPH_NOSNAP)
3116                 return;
3117
3118         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3119                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3120 }
3121
3122 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3123 {
3124         u64 mapping_size;
3125         int ret;
3126
3127         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3128         mapping_size = rbd_dev->mapping.size;
3129         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3130         if (rbd_dev->image_format == 1)
3131                 ret = rbd_dev_v1_header_info(rbd_dev);
3132         else
3133                 ret = rbd_dev_v2_header_info(rbd_dev);
3134
3135         /* If it's a mapped snapshot, validate its EXISTS flag */
3136
3137         rbd_exists_validate(rbd_dev);
3138         mutex_unlock(&ctl_mutex);
3139         if (mapping_size != rbd_dev->mapping.size) {
3140                 sector_t size;
3141
3142                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3143                 dout("setting size to %llu sectors", (unsigned long long)size);
3144                 set_capacity(rbd_dev->disk, size);
3145                 revalidate_disk(rbd_dev->disk);
3146         }
3147
3148         return ret;
3149 }
3150
3151 static int rbd_init_disk(struct rbd_device *rbd_dev)
3152 {
3153         struct gendisk *disk;
3154         struct request_queue *q;
3155         u64 segment_size;
3156
3157         /* create gendisk info */
3158         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3159         if (!disk)
3160                 return -ENOMEM;
3161
3162         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3163                  rbd_dev->dev_id);
3164         disk->major = rbd_dev->major;
3165         disk->first_minor = 0;
3166         disk->fops = &rbd_bd_ops;
3167         disk->private_data = rbd_dev;
3168
3169         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3170         if (!q)
3171                 goto out_disk;
3172
3173         /* We use the default size, but let's be explicit about it. */
3174         blk_queue_physical_block_size(q, SECTOR_SIZE);
3175
3176         /* set io sizes to object size */
3177         segment_size = rbd_obj_bytes(&rbd_dev->header);
3178         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3179         blk_queue_max_segment_size(q, segment_size);
3180         blk_queue_io_min(q, segment_size);
3181         blk_queue_io_opt(q, segment_size);
3182
3183         blk_queue_merge_bvec(q, rbd_merge_bvec);
3184         disk->queue = q;
3185
3186         q->queuedata = rbd_dev;
3187
3188         rbd_dev->disk = disk;
3189
3190         return 0;
3191 out_disk:
3192         put_disk(disk);
3193
3194         return -ENOMEM;
3195 }
3196
3197 /*
3198   sysfs
3199 */
3200
3201 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3202 {
3203         return container_of(dev, struct rbd_device, dev);
3204 }
3205
3206 static ssize_t rbd_size_show(struct device *dev,
3207                              struct device_attribute *attr, char *buf)
3208 {
3209         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3210
3211         return sprintf(buf, "%llu\n",
3212                 (unsigned long long)rbd_dev->mapping.size);
3213 }
3214
3215 /*
3216  * Note this shows the features for whatever's mapped, which is not
3217  * necessarily the base image.
3218  */
3219 static ssize_t rbd_features_show(struct device *dev,
3220                              struct device_attribute *attr, char *buf)
3221 {
3222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
3224         return sprintf(buf, "0x%016llx\n",
3225                         (unsigned long long)rbd_dev->mapping.features);
3226 }
3227
3228 static ssize_t rbd_major_show(struct device *dev,
3229                               struct device_attribute *attr, char *buf)
3230 {
3231         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3232
3233         if (rbd_dev->major)
3234                 return sprintf(buf, "%d\n", rbd_dev->major);
3235
3236         return sprintf(buf, "(none)\n");
3237
3238 }
3239
3240 static ssize_t rbd_client_id_show(struct device *dev,
3241                                   struct device_attribute *attr, char *buf)
3242 {
3243         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244
3245         return sprintf(buf, "client%lld\n",
3246                         ceph_client_id(rbd_dev->rbd_client->client));
3247 }
3248
3249 static ssize_t rbd_pool_show(struct device *dev,
3250                              struct device_attribute *attr, char *buf)
3251 {
3252         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3253
3254         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3255 }
3256
3257 static ssize_t rbd_pool_id_show(struct device *dev,
3258                              struct device_attribute *attr, char *buf)
3259 {
3260         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3261
3262         return sprintf(buf, "%llu\n",
3263                         (unsigned long long) rbd_dev->spec->pool_id);
3264 }
3265
3266 static ssize_t rbd_name_show(struct device *dev,
3267                              struct device_attribute *attr, char *buf)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270
3271         if (rbd_dev->spec->image_name)
3272                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3273
3274         return sprintf(buf, "(unknown)\n");
3275 }
3276
3277 static ssize_t rbd_image_id_show(struct device *dev,
3278                              struct device_attribute *attr, char *buf)
3279 {
3280         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281
3282         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3283 }
3284
3285 /*
3286  * Shows the name of the currently-mapped snapshot (or
3287  * RBD_SNAP_HEAD_NAME for the base image).
3288  */
3289 static ssize_t rbd_snap_show(struct device *dev,
3290                              struct device_attribute *attr,
3291                              char *buf)
3292 {
3293         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294
3295         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3296 }
3297
3298 /*
3299  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3300  * for the parent image.  If there is no parent, simply shows
3301  * "(no parent image)".
3302  */
3303 static ssize_t rbd_parent_show(struct device *dev,
3304                              struct device_attribute *attr,
3305                              char *buf)
3306 {
3307         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3308         struct rbd_spec *spec = rbd_dev->parent_spec;
3309         int count;
3310         char *bufp = buf;
3311
3312         if (!spec)
3313                 return sprintf(buf, "(no parent image)\n");
3314
3315         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3316                         (unsigned long long) spec->pool_id, spec->pool_name);
3317         if (count < 0)
3318                 return count;
3319         bufp += count;
3320
3321         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3322                         spec->image_name ? spec->image_name : "(unknown)");
3323         if (count < 0)
3324                 return count;
3325         bufp += count;
3326
3327         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3328                         (unsigned long long) spec->snap_id, spec->snap_name);
3329         if (count < 0)
3330                 return count;
3331         bufp += count;
3332
3333         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3334         if (count < 0)
3335                 return count;
3336         bufp += count;
3337
3338         return (ssize_t) (bufp - buf);
3339 }
3340
3341 static ssize_t rbd_image_refresh(struct device *dev,
3342                                  struct device_attribute *attr,
3343                                  const char *buf,
3344                                  size_t size)
3345 {
3346         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3347         int ret;
3348
3349         ret = rbd_dev_refresh(rbd_dev);
3350         if (ret)
3351                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3352
3353         return ret < 0 ? ret : size;
3354 }
3355
3356 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3357 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3358 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3359 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3360 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3361 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3362 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3363 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3364 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3365 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3366 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3367
3368 static struct attribute *rbd_attrs[] = {
3369         &dev_attr_size.attr,
3370         &dev_attr_features.attr,
3371         &dev_attr_major.attr,
3372         &dev_attr_client_id.attr,
3373         &dev_attr_pool.attr,
3374         &dev_attr_pool_id.attr,
3375         &dev_attr_name.attr,
3376         &dev_attr_image_id.attr,
3377         &dev_attr_current_snap.attr,
3378         &dev_attr_parent.attr,
3379         &dev_attr_refresh.attr,
3380         NULL
3381 };
3382
3383 static struct attribute_group rbd_attr_group = {
3384         .attrs = rbd_attrs,
3385 };
3386
3387 static const struct attribute_group *rbd_attr_groups[] = {
3388         &rbd_attr_group,
3389         NULL
3390 };
3391
3392 static void rbd_sysfs_dev_release(struct device *dev)
3393 {
3394 }
3395
3396 static struct device_type rbd_device_type = {
3397         .name           = "rbd",
3398         .groups         = rbd_attr_groups,
3399         .release        = rbd_sysfs_dev_release,
3400 };
3401
3402 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3403 {
3404         kref_get(&spec->kref);
3405
3406         return spec;
3407 }
3408
3409 static void rbd_spec_free(struct kref *kref);
3410 static void rbd_spec_put(struct rbd_spec *spec)
3411 {
3412         if (spec)
3413                 kref_put(&spec->kref, rbd_spec_free);
3414 }
3415
3416 static struct rbd_spec *rbd_spec_alloc(void)
3417 {
3418         struct rbd_spec *spec;
3419
3420         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3421         if (!spec)
3422                 return NULL;
3423         kref_init(&spec->kref);
3424
3425         return spec;
3426 }
3427
3428 static void rbd_spec_free(struct kref *kref)
3429 {
3430         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3431
3432         kfree(spec->pool_name);
3433         kfree(spec->image_id);
3434         kfree(spec->image_name);
3435         kfree(spec->snap_name);
3436         kfree(spec);
3437 }
3438
3439 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3440                                 struct rbd_spec *spec)
3441 {
3442         struct rbd_device *rbd_dev;
3443
3444         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3445         if (!rbd_dev)
3446                 return NULL;
3447
3448         spin_lock_init(&rbd_dev->lock);
3449         rbd_dev->flags = 0;
3450         INIT_LIST_HEAD(&rbd_dev->node);
3451         init_rwsem(&rbd_dev->header_rwsem);
3452
3453         rbd_dev->spec = spec;
3454         rbd_dev->rbd_client = rbdc;
3455
3456         /* Initialize the layout used for all rbd requests */
3457
3458         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3459         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3460         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3461         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3462
3463         return rbd_dev;
3464 }
3465
3466 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3467 {
3468         rbd_put_client(rbd_dev->rbd_client);
3469         rbd_spec_put(rbd_dev->spec);
3470         kfree(rbd_dev);
3471 }
3472
3473 /*
3474  * Get the size and object order for an image snapshot, or if
3475  * snap_id is CEPH_NOSNAP, gets this information for the base
3476  * image.
3477  */
3478 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3479                                 u8 *order, u64 *snap_size)
3480 {
3481         __le64 snapid = cpu_to_le64(snap_id);
3482         int ret;
3483         struct {
3484                 u8 order;
3485                 __le64 size;
3486         } __attribute__ ((packed)) size_buf = { 0 };
3487
3488         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3489                                 "rbd", "get_size",
3490                                 &snapid, sizeof (snapid),
3491                                 &size_buf, sizeof (size_buf));
3492         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3493         if (ret < 0)
3494                 return ret;
3495         if (ret < sizeof (size_buf))
3496                 return -ERANGE;
3497
3498         if (order)
3499                 *order = size_buf.order;
3500         *snap_size = le64_to_cpu(size_buf.size);
3501
3502         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3503                 (unsigned long long)snap_id, (unsigned int)*order,
3504                 (unsigned long long)*snap_size);
3505
3506         return 0;
3507 }
3508
3509 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3510 {
3511         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3512                                         &rbd_dev->header.obj_order,
3513                                         &rbd_dev->header.image_size);
3514 }
3515
3516 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3517 {
3518         void *reply_buf;
3519         int ret;
3520         void *p;
3521
3522         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3523         if (!reply_buf)
3524                 return -ENOMEM;
3525
3526         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3527                                 "rbd", "get_object_prefix", NULL, 0,
3528                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3529         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3530         if (ret < 0)
3531                 goto out;
3532
3533         p = reply_buf;
3534         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3535                                                 p + ret, NULL, GFP_NOIO);
3536         ret = 0;
3537
3538         if (IS_ERR(rbd_dev->header.object_prefix)) {
3539                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3540                 rbd_dev->header.object_prefix = NULL;
3541         } else {
3542                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3543         }
3544 out:
3545         kfree(reply_buf);
3546
3547         return ret;
3548 }
3549
3550 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3551                 u64 *snap_features)
3552 {
3553         __le64 snapid = cpu_to_le64(snap_id);
3554         struct {
3555                 __le64 features;
3556                 __le64 incompat;
3557         } __attribute__ ((packed)) features_buf = { 0 };
3558         u64 incompat;
3559         int ret;
3560
3561         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3562                                 "rbd", "get_features",
3563                                 &snapid, sizeof (snapid),
3564                                 &features_buf, sizeof (features_buf));
3565         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3566         if (ret < 0)
3567                 return ret;
3568         if (ret < sizeof (features_buf))
3569                 return -ERANGE;
3570
3571         incompat = le64_to_cpu(features_buf.incompat);
3572         if (incompat & ~RBD_FEATURES_SUPPORTED)
3573                 return -ENXIO;
3574
3575         *snap_features = le64_to_cpu(features_buf.features);
3576
3577         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3578                 (unsigned long long)snap_id,
3579                 (unsigned long long)*snap_features,
3580                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3581
3582         return 0;
3583 }
3584
3585 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3586 {
3587         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3588                                                 &rbd_dev->header.features);
3589 }
3590
3591 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3592 {
3593         struct rbd_spec *parent_spec;
3594         size_t size;
3595         void *reply_buf = NULL;
3596         __le64 snapid;
3597         void *p;
3598         void *end;
3599         char *image_id;
3600         u64 overlap;
3601         int ret;
3602
3603         parent_spec = rbd_spec_alloc();
3604         if (!parent_spec)
3605                 return -ENOMEM;
3606
3607         size = sizeof (__le64) +                                /* pool_id */
3608                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3609                 sizeof (__le64) +                               /* snap_id */
3610                 sizeof (__le64);                                /* overlap */
3611         reply_buf = kmalloc(size, GFP_KERNEL);
3612         if (!reply_buf) {
3613                 ret = -ENOMEM;
3614                 goto out_err;
3615         }
3616
3617         snapid = cpu_to_le64(CEPH_NOSNAP);
3618         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3619                                 "rbd", "get_parent",
3620                                 &snapid, sizeof (snapid),
3621                                 reply_buf, size);
3622         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3623         if (ret < 0)
3624                 goto out_err;
3625
3626         p = reply_buf;
3627         end = reply_buf + ret;
3628         ret = -ERANGE;
3629         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3630         if (parent_spec->pool_id == CEPH_NOPOOL)
3631                 goto out;       /* No parent?  No problem. */
3632
3633         /* The ceph file layout needs to fit pool id in 32 bits */
3634
3635         ret = -EIO;
3636         if (parent_spec->pool_id > (u64)U32_MAX) {
3637                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3638                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3639                 goto out_err;
3640         }
3641
3642         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3643         if (IS_ERR(image_id)) {
3644                 ret = PTR_ERR(image_id);
3645                 goto out_err;
3646         }
3647         parent_spec->image_id = image_id;
3648         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3649         ceph_decode_64_safe(&p, end, overlap, out_err);
3650
3651         rbd_dev->parent_overlap = overlap;
3652         rbd_dev->parent_spec = parent_spec;
3653         parent_spec = NULL;     /* rbd_dev now owns this */
3654 out:
3655         ret = 0;
3656 out_err:
3657         kfree(reply_buf);
3658         rbd_spec_put(parent_spec);
3659
3660         return ret;
3661 }
3662
3663 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3664 {
3665         struct {
3666                 __le64 stripe_unit;
3667                 __le64 stripe_count;
3668         } __attribute__ ((packed)) striping_info_buf = { 0 };
3669         size_t size = sizeof (striping_info_buf);
3670         void *p;
3671         u64 obj_size;
3672         u64 stripe_unit;
3673         u64 stripe_count;
3674         int ret;
3675
3676         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3677                                 "rbd", "get_stripe_unit_count", NULL, 0,
3678                                 (char *)&striping_info_buf, size);
3679         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3680         if (ret < 0)
3681                 return ret;
3682         if (ret < size)
3683                 return -ERANGE;
3684
3685         /*
3686          * We don't actually support the "fancy striping" feature
3687          * (STRIPINGV2) yet, but if the striping sizes are the
3688          * defaults the behavior is the same as before.  So find
3689          * out, and only fail if the image has non-default values.
3690          */
3691         ret = -EINVAL;
3692         obj_size = (u64)1 << rbd_dev->header.obj_order;
3693         p = &striping_info_buf;
3694         stripe_unit = ceph_decode_64(&p);
3695         if (stripe_unit != obj_size) {
3696                 rbd_warn(rbd_dev, "unsupported stripe unit "
3697                                 "(got %llu want %llu)",
3698                                 stripe_unit, obj_size);
3699                 return -EINVAL;
3700         }
3701         stripe_count = ceph_decode_64(&p);
3702         if (stripe_count != 1) {
3703                 rbd_warn(rbd_dev, "unsupported stripe count "
3704                                 "(got %llu want 1)", stripe_count);
3705                 return -EINVAL;
3706         }
3707         rbd_dev->header.stripe_unit = stripe_unit;
3708         rbd_dev->header.stripe_count = stripe_count;
3709
3710         return 0;
3711 }
3712
3713 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3714 {
3715         size_t image_id_size;
3716         char *image_id;
3717         void *p;
3718         void *end;
3719         size_t size;
3720         void *reply_buf = NULL;
3721         size_t len = 0;
3722         char *image_name = NULL;
3723         int ret;
3724
3725         rbd_assert(!rbd_dev->spec->image_name);
3726
3727         len = strlen(rbd_dev->spec->image_id);
3728         image_id_size = sizeof (__le32) + len;
3729         image_id = kmalloc(image_id_size, GFP_KERNEL);
3730         if (!image_id)
3731                 return NULL;
3732
3733         p = image_id;
3734         end = image_id + image_id_size;
3735         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3736
3737         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3738         reply_buf = kmalloc(size, GFP_KERNEL);
3739         if (!reply_buf)
3740                 goto out;
3741
3742         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3743                                 "rbd", "dir_get_name",
3744                                 image_id, image_id_size,
3745                                 reply_buf, size);
3746         if (ret < 0)
3747                 goto out;
3748         p = reply_buf;
3749         end = reply_buf + ret;
3750
3751         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3752         if (IS_ERR(image_name))
3753                 image_name = NULL;
3754         else
3755                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3756 out:
3757         kfree(reply_buf);
3758         kfree(image_id);
3759
3760         return image_name;
3761 }
3762
3763 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3764 {
3765         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3766         const char *snap_name;
3767         u32 which = 0;
3768
3769         /* Skip over names until we find the one we are looking for */
3770
3771         snap_name = rbd_dev->header.snap_names;
3772         while (which < snapc->num_snaps) {
3773                 if (!strcmp(name, snap_name))
3774                         return snapc->snaps[which];
3775                 snap_name += strlen(snap_name) + 1;
3776                 which++;
3777         }
3778         return CEPH_NOSNAP;
3779 }
3780
3781 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3782 {
3783         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3784         u32 which;
3785         bool found = false;
3786         u64 snap_id;
3787
3788         for (which = 0; !found && which < snapc->num_snaps; which++) {
3789                 const char *snap_name;
3790
3791                 snap_id = snapc->snaps[which];
3792                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3793                 if (IS_ERR(snap_name))
3794                         break;
3795                 found = !strcmp(name, snap_name);
3796                 kfree(snap_name);
3797         }
3798         return found ? snap_id : CEPH_NOSNAP;
3799 }
3800
3801 /*
3802  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3803  * no snapshot by that name is found, or if an error occurs.
3804  */
3805 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3806 {
3807         if (rbd_dev->image_format == 1)
3808                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3809
3810         return rbd_v2_snap_id_by_name(rbd_dev, name);
3811 }
3812
3813 /*
3814  * When an rbd image has a parent image, it is identified by the
3815  * pool, image, and snapshot ids (not names).  This function fills
3816  * in the names for those ids.  (It's OK if we can't figure out the
3817  * name for an image id, but the pool and snapshot ids should always
3818  * exist and have names.)  All names in an rbd spec are dynamically
3819  * allocated.
3820  *
3821  * When an image being mapped (not a parent) is probed, we have the
3822  * pool name and pool id, image name and image id, and the snapshot
3823  * name.  The only thing we're missing is the snapshot id.
3824  */
3825 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3826 {
3827         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3828         struct rbd_spec *spec = rbd_dev->spec;
3829         const char *pool_name;
3830         const char *image_name;
3831         const char *snap_name;
3832         int ret;
3833
3834         /*
3835          * An image being mapped will have the pool name (etc.), but
3836          * we need to look up the snapshot id.
3837          */
3838         if (spec->pool_name) {
3839                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3840                         u64 snap_id;
3841
3842                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3843                         if (snap_id == CEPH_NOSNAP)
3844                                 return -ENOENT;
3845                         spec->snap_id = snap_id;
3846                 } else {
3847                         spec->snap_id = CEPH_NOSNAP;
3848                 }
3849
3850                 return 0;
3851         }
3852
3853         /* Get the pool name; we have to make our own copy of this */
3854
3855         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3856         if (!pool_name) {
3857                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3858                 return -EIO;
3859         }
3860         pool_name = kstrdup(pool_name, GFP_KERNEL);
3861         if (!pool_name)
3862                 return -ENOMEM;
3863
3864         /* Fetch the image name; tolerate failure here */
3865
3866         image_name = rbd_dev_image_name(rbd_dev);
3867         if (!image_name)
3868                 rbd_warn(rbd_dev, "unable to get image name");
3869
3870         /* Look up the snapshot name, and make a copy */
3871
3872         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3873         if (!snap_name) {
3874                 ret = -ENOMEM;
3875                 goto out_err;
3876         }
3877
3878         spec->pool_name = pool_name;
3879         spec->image_name = image_name;
3880         spec->snap_name = snap_name;
3881
3882         return 0;
3883 out_err:
3884         kfree(image_name);
3885         kfree(pool_name);
3886
3887         return ret;
3888 }
3889
3890 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3891 {
3892         size_t size;
3893         int ret;
3894         void *reply_buf;
3895         void *p;
3896         void *end;
3897         u64 seq;
3898         u32 snap_count;
3899         struct ceph_snap_context *snapc;
3900         u32 i;
3901
3902         /*
3903          * We'll need room for the seq value (maximum snapshot id),
3904          * snapshot count, and array of that many snapshot ids.
3905          * For now we have a fixed upper limit on the number we're
3906          * prepared to receive.
3907          */
3908         size = sizeof (__le64) + sizeof (__le32) +
3909                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3910         reply_buf = kzalloc(size, GFP_KERNEL);
3911         if (!reply_buf)
3912                 return -ENOMEM;
3913
3914         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3915                                 "rbd", "get_snapcontext", NULL, 0,
3916                                 reply_buf, size);
3917         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3918         if (ret < 0)
3919                 goto out;
3920
3921         p = reply_buf;
3922         end = reply_buf + ret;
3923         ret = -ERANGE;
3924         ceph_decode_64_safe(&p, end, seq, out);
3925         ceph_decode_32_safe(&p, end, snap_count, out);
3926
3927         /*
3928          * Make sure the reported number of snapshot ids wouldn't go
3929          * beyond the end of our buffer.  But before checking that,
3930          * make sure the computed size of the snapshot context we
3931          * allocate is representable in a size_t.
3932          */
3933         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3934                                  / sizeof (u64)) {
3935                 ret = -EINVAL;
3936                 goto out;
3937         }
3938         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3939                 goto out;
3940         ret = 0;
3941
3942         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3943         if (!snapc) {
3944                 ret = -ENOMEM;
3945                 goto out;
3946         }
3947         snapc->seq = seq;
3948         for (i = 0; i < snap_count; i++)
3949                 snapc->snaps[i] = ceph_decode_64(&p);
3950
3951         ceph_put_snap_context(rbd_dev->header.snapc);
3952         rbd_dev->header.snapc = snapc;
3953
3954         dout("  snap context seq = %llu, snap_count = %u\n",
3955                 (unsigned long long)seq, (unsigned int)snap_count);
3956 out:
3957         kfree(reply_buf);
3958
3959         return ret;
3960 }
3961
3962 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3963                                         u64 snap_id)
3964 {
3965         size_t size;
3966         void *reply_buf;
3967         __le64 snapid;
3968         int ret;
3969         void *p;
3970         void *end;
3971         char *snap_name;
3972
3973         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3974         reply_buf = kmalloc(size, GFP_KERNEL);
3975         if (!reply_buf)
3976                 return ERR_PTR(-ENOMEM);
3977
3978         snapid = cpu_to_le64(snap_id);
3979         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3980                                 "rbd", "get_snapshot_name",
3981                                 &snapid, sizeof (snapid),
3982                                 reply_buf, size);
3983         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3984         if (ret < 0) {
3985                 snap_name = ERR_PTR(ret);
3986                 goto out;
3987         }
3988
3989         p = reply_buf;
3990         end = reply_buf + ret;
3991         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3992         if (IS_ERR(snap_name))
3993                 goto out;
3994
3995         dout("  snap_id 0x%016llx snap_name = %s\n",
3996                 (unsigned long long)snap_id, snap_name);
3997 out:
3998         kfree(reply_buf);
3999
4000         return snap_name;
4001 }
4002
4003 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4004 {
4005         bool first_time = rbd_dev->header.object_prefix == NULL;
4006         int ret;
4007
4008         down_write(&rbd_dev->header_rwsem);
4009
4010         if (first_time) {
4011                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4012                 if (ret)
4013                         goto out;
4014         }
4015
4016         ret = rbd_dev_v2_image_size(rbd_dev);
4017         if (ret)
4018                 goto out;
4019         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4020                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4021                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4022
4023         ret = rbd_dev_v2_snap_context(rbd_dev);
4024         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4025         if (ret)
4026                 goto out;
4027 out:
4028         up_write(&rbd_dev->header_rwsem);
4029
4030         return ret;
4031 }
4032
4033 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4034 {
4035         struct device *dev;
4036         int ret;
4037
4038         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4039
4040         dev = &rbd_dev->dev;
4041         dev->bus = &rbd_bus_type;
4042         dev->type = &rbd_device_type;
4043         dev->parent = &rbd_root_dev;
4044         dev->release = rbd_dev_device_release;
4045         dev_set_name(dev, "%d", rbd_dev->dev_id);
4046         ret = device_register(dev);
4047
4048         mutex_unlock(&ctl_mutex);
4049
4050         return ret;
4051 }
4052
4053 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4054 {
4055         device_unregister(&rbd_dev->dev);
4056 }
4057
4058 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4059
4060 /*
4061  * Get a unique rbd identifier for the given new rbd_dev, and add
4062  * the rbd_dev to the global list.  The minimum rbd id is 1.
4063  */
4064 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4065 {
4066         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4067
4068         spin_lock(&rbd_dev_list_lock);
4069         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4070         spin_unlock(&rbd_dev_list_lock);
4071         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4072                 (unsigned long long) rbd_dev->dev_id);
4073 }
4074
4075 /*
4076  * Remove an rbd_dev from the global list, and record that its
4077  * identifier is no longer in use.
4078  */
4079 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4080 {
4081         struct list_head *tmp;
4082         int rbd_id = rbd_dev->dev_id;
4083         int max_id;
4084
4085         rbd_assert(rbd_id > 0);
4086
4087         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4088                 (unsigned long long) rbd_dev->dev_id);
4089         spin_lock(&rbd_dev_list_lock);
4090         list_del_init(&rbd_dev->node);
4091
4092         /*
4093          * If the id being "put" is not the current maximum, there
4094          * is nothing special we need to do.
4095          */
4096         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4097                 spin_unlock(&rbd_dev_list_lock);
4098                 return;
4099         }
4100
4101         /*
4102          * We need to update the current maximum id.  Search the
4103          * list to find out what it is.  We're more likely to find
4104          * the maximum at the end, so search the list backward.
4105          */
4106         max_id = 0;
4107         list_for_each_prev(tmp, &rbd_dev_list) {
4108                 struct rbd_device *rbd_dev;
4109
4110                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4111                 if (rbd_dev->dev_id > max_id)
4112                         max_id = rbd_dev->dev_id;
4113         }
4114         spin_unlock(&rbd_dev_list_lock);
4115
4116         /*
4117          * The max id could have been updated by rbd_dev_id_get(), in
4118          * which case it now accurately reflects the new maximum.
4119          * Be careful not to overwrite the maximum value in that
4120          * case.
4121          */
4122         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4123         dout("  max dev id has been reset\n");
4124 }
4125
4126 /*
4127  * Skips over white space at *buf, and updates *buf to point to the
4128  * first found non-space character (if any). Returns the length of
4129  * the token (string of non-white space characters) found.  Note
4130  * that *buf must be terminated with '\0'.
4131  */
4132 static inline size_t next_token(const char **buf)
4133 {
4134         /*
4135         * These are the characters that produce nonzero for
4136         * isspace() in the "C" and "POSIX" locales.
4137         */
4138         const char *spaces = " \f\n\r\t\v";
4139
4140         *buf += strspn(*buf, spaces);   /* Find start of token */
4141
4142         return strcspn(*buf, spaces);   /* Return token length */
4143 }
4144
4145 /*
4146  * Finds the next token in *buf, and if the provided token buffer is
4147  * big enough, copies the found token into it.  The result, if
4148  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4149  * must be terminated with '\0' on entry.
4150  *
4151  * Returns the length of the token found (not including the '\0').
4152  * Return value will be 0 if no token is found, and it will be >=
4153  * token_size if the token would not fit.
4154  *
4155  * The *buf pointer will be updated to point beyond the end of the
4156  * found token.  Note that this occurs even if the token buffer is
4157  * too small to hold it.
4158  */
4159 static inline size_t copy_token(const char **buf,
4160                                 char *token,
4161                                 size_t token_size)
4162 {
4163         size_t len;
4164
4165         len = next_token(buf);
4166         if (len < token_size) {
4167                 memcpy(token, *buf, len);
4168                 *(token + len) = '\0';
4169         }
4170         *buf += len;
4171
4172         return len;
4173 }
4174
4175 /*
4176  * Finds the next token in *buf, dynamically allocates a buffer big
4177  * enough to hold a copy of it, and copies the token into the new
4178  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4179  * that a duplicate buffer is created even for a zero-length token.
4180  *
4181  * Returns a pointer to the newly-allocated duplicate, or a null
4182  * pointer if memory for the duplicate was not available.  If
4183  * the lenp argument is a non-null pointer, the length of the token
4184  * (not including the '\0') is returned in *lenp.
4185  *
4186  * If successful, the *buf pointer will be updated to point beyond
4187  * the end of the found token.
4188  *
4189  * Note: uses GFP_KERNEL for allocation.
4190  */
4191 static inline char *dup_token(const char **buf, size_t *lenp)
4192 {
4193         char *dup;
4194         size_t len;
4195
4196         len = next_token(buf);
4197         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4198         if (!dup)
4199                 return NULL;
4200         *(dup + len) = '\0';
4201         *buf += len;
4202
4203         if (lenp)
4204                 *lenp = len;
4205
4206         return dup;
4207 }
4208
4209 /*
4210  * Parse the options provided for an "rbd add" (i.e., rbd image
4211  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4212  * and the data written is passed here via a NUL-terminated buffer.
4213  * Returns 0 if successful or an error code otherwise.
4214  *
4215  * The information extracted from these options is recorded in
4216  * the other parameters which return dynamically-allocated
4217  * structures:
4218  *  ceph_opts
4219  *      The address of a pointer that will refer to a ceph options
4220  *      structure.  Caller must release the returned pointer using
4221  *      ceph_destroy_options() when it is no longer needed.
4222  *  rbd_opts
4223  *      Address of an rbd options pointer.  Fully initialized by
4224  *      this function; caller must release with kfree().
4225  *  spec
4226  *      Address of an rbd image specification pointer.  Fully
4227  *      initialized by this function based on parsed options.
4228  *      Caller must release with rbd_spec_put().
4229  *
4230  * The options passed take this form:
4231  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4232  * where:
4233  *  <mon_addrs>
4234  *      A comma-separated list of one or more monitor addresses.
4235  *      A monitor address is an ip address, optionally followed
4236  *      by a port number (separated by a colon).
4237  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4238  *  <options>
4239  *      A comma-separated list of ceph and/or rbd options.
4240  *  <pool_name>
4241  *      The name of the rados pool containing the rbd image.
4242  *  <image_name>
4243  *      The name of the image in that pool to map.
4244  *  <snap_id>
4245  *      An optional snapshot id.  If provided, the mapping will
4246  *      present data from the image at the time that snapshot was
4247  *      created.  The image head is used if no snapshot id is
4248  *      provided.  Snapshot mappings are always read-only.
4249  */
4250 static int rbd_add_parse_args(const char *buf,
4251                                 struct ceph_options **ceph_opts,
4252                                 struct rbd_options **opts,
4253                                 struct rbd_spec **rbd_spec)
4254 {
4255         size_t len;
4256         char *options;
4257         const char *mon_addrs;
4258         char *snap_name;
4259         size_t mon_addrs_size;
4260         struct rbd_spec *spec = NULL;
4261         struct rbd_options *rbd_opts = NULL;
4262         struct ceph_options *copts;
4263         int ret;
4264
4265         /* The first four tokens are required */
4266
4267         len = next_token(&buf);
4268         if (!len) {
4269                 rbd_warn(NULL, "no monitor address(es) provided");
4270                 return -EINVAL;
4271         }
4272         mon_addrs = buf;
4273         mon_addrs_size = len + 1;
4274         buf += len;
4275
4276         ret = -EINVAL;
4277         options = dup_token(&buf, NULL);
4278         if (!options)
4279                 return -ENOMEM;
4280         if (!*options) {
4281                 rbd_warn(NULL, "no options provided");
4282                 goto out_err;
4283         }
4284
4285         spec = rbd_spec_alloc();
4286         if (!spec)
4287                 goto out_mem;
4288
4289         spec->pool_name = dup_token(&buf, NULL);
4290         if (!spec->pool_name)
4291                 goto out_mem;
4292         if (!*spec->pool_name) {
4293                 rbd_warn(NULL, "no pool name provided");
4294                 goto out_err;
4295         }
4296
4297         spec->image_name = dup_token(&buf, NULL);
4298         if (!spec->image_name)
4299                 goto out_mem;
4300         if (!*spec->image_name) {
4301                 rbd_warn(NULL, "no image name provided");
4302                 goto out_err;
4303         }
4304
4305         /*
4306          * Snapshot name is optional; default is to use "-"
4307          * (indicating the head/no snapshot).
4308          */
4309         len = next_token(&buf);
4310         if (!len) {
4311                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4312                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4313         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4314                 ret = -ENAMETOOLONG;
4315                 goto out_err;
4316         }
4317         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4318         if (!snap_name)
4319                 goto out_mem;
4320         *(snap_name + len) = '\0';
4321         spec->snap_name = snap_name;
4322
4323         /* Initialize all rbd options to the defaults */
4324
4325         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4326         if (!rbd_opts)
4327                 goto out_mem;
4328
4329         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4330
4331         copts = ceph_parse_options(options, mon_addrs,
4332                                         mon_addrs + mon_addrs_size - 1,
4333                                         parse_rbd_opts_token, rbd_opts);
4334         if (IS_ERR(copts)) {
4335                 ret = PTR_ERR(copts);
4336                 goto out_err;
4337         }
4338         kfree(options);
4339
4340         *ceph_opts = copts;
4341         *opts = rbd_opts;
4342         *rbd_spec = spec;
4343
4344         return 0;
4345 out_mem:
4346         ret = -ENOMEM;
4347 out_err:
4348         kfree(rbd_opts);
4349         rbd_spec_put(spec);
4350         kfree(options);
4351
4352         return ret;
4353 }
4354
4355 /*
4356  * An rbd format 2 image has a unique identifier, distinct from the
4357  * name given to it by the user.  Internally, that identifier is
4358  * what's used to specify the names of objects related to the image.
4359  *
4360  * A special "rbd id" object is used to map an rbd image name to its
4361  * id.  If that object doesn't exist, then there is no v2 rbd image
4362  * with the supplied name.
4363  *
4364  * This function will record the given rbd_dev's image_id field if
4365  * it can be determined, and in that case will return 0.  If any
4366  * errors occur a negative errno will be returned and the rbd_dev's
4367  * image_id field will be unchanged (and should be NULL).
4368  */
4369 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4370 {
4371         int ret;
4372         size_t size;
4373         char *object_name;
4374         void *response;
4375         char *image_id;
4376
4377         /*
4378          * When probing a parent image, the image id is already
4379          * known (and the image name likely is not).  There's no
4380          * need to fetch the image id again in this case.  We
4381          * do still need to set the image format though.
4382          */
4383         if (rbd_dev->spec->image_id) {
4384                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4385
4386                 return 0;
4387         }
4388
4389         /*
4390          * First, see if the format 2 image id file exists, and if
4391          * so, get the image's persistent id from it.
4392          */
4393         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4394         object_name = kmalloc(size, GFP_NOIO);
4395         if (!object_name)
4396                 return -ENOMEM;
4397         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4398         dout("rbd id object name is %s\n", object_name);
4399
4400         /* Response will be an encoded string, which includes a length */
4401
4402         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4403         response = kzalloc(size, GFP_NOIO);
4404         if (!response) {
4405                 ret = -ENOMEM;
4406                 goto out;
4407         }
4408
4409         /* If it doesn't exist we'll assume it's a format 1 image */
4410
4411         ret = rbd_obj_method_sync(rbd_dev, object_name,
4412                                 "rbd", "get_id", NULL, 0,
4413                                 response, RBD_IMAGE_ID_LEN_MAX);
4414         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4415         if (ret == -ENOENT) {
4416                 image_id = kstrdup("", GFP_KERNEL);
4417                 ret = image_id ? 0 : -ENOMEM;
4418                 if (!ret)
4419                         rbd_dev->image_format = 1;
4420         } else if (ret > sizeof (__le32)) {
4421                 void *p = response;
4422
4423                 image_id = ceph_extract_encoded_string(&p, p + ret,
4424                                                 NULL, GFP_NOIO);
4425                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4426                 if (!ret)
4427                         rbd_dev->image_format = 2;
4428         } else {
4429                 ret = -EINVAL;
4430         }
4431
4432         if (!ret) {
4433                 rbd_dev->spec->image_id = image_id;
4434                 dout("image_id is %s\n", image_id);
4435         }
4436 out:
4437         kfree(response);
4438         kfree(object_name);
4439
4440         return ret;
4441 }
4442
4443 /* Undo whatever state changes are made by v1 or v2 image probe */
4444
4445 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4446 {
4447         struct rbd_image_header *header;
4448
4449         rbd_dev_remove_parent(rbd_dev);
4450         rbd_spec_put(rbd_dev->parent_spec);
4451         rbd_dev->parent_spec = NULL;
4452         rbd_dev->parent_overlap = 0;
4453
4454         /* Free dynamic fields from the header, then zero it out */
4455
4456         header = &rbd_dev->header;
4457         ceph_put_snap_context(header->snapc);
4458         kfree(header->snap_sizes);
4459         kfree(header->snap_names);
4460         kfree(header->object_prefix);
4461         memset(header, 0, sizeof (*header));
4462 }
4463
4464 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4465 {
4466         int ret;
4467
4468         ret = rbd_dev_v2_object_prefix(rbd_dev);
4469         if (ret)
4470                 goto out_err;
4471
4472         /*
4473          * Get the and check features for the image.  Currently the
4474          * features are assumed to never change.
4475          */
4476         ret = rbd_dev_v2_features(rbd_dev);
4477         if (ret)
4478                 goto out_err;
4479
4480         /* If the image supports layering, get the parent info */
4481
4482         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4483                 ret = rbd_dev_v2_parent_info(rbd_dev);
4484                 if (ret)
4485                         goto out_err;
4486                 /*
4487                  * Print a warning if this image has a parent.
4488                  * Don't print it if the image now being probed
4489                  * is itself a parent.  We can tell at this point
4490                  * because we won't know its pool name yet (just its
4491                  * pool id).
4492                  */
4493                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4494                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4495                                         "is EXPERIMENTAL!");
4496         }
4497
4498         /* If the image supports fancy striping, get its parameters */
4499
4500         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4501                 ret = rbd_dev_v2_striping_info(rbd_dev);
4502                 if (ret < 0)
4503                         goto out_err;
4504         }
4505         /* No support for crypto and compression type format 2 images */
4506
4507         return 0;
4508 out_err:
4509         rbd_dev->parent_overlap = 0;
4510         rbd_spec_put(rbd_dev->parent_spec);
4511         rbd_dev->parent_spec = NULL;
4512         kfree(rbd_dev->header_name);
4513         rbd_dev->header_name = NULL;
4514         kfree(rbd_dev->header.object_prefix);
4515         rbd_dev->header.object_prefix = NULL;
4516
4517         return ret;
4518 }
4519
4520 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4521 {
4522         struct rbd_device *parent = NULL;
4523         struct rbd_spec *parent_spec;
4524         struct rbd_client *rbdc;
4525         int ret;
4526
4527         if (!rbd_dev->parent_spec)
4528                 return 0;
4529         /*
4530          * We need to pass a reference to the client and the parent
4531          * spec when creating the parent rbd_dev.  Images related by
4532          * parent/child relationships always share both.
4533          */
4534         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4535         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4536
4537         ret = -ENOMEM;
4538         parent = rbd_dev_create(rbdc, parent_spec);
4539         if (!parent)
4540                 goto out_err;
4541
4542         ret = rbd_dev_image_probe(parent, false);
4543         if (ret < 0)
4544                 goto out_err;
4545         rbd_dev->parent = parent;
4546
4547         return 0;
4548 out_err:
4549         if (parent) {
4550                 rbd_spec_put(rbd_dev->parent_spec);
4551                 kfree(rbd_dev->header_name);
4552                 rbd_dev_destroy(parent);
4553         } else {
4554                 rbd_put_client(rbdc);
4555                 rbd_spec_put(parent_spec);
4556         }
4557
4558         return ret;
4559 }
4560
4561 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4562 {
4563         int ret;
4564
4565         /* generate unique id: find highest unique id, add one */
4566         rbd_dev_id_get(rbd_dev);
4567
4568         /* Fill in the device name, now that we have its id. */
4569         BUILD_BUG_ON(DEV_NAME_LEN
4570                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4571         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4572
4573         /* Get our block major device number. */
4574
4575         ret = register_blkdev(0, rbd_dev->name);
4576         if (ret < 0)
4577                 goto err_out_id;
4578         rbd_dev->major = ret;
4579
4580         /* Set up the blkdev mapping. */
4581
4582         ret = rbd_init_disk(rbd_dev);
4583         if (ret)
4584                 goto err_out_blkdev;
4585
4586         ret = rbd_dev_mapping_set(rbd_dev);
4587         if (ret)
4588                 goto err_out_disk;
4589         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4590
4591         ret = rbd_bus_add_dev(rbd_dev);
4592         if (ret)
4593                 goto err_out_mapping;
4594
4595         /* Everything's ready.  Announce the disk to the world. */
4596
4597         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4598         add_disk(rbd_dev->disk);
4599
4600         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4601                 (unsigned long long) rbd_dev->mapping.size);
4602
4603         return ret;
4604
4605 err_out_mapping:
4606         rbd_dev_mapping_clear(rbd_dev);
4607 err_out_disk:
4608         rbd_free_disk(rbd_dev);
4609 err_out_blkdev:
4610         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4611 err_out_id:
4612         rbd_dev_id_put(rbd_dev);
4613         rbd_dev_mapping_clear(rbd_dev);
4614
4615         return ret;
4616 }
4617
4618 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4619 {
4620         struct rbd_spec *spec = rbd_dev->spec;
4621         size_t size;
4622
4623         /* Record the header object name for this rbd image. */
4624
4625         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4626
4627         if (rbd_dev->image_format == 1)
4628                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4629         else
4630                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4631
4632         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4633         if (!rbd_dev->header_name)
4634                 return -ENOMEM;
4635
4636         if (rbd_dev->image_format == 1)
4637                 sprintf(rbd_dev->header_name, "%s%s",
4638                         spec->image_name, RBD_SUFFIX);
4639         else
4640                 sprintf(rbd_dev->header_name, "%s%s",
4641                         RBD_HEADER_PREFIX, spec->image_id);
4642         return 0;
4643 }
4644
4645 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4646 {
4647         rbd_dev_unprobe(rbd_dev);
4648         kfree(rbd_dev->header_name);
4649         rbd_dev->header_name = NULL;
4650         rbd_dev->image_format = 0;
4651         kfree(rbd_dev->spec->image_id);
4652         rbd_dev->spec->image_id = NULL;
4653
4654         rbd_dev_destroy(rbd_dev);
4655 }
4656
4657 /*
4658  * Probe for the existence of the header object for the given rbd
4659  * device.  If this image is the one being mapped (i.e., not a
4660  * parent), initiate a watch on its header object before using that
4661  * object to get detailed information about the rbd image.
4662  */
4663 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4664 {
4665         int ret;
4666         int tmp;
4667
4668         /*
4669          * Get the id from the image id object.  If it's not a
4670          * format 2 image, we'll get ENOENT back, and we'll assume
4671          * it's a format 1 image.
4672          */
4673         ret = rbd_dev_image_id(rbd_dev);
4674         if (ret)
4675                 return ret;
4676         rbd_assert(rbd_dev->spec->image_id);
4677         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4678
4679         ret = rbd_dev_header_name(rbd_dev);
4680         if (ret)
4681                 goto err_out_format;
4682
4683         if (mapping) {
4684                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4685                 if (ret)
4686                         goto out_header_name;
4687         }
4688
4689         if (rbd_dev->image_format == 1)
4690                 ret = rbd_dev_v1_header_info(rbd_dev);
4691         else
4692                 ret = rbd_dev_v2_header_info(rbd_dev);
4693         if (ret)
4694                 goto err_out_watch;
4695
4696         ret = rbd_dev_spec_update(rbd_dev);
4697         if (ret)
4698                 goto err_out_probe;
4699
4700         ret = rbd_dev_probe_parent(rbd_dev);
4701         if (ret)
4702                 goto err_out_probe;
4703
4704         dout("discovered format %u image, header name is %s\n",
4705                 rbd_dev->image_format, rbd_dev->header_name);
4706
4707         return 0;
4708 err_out_probe:
4709         rbd_dev_unprobe(rbd_dev);
4710 err_out_watch:
4711         if (mapping) {
4712                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4713                 if (tmp)
4714                         rbd_warn(rbd_dev, "unable to tear down "
4715                                         "watch request (%d)\n", tmp);
4716         }
4717 out_header_name:
4718         kfree(rbd_dev->header_name);
4719         rbd_dev->header_name = NULL;
4720 err_out_format:
4721         rbd_dev->image_format = 0;
4722         kfree(rbd_dev->spec->image_id);
4723         rbd_dev->spec->image_id = NULL;
4724
4725         dout("probe failed, returning %d\n", ret);
4726
4727         return ret;
4728 }
4729
4730 static ssize_t rbd_add(struct bus_type *bus,
4731                        const char *buf,
4732                        size_t count)
4733 {
4734         struct rbd_device *rbd_dev = NULL;
4735         struct ceph_options *ceph_opts = NULL;
4736         struct rbd_options *rbd_opts = NULL;
4737         struct rbd_spec *spec = NULL;
4738         struct rbd_client *rbdc;
4739         struct ceph_osd_client *osdc;
4740         bool read_only;
4741         int rc = -ENOMEM;
4742
4743         if (!try_module_get(THIS_MODULE))
4744                 return -ENODEV;
4745
4746         /* parse add command */
4747         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4748         if (rc < 0)
4749                 goto err_out_module;
4750         read_only = rbd_opts->read_only;
4751         kfree(rbd_opts);
4752         rbd_opts = NULL;        /* done with this */
4753
4754         rbdc = rbd_get_client(ceph_opts);
4755         if (IS_ERR(rbdc)) {
4756                 rc = PTR_ERR(rbdc);
4757                 goto err_out_args;
4758         }
4759         ceph_opts = NULL;       /* rbd_dev client now owns this */
4760
4761         /* pick the pool */
4762         osdc = &rbdc->client->osdc;
4763         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4764         if (rc < 0)
4765                 goto err_out_client;
4766         spec->pool_id = (u64)rc;
4767
4768         /* The ceph file layout needs to fit pool id in 32 bits */
4769
4770         if (spec->pool_id > (u64)U32_MAX) {
4771                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4772                                 (unsigned long long)spec->pool_id, U32_MAX);
4773                 rc = -EIO;
4774                 goto err_out_client;
4775         }
4776
4777         rbd_dev = rbd_dev_create(rbdc, spec);
4778         if (!rbd_dev)
4779                 goto err_out_client;
4780         rbdc = NULL;            /* rbd_dev now owns this */
4781         spec = NULL;            /* rbd_dev now owns this */
4782
4783         rc = rbd_dev_image_probe(rbd_dev, true);
4784         if (rc < 0)
4785                 goto err_out_rbd_dev;
4786
4787         /* If we are mapping a snapshot it must be marked read-only */
4788
4789         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4790                 read_only = true;
4791         rbd_dev->mapping.read_only = read_only;
4792
4793         rc = rbd_dev_device_setup(rbd_dev);
4794         if (!rc)
4795                 return count;
4796
4797         rbd_dev_image_release(rbd_dev);
4798 err_out_rbd_dev:
4799         rbd_dev_destroy(rbd_dev);
4800 err_out_client:
4801         rbd_put_client(rbdc);
4802 err_out_args:
4803         if (ceph_opts)
4804                 ceph_destroy_options(ceph_opts);
4805         kfree(rbd_opts);
4806         rbd_spec_put(spec);
4807 err_out_module:
4808         module_put(THIS_MODULE);
4809
4810         dout("Error adding device %s\n", buf);
4811
4812         return (ssize_t)rc;
4813 }
4814
4815 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4816 {
4817         struct list_head *tmp;
4818         struct rbd_device *rbd_dev;
4819
4820         spin_lock(&rbd_dev_list_lock);
4821         list_for_each(tmp, &rbd_dev_list) {
4822                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4823                 if (rbd_dev->dev_id == dev_id) {
4824                         spin_unlock(&rbd_dev_list_lock);
4825                         return rbd_dev;
4826                 }
4827         }
4828         spin_unlock(&rbd_dev_list_lock);
4829         return NULL;
4830 }
4831
4832 static void rbd_dev_device_release(struct device *dev)
4833 {
4834         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4835
4836         rbd_free_disk(rbd_dev);
4837         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4838         rbd_dev_mapping_clear(rbd_dev);
4839         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4840         rbd_dev->major = 0;
4841         rbd_dev_id_put(rbd_dev);
4842         rbd_dev_mapping_clear(rbd_dev);
4843 }
4844
4845 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4846 {
4847         while (rbd_dev->parent) {
4848                 struct rbd_device *first = rbd_dev;
4849                 struct rbd_device *second = first->parent;
4850                 struct rbd_device *third;
4851
4852                 /*
4853                  * Follow to the parent with no grandparent and
4854                  * remove it.
4855                  */
4856                 while (second && (third = second->parent)) {
4857                         first = second;
4858                         second = third;
4859                 }
4860                 rbd_assert(second);
4861                 rbd_dev_image_release(second);
4862                 first->parent = NULL;
4863                 first->parent_overlap = 0;
4864
4865                 rbd_assert(first->parent_spec);
4866                 rbd_spec_put(first->parent_spec);
4867                 first->parent_spec = NULL;
4868         }
4869 }
4870
4871 static ssize_t rbd_remove(struct bus_type *bus,
4872                           const char *buf,
4873                           size_t count)
4874 {
4875         struct rbd_device *rbd_dev = NULL;
4876         int target_id;
4877         unsigned long ul;
4878         int ret;
4879
4880         ret = strict_strtoul(buf, 10, &ul);
4881         if (ret)
4882                 return ret;
4883
4884         /* convert to int; abort if we lost anything in the conversion */
4885         target_id = (int) ul;
4886         if (target_id != ul)
4887                 return -EINVAL;
4888
4889         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4890
4891         rbd_dev = __rbd_get_dev(target_id);
4892         if (!rbd_dev) {
4893                 ret = -ENOENT;
4894                 goto done;
4895         }
4896
4897         spin_lock_irq(&rbd_dev->lock);
4898         if (rbd_dev->open_count)
4899                 ret = -EBUSY;
4900         else
4901                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4902         spin_unlock_irq(&rbd_dev->lock);
4903         if (ret < 0)
4904                 goto done;
4905         rbd_bus_del_dev(rbd_dev);
4906         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4907         if (ret)
4908                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4909         rbd_dev_image_release(rbd_dev);
4910         module_put(THIS_MODULE);
4911         ret = count;
4912 done:
4913         mutex_unlock(&ctl_mutex);
4914
4915         return ret;
4916 }
4917
4918 /*
4919  * create control files in sysfs
4920  * /sys/bus/rbd/...
4921  */
4922 static int rbd_sysfs_init(void)
4923 {
4924         int ret;
4925
4926         ret = device_register(&rbd_root_dev);
4927         if (ret < 0)
4928                 return ret;
4929
4930         ret = bus_register(&rbd_bus_type);
4931         if (ret < 0)
4932                 device_unregister(&rbd_root_dev);
4933
4934         return ret;
4935 }
4936
4937 static void rbd_sysfs_cleanup(void)
4938 {
4939         bus_unregister(&rbd_bus_type);
4940         device_unregister(&rbd_root_dev);
4941 }
4942
4943 static int rbd_slab_init(void)
4944 {
4945         rbd_assert(!rbd_img_request_cache);
4946         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4947                                         sizeof (struct rbd_img_request),
4948                                         __alignof__(struct rbd_img_request),
4949                                         0, NULL);
4950         if (!rbd_img_request_cache)
4951                 return -ENOMEM;
4952
4953         rbd_assert(!rbd_obj_request_cache);
4954         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4955                                         sizeof (struct rbd_obj_request),
4956                                         __alignof__(struct rbd_obj_request),
4957                                         0, NULL);
4958         if (!rbd_obj_request_cache)
4959                 goto out_err;
4960
4961         rbd_assert(!rbd_segment_name_cache);
4962         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4963                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4964         if (rbd_segment_name_cache)
4965                 return 0;
4966 out_err:
4967         if (rbd_obj_request_cache) {
4968                 kmem_cache_destroy(rbd_obj_request_cache);
4969                 rbd_obj_request_cache = NULL;
4970         }
4971
4972         kmem_cache_destroy(rbd_img_request_cache);
4973         rbd_img_request_cache = NULL;
4974
4975         return -ENOMEM;
4976 }
4977
4978 static void rbd_slab_exit(void)
4979 {
4980         rbd_assert(rbd_segment_name_cache);
4981         kmem_cache_destroy(rbd_segment_name_cache);
4982         rbd_segment_name_cache = NULL;
4983
4984         rbd_assert(rbd_obj_request_cache);
4985         kmem_cache_destroy(rbd_obj_request_cache);
4986         rbd_obj_request_cache = NULL;
4987
4988         rbd_assert(rbd_img_request_cache);
4989         kmem_cache_destroy(rbd_img_request_cache);
4990         rbd_img_request_cache = NULL;
4991 }
4992
4993 static int __init rbd_init(void)
4994 {
4995         int rc;
4996
4997         if (!libceph_compatible(NULL)) {
4998                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4999
5000                 return -EINVAL;
5001         }
5002         rc = rbd_slab_init();
5003         if (rc)
5004                 return rc;
5005         rc = rbd_sysfs_init();
5006         if (rc)
5007                 rbd_slab_exit();
5008         else
5009                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5010
5011         return rc;
5012 }
5013
5014 static void __exit rbd_exit(void)
5015 {
5016         rbd_sysfs_cleanup();
5017         rbd_slab_exit();
5018 }
5019
5020 module_init(rbd_init);
5021 module_exit(rbd_exit);
5022
5023 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5024 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5025 MODULE_DESCRIPTION("rados block device");
5026
5027 /* following authorship retained from original osdblk.c */
5028 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5029
5030 MODULE_LICENSE("GPL");