]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: refactor rbd_header_from_disk()
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
429 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
430                                         u64 snap_id);
431 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432                                 u8 *order, u64 *snap_size);
433 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
434                 u64 *snap_features);
435 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Fill an rbd image header with information from the given format 1
731  * on-disk header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         bool first_time = header->object_prefix == NULL;
737         struct ceph_snap_context *snapc;
738         char *object_prefix = NULL;
739         char *snap_names = NULL;
740         u64 *snap_sizes = NULL;
741         u32 snap_count;
742         size_t size;
743         int ret = -ENOMEM;
744         u32 i;
745
746         /* Allocate this now to avoid having to handle failure below */
747
748         if (first_time) {
749                 size_t len;
750
751                 len = strnlen(ondisk->object_prefix,
752                                 sizeof (ondisk->object_prefix));
753                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
754                 if (!object_prefix)
755                         return -ENOMEM;
756                 memcpy(object_prefix, ondisk->object_prefix, len);
757                 object_prefix[len] = '\0';
758         }
759
760         /* Allocate the snapshot context and fill it in */
761
762         snap_count = le32_to_cpu(ondisk->snap_count);
763         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
764         if (!snapc)
765                 goto out_err;
766         snapc->seq = le64_to_cpu(ondisk->snap_seq);
767         if (snap_count) {
768                 struct rbd_image_snap_ondisk *snaps;
769                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
770
771                 /* We'll keep a copy of the snapshot names... */
772
773                 if (snap_names_len > (u64)SIZE_MAX)
774                         goto out_2big;
775                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
776                 if (!snap_names)
777                         goto out_err;
778
779                 /* ...as well as the array of their sizes. */
780
781                 size = snap_count * sizeof (*header->snap_sizes);
782                 snap_sizes = kmalloc(size, GFP_KERNEL);
783                 if (!snap_sizes)
784                         goto out_err;
785
786                 /*
787                  * Copy the names, and fill in each snapshot's id
788                  * and size.
789                  *
790                  * Note that rbd_dev_v1_header_read() guarantees the
791                  * ondisk buffer we're working with has
792                  * snap_names_len bytes beyond the end of the
793                  * snapshot id array, this memcpy() is safe.
794                  */
795                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
796                 snaps = ondisk->snaps;
797                 for (i = 0; i < snap_count; i++) {
798                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
799                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
800                 }
801         }
802
803         /* We won't fail any more, fill in the header */
804
805         if (first_time) {
806                 header->object_prefix = object_prefix;
807                 header->obj_order = ondisk->options.order;
808                 header->crypt_type = ondisk->options.crypt_type;
809                 header->comp_type = ondisk->options.comp_type;
810                 /* The rest aren't used for format 1 images */
811                 header->stripe_unit = 0;
812                 header->stripe_count = 0;
813                 header->features = 0;
814         }
815
816         /* The remaining fields always get updated (when we refresh) */
817
818         header->image_size = le64_to_cpu(ondisk->image_size);
819         header->snapc = snapc;
820         header->snap_names = snap_names;
821         header->snap_sizes = snap_sizes;
822
823         return 0;
824 out_2big:
825         ret = -EIO;
826 out_err:
827         kfree(snap_sizes);
828         kfree(snap_names);
829         ceph_put_snap_context(snapc);
830         kfree(object_prefix);
831
832         return ret;
833 }
834
835 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
836 {
837         const char *snap_name;
838
839         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
840
841         /* Skip over names until we find the one we are looking for */
842
843         snap_name = rbd_dev->header.snap_names;
844         while (which--)
845                 snap_name += strlen(snap_name) + 1;
846
847         return kstrdup(snap_name, GFP_KERNEL);
848 }
849
850 /*
851  * Snapshot id comparison function for use with qsort()/bsearch().
852  * Note that result is for snapshots in *descending* order.
853  */
854 static int snapid_compare_reverse(const void *s1, const void *s2)
855 {
856         u64 snap_id1 = *(u64 *)s1;
857         u64 snap_id2 = *(u64 *)s2;
858
859         if (snap_id1 < snap_id2)
860                 return 1;
861         return snap_id1 == snap_id2 ? 0 : -1;
862 }
863
864 /*
865  * Search a snapshot context to see if the given snapshot id is
866  * present.
867  *
868  * Returns the position of the snapshot id in the array if it's found,
869  * or BAD_SNAP_INDEX otherwise.
870  *
871  * Note: The snapshot array is in kept sorted (by the osd) in
872  * reverse order, highest snapshot id first.
873  */
874 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
875 {
876         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
877         u64 *found;
878
879         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
880                                 sizeof (snap_id), snapid_compare_reverse);
881
882         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
883 }
884
885 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
886                                         u64 snap_id)
887 {
888         u32 which;
889
890         which = rbd_dev_snap_index(rbd_dev, snap_id);
891         if (which == BAD_SNAP_INDEX)
892                 return NULL;
893
894         return _rbd_dev_v1_snap_name(rbd_dev, which);
895 }
896
897 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
898 {
899         if (snap_id == CEPH_NOSNAP)
900                 return RBD_SNAP_HEAD_NAME;
901
902         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
903         if (rbd_dev->image_format == 1)
904                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
905
906         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
907 }
908
909 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
910                                 u64 *snap_size)
911 {
912         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
913         if (snap_id == CEPH_NOSNAP) {
914                 *snap_size = rbd_dev->header.image_size;
915         } else if (rbd_dev->image_format == 1) {
916                 u32 which;
917
918                 which = rbd_dev_snap_index(rbd_dev, snap_id);
919                 if (which == BAD_SNAP_INDEX)
920                         return -ENOENT;
921
922                 *snap_size = rbd_dev->header.snap_sizes[which];
923         } else {
924                 u64 size = 0;
925                 int ret;
926
927                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
928                 if (ret)
929                         return ret;
930
931                 *snap_size = size;
932         }
933         return 0;
934 }
935
936 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
937                         u64 *snap_features)
938 {
939         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
940         if (snap_id == CEPH_NOSNAP) {
941                 *snap_features = rbd_dev->header.features;
942         } else if (rbd_dev->image_format == 1) {
943                 *snap_features = 0;     /* No features for format 1 */
944         } else {
945                 u64 features = 0;
946                 int ret;
947
948                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
949                 if (ret)
950                         return ret;
951
952                 *snap_features = features;
953         }
954         return 0;
955 }
956
957 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
958 {
959         u64 snap_id = rbd_dev->spec->snap_id;
960         u64 size = 0;
961         u64 features = 0;
962         int ret;
963
964         ret = rbd_snap_size(rbd_dev, snap_id, &size);
965         if (ret)
966                 return ret;
967         ret = rbd_snap_features(rbd_dev, snap_id, &features);
968         if (ret)
969                 return ret;
970
971         rbd_dev->mapping.size = size;
972         rbd_dev->mapping.features = features;
973
974         return 0;
975 }
976
977 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
978 {
979         rbd_dev->mapping.size = 0;
980         rbd_dev->mapping.features = 0;
981 }
982
983 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
984 {
985         char *name;
986         u64 segment;
987         int ret;
988
989         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
990         if (!name)
991                 return NULL;
992         segment = offset >> rbd_dev->header.obj_order;
993         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
994                         rbd_dev->header.object_prefix, segment);
995         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
996                 pr_err("error formatting segment name for #%llu (%d)\n",
997                         segment, ret);
998                 kfree(name);
999                 name = NULL;
1000         }
1001
1002         return name;
1003 }
1004
1005 static void rbd_segment_name_free(const char *name)
1006 {
1007         /* The explicit cast here is needed to drop the const qualifier */
1008
1009         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1010 }
1011
1012 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1013 {
1014         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1015
1016         return offset & (segment_size - 1);
1017 }
1018
1019 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1020                                 u64 offset, u64 length)
1021 {
1022         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1023
1024         offset &= segment_size - 1;
1025
1026         rbd_assert(length <= U64_MAX - offset);
1027         if (offset + length > segment_size)
1028                 length = segment_size - offset;
1029
1030         return length;
1031 }
1032
1033 /*
1034  * returns the size of an object in the image
1035  */
1036 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1037 {
1038         return 1 << header->obj_order;
1039 }
1040
1041 /*
1042  * bio helpers
1043  */
1044
1045 static void bio_chain_put(struct bio *chain)
1046 {
1047         struct bio *tmp;
1048
1049         while (chain) {
1050                 tmp = chain;
1051                 chain = chain->bi_next;
1052                 bio_put(tmp);
1053         }
1054 }
1055
1056 /*
1057  * zeros a bio chain, starting at specific offset
1058  */
1059 static void zero_bio_chain(struct bio *chain, int start_ofs)
1060 {
1061         struct bio_vec *bv;
1062         unsigned long flags;
1063         void *buf;
1064         int i;
1065         int pos = 0;
1066
1067         while (chain) {
1068                 bio_for_each_segment(bv, chain, i) {
1069                         if (pos + bv->bv_len > start_ofs) {
1070                                 int remainder = max(start_ofs - pos, 0);
1071                                 buf = bvec_kmap_irq(bv, &flags);
1072                                 memset(buf + remainder, 0,
1073                                        bv->bv_len - remainder);
1074                                 bvec_kunmap_irq(buf, &flags);
1075                         }
1076                         pos += bv->bv_len;
1077                 }
1078
1079                 chain = chain->bi_next;
1080         }
1081 }
1082
1083 /*
1084  * similar to zero_bio_chain(), zeros data defined by a page array,
1085  * starting at the given byte offset from the start of the array and
1086  * continuing up to the given end offset.  The pages array is
1087  * assumed to be big enough to hold all bytes up to the end.
1088  */
1089 static void zero_pages(struct page **pages, u64 offset, u64 end)
1090 {
1091         struct page **page = &pages[offset >> PAGE_SHIFT];
1092
1093         rbd_assert(end > offset);
1094         rbd_assert(end - offset <= (u64)SIZE_MAX);
1095         while (offset < end) {
1096                 size_t page_offset;
1097                 size_t length;
1098                 unsigned long flags;
1099                 void *kaddr;
1100
1101                 page_offset = (size_t)(offset & ~PAGE_MASK);
1102                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1103                 local_irq_save(flags);
1104                 kaddr = kmap_atomic(*page);
1105                 memset(kaddr + page_offset, 0, length);
1106                 kunmap_atomic(kaddr);
1107                 local_irq_restore(flags);
1108
1109                 offset += length;
1110                 page++;
1111         }
1112 }
1113
1114 /*
1115  * Clone a portion of a bio, starting at the given byte offset
1116  * and continuing for the number of bytes indicated.
1117  */
1118 static struct bio *bio_clone_range(struct bio *bio_src,
1119                                         unsigned int offset,
1120                                         unsigned int len,
1121                                         gfp_t gfpmask)
1122 {
1123         struct bio_vec *bv;
1124         unsigned int resid;
1125         unsigned short idx;
1126         unsigned int voff;
1127         unsigned short end_idx;
1128         unsigned short vcnt;
1129         struct bio *bio;
1130
1131         /* Handle the easy case for the caller */
1132
1133         if (!offset && len == bio_src->bi_size)
1134                 return bio_clone(bio_src, gfpmask);
1135
1136         if (WARN_ON_ONCE(!len))
1137                 return NULL;
1138         if (WARN_ON_ONCE(len > bio_src->bi_size))
1139                 return NULL;
1140         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1141                 return NULL;
1142
1143         /* Find first affected segment... */
1144
1145         resid = offset;
1146         __bio_for_each_segment(bv, bio_src, idx, 0) {
1147                 if (resid < bv->bv_len)
1148                         break;
1149                 resid -= bv->bv_len;
1150         }
1151         voff = resid;
1152
1153         /* ...and the last affected segment */
1154
1155         resid += len;
1156         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1157                 if (resid <= bv->bv_len)
1158                         break;
1159                 resid -= bv->bv_len;
1160         }
1161         vcnt = end_idx - idx + 1;
1162
1163         /* Build the clone */
1164
1165         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1166         if (!bio)
1167                 return NULL;    /* ENOMEM */
1168
1169         bio->bi_bdev = bio_src->bi_bdev;
1170         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1171         bio->bi_rw = bio_src->bi_rw;
1172         bio->bi_flags |= 1 << BIO_CLONED;
1173
1174         /*
1175          * Copy over our part of the bio_vec, then update the first
1176          * and last (or only) entries.
1177          */
1178         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1179                         vcnt * sizeof (struct bio_vec));
1180         bio->bi_io_vec[0].bv_offset += voff;
1181         if (vcnt > 1) {
1182                 bio->bi_io_vec[0].bv_len -= voff;
1183                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1184         } else {
1185                 bio->bi_io_vec[0].bv_len = len;
1186         }
1187
1188         bio->bi_vcnt = vcnt;
1189         bio->bi_size = len;
1190         bio->bi_idx = 0;
1191
1192         return bio;
1193 }
1194
1195 /*
1196  * Clone a portion of a bio chain, starting at the given byte offset
1197  * into the first bio in the source chain and continuing for the
1198  * number of bytes indicated.  The result is another bio chain of
1199  * exactly the given length, or a null pointer on error.
1200  *
1201  * The bio_src and offset parameters are both in-out.  On entry they
1202  * refer to the first source bio and the offset into that bio where
1203  * the start of data to be cloned is located.
1204  *
1205  * On return, bio_src is updated to refer to the bio in the source
1206  * chain that contains first un-cloned byte, and *offset will
1207  * contain the offset of that byte within that bio.
1208  */
1209 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1210                                         unsigned int *offset,
1211                                         unsigned int len,
1212                                         gfp_t gfpmask)
1213 {
1214         struct bio *bi = *bio_src;
1215         unsigned int off = *offset;
1216         struct bio *chain = NULL;
1217         struct bio **end;
1218
1219         /* Build up a chain of clone bios up to the limit */
1220
1221         if (!bi || off >= bi->bi_size || !len)
1222                 return NULL;            /* Nothing to clone */
1223
1224         end = &chain;
1225         while (len) {
1226                 unsigned int bi_size;
1227                 struct bio *bio;
1228
1229                 if (!bi) {
1230                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1231                         goto out_err;   /* EINVAL; ran out of bio's */
1232                 }
1233                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1234                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1235                 if (!bio)
1236                         goto out_err;   /* ENOMEM */
1237
1238                 *end = bio;
1239                 end = &bio->bi_next;
1240
1241                 off += bi_size;
1242                 if (off == bi->bi_size) {
1243                         bi = bi->bi_next;
1244                         off = 0;
1245                 }
1246                 len -= bi_size;
1247         }
1248         *bio_src = bi;
1249         *offset = off;
1250
1251         return chain;
1252 out_err:
1253         bio_chain_put(chain);
1254
1255         return NULL;
1256 }
1257
1258 /*
1259  * The default/initial value for all object request flags is 0.  For
1260  * each flag, once its value is set to 1 it is never reset to 0
1261  * again.
1262  */
1263 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1264 {
1265         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1266                 struct rbd_device *rbd_dev;
1267
1268                 rbd_dev = obj_request->img_request->rbd_dev;
1269                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1270                         obj_request);
1271         }
1272 }
1273
1274 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1275 {
1276         smp_mb();
1277         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1278 }
1279
1280 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev = NULL;
1284
1285                 if (obj_request_img_data_test(obj_request))
1286                         rbd_dev = obj_request->img_request->rbd_dev;
1287                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1288                         obj_request);
1289         }
1290 }
1291
1292 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1293 {
1294         smp_mb();
1295         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1296 }
1297
1298 /*
1299  * This sets the KNOWN flag after (possibly) setting the EXISTS
1300  * flag.  The latter is set based on the "exists" value provided.
1301  *
1302  * Note that for our purposes once an object exists it never goes
1303  * away again.  It's possible that the response from two existence
1304  * checks are separated by the creation of the target object, and
1305  * the first ("doesn't exist") response arrives *after* the second
1306  * ("does exist").  In that case we ignore the second one.
1307  */
1308 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1309                                 bool exists)
1310 {
1311         if (exists)
1312                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1313         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1314         smp_mb();
1315 }
1316
1317 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1318 {
1319         smp_mb();
1320         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1321 }
1322
1323 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1324 {
1325         smp_mb();
1326         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1327 }
1328
1329 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1330 {
1331         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332                 atomic_read(&obj_request->kref.refcount));
1333         kref_get(&obj_request->kref);
1334 }
1335
1336 static void rbd_obj_request_destroy(struct kref *kref);
1337 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1338 {
1339         rbd_assert(obj_request != NULL);
1340         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1341                 atomic_read(&obj_request->kref.refcount));
1342         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1343 }
1344
1345 static void rbd_img_request_get(struct rbd_img_request *img_request)
1346 {
1347         dout("%s: img %p (was %d)\n", __func__, img_request,
1348                 atomic_read(&img_request->kref.refcount));
1349         kref_get(&img_request->kref);
1350 }
1351
1352 static void rbd_img_request_destroy(struct kref *kref);
1353 static void rbd_img_request_put(struct rbd_img_request *img_request)
1354 {
1355         rbd_assert(img_request != NULL);
1356         dout("%s: img %p (was %d)\n", __func__, img_request,
1357                 atomic_read(&img_request->kref.refcount));
1358         kref_put(&img_request->kref, rbd_img_request_destroy);
1359 }
1360
1361 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1362                                         struct rbd_obj_request *obj_request)
1363 {
1364         rbd_assert(obj_request->img_request == NULL);
1365
1366         /* Image request now owns object's original reference */
1367         obj_request->img_request = img_request;
1368         obj_request->which = img_request->obj_request_count;
1369         rbd_assert(!obj_request_img_data_test(obj_request));
1370         obj_request_img_data_set(obj_request);
1371         rbd_assert(obj_request->which != BAD_WHICH);
1372         img_request->obj_request_count++;
1373         list_add_tail(&obj_request->links, &img_request->obj_requests);
1374         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375                 obj_request->which);
1376 }
1377
1378 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1379                                         struct rbd_obj_request *obj_request)
1380 {
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382
1383         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1384                 obj_request->which);
1385         list_del(&obj_request->links);
1386         rbd_assert(img_request->obj_request_count > 0);
1387         img_request->obj_request_count--;
1388         rbd_assert(obj_request->which == img_request->obj_request_count);
1389         obj_request->which = BAD_WHICH;
1390         rbd_assert(obj_request_img_data_test(obj_request));
1391         rbd_assert(obj_request->img_request == img_request);
1392         obj_request->img_request = NULL;
1393         obj_request->callback = NULL;
1394         rbd_obj_request_put(obj_request);
1395 }
1396
1397 static bool obj_request_type_valid(enum obj_request_type type)
1398 {
1399         switch (type) {
1400         case OBJ_REQUEST_NODATA:
1401         case OBJ_REQUEST_BIO:
1402         case OBJ_REQUEST_PAGES:
1403                 return true;
1404         default:
1405                 return false;
1406         }
1407 }
1408
1409 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1410                                 struct rbd_obj_request *obj_request)
1411 {
1412         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1413
1414         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1415 }
1416
1417 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1418 {
1419
1420         dout("%s: img %p\n", __func__, img_request);
1421
1422         /*
1423          * If no error occurred, compute the aggregate transfer
1424          * count for the image request.  We could instead use
1425          * atomic64_cmpxchg() to update it as each object request
1426          * completes; not clear which way is better off hand.
1427          */
1428         if (!img_request->result) {
1429                 struct rbd_obj_request *obj_request;
1430                 u64 xferred = 0;
1431
1432                 for_each_obj_request(img_request, obj_request)
1433                         xferred += obj_request->xferred;
1434                 img_request->xferred = xferred;
1435         }
1436
1437         if (img_request->callback)
1438                 img_request->callback(img_request);
1439         else
1440                 rbd_img_request_put(img_request);
1441 }
1442
1443 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1444
1445 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1446 {
1447         dout("%s: obj %p\n", __func__, obj_request);
1448
1449         return wait_for_completion_interruptible(&obj_request->completion);
1450 }
1451
1452 /*
1453  * The default/initial value for all image request flags is 0.  Each
1454  * is conditionally set to 1 at image request initialization time
1455  * and currently never change thereafter.
1456  */
1457 static void img_request_write_set(struct rbd_img_request *img_request)
1458 {
1459         set_bit(IMG_REQ_WRITE, &img_request->flags);
1460         smp_mb();
1461 }
1462
1463 static bool img_request_write_test(struct rbd_img_request *img_request)
1464 {
1465         smp_mb();
1466         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1467 }
1468
1469 static void img_request_child_set(struct rbd_img_request *img_request)
1470 {
1471         set_bit(IMG_REQ_CHILD, &img_request->flags);
1472         smp_mb();
1473 }
1474
1475 static bool img_request_child_test(struct rbd_img_request *img_request)
1476 {
1477         smp_mb();
1478         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1479 }
1480
1481 static void img_request_layered_set(struct rbd_img_request *img_request)
1482 {
1483         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1484         smp_mb();
1485 }
1486
1487 static bool img_request_layered_test(struct rbd_img_request *img_request)
1488 {
1489         smp_mb();
1490         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1491 }
1492
1493 static void
1494 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1495 {
1496         u64 xferred = obj_request->xferred;
1497         u64 length = obj_request->length;
1498
1499         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1500                 obj_request, obj_request->img_request, obj_request->result,
1501                 xferred, length);
1502         /*
1503          * ENOENT means a hole in the image.  We zero-fill the
1504          * entire length of the request.  A short read also implies
1505          * zero-fill to the end of the request.  Either way we
1506          * update the xferred count to indicate the whole request
1507          * was satisfied.
1508          */
1509         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1510         if (obj_request->result == -ENOENT) {
1511                 if (obj_request->type == OBJ_REQUEST_BIO)
1512                         zero_bio_chain(obj_request->bio_list, 0);
1513                 else
1514                         zero_pages(obj_request->pages, 0, length);
1515                 obj_request->result = 0;
1516                 obj_request->xferred = length;
1517         } else if (xferred < length && !obj_request->result) {
1518                 if (obj_request->type == OBJ_REQUEST_BIO)
1519                         zero_bio_chain(obj_request->bio_list, xferred);
1520                 else
1521                         zero_pages(obj_request->pages, xferred, length);
1522                 obj_request->xferred = length;
1523         }
1524         obj_request_done_set(obj_request);
1525 }
1526
1527 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1528 {
1529         dout("%s: obj %p cb %p\n", __func__, obj_request,
1530                 obj_request->callback);
1531         if (obj_request->callback)
1532                 obj_request->callback(obj_request);
1533         else
1534                 complete_all(&obj_request->completion);
1535 }
1536
1537 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p\n", __func__, obj_request);
1540         obj_request_done_set(obj_request);
1541 }
1542
1543 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1544 {
1545         struct rbd_img_request *img_request = NULL;
1546         struct rbd_device *rbd_dev = NULL;
1547         bool layered = false;
1548
1549         if (obj_request_img_data_test(obj_request)) {
1550                 img_request = obj_request->img_request;
1551                 layered = img_request && img_request_layered_test(img_request);
1552                 rbd_dev = img_request->rbd_dev;
1553         }
1554
1555         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1556                 obj_request, img_request, obj_request->result,
1557                 obj_request->xferred, obj_request->length);
1558         if (layered && obj_request->result == -ENOENT &&
1559                         obj_request->img_offset < rbd_dev->parent_overlap)
1560                 rbd_img_parent_read(obj_request);
1561         else if (img_request)
1562                 rbd_img_obj_request_read_callback(obj_request);
1563         else
1564                 obj_request_done_set(obj_request);
1565 }
1566
1567 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1568 {
1569         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1570                 obj_request->result, obj_request->length);
1571         /*
1572          * There is no such thing as a successful short write.  Set
1573          * it to our originally-requested length.
1574          */
1575         obj_request->xferred = obj_request->length;
1576         obj_request_done_set(obj_request);
1577 }
1578
1579 /*
1580  * For a simple stat call there's nothing to do.  We'll do more if
1581  * this is part of a write sequence for a layered image.
1582  */
1583 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1584 {
1585         dout("%s: obj %p\n", __func__, obj_request);
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1590                                 struct ceph_msg *msg)
1591 {
1592         struct rbd_obj_request *obj_request = osd_req->r_priv;
1593         u16 opcode;
1594
1595         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1596         rbd_assert(osd_req == obj_request->osd_req);
1597         if (obj_request_img_data_test(obj_request)) {
1598                 rbd_assert(obj_request->img_request);
1599                 rbd_assert(obj_request->which != BAD_WHICH);
1600         } else {
1601                 rbd_assert(obj_request->which == BAD_WHICH);
1602         }
1603
1604         if (osd_req->r_result < 0)
1605                 obj_request->result = osd_req->r_result;
1606
1607         BUG_ON(osd_req->r_num_ops > 2);
1608
1609         /*
1610          * We support a 64-bit length, but ultimately it has to be
1611          * passed to blk_end_request(), which takes an unsigned int.
1612          */
1613         obj_request->xferred = osd_req->r_reply_op_len[0];
1614         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1615         opcode = osd_req->r_ops[0].op;
1616         switch (opcode) {
1617         case CEPH_OSD_OP_READ:
1618                 rbd_osd_read_callback(obj_request);
1619                 break;
1620         case CEPH_OSD_OP_WRITE:
1621                 rbd_osd_write_callback(obj_request);
1622                 break;
1623         case CEPH_OSD_OP_STAT:
1624                 rbd_osd_stat_callback(obj_request);
1625                 break;
1626         case CEPH_OSD_OP_CALL:
1627         case CEPH_OSD_OP_NOTIFY_ACK:
1628         case CEPH_OSD_OP_WATCH:
1629                 rbd_osd_trivial_callback(obj_request);
1630                 break;
1631         default:
1632                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1633                         obj_request->object_name, (unsigned short) opcode);
1634                 break;
1635         }
1636
1637         if (obj_request_done_test(obj_request))
1638                 rbd_obj_request_complete(obj_request);
1639 }
1640
1641 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1642 {
1643         struct rbd_img_request *img_request = obj_request->img_request;
1644         struct ceph_osd_request *osd_req = obj_request->osd_req;
1645         u64 snap_id;
1646
1647         rbd_assert(osd_req != NULL);
1648
1649         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1650         ceph_osdc_build_request(osd_req, obj_request->offset,
1651                         NULL, snap_id, NULL);
1652 }
1653
1654 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1655 {
1656         struct rbd_img_request *img_request = obj_request->img_request;
1657         struct ceph_osd_request *osd_req = obj_request->osd_req;
1658         struct ceph_snap_context *snapc;
1659         struct timespec mtime = CURRENT_TIME;
1660
1661         rbd_assert(osd_req != NULL);
1662
1663         snapc = img_request ? img_request->snapc : NULL;
1664         ceph_osdc_build_request(osd_req, obj_request->offset,
1665                         snapc, CEPH_NOSNAP, &mtime);
1666 }
1667
1668 static struct ceph_osd_request *rbd_osd_req_create(
1669                                         struct rbd_device *rbd_dev,
1670                                         bool write_request,
1671                                         struct rbd_obj_request *obj_request)
1672 {
1673         struct ceph_snap_context *snapc = NULL;
1674         struct ceph_osd_client *osdc;
1675         struct ceph_osd_request *osd_req;
1676
1677         if (obj_request_img_data_test(obj_request)) {
1678                 struct rbd_img_request *img_request = obj_request->img_request;
1679
1680                 rbd_assert(write_request ==
1681                                 img_request_write_test(img_request));
1682                 if (write_request)
1683                         snapc = img_request->snapc;
1684         }
1685
1686         /* Allocate and initialize the request, for the single op */
1687
1688         osdc = &rbd_dev->rbd_client->client->osdc;
1689         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1690         if (!osd_req)
1691                 return NULL;    /* ENOMEM */
1692
1693         if (write_request)
1694                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1695         else
1696                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1697
1698         osd_req->r_callback = rbd_osd_req_callback;
1699         osd_req->r_priv = obj_request;
1700
1701         osd_req->r_oid_len = strlen(obj_request->object_name);
1702         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1703         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1704
1705         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1706
1707         return osd_req;
1708 }
1709
1710 /*
1711  * Create a copyup osd request based on the information in the
1712  * object request supplied.  A copyup request has two osd ops,
1713  * a copyup method call, and a "normal" write request.
1714  */
1715 static struct ceph_osd_request *
1716 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1717 {
1718         struct rbd_img_request *img_request;
1719         struct ceph_snap_context *snapc;
1720         struct rbd_device *rbd_dev;
1721         struct ceph_osd_client *osdc;
1722         struct ceph_osd_request *osd_req;
1723
1724         rbd_assert(obj_request_img_data_test(obj_request));
1725         img_request = obj_request->img_request;
1726         rbd_assert(img_request);
1727         rbd_assert(img_request_write_test(img_request));
1728
1729         /* Allocate and initialize the request, for the two ops */
1730
1731         snapc = img_request->snapc;
1732         rbd_dev = img_request->rbd_dev;
1733         osdc = &rbd_dev->rbd_client->client->osdc;
1734         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1735         if (!osd_req)
1736                 return NULL;    /* ENOMEM */
1737
1738         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1739         osd_req->r_callback = rbd_osd_req_callback;
1740         osd_req->r_priv = obj_request;
1741
1742         osd_req->r_oid_len = strlen(obj_request->object_name);
1743         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1744         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1745
1746         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1747
1748         return osd_req;
1749 }
1750
1751
1752 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1753 {
1754         ceph_osdc_put_request(osd_req);
1755 }
1756
1757 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1758
1759 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1760                                                 u64 offset, u64 length,
1761                                                 enum obj_request_type type)
1762 {
1763         struct rbd_obj_request *obj_request;
1764         size_t size;
1765         char *name;
1766
1767         rbd_assert(obj_request_type_valid(type));
1768
1769         size = strlen(object_name) + 1;
1770         name = kmalloc(size, GFP_KERNEL);
1771         if (!name)
1772                 return NULL;
1773
1774         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1775         if (!obj_request) {
1776                 kfree(name);
1777                 return NULL;
1778         }
1779
1780         obj_request->object_name = memcpy(name, object_name, size);
1781         obj_request->offset = offset;
1782         obj_request->length = length;
1783         obj_request->flags = 0;
1784         obj_request->which = BAD_WHICH;
1785         obj_request->type = type;
1786         INIT_LIST_HEAD(&obj_request->links);
1787         init_completion(&obj_request->completion);
1788         kref_init(&obj_request->kref);
1789
1790         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1791                 offset, length, (int)type, obj_request);
1792
1793         return obj_request;
1794 }
1795
1796 static void rbd_obj_request_destroy(struct kref *kref)
1797 {
1798         struct rbd_obj_request *obj_request;
1799
1800         obj_request = container_of(kref, struct rbd_obj_request, kref);
1801
1802         dout("%s: obj %p\n", __func__, obj_request);
1803
1804         rbd_assert(obj_request->img_request == NULL);
1805         rbd_assert(obj_request->which == BAD_WHICH);
1806
1807         if (obj_request->osd_req)
1808                 rbd_osd_req_destroy(obj_request->osd_req);
1809
1810         rbd_assert(obj_request_type_valid(obj_request->type));
1811         switch (obj_request->type) {
1812         case OBJ_REQUEST_NODATA:
1813                 break;          /* Nothing to do */
1814         case OBJ_REQUEST_BIO:
1815                 if (obj_request->bio_list)
1816                         bio_chain_put(obj_request->bio_list);
1817                 break;
1818         case OBJ_REQUEST_PAGES:
1819                 if (obj_request->pages)
1820                         ceph_release_page_vector(obj_request->pages,
1821                                                 obj_request->page_count);
1822                 break;
1823         }
1824
1825         kfree(obj_request->object_name);
1826         obj_request->object_name = NULL;
1827         kmem_cache_free(rbd_obj_request_cache, obj_request);
1828 }
1829
1830 /*
1831  * Caller is responsible for filling in the list of object requests
1832  * that comprises the image request, and the Linux request pointer
1833  * (if there is one).
1834  */
1835 static struct rbd_img_request *rbd_img_request_create(
1836                                         struct rbd_device *rbd_dev,
1837                                         u64 offset, u64 length,
1838                                         bool write_request,
1839                                         bool child_request)
1840 {
1841         struct rbd_img_request *img_request;
1842
1843         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1844         if (!img_request)
1845                 return NULL;
1846
1847         if (write_request) {
1848                 down_read(&rbd_dev->header_rwsem);
1849                 ceph_get_snap_context(rbd_dev->header.snapc);
1850                 up_read(&rbd_dev->header_rwsem);
1851         }
1852
1853         img_request->rq = NULL;
1854         img_request->rbd_dev = rbd_dev;
1855         img_request->offset = offset;
1856         img_request->length = length;
1857         img_request->flags = 0;
1858         if (write_request) {
1859                 img_request_write_set(img_request);
1860                 img_request->snapc = rbd_dev->header.snapc;
1861         } else {
1862                 img_request->snap_id = rbd_dev->spec->snap_id;
1863         }
1864         if (child_request)
1865                 img_request_child_set(img_request);
1866         if (rbd_dev->parent_spec)
1867                 img_request_layered_set(img_request);
1868         spin_lock_init(&img_request->completion_lock);
1869         img_request->next_completion = 0;
1870         img_request->callback = NULL;
1871         img_request->result = 0;
1872         img_request->obj_request_count = 0;
1873         INIT_LIST_HEAD(&img_request->obj_requests);
1874         kref_init(&img_request->kref);
1875
1876         rbd_img_request_get(img_request);       /* Avoid a warning */
1877         rbd_img_request_put(img_request);       /* TEMPORARY */
1878
1879         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1880                 write_request ? "write" : "read", offset, length,
1881                 img_request);
1882
1883         return img_request;
1884 }
1885
1886 static void rbd_img_request_destroy(struct kref *kref)
1887 {
1888         struct rbd_img_request *img_request;
1889         struct rbd_obj_request *obj_request;
1890         struct rbd_obj_request *next_obj_request;
1891
1892         img_request = container_of(kref, struct rbd_img_request, kref);
1893
1894         dout("%s: img %p\n", __func__, img_request);
1895
1896         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1897                 rbd_img_obj_request_del(img_request, obj_request);
1898         rbd_assert(img_request->obj_request_count == 0);
1899
1900         if (img_request_write_test(img_request))
1901                 ceph_put_snap_context(img_request->snapc);
1902
1903         if (img_request_child_test(img_request))
1904                 rbd_obj_request_put(img_request->obj_request);
1905
1906         kmem_cache_free(rbd_img_request_cache, img_request);
1907 }
1908
1909 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1910 {
1911         struct rbd_img_request *img_request;
1912         unsigned int xferred;
1913         int result;
1914         bool more;
1915
1916         rbd_assert(obj_request_img_data_test(obj_request));
1917         img_request = obj_request->img_request;
1918
1919         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1920         xferred = (unsigned int)obj_request->xferred;
1921         result = obj_request->result;
1922         if (result) {
1923                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1924
1925                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1926                         img_request_write_test(img_request) ? "write" : "read",
1927                         obj_request->length, obj_request->img_offset,
1928                         obj_request->offset);
1929                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1930                         result, xferred);
1931                 if (!img_request->result)
1932                         img_request->result = result;
1933         }
1934
1935         /* Image object requests don't own their page array */
1936
1937         if (obj_request->type == OBJ_REQUEST_PAGES) {
1938                 obj_request->pages = NULL;
1939                 obj_request->page_count = 0;
1940         }
1941
1942         if (img_request_child_test(img_request)) {
1943                 rbd_assert(img_request->obj_request != NULL);
1944                 more = obj_request->which < img_request->obj_request_count - 1;
1945         } else {
1946                 rbd_assert(img_request->rq != NULL);
1947                 more = blk_end_request(img_request->rq, result, xferred);
1948         }
1949
1950         return more;
1951 }
1952
1953 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1954 {
1955         struct rbd_img_request *img_request;
1956         u32 which = obj_request->which;
1957         bool more = true;
1958
1959         rbd_assert(obj_request_img_data_test(obj_request));
1960         img_request = obj_request->img_request;
1961
1962         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1963         rbd_assert(img_request != NULL);
1964         rbd_assert(img_request->obj_request_count > 0);
1965         rbd_assert(which != BAD_WHICH);
1966         rbd_assert(which < img_request->obj_request_count);
1967         rbd_assert(which >= img_request->next_completion);
1968
1969         spin_lock_irq(&img_request->completion_lock);
1970         if (which != img_request->next_completion)
1971                 goto out;
1972
1973         for_each_obj_request_from(img_request, obj_request) {
1974                 rbd_assert(more);
1975                 rbd_assert(which < img_request->obj_request_count);
1976
1977                 if (!obj_request_done_test(obj_request))
1978                         break;
1979                 more = rbd_img_obj_end_request(obj_request);
1980                 which++;
1981         }
1982
1983         rbd_assert(more ^ (which == img_request->obj_request_count));
1984         img_request->next_completion = which;
1985 out:
1986         spin_unlock_irq(&img_request->completion_lock);
1987
1988         if (!more)
1989                 rbd_img_request_complete(img_request);
1990 }
1991
1992 /*
1993  * Split up an image request into one or more object requests, each
1994  * to a different object.  The "type" parameter indicates whether
1995  * "data_desc" is the pointer to the head of a list of bio
1996  * structures, or the base of a page array.  In either case this
1997  * function assumes data_desc describes memory sufficient to hold
1998  * all data described by the image request.
1999  */
2000 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2001                                         enum obj_request_type type,
2002                                         void *data_desc)
2003 {
2004         struct rbd_device *rbd_dev = img_request->rbd_dev;
2005         struct rbd_obj_request *obj_request = NULL;
2006         struct rbd_obj_request *next_obj_request;
2007         bool write_request = img_request_write_test(img_request);
2008         struct bio *bio_list;
2009         unsigned int bio_offset = 0;
2010         struct page **pages;
2011         u64 img_offset;
2012         u64 resid;
2013         u16 opcode;
2014
2015         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2016                 (int)type, data_desc);
2017
2018         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2019         img_offset = img_request->offset;
2020         resid = img_request->length;
2021         rbd_assert(resid > 0);
2022
2023         if (type == OBJ_REQUEST_BIO) {
2024                 bio_list = data_desc;
2025                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2026         } else {
2027                 rbd_assert(type == OBJ_REQUEST_PAGES);
2028                 pages = data_desc;
2029         }
2030
2031         while (resid) {
2032                 struct ceph_osd_request *osd_req;
2033                 const char *object_name;
2034                 u64 offset;
2035                 u64 length;
2036
2037                 object_name = rbd_segment_name(rbd_dev, img_offset);
2038                 if (!object_name)
2039                         goto out_unwind;
2040                 offset = rbd_segment_offset(rbd_dev, img_offset);
2041                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2042                 obj_request = rbd_obj_request_create(object_name,
2043                                                 offset, length, type);
2044                 /* object request has its own copy of the object name */
2045                 rbd_segment_name_free(object_name);
2046                 if (!obj_request)
2047                         goto out_unwind;
2048
2049                 if (type == OBJ_REQUEST_BIO) {
2050                         unsigned int clone_size;
2051
2052                         rbd_assert(length <= (u64)UINT_MAX);
2053                         clone_size = (unsigned int)length;
2054                         obj_request->bio_list =
2055                                         bio_chain_clone_range(&bio_list,
2056                                                                 &bio_offset,
2057                                                                 clone_size,
2058                                                                 GFP_ATOMIC);
2059                         if (!obj_request->bio_list)
2060                                 goto out_partial;
2061                 } else {
2062                         unsigned int page_count;
2063
2064                         obj_request->pages = pages;
2065                         page_count = (u32)calc_pages_for(offset, length);
2066                         obj_request->page_count = page_count;
2067                         if ((offset + length) & ~PAGE_MASK)
2068                                 page_count--;   /* more on last page */
2069                         pages += page_count;
2070                 }
2071
2072                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2073                                                 obj_request);
2074                 if (!osd_req)
2075                         goto out_partial;
2076                 obj_request->osd_req = osd_req;
2077                 obj_request->callback = rbd_img_obj_callback;
2078
2079                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2080                                                 0, 0);
2081                 if (type == OBJ_REQUEST_BIO)
2082                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2083                                         obj_request->bio_list, length);
2084                 else
2085                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2086                                         obj_request->pages, length,
2087                                         offset & ~PAGE_MASK, false, false);
2088
2089                 if (write_request)
2090                         rbd_osd_req_format_write(obj_request);
2091                 else
2092                         rbd_osd_req_format_read(obj_request);
2093
2094                 obj_request->img_offset = img_offset;
2095                 rbd_img_obj_request_add(img_request, obj_request);
2096
2097                 img_offset += length;
2098                 resid -= length;
2099         }
2100
2101         return 0;
2102
2103 out_partial:
2104         rbd_obj_request_put(obj_request);
2105 out_unwind:
2106         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2107                 rbd_obj_request_put(obj_request);
2108
2109         return -ENOMEM;
2110 }
2111
2112 static void
2113 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2114 {
2115         struct rbd_img_request *img_request;
2116         struct rbd_device *rbd_dev;
2117         u64 length;
2118         u32 page_count;
2119
2120         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2121         rbd_assert(obj_request_img_data_test(obj_request));
2122         img_request = obj_request->img_request;
2123         rbd_assert(img_request);
2124
2125         rbd_dev = img_request->rbd_dev;
2126         rbd_assert(rbd_dev);
2127         length = (u64)1 << rbd_dev->header.obj_order;
2128         page_count = (u32)calc_pages_for(0, length);
2129
2130         rbd_assert(obj_request->copyup_pages);
2131         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2132         obj_request->copyup_pages = NULL;
2133
2134         /*
2135          * We want the transfer count to reflect the size of the
2136          * original write request.  There is no such thing as a
2137          * successful short write, so if the request was successful
2138          * we can just set it to the originally-requested length.
2139          */
2140         if (!obj_request->result)
2141                 obj_request->xferred = obj_request->length;
2142
2143         /* Finish up with the normal image object callback */
2144
2145         rbd_img_obj_callback(obj_request);
2146 }
2147
2148 static void
2149 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2150 {
2151         struct rbd_obj_request *orig_request;
2152         struct ceph_osd_request *osd_req;
2153         struct ceph_osd_client *osdc;
2154         struct rbd_device *rbd_dev;
2155         struct page **pages;
2156         int result;
2157         u64 obj_size;
2158         u64 xferred;
2159
2160         rbd_assert(img_request_child_test(img_request));
2161
2162         /* First get what we need from the image request */
2163
2164         pages = img_request->copyup_pages;
2165         rbd_assert(pages != NULL);
2166         img_request->copyup_pages = NULL;
2167
2168         orig_request = img_request->obj_request;
2169         rbd_assert(orig_request != NULL);
2170         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2171         result = img_request->result;
2172         obj_size = img_request->length;
2173         xferred = img_request->xferred;
2174
2175         rbd_dev = img_request->rbd_dev;
2176         rbd_assert(rbd_dev);
2177         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2178
2179         rbd_img_request_put(img_request);
2180
2181         if (result)
2182                 goto out_err;
2183
2184         /* Allocate the new copyup osd request for the original request */
2185
2186         result = -ENOMEM;
2187         rbd_assert(!orig_request->osd_req);
2188         osd_req = rbd_osd_req_create_copyup(orig_request);
2189         if (!osd_req)
2190                 goto out_err;
2191         orig_request->osd_req = osd_req;
2192         orig_request->copyup_pages = pages;
2193
2194         /* Initialize the copyup op */
2195
2196         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2197         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2198                                                 false, false);
2199
2200         /* Then the original write request op */
2201
2202         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2203                                         orig_request->offset,
2204                                         orig_request->length, 0, 0);
2205         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2206                                         orig_request->length);
2207
2208         rbd_osd_req_format_write(orig_request);
2209
2210         /* All set, send it off. */
2211
2212         orig_request->callback = rbd_img_obj_copyup_callback;
2213         osdc = &rbd_dev->rbd_client->client->osdc;
2214         result = rbd_obj_request_submit(osdc, orig_request);
2215         if (!result)
2216                 return;
2217 out_err:
2218         /* Record the error code and complete the request */
2219
2220         orig_request->result = result;
2221         orig_request->xferred = 0;
2222         obj_request_done_set(orig_request);
2223         rbd_obj_request_complete(orig_request);
2224 }
2225
2226 /*
2227  * Read from the parent image the range of data that covers the
2228  * entire target of the given object request.  This is used for
2229  * satisfying a layered image write request when the target of an
2230  * object request from the image request does not exist.
2231  *
2232  * A page array big enough to hold the returned data is allocated
2233  * and supplied to rbd_img_request_fill() as the "data descriptor."
2234  * When the read completes, this page array will be transferred to
2235  * the original object request for the copyup operation.
2236  *
2237  * If an error occurs, record it as the result of the original
2238  * object request and mark it done so it gets completed.
2239  */
2240 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2241 {
2242         struct rbd_img_request *img_request = NULL;
2243         struct rbd_img_request *parent_request = NULL;
2244         struct rbd_device *rbd_dev;
2245         u64 img_offset;
2246         u64 length;
2247         struct page **pages = NULL;
2248         u32 page_count;
2249         int result;
2250
2251         rbd_assert(obj_request_img_data_test(obj_request));
2252         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2253
2254         img_request = obj_request->img_request;
2255         rbd_assert(img_request != NULL);
2256         rbd_dev = img_request->rbd_dev;
2257         rbd_assert(rbd_dev->parent != NULL);
2258
2259         /*
2260          * First things first.  The original osd request is of no
2261          * use to use any more, we'll need a new one that can hold
2262          * the two ops in a copyup request.  We'll get that later,
2263          * but for now we can release the old one.
2264          */
2265         rbd_osd_req_destroy(obj_request->osd_req);
2266         obj_request->osd_req = NULL;
2267
2268         /*
2269          * Determine the byte range covered by the object in the
2270          * child image to which the original request was to be sent.
2271          */
2272         img_offset = obj_request->img_offset - obj_request->offset;
2273         length = (u64)1 << rbd_dev->header.obj_order;
2274
2275         /*
2276          * There is no defined parent data beyond the parent
2277          * overlap, so limit what we read at that boundary if
2278          * necessary.
2279          */
2280         if (img_offset + length > rbd_dev->parent_overlap) {
2281                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2282                 length = rbd_dev->parent_overlap - img_offset;
2283         }
2284
2285         /*
2286          * Allocate a page array big enough to receive the data read
2287          * from the parent.
2288          */
2289         page_count = (u32)calc_pages_for(0, length);
2290         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2291         if (IS_ERR(pages)) {
2292                 result = PTR_ERR(pages);
2293                 pages = NULL;
2294                 goto out_err;
2295         }
2296
2297         result = -ENOMEM;
2298         parent_request = rbd_img_request_create(rbd_dev->parent,
2299                                                 img_offset, length,
2300                                                 false, true);
2301         if (!parent_request)
2302                 goto out_err;
2303         rbd_obj_request_get(obj_request);
2304         parent_request->obj_request = obj_request;
2305
2306         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2307         if (result)
2308                 goto out_err;
2309         parent_request->copyup_pages = pages;
2310
2311         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2312         result = rbd_img_request_submit(parent_request);
2313         if (!result)
2314                 return 0;
2315
2316         parent_request->copyup_pages = NULL;
2317         parent_request->obj_request = NULL;
2318         rbd_obj_request_put(obj_request);
2319 out_err:
2320         if (pages)
2321                 ceph_release_page_vector(pages, page_count);
2322         if (parent_request)
2323                 rbd_img_request_put(parent_request);
2324         obj_request->result = result;
2325         obj_request->xferred = 0;
2326         obj_request_done_set(obj_request);
2327
2328         return result;
2329 }
2330
2331 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2332 {
2333         struct rbd_obj_request *orig_request;
2334         int result;
2335
2336         rbd_assert(!obj_request_img_data_test(obj_request));
2337
2338         /*
2339          * All we need from the object request is the original
2340          * request and the result of the STAT op.  Grab those, then
2341          * we're done with the request.
2342          */
2343         orig_request = obj_request->obj_request;
2344         obj_request->obj_request = NULL;
2345         rbd_assert(orig_request);
2346         rbd_assert(orig_request->img_request);
2347
2348         result = obj_request->result;
2349         obj_request->result = 0;
2350
2351         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2352                 obj_request, orig_request, result,
2353                 obj_request->xferred, obj_request->length);
2354         rbd_obj_request_put(obj_request);
2355
2356         rbd_assert(orig_request);
2357         rbd_assert(orig_request->img_request);
2358
2359         /*
2360          * Our only purpose here is to determine whether the object
2361          * exists, and we don't want to treat the non-existence as
2362          * an error.  If something else comes back, transfer the
2363          * error to the original request and complete it now.
2364          */
2365         if (!result) {
2366                 obj_request_existence_set(orig_request, true);
2367         } else if (result == -ENOENT) {
2368                 obj_request_existence_set(orig_request, false);
2369         } else if (result) {
2370                 orig_request->result = result;
2371                 goto out;
2372         }
2373
2374         /*
2375          * Resubmit the original request now that we have recorded
2376          * whether the target object exists.
2377          */
2378         orig_request->result = rbd_img_obj_request_submit(orig_request);
2379 out:
2380         if (orig_request->result)
2381                 rbd_obj_request_complete(orig_request);
2382         rbd_obj_request_put(orig_request);
2383 }
2384
2385 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2386 {
2387         struct rbd_obj_request *stat_request;
2388         struct rbd_device *rbd_dev;
2389         struct ceph_osd_client *osdc;
2390         struct page **pages = NULL;
2391         u32 page_count;
2392         size_t size;
2393         int ret;
2394
2395         /*
2396          * The response data for a STAT call consists of:
2397          *     le64 length;
2398          *     struct {
2399          *         le32 tv_sec;
2400          *         le32 tv_nsec;
2401          *     } mtime;
2402          */
2403         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2404         page_count = (u32)calc_pages_for(0, size);
2405         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2406         if (IS_ERR(pages))
2407                 return PTR_ERR(pages);
2408
2409         ret = -ENOMEM;
2410         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2411                                                         OBJ_REQUEST_PAGES);
2412         if (!stat_request)
2413                 goto out;
2414
2415         rbd_obj_request_get(obj_request);
2416         stat_request->obj_request = obj_request;
2417         stat_request->pages = pages;
2418         stat_request->page_count = page_count;
2419
2420         rbd_assert(obj_request->img_request);
2421         rbd_dev = obj_request->img_request->rbd_dev;
2422         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2423                                                 stat_request);
2424         if (!stat_request->osd_req)
2425                 goto out;
2426         stat_request->callback = rbd_img_obj_exists_callback;
2427
2428         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2429         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2430                                         false, false);
2431         rbd_osd_req_format_read(stat_request);
2432
2433         osdc = &rbd_dev->rbd_client->client->osdc;
2434         ret = rbd_obj_request_submit(osdc, stat_request);
2435 out:
2436         if (ret)
2437                 rbd_obj_request_put(obj_request);
2438
2439         return ret;
2440 }
2441
2442 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2443 {
2444         struct rbd_img_request *img_request;
2445         struct rbd_device *rbd_dev;
2446         bool known;
2447
2448         rbd_assert(obj_request_img_data_test(obj_request));
2449
2450         img_request = obj_request->img_request;
2451         rbd_assert(img_request);
2452         rbd_dev = img_request->rbd_dev;
2453
2454         /*
2455          * Only writes to layered images need special handling.
2456          * Reads and non-layered writes are simple object requests.
2457          * Layered writes that start beyond the end of the overlap
2458          * with the parent have no parent data, so they too are
2459          * simple object requests.  Finally, if the target object is
2460          * known to already exist, its parent data has already been
2461          * copied, so a write to the object can also be handled as a
2462          * simple object request.
2463          */
2464         if (!img_request_write_test(img_request) ||
2465                 !img_request_layered_test(img_request) ||
2466                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2467                 ((known = obj_request_known_test(obj_request)) &&
2468                         obj_request_exists_test(obj_request))) {
2469
2470                 struct rbd_device *rbd_dev;
2471                 struct ceph_osd_client *osdc;
2472
2473                 rbd_dev = obj_request->img_request->rbd_dev;
2474                 osdc = &rbd_dev->rbd_client->client->osdc;
2475
2476                 return rbd_obj_request_submit(osdc, obj_request);
2477         }
2478
2479         /*
2480          * It's a layered write.  The target object might exist but
2481          * we may not know that yet.  If we know it doesn't exist,
2482          * start by reading the data for the full target object from
2483          * the parent so we can use it for a copyup to the target.
2484          */
2485         if (known)
2486                 return rbd_img_obj_parent_read_full(obj_request);
2487
2488         /* We don't know whether the target exists.  Go find out. */
2489
2490         return rbd_img_obj_exists_submit(obj_request);
2491 }
2492
2493 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2494 {
2495         struct rbd_obj_request *obj_request;
2496         struct rbd_obj_request *next_obj_request;
2497
2498         dout("%s: img %p\n", __func__, img_request);
2499         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2500                 int ret;
2501
2502                 ret = rbd_img_obj_request_submit(obj_request);
2503                 if (ret)
2504                         return ret;
2505         }
2506
2507         return 0;
2508 }
2509
2510 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2511 {
2512         struct rbd_obj_request *obj_request;
2513         struct rbd_device *rbd_dev;
2514         u64 obj_end;
2515
2516         rbd_assert(img_request_child_test(img_request));
2517
2518         obj_request = img_request->obj_request;
2519         rbd_assert(obj_request);
2520         rbd_assert(obj_request->img_request);
2521
2522         obj_request->result = img_request->result;
2523         if (obj_request->result)
2524                 goto out;
2525
2526         /*
2527          * We need to zero anything beyond the parent overlap
2528          * boundary.  Since rbd_img_obj_request_read_callback()
2529          * will zero anything beyond the end of a short read, an
2530          * easy way to do this is to pretend the data from the
2531          * parent came up short--ending at the overlap boundary.
2532          */
2533         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2534         obj_end = obj_request->img_offset + obj_request->length;
2535         rbd_dev = obj_request->img_request->rbd_dev;
2536         if (obj_end > rbd_dev->parent_overlap) {
2537                 u64 xferred = 0;
2538
2539                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2540                         xferred = rbd_dev->parent_overlap -
2541                                         obj_request->img_offset;
2542
2543                 obj_request->xferred = min(img_request->xferred, xferred);
2544         } else {
2545                 obj_request->xferred = img_request->xferred;
2546         }
2547 out:
2548         rbd_img_request_put(img_request);
2549         rbd_img_obj_request_read_callback(obj_request);
2550         rbd_obj_request_complete(obj_request);
2551 }
2552
2553 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2554 {
2555         struct rbd_device *rbd_dev;
2556         struct rbd_img_request *img_request;
2557         int result;
2558
2559         rbd_assert(obj_request_img_data_test(obj_request));
2560         rbd_assert(obj_request->img_request != NULL);
2561         rbd_assert(obj_request->result == (s32) -ENOENT);
2562         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2563
2564         rbd_dev = obj_request->img_request->rbd_dev;
2565         rbd_assert(rbd_dev->parent != NULL);
2566         /* rbd_read_finish(obj_request, obj_request->length); */
2567         img_request = rbd_img_request_create(rbd_dev->parent,
2568                                                 obj_request->img_offset,
2569                                                 obj_request->length,
2570                                                 false, true);
2571         result = -ENOMEM;
2572         if (!img_request)
2573                 goto out_err;
2574
2575         rbd_obj_request_get(obj_request);
2576         img_request->obj_request = obj_request;
2577
2578         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2579                                         obj_request->bio_list);
2580         if (result)
2581                 goto out_err;
2582
2583         img_request->callback = rbd_img_parent_read_callback;
2584         result = rbd_img_request_submit(img_request);
2585         if (result)
2586                 goto out_err;
2587
2588         return;
2589 out_err:
2590         if (img_request)
2591                 rbd_img_request_put(img_request);
2592         obj_request->result = result;
2593         obj_request->xferred = 0;
2594         obj_request_done_set(obj_request);
2595 }
2596
2597 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2598 {
2599         struct rbd_obj_request *obj_request;
2600         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2601         int ret;
2602
2603         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2604                                                         OBJ_REQUEST_NODATA);
2605         if (!obj_request)
2606                 return -ENOMEM;
2607
2608         ret = -ENOMEM;
2609         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2610         if (!obj_request->osd_req)
2611                 goto out;
2612         obj_request->callback = rbd_obj_request_put;
2613
2614         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2615                                         notify_id, 0, 0);
2616         rbd_osd_req_format_read(obj_request);
2617
2618         ret = rbd_obj_request_submit(osdc, obj_request);
2619 out:
2620         if (ret)
2621                 rbd_obj_request_put(obj_request);
2622
2623         return ret;
2624 }
2625
2626 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2627 {
2628         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2629         int ret;
2630
2631         if (!rbd_dev)
2632                 return;
2633
2634         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2635                 rbd_dev->header_name, (unsigned long long)notify_id,
2636                 (unsigned int)opcode);
2637         ret = rbd_dev_refresh(rbd_dev);
2638         if (ret)
2639                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2640
2641         rbd_obj_notify_ack(rbd_dev, notify_id);
2642 }
2643
2644 /*
2645  * Request sync osd watch/unwatch.  The value of "start" determines
2646  * whether a watch request is being initiated or torn down.
2647  */
2648 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2649 {
2650         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2651         struct rbd_obj_request *obj_request;
2652         int ret;
2653
2654         rbd_assert(start ^ !!rbd_dev->watch_event);
2655         rbd_assert(start ^ !!rbd_dev->watch_request);
2656
2657         if (start) {
2658                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2659                                                 &rbd_dev->watch_event);
2660                 if (ret < 0)
2661                         return ret;
2662                 rbd_assert(rbd_dev->watch_event != NULL);
2663         }
2664
2665         ret = -ENOMEM;
2666         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2667                                                         OBJ_REQUEST_NODATA);
2668         if (!obj_request)
2669                 goto out_cancel;
2670
2671         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2672         if (!obj_request->osd_req)
2673                 goto out_cancel;
2674
2675         if (start)
2676                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2677         else
2678                 ceph_osdc_unregister_linger_request(osdc,
2679                                         rbd_dev->watch_request->osd_req);
2680
2681         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2682                                 rbd_dev->watch_event->cookie, 0, start);
2683         rbd_osd_req_format_write(obj_request);
2684
2685         ret = rbd_obj_request_submit(osdc, obj_request);
2686         if (ret)
2687                 goto out_cancel;
2688         ret = rbd_obj_request_wait(obj_request);
2689         if (ret)
2690                 goto out_cancel;
2691         ret = obj_request->result;
2692         if (ret)
2693                 goto out_cancel;
2694
2695         /*
2696          * A watch request is set to linger, so the underlying osd
2697          * request won't go away until we unregister it.  We retain
2698          * a pointer to the object request during that time (in
2699          * rbd_dev->watch_request), so we'll keep a reference to
2700          * it.  We'll drop that reference (below) after we've
2701          * unregistered it.
2702          */
2703         if (start) {
2704                 rbd_dev->watch_request = obj_request;
2705
2706                 return 0;
2707         }
2708
2709         /* We have successfully torn down the watch request */
2710
2711         rbd_obj_request_put(rbd_dev->watch_request);
2712         rbd_dev->watch_request = NULL;
2713 out_cancel:
2714         /* Cancel the event if we're tearing down, or on error */
2715         ceph_osdc_cancel_event(rbd_dev->watch_event);
2716         rbd_dev->watch_event = NULL;
2717         if (obj_request)
2718                 rbd_obj_request_put(obj_request);
2719
2720         return ret;
2721 }
2722
2723 /*
2724  * Synchronous osd object method call.  Returns the number of bytes
2725  * returned in the outbound buffer, or a negative error code.
2726  */
2727 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2728                              const char *object_name,
2729                              const char *class_name,
2730                              const char *method_name,
2731                              const void *outbound,
2732                              size_t outbound_size,
2733                              void *inbound,
2734                              size_t inbound_size)
2735 {
2736         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2737         struct rbd_obj_request *obj_request;
2738         struct page **pages;
2739         u32 page_count;
2740         int ret;
2741
2742         /*
2743          * Method calls are ultimately read operations.  The result
2744          * should placed into the inbound buffer provided.  They
2745          * also supply outbound data--parameters for the object
2746          * method.  Currently if this is present it will be a
2747          * snapshot id.
2748          */
2749         page_count = (u32)calc_pages_for(0, inbound_size);
2750         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2751         if (IS_ERR(pages))
2752                 return PTR_ERR(pages);
2753
2754         ret = -ENOMEM;
2755         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2756                                                         OBJ_REQUEST_PAGES);
2757         if (!obj_request)
2758                 goto out;
2759
2760         obj_request->pages = pages;
2761         obj_request->page_count = page_count;
2762
2763         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2764         if (!obj_request->osd_req)
2765                 goto out;
2766
2767         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2768                                         class_name, method_name);
2769         if (outbound_size) {
2770                 struct ceph_pagelist *pagelist;
2771
2772                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2773                 if (!pagelist)
2774                         goto out;
2775
2776                 ceph_pagelist_init(pagelist);
2777                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2778                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2779                                                 pagelist);
2780         }
2781         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2782                                         obj_request->pages, inbound_size,
2783                                         0, false, false);
2784         rbd_osd_req_format_read(obj_request);
2785
2786         ret = rbd_obj_request_submit(osdc, obj_request);
2787         if (ret)
2788                 goto out;
2789         ret = rbd_obj_request_wait(obj_request);
2790         if (ret)
2791                 goto out;
2792
2793         ret = obj_request->result;
2794         if (ret < 0)
2795                 goto out;
2796
2797         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2798         ret = (int)obj_request->xferred;
2799         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2800 out:
2801         if (obj_request)
2802                 rbd_obj_request_put(obj_request);
2803         else
2804                 ceph_release_page_vector(pages, page_count);
2805
2806         return ret;
2807 }
2808
2809 static void rbd_request_fn(struct request_queue *q)
2810                 __releases(q->queue_lock) __acquires(q->queue_lock)
2811 {
2812         struct rbd_device *rbd_dev = q->queuedata;
2813         bool read_only = rbd_dev->mapping.read_only;
2814         struct request *rq;
2815         int result;
2816
2817         while ((rq = blk_fetch_request(q))) {
2818                 bool write_request = rq_data_dir(rq) == WRITE;
2819                 struct rbd_img_request *img_request;
2820                 u64 offset;
2821                 u64 length;
2822
2823                 /* Ignore any non-FS requests that filter through. */
2824
2825                 if (rq->cmd_type != REQ_TYPE_FS) {
2826                         dout("%s: non-fs request type %d\n", __func__,
2827                                 (int) rq->cmd_type);
2828                         __blk_end_request_all(rq, 0);
2829                         continue;
2830                 }
2831
2832                 /* Ignore/skip any zero-length requests */
2833
2834                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2835                 length = (u64) blk_rq_bytes(rq);
2836
2837                 if (!length) {
2838                         dout("%s: zero-length request\n", __func__);
2839                         __blk_end_request_all(rq, 0);
2840                         continue;
2841                 }
2842
2843                 spin_unlock_irq(q->queue_lock);
2844
2845                 /* Disallow writes to a read-only device */
2846
2847                 if (write_request) {
2848                         result = -EROFS;
2849                         if (read_only)
2850                                 goto end_request;
2851                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2852                 }
2853
2854                 /*
2855                  * Quit early if the mapped snapshot no longer
2856                  * exists.  It's still possible the snapshot will
2857                  * have disappeared by the time our request arrives
2858                  * at the osd, but there's no sense in sending it if
2859                  * we already know.
2860                  */
2861                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2862                         dout("request for non-existent snapshot");
2863                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2864                         result = -ENXIO;
2865                         goto end_request;
2866                 }
2867
2868                 result = -EINVAL;
2869                 if (offset && length > U64_MAX - offset + 1) {
2870                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2871                                 offset, length);
2872                         goto end_request;       /* Shouldn't happen */
2873                 }
2874
2875                 result = -EIO;
2876                 if (offset + length > rbd_dev->mapping.size) {
2877                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2878                                 offset, length, rbd_dev->mapping.size);
2879                         goto end_request;
2880                 }
2881
2882                 result = -ENOMEM;
2883                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2884                                                         write_request, false);
2885                 if (!img_request)
2886                         goto end_request;
2887
2888                 img_request->rq = rq;
2889
2890                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2891                                                 rq->bio);
2892                 if (!result)
2893                         result = rbd_img_request_submit(img_request);
2894                 if (result)
2895                         rbd_img_request_put(img_request);
2896 end_request:
2897                 spin_lock_irq(q->queue_lock);
2898                 if (result < 0) {
2899                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2900                                 write_request ? "write" : "read",
2901                                 length, offset, result);
2902
2903                         __blk_end_request_all(rq, result);
2904                 }
2905         }
2906 }
2907
2908 /*
2909  * a queue callback. Makes sure that we don't create a bio that spans across
2910  * multiple osd objects. One exception would be with a single page bios,
2911  * which we handle later at bio_chain_clone_range()
2912  */
2913 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2914                           struct bio_vec *bvec)
2915 {
2916         struct rbd_device *rbd_dev = q->queuedata;
2917         sector_t sector_offset;
2918         sector_t sectors_per_obj;
2919         sector_t obj_sector_offset;
2920         int ret;
2921
2922         /*
2923          * Find how far into its rbd object the partition-relative
2924          * bio start sector is to offset relative to the enclosing
2925          * device.
2926          */
2927         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2928         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2929         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2930
2931         /*
2932          * Compute the number of bytes from that offset to the end
2933          * of the object.  Account for what's already used by the bio.
2934          */
2935         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2936         if (ret > bmd->bi_size)
2937                 ret -= bmd->bi_size;
2938         else
2939                 ret = 0;
2940
2941         /*
2942          * Don't send back more than was asked for.  And if the bio
2943          * was empty, let the whole thing through because:  "Note
2944          * that a block device *must* allow a single page to be
2945          * added to an empty bio."
2946          */
2947         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2948         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2949                 ret = (int) bvec->bv_len;
2950
2951         return ret;
2952 }
2953
2954 static void rbd_free_disk(struct rbd_device *rbd_dev)
2955 {
2956         struct gendisk *disk = rbd_dev->disk;
2957
2958         if (!disk)
2959                 return;
2960
2961         rbd_dev->disk = NULL;
2962         if (disk->flags & GENHD_FL_UP) {
2963                 del_gendisk(disk);
2964                 if (disk->queue)
2965                         blk_cleanup_queue(disk->queue);
2966         }
2967         put_disk(disk);
2968 }
2969
2970 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2971                                 const char *object_name,
2972                                 u64 offset, u64 length, void *buf)
2973
2974 {
2975         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2976         struct rbd_obj_request *obj_request;
2977         struct page **pages = NULL;
2978         u32 page_count;
2979         size_t size;
2980         int ret;
2981
2982         page_count = (u32) calc_pages_for(offset, length);
2983         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2984         if (IS_ERR(pages))
2985                 ret = PTR_ERR(pages);
2986
2987         ret = -ENOMEM;
2988         obj_request = rbd_obj_request_create(object_name, offset, length,
2989                                                         OBJ_REQUEST_PAGES);
2990         if (!obj_request)
2991                 goto out;
2992
2993         obj_request->pages = pages;
2994         obj_request->page_count = page_count;
2995
2996         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2997         if (!obj_request->osd_req)
2998                 goto out;
2999
3000         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3001                                         offset, length, 0, 0);
3002         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3003                                         obj_request->pages,
3004                                         obj_request->length,
3005                                         obj_request->offset & ~PAGE_MASK,
3006                                         false, false);
3007         rbd_osd_req_format_read(obj_request);
3008
3009         ret = rbd_obj_request_submit(osdc, obj_request);
3010         if (ret)
3011                 goto out;
3012         ret = rbd_obj_request_wait(obj_request);
3013         if (ret)
3014                 goto out;
3015
3016         ret = obj_request->result;
3017         if (ret < 0)
3018                 goto out;
3019
3020         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3021         size = (size_t) obj_request->xferred;
3022         ceph_copy_from_page_vector(pages, buf, 0, size);
3023         rbd_assert(size <= (size_t)INT_MAX);
3024         ret = (int)size;
3025 out:
3026         if (obj_request)
3027                 rbd_obj_request_put(obj_request);
3028         else
3029                 ceph_release_page_vector(pages, page_count);
3030
3031         return ret;
3032 }
3033
3034 /*
3035  * Read the complete header for the given rbd device.
3036  *
3037  * Returns a pointer to a dynamically-allocated buffer containing
3038  * the complete and validated header.  Caller can pass the address
3039  * of a variable that will be filled in with the version of the
3040  * header object at the time it was read.
3041  *
3042  * Returns a pointer-coded errno if a failure occurs.
3043  */
3044 static struct rbd_image_header_ondisk *
3045 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3046 {
3047         struct rbd_image_header_ondisk *ondisk = NULL;
3048         u32 snap_count = 0;
3049         u64 names_size = 0;
3050         u32 want_count;
3051         int ret;
3052
3053         /*
3054          * The complete header will include an array of its 64-bit
3055          * snapshot ids, followed by the names of those snapshots as
3056          * a contiguous block of NUL-terminated strings.  Note that
3057          * the number of snapshots could change by the time we read
3058          * it in, in which case we re-read it.
3059          */
3060         do {
3061                 size_t size;
3062
3063                 kfree(ondisk);
3064
3065                 size = sizeof (*ondisk);
3066                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3067                 size += names_size;
3068                 ondisk = kmalloc(size, GFP_KERNEL);
3069                 if (!ondisk)
3070                         return ERR_PTR(-ENOMEM);
3071
3072                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3073                                        0, size, ondisk);
3074                 if (ret < 0)
3075                         goto out_err;
3076                 if ((size_t)ret < size) {
3077                         ret = -ENXIO;
3078                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3079                                 size, ret);
3080                         goto out_err;
3081                 }
3082                 if (!rbd_dev_ondisk_valid(ondisk)) {
3083                         ret = -ENXIO;
3084                         rbd_warn(rbd_dev, "invalid header");
3085                         goto out_err;
3086                 }
3087
3088                 names_size = le64_to_cpu(ondisk->snap_names_len);
3089                 want_count = snap_count;
3090                 snap_count = le32_to_cpu(ondisk->snap_count);
3091         } while (snap_count != want_count);
3092
3093         return ondisk;
3094
3095 out_err:
3096         kfree(ondisk);
3097
3098         return ERR_PTR(ret);
3099 }
3100
3101 /*
3102  * reload the ondisk the header
3103  */
3104 static int rbd_read_header(struct rbd_device *rbd_dev,
3105                            struct rbd_image_header *header)
3106 {
3107         struct rbd_image_header_ondisk *ondisk;
3108         int ret;
3109
3110         ondisk = rbd_dev_v1_header_read(rbd_dev);
3111         if (IS_ERR(ondisk))
3112                 return PTR_ERR(ondisk);
3113         ret = rbd_header_from_disk(header, ondisk);
3114         kfree(ondisk);
3115
3116         return ret;
3117 }
3118
3119 /*
3120  * only read the first part of the ondisk header, without the snaps info
3121  */
3122 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3123 {
3124         int ret;
3125         struct rbd_image_header h;
3126
3127         memset(&h, 0, sizeof (h));
3128         ret = rbd_read_header(rbd_dev, &h);
3129         if (ret < 0)
3130                 return ret;
3131
3132         down_write(&rbd_dev->header_rwsem);
3133
3134         /* Update image size, and check for resize of mapped image */
3135         rbd_dev->header.image_size = h.image_size;
3136         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3137                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3138                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3139
3140         /* rbd_dev->header.object_prefix shouldn't change */
3141         kfree(rbd_dev->header.snap_sizes);
3142         kfree(rbd_dev->header.snap_names);
3143         /* osd requests may still refer to snapc */
3144         ceph_put_snap_context(rbd_dev->header.snapc);
3145
3146         rbd_dev->header.image_size = h.image_size;
3147         rbd_dev->header.snapc = h.snapc;
3148         rbd_dev->header.snap_names = h.snap_names;
3149         rbd_dev->header.snap_sizes = h.snap_sizes;
3150         /* Free the extra copy of the object prefix */
3151         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3152                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3153         kfree(h.object_prefix);
3154
3155         up_write(&rbd_dev->header_rwsem);
3156
3157         return ret;
3158 }
3159
3160 /*
3161  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3162  * has disappeared from the (just updated) snapshot context.
3163  */
3164 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3165 {
3166         u64 snap_id;
3167
3168         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3169                 return;
3170
3171         snap_id = rbd_dev->spec->snap_id;
3172         if (snap_id == CEPH_NOSNAP)
3173                 return;
3174
3175         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3176                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3177 }
3178
3179 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3180 {
3181         u64 mapping_size;
3182         int ret;
3183
3184         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3185         mapping_size = rbd_dev->mapping.size;
3186         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3187         if (rbd_dev->image_format == 1)
3188                 ret = rbd_dev_v1_refresh(rbd_dev);
3189         else
3190                 ret = rbd_dev_v2_refresh(rbd_dev);
3191
3192         /* If it's a mapped snapshot, validate its EXISTS flag */
3193
3194         rbd_exists_validate(rbd_dev);
3195         mutex_unlock(&ctl_mutex);
3196         if (mapping_size != rbd_dev->mapping.size) {
3197                 sector_t size;
3198
3199                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3200                 dout("setting size to %llu sectors", (unsigned long long)size);
3201                 set_capacity(rbd_dev->disk, size);
3202                 revalidate_disk(rbd_dev->disk);
3203         }
3204
3205         return ret;
3206 }
3207
3208 static int rbd_init_disk(struct rbd_device *rbd_dev)
3209 {
3210         struct gendisk *disk;
3211         struct request_queue *q;
3212         u64 segment_size;
3213
3214         /* create gendisk info */
3215         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3216         if (!disk)
3217                 return -ENOMEM;
3218
3219         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3220                  rbd_dev->dev_id);
3221         disk->major = rbd_dev->major;
3222         disk->first_minor = 0;
3223         disk->fops = &rbd_bd_ops;
3224         disk->private_data = rbd_dev;
3225
3226         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3227         if (!q)
3228                 goto out_disk;
3229
3230         /* We use the default size, but let's be explicit about it. */
3231         blk_queue_physical_block_size(q, SECTOR_SIZE);
3232
3233         /* set io sizes to object size */
3234         segment_size = rbd_obj_bytes(&rbd_dev->header);
3235         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3236         blk_queue_max_segment_size(q, segment_size);
3237         blk_queue_io_min(q, segment_size);
3238         blk_queue_io_opt(q, segment_size);
3239
3240         blk_queue_merge_bvec(q, rbd_merge_bvec);
3241         disk->queue = q;
3242
3243         q->queuedata = rbd_dev;
3244
3245         rbd_dev->disk = disk;
3246
3247         return 0;
3248 out_disk:
3249         put_disk(disk);
3250
3251         return -ENOMEM;
3252 }
3253
3254 /*
3255   sysfs
3256 */
3257
3258 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3259 {
3260         return container_of(dev, struct rbd_device, dev);
3261 }
3262
3263 static ssize_t rbd_size_show(struct device *dev,
3264                              struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         return sprintf(buf, "%llu\n",
3269                 (unsigned long long)rbd_dev->mapping.size);
3270 }
3271
3272 /*
3273  * Note this shows the features for whatever's mapped, which is not
3274  * necessarily the base image.
3275  */
3276 static ssize_t rbd_features_show(struct device *dev,
3277                              struct device_attribute *attr, char *buf)
3278 {
3279         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3280
3281         return sprintf(buf, "0x%016llx\n",
3282                         (unsigned long long)rbd_dev->mapping.features);
3283 }
3284
3285 static ssize_t rbd_major_show(struct device *dev,
3286                               struct device_attribute *attr, char *buf)
3287 {
3288         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3289
3290         if (rbd_dev->major)
3291                 return sprintf(buf, "%d\n", rbd_dev->major);
3292
3293         return sprintf(buf, "(none)\n");
3294
3295 }
3296
3297 static ssize_t rbd_client_id_show(struct device *dev,
3298                                   struct device_attribute *attr, char *buf)
3299 {
3300         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3301
3302         return sprintf(buf, "client%lld\n",
3303                         ceph_client_id(rbd_dev->rbd_client->client));
3304 }
3305
3306 static ssize_t rbd_pool_show(struct device *dev,
3307                              struct device_attribute *attr, char *buf)
3308 {
3309         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3310
3311         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3312 }
3313
3314 static ssize_t rbd_pool_id_show(struct device *dev,
3315                              struct device_attribute *attr, char *buf)
3316 {
3317         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3318
3319         return sprintf(buf, "%llu\n",
3320                         (unsigned long long) rbd_dev->spec->pool_id);
3321 }
3322
3323 static ssize_t rbd_name_show(struct device *dev,
3324                              struct device_attribute *attr, char *buf)
3325 {
3326         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3327
3328         if (rbd_dev->spec->image_name)
3329                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3330
3331         return sprintf(buf, "(unknown)\n");
3332 }
3333
3334 static ssize_t rbd_image_id_show(struct device *dev,
3335                              struct device_attribute *attr, char *buf)
3336 {
3337         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3338
3339         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3340 }
3341
3342 /*
3343  * Shows the name of the currently-mapped snapshot (or
3344  * RBD_SNAP_HEAD_NAME for the base image).
3345  */
3346 static ssize_t rbd_snap_show(struct device *dev,
3347                              struct device_attribute *attr,
3348                              char *buf)
3349 {
3350         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3351
3352         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3353 }
3354
3355 /*
3356  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3357  * for the parent image.  If there is no parent, simply shows
3358  * "(no parent image)".
3359  */
3360 static ssize_t rbd_parent_show(struct device *dev,
3361                              struct device_attribute *attr,
3362                              char *buf)
3363 {
3364         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3365         struct rbd_spec *spec = rbd_dev->parent_spec;
3366         int count;
3367         char *bufp = buf;
3368
3369         if (!spec)
3370                 return sprintf(buf, "(no parent image)\n");
3371
3372         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3373                         (unsigned long long) spec->pool_id, spec->pool_name);
3374         if (count < 0)
3375                 return count;
3376         bufp += count;
3377
3378         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3379                         spec->image_name ? spec->image_name : "(unknown)");
3380         if (count < 0)
3381                 return count;
3382         bufp += count;
3383
3384         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3385                         (unsigned long long) spec->snap_id, spec->snap_name);
3386         if (count < 0)
3387                 return count;
3388         bufp += count;
3389
3390         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3391         if (count < 0)
3392                 return count;
3393         bufp += count;
3394
3395         return (ssize_t) (bufp - buf);
3396 }
3397
3398 static ssize_t rbd_image_refresh(struct device *dev,
3399                                  struct device_attribute *attr,
3400                                  const char *buf,
3401                                  size_t size)
3402 {
3403         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3404         int ret;
3405
3406         ret = rbd_dev_refresh(rbd_dev);
3407         if (ret)
3408                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3409
3410         return ret < 0 ? ret : size;
3411 }
3412
3413 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3414 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3415 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3416 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3417 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3418 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3419 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3420 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3421 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3422 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3423 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3424
3425 static struct attribute *rbd_attrs[] = {
3426         &dev_attr_size.attr,
3427         &dev_attr_features.attr,
3428         &dev_attr_major.attr,
3429         &dev_attr_client_id.attr,
3430         &dev_attr_pool.attr,
3431         &dev_attr_pool_id.attr,
3432         &dev_attr_name.attr,
3433         &dev_attr_image_id.attr,
3434         &dev_attr_current_snap.attr,
3435         &dev_attr_parent.attr,
3436         &dev_attr_refresh.attr,
3437         NULL
3438 };
3439
3440 static struct attribute_group rbd_attr_group = {
3441         .attrs = rbd_attrs,
3442 };
3443
3444 static const struct attribute_group *rbd_attr_groups[] = {
3445         &rbd_attr_group,
3446         NULL
3447 };
3448
3449 static void rbd_sysfs_dev_release(struct device *dev)
3450 {
3451 }
3452
3453 static struct device_type rbd_device_type = {
3454         .name           = "rbd",
3455         .groups         = rbd_attr_groups,
3456         .release        = rbd_sysfs_dev_release,
3457 };
3458
3459 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3460 {
3461         kref_get(&spec->kref);
3462
3463         return spec;
3464 }
3465
3466 static void rbd_spec_free(struct kref *kref);
3467 static void rbd_spec_put(struct rbd_spec *spec)
3468 {
3469         if (spec)
3470                 kref_put(&spec->kref, rbd_spec_free);
3471 }
3472
3473 static struct rbd_spec *rbd_spec_alloc(void)
3474 {
3475         struct rbd_spec *spec;
3476
3477         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3478         if (!spec)
3479                 return NULL;
3480         kref_init(&spec->kref);
3481
3482         return spec;
3483 }
3484
3485 static void rbd_spec_free(struct kref *kref)
3486 {
3487         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3488
3489         kfree(spec->pool_name);
3490         kfree(spec->image_id);
3491         kfree(spec->image_name);
3492         kfree(spec->snap_name);
3493         kfree(spec);
3494 }
3495
3496 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3497                                 struct rbd_spec *spec)
3498 {
3499         struct rbd_device *rbd_dev;
3500
3501         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3502         if (!rbd_dev)
3503                 return NULL;
3504
3505         spin_lock_init(&rbd_dev->lock);
3506         rbd_dev->flags = 0;
3507         INIT_LIST_HEAD(&rbd_dev->node);
3508         init_rwsem(&rbd_dev->header_rwsem);
3509
3510         rbd_dev->spec = spec;
3511         rbd_dev->rbd_client = rbdc;
3512
3513         /* Initialize the layout used for all rbd requests */
3514
3515         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3516         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3517         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3518         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3519
3520         return rbd_dev;
3521 }
3522
3523 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3524 {
3525         rbd_put_client(rbd_dev->rbd_client);
3526         rbd_spec_put(rbd_dev->spec);
3527         kfree(rbd_dev);
3528 }
3529
3530 /*
3531  * Get the size and object order for an image snapshot, or if
3532  * snap_id is CEPH_NOSNAP, gets this information for the base
3533  * image.
3534  */
3535 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3536                                 u8 *order, u64 *snap_size)
3537 {
3538         __le64 snapid = cpu_to_le64(snap_id);
3539         int ret;
3540         struct {
3541                 u8 order;
3542                 __le64 size;
3543         } __attribute__ ((packed)) size_buf = { 0 };
3544
3545         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3546                                 "rbd", "get_size",
3547                                 &snapid, sizeof (snapid),
3548                                 &size_buf, sizeof (size_buf));
3549         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3550         if (ret < 0)
3551                 return ret;
3552         if (ret < sizeof (size_buf))
3553                 return -ERANGE;
3554
3555         if (order)
3556                 *order = size_buf.order;
3557         *snap_size = le64_to_cpu(size_buf.size);
3558
3559         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3560                 (unsigned long long)snap_id, (unsigned int)*order,
3561                 (unsigned long long)*snap_size);
3562
3563         return 0;
3564 }
3565
3566 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3567 {
3568         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3569                                         &rbd_dev->header.obj_order,
3570                                         &rbd_dev->header.image_size);
3571 }
3572
3573 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3574 {
3575         void *reply_buf;
3576         int ret;
3577         void *p;
3578
3579         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3580         if (!reply_buf)
3581                 return -ENOMEM;
3582
3583         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3584                                 "rbd", "get_object_prefix", NULL, 0,
3585                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3586         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3587         if (ret < 0)
3588                 goto out;
3589
3590         p = reply_buf;
3591         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3592                                                 p + ret, NULL, GFP_NOIO);
3593         ret = 0;
3594
3595         if (IS_ERR(rbd_dev->header.object_prefix)) {
3596                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3597                 rbd_dev->header.object_prefix = NULL;
3598         } else {
3599                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3600         }
3601 out:
3602         kfree(reply_buf);
3603
3604         return ret;
3605 }
3606
3607 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3608                 u64 *snap_features)
3609 {
3610         __le64 snapid = cpu_to_le64(snap_id);
3611         struct {
3612                 __le64 features;
3613                 __le64 incompat;
3614         } __attribute__ ((packed)) features_buf = { 0 };
3615         u64 incompat;
3616         int ret;
3617
3618         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3619                                 "rbd", "get_features",
3620                                 &snapid, sizeof (snapid),
3621                                 &features_buf, sizeof (features_buf));
3622         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3623         if (ret < 0)
3624                 return ret;
3625         if (ret < sizeof (features_buf))
3626                 return -ERANGE;
3627
3628         incompat = le64_to_cpu(features_buf.incompat);
3629         if (incompat & ~RBD_FEATURES_SUPPORTED)
3630                 return -ENXIO;
3631
3632         *snap_features = le64_to_cpu(features_buf.features);
3633
3634         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3635                 (unsigned long long)snap_id,
3636                 (unsigned long long)*snap_features,
3637                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3638
3639         return 0;
3640 }
3641
3642 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3643 {
3644         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3645                                                 &rbd_dev->header.features);
3646 }
3647
3648 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3649 {
3650         struct rbd_spec *parent_spec;
3651         size_t size;
3652         void *reply_buf = NULL;
3653         __le64 snapid;
3654         void *p;
3655         void *end;
3656         char *image_id;
3657         u64 overlap;
3658         int ret;
3659
3660         parent_spec = rbd_spec_alloc();
3661         if (!parent_spec)
3662                 return -ENOMEM;
3663
3664         size = sizeof (__le64) +                                /* pool_id */
3665                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3666                 sizeof (__le64) +                               /* snap_id */
3667                 sizeof (__le64);                                /* overlap */
3668         reply_buf = kmalloc(size, GFP_KERNEL);
3669         if (!reply_buf) {
3670                 ret = -ENOMEM;
3671                 goto out_err;
3672         }
3673
3674         snapid = cpu_to_le64(CEPH_NOSNAP);
3675         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3676                                 "rbd", "get_parent",
3677                                 &snapid, sizeof (snapid),
3678                                 reply_buf, size);
3679         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3680         if (ret < 0)
3681                 goto out_err;
3682
3683         p = reply_buf;
3684         end = reply_buf + ret;
3685         ret = -ERANGE;
3686         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3687         if (parent_spec->pool_id == CEPH_NOPOOL)
3688                 goto out;       /* No parent?  No problem. */
3689
3690         /* The ceph file layout needs to fit pool id in 32 bits */
3691
3692         ret = -EIO;
3693         if (parent_spec->pool_id > (u64)U32_MAX) {
3694                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3695                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3696                 goto out_err;
3697         }
3698
3699         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3700         if (IS_ERR(image_id)) {
3701                 ret = PTR_ERR(image_id);
3702                 goto out_err;
3703         }
3704         parent_spec->image_id = image_id;
3705         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3706         ceph_decode_64_safe(&p, end, overlap, out_err);
3707
3708         rbd_dev->parent_overlap = overlap;
3709         rbd_dev->parent_spec = parent_spec;
3710         parent_spec = NULL;     /* rbd_dev now owns this */
3711 out:
3712         ret = 0;
3713 out_err:
3714         kfree(reply_buf);
3715         rbd_spec_put(parent_spec);
3716
3717         return ret;
3718 }
3719
3720 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3721 {
3722         struct {
3723                 __le64 stripe_unit;
3724                 __le64 stripe_count;
3725         } __attribute__ ((packed)) striping_info_buf = { 0 };
3726         size_t size = sizeof (striping_info_buf);
3727         void *p;
3728         u64 obj_size;
3729         u64 stripe_unit;
3730         u64 stripe_count;
3731         int ret;
3732
3733         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3734                                 "rbd", "get_stripe_unit_count", NULL, 0,
3735                                 (char *)&striping_info_buf, size);
3736         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3737         if (ret < 0)
3738                 return ret;
3739         if (ret < size)
3740                 return -ERANGE;
3741
3742         /*
3743          * We don't actually support the "fancy striping" feature
3744          * (STRIPINGV2) yet, but if the striping sizes are the
3745          * defaults the behavior is the same as before.  So find
3746          * out, and only fail if the image has non-default values.
3747          */
3748         ret = -EINVAL;
3749         obj_size = (u64)1 << rbd_dev->header.obj_order;
3750         p = &striping_info_buf;
3751         stripe_unit = ceph_decode_64(&p);
3752         if (stripe_unit != obj_size) {
3753                 rbd_warn(rbd_dev, "unsupported stripe unit "
3754                                 "(got %llu want %llu)",
3755                                 stripe_unit, obj_size);
3756                 return -EINVAL;
3757         }
3758         stripe_count = ceph_decode_64(&p);
3759         if (stripe_count != 1) {
3760                 rbd_warn(rbd_dev, "unsupported stripe count "
3761                                 "(got %llu want 1)", stripe_count);
3762                 return -EINVAL;
3763         }
3764         rbd_dev->header.stripe_unit = stripe_unit;
3765         rbd_dev->header.stripe_count = stripe_count;
3766
3767         return 0;
3768 }
3769
3770 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3771 {
3772         size_t image_id_size;
3773         char *image_id;
3774         void *p;
3775         void *end;
3776         size_t size;
3777         void *reply_buf = NULL;
3778         size_t len = 0;
3779         char *image_name = NULL;
3780         int ret;
3781
3782         rbd_assert(!rbd_dev->spec->image_name);
3783
3784         len = strlen(rbd_dev->spec->image_id);
3785         image_id_size = sizeof (__le32) + len;
3786         image_id = kmalloc(image_id_size, GFP_KERNEL);
3787         if (!image_id)
3788                 return NULL;
3789
3790         p = image_id;
3791         end = image_id + image_id_size;
3792         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3793
3794         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3795         reply_buf = kmalloc(size, GFP_KERNEL);
3796         if (!reply_buf)
3797                 goto out;
3798
3799         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3800                                 "rbd", "dir_get_name",
3801                                 image_id, image_id_size,
3802                                 reply_buf, size);
3803         if (ret < 0)
3804                 goto out;
3805         p = reply_buf;
3806         end = reply_buf + ret;
3807
3808         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3809         if (IS_ERR(image_name))
3810                 image_name = NULL;
3811         else
3812                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3813 out:
3814         kfree(reply_buf);
3815         kfree(image_id);
3816
3817         return image_name;
3818 }
3819
3820 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3821 {
3822         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3823         const char *snap_name;
3824         u32 which = 0;
3825
3826         /* Skip over names until we find the one we are looking for */
3827
3828         snap_name = rbd_dev->header.snap_names;
3829         while (which < snapc->num_snaps) {
3830                 if (!strcmp(name, snap_name))
3831                         return snapc->snaps[which];
3832                 snap_name += strlen(snap_name) + 1;
3833                 which++;
3834         }
3835         return CEPH_NOSNAP;
3836 }
3837
3838 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3839 {
3840         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3841         u32 which;
3842         bool found = false;
3843         u64 snap_id;
3844
3845         for (which = 0; !found && which < snapc->num_snaps; which++) {
3846                 const char *snap_name;
3847
3848                 snap_id = snapc->snaps[which];
3849                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3850                 if (IS_ERR(snap_name))
3851                         break;
3852                 found = !strcmp(name, snap_name);
3853                 kfree(snap_name);
3854         }
3855         return found ? snap_id : CEPH_NOSNAP;
3856 }
3857
3858 /*
3859  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3860  * no snapshot by that name is found, or if an error occurs.
3861  */
3862 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3863 {
3864         if (rbd_dev->image_format == 1)
3865                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3866
3867         return rbd_v2_snap_id_by_name(rbd_dev, name);
3868 }
3869
3870 /*
3871  * When an rbd image has a parent image, it is identified by the
3872  * pool, image, and snapshot ids (not names).  This function fills
3873  * in the names for those ids.  (It's OK if we can't figure out the
3874  * name for an image id, but the pool and snapshot ids should always
3875  * exist and have names.)  All names in an rbd spec are dynamically
3876  * allocated.
3877  *
3878  * When an image being mapped (not a parent) is probed, we have the
3879  * pool name and pool id, image name and image id, and the snapshot
3880  * name.  The only thing we're missing is the snapshot id.
3881  */
3882 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3883 {
3884         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3885         struct rbd_spec *spec = rbd_dev->spec;
3886         const char *pool_name;
3887         const char *image_name;
3888         const char *snap_name;
3889         int ret;
3890
3891         /*
3892          * An image being mapped will have the pool name (etc.), but
3893          * we need to look up the snapshot id.
3894          */
3895         if (spec->pool_name) {
3896                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3897                         u64 snap_id;
3898
3899                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3900                         if (snap_id == CEPH_NOSNAP)
3901                                 return -ENOENT;
3902                         spec->snap_id = snap_id;
3903                 } else {
3904                         spec->snap_id = CEPH_NOSNAP;
3905                 }
3906
3907                 return 0;
3908         }
3909
3910         /* Get the pool name; we have to make our own copy of this */
3911
3912         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3913         if (!pool_name) {
3914                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3915                 return -EIO;
3916         }
3917         pool_name = kstrdup(pool_name, GFP_KERNEL);
3918         if (!pool_name)
3919                 return -ENOMEM;
3920
3921         /* Fetch the image name; tolerate failure here */
3922
3923         image_name = rbd_dev_image_name(rbd_dev);
3924         if (!image_name)
3925                 rbd_warn(rbd_dev, "unable to get image name");
3926
3927         /* Look up the snapshot name, and make a copy */
3928
3929         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3930         if (!snap_name) {
3931                 ret = -ENOMEM;
3932                 goto out_err;
3933         }
3934
3935         spec->pool_name = pool_name;
3936         spec->image_name = image_name;
3937         spec->snap_name = snap_name;
3938
3939         return 0;
3940 out_err:
3941         kfree(image_name);
3942         kfree(pool_name);
3943
3944         return ret;
3945 }
3946
3947 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3948 {
3949         size_t size;
3950         int ret;
3951         void *reply_buf;
3952         void *p;
3953         void *end;
3954         u64 seq;
3955         u32 snap_count;
3956         struct ceph_snap_context *snapc;
3957         u32 i;
3958
3959         /*
3960          * We'll need room for the seq value (maximum snapshot id),
3961          * snapshot count, and array of that many snapshot ids.
3962          * For now we have a fixed upper limit on the number we're
3963          * prepared to receive.
3964          */
3965         size = sizeof (__le64) + sizeof (__le32) +
3966                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3967         reply_buf = kzalloc(size, GFP_KERNEL);
3968         if (!reply_buf)
3969                 return -ENOMEM;
3970
3971         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3972                                 "rbd", "get_snapcontext", NULL, 0,
3973                                 reply_buf, size);
3974         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3975         if (ret < 0)
3976                 goto out;
3977
3978         p = reply_buf;
3979         end = reply_buf + ret;
3980         ret = -ERANGE;
3981         ceph_decode_64_safe(&p, end, seq, out);
3982         ceph_decode_32_safe(&p, end, snap_count, out);
3983
3984         /*
3985          * Make sure the reported number of snapshot ids wouldn't go
3986          * beyond the end of our buffer.  But before checking that,
3987          * make sure the computed size of the snapshot context we
3988          * allocate is representable in a size_t.
3989          */
3990         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3991                                  / sizeof (u64)) {
3992                 ret = -EINVAL;
3993                 goto out;
3994         }
3995         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3996                 goto out;
3997         ret = 0;
3998
3999         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4000         if (!snapc) {
4001                 ret = -ENOMEM;
4002                 goto out;
4003         }
4004         snapc->seq = seq;
4005         for (i = 0; i < snap_count; i++)
4006                 snapc->snaps[i] = ceph_decode_64(&p);
4007
4008         ceph_put_snap_context(rbd_dev->header.snapc);
4009         rbd_dev->header.snapc = snapc;
4010
4011         dout("  snap context seq = %llu, snap_count = %u\n",
4012                 (unsigned long long)seq, (unsigned int)snap_count);
4013 out:
4014         kfree(reply_buf);
4015
4016         return ret;
4017 }
4018
4019 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4020                                         u64 snap_id)
4021 {
4022         size_t size;
4023         void *reply_buf;
4024         __le64 snapid;
4025         int ret;
4026         void *p;
4027         void *end;
4028         char *snap_name;
4029
4030         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4031         reply_buf = kmalloc(size, GFP_KERNEL);
4032         if (!reply_buf)
4033                 return ERR_PTR(-ENOMEM);
4034
4035         snapid = cpu_to_le64(snap_id);
4036         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4037                                 "rbd", "get_snapshot_name",
4038                                 &snapid, sizeof (snapid),
4039                                 reply_buf, size);
4040         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4041         if (ret < 0) {
4042                 snap_name = ERR_PTR(ret);
4043                 goto out;
4044         }
4045
4046         p = reply_buf;
4047         end = reply_buf + ret;
4048         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4049         if (IS_ERR(snap_name))
4050                 goto out;
4051
4052         dout("  snap_id 0x%016llx snap_name = %s\n",
4053                 (unsigned long long)snap_id, snap_name);
4054 out:
4055         kfree(reply_buf);
4056
4057         return snap_name;
4058 }
4059
4060 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4061 {
4062         int ret;
4063
4064         down_write(&rbd_dev->header_rwsem);
4065
4066         ret = rbd_dev_v2_image_size(rbd_dev);
4067         if (ret)
4068                 goto out;
4069         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4070                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4071                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4072
4073         ret = rbd_dev_v2_snap_context(rbd_dev);
4074         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4075         if (ret)
4076                 goto out;
4077 out:
4078         up_write(&rbd_dev->header_rwsem);
4079
4080         return ret;
4081 }
4082
4083 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4084 {
4085         struct device *dev;
4086         int ret;
4087
4088         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4089
4090         dev = &rbd_dev->dev;
4091         dev->bus = &rbd_bus_type;
4092         dev->type = &rbd_device_type;
4093         dev->parent = &rbd_root_dev;
4094         dev->release = rbd_dev_device_release;
4095         dev_set_name(dev, "%d", rbd_dev->dev_id);
4096         ret = device_register(dev);
4097
4098         mutex_unlock(&ctl_mutex);
4099
4100         return ret;
4101 }
4102
4103 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4104 {
4105         device_unregister(&rbd_dev->dev);
4106 }
4107
4108 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4109
4110 /*
4111  * Get a unique rbd identifier for the given new rbd_dev, and add
4112  * the rbd_dev to the global list.  The minimum rbd id is 1.
4113  */
4114 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4115 {
4116         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4117
4118         spin_lock(&rbd_dev_list_lock);
4119         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4120         spin_unlock(&rbd_dev_list_lock);
4121         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4122                 (unsigned long long) rbd_dev->dev_id);
4123 }
4124
4125 /*
4126  * Remove an rbd_dev from the global list, and record that its
4127  * identifier is no longer in use.
4128  */
4129 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4130 {
4131         struct list_head *tmp;
4132         int rbd_id = rbd_dev->dev_id;
4133         int max_id;
4134
4135         rbd_assert(rbd_id > 0);
4136
4137         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4138                 (unsigned long long) rbd_dev->dev_id);
4139         spin_lock(&rbd_dev_list_lock);
4140         list_del_init(&rbd_dev->node);
4141
4142         /*
4143          * If the id being "put" is not the current maximum, there
4144          * is nothing special we need to do.
4145          */
4146         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4147                 spin_unlock(&rbd_dev_list_lock);
4148                 return;
4149         }
4150
4151         /*
4152          * We need to update the current maximum id.  Search the
4153          * list to find out what it is.  We're more likely to find
4154          * the maximum at the end, so search the list backward.
4155          */
4156         max_id = 0;
4157         list_for_each_prev(tmp, &rbd_dev_list) {
4158                 struct rbd_device *rbd_dev;
4159
4160                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4161                 if (rbd_dev->dev_id > max_id)
4162                         max_id = rbd_dev->dev_id;
4163         }
4164         spin_unlock(&rbd_dev_list_lock);
4165
4166         /*
4167          * The max id could have been updated by rbd_dev_id_get(), in
4168          * which case it now accurately reflects the new maximum.
4169          * Be careful not to overwrite the maximum value in that
4170          * case.
4171          */
4172         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4173         dout("  max dev id has been reset\n");
4174 }
4175
4176 /*
4177  * Skips over white space at *buf, and updates *buf to point to the
4178  * first found non-space character (if any). Returns the length of
4179  * the token (string of non-white space characters) found.  Note
4180  * that *buf must be terminated with '\0'.
4181  */
4182 static inline size_t next_token(const char **buf)
4183 {
4184         /*
4185         * These are the characters that produce nonzero for
4186         * isspace() in the "C" and "POSIX" locales.
4187         */
4188         const char *spaces = " \f\n\r\t\v";
4189
4190         *buf += strspn(*buf, spaces);   /* Find start of token */
4191
4192         return strcspn(*buf, spaces);   /* Return token length */
4193 }
4194
4195 /*
4196  * Finds the next token in *buf, and if the provided token buffer is
4197  * big enough, copies the found token into it.  The result, if
4198  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4199  * must be terminated with '\0' on entry.
4200  *
4201  * Returns the length of the token found (not including the '\0').
4202  * Return value will be 0 if no token is found, and it will be >=
4203  * token_size if the token would not fit.
4204  *
4205  * The *buf pointer will be updated to point beyond the end of the
4206  * found token.  Note that this occurs even if the token buffer is
4207  * too small to hold it.
4208  */
4209 static inline size_t copy_token(const char **buf,
4210                                 char *token,
4211                                 size_t token_size)
4212 {
4213         size_t len;
4214
4215         len = next_token(buf);
4216         if (len < token_size) {
4217                 memcpy(token, *buf, len);
4218                 *(token + len) = '\0';
4219         }
4220         *buf += len;
4221
4222         return len;
4223 }
4224
4225 /*
4226  * Finds the next token in *buf, dynamically allocates a buffer big
4227  * enough to hold a copy of it, and copies the token into the new
4228  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4229  * that a duplicate buffer is created even for a zero-length token.
4230  *
4231  * Returns a pointer to the newly-allocated duplicate, or a null
4232  * pointer if memory for the duplicate was not available.  If
4233  * the lenp argument is a non-null pointer, the length of the token
4234  * (not including the '\0') is returned in *lenp.
4235  *
4236  * If successful, the *buf pointer will be updated to point beyond
4237  * the end of the found token.
4238  *
4239  * Note: uses GFP_KERNEL for allocation.
4240  */
4241 static inline char *dup_token(const char **buf, size_t *lenp)
4242 {
4243         char *dup;
4244         size_t len;
4245
4246         len = next_token(buf);
4247         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4248         if (!dup)
4249                 return NULL;
4250         *(dup + len) = '\0';
4251         *buf += len;
4252
4253         if (lenp)
4254                 *lenp = len;
4255
4256         return dup;
4257 }
4258
4259 /*
4260  * Parse the options provided for an "rbd add" (i.e., rbd image
4261  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4262  * and the data written is passed here via a NUL-terminated buffer.
4263  * Returns 0 if successful or an error code otherwise.
4264  *
4265  * The information extracted from these options is recorded in
4266  * the other parameters which return dynamically-allocated
4267  * structures:
4268  *  ceph_opts
4269  *      The address of a pointer that will refer to a ceph options
4270  *      structure.  Caller must release the returned pointer using
4271  *      ceph_destroy_options() when it is no longer needed.
4272  *  rbd_opts
4273  *      Address of an rbd options pointer.  Fully initialized by
4274  *      this function; caller must release with kfree().
4275  *  spec
4276  *      Address of an rbd image specification pointer.  Fully
4277  *      initialized by this function based on parsed options.
4278  *      Caller must release with rbd_spec_put().
4279  *
4280  * The options passed take this form:
4281  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4282  * where:
4283  *  <mon_addrs>
4284  *      A comma-separated list of one or more monitor addresses.
4285  *      A monitor address is an ip address, optionally followed
4286  *      by a port number (separated by a colon).
4287  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4288  *  <options>
4289  *      A comma-separated list of ceph and/or rbd options.
4290  *  <pool_name>
4291  *      The name of the rados pool containing the rbd image.
4292  *  <image_name>
4293  *      The name of the image in that pool to map.
4294  *  <snap_id>
4295  *      An optional snapshot id.  If provided, the mapping will
4296  *      present data from the image at the time that snapshot was
4297  *      created.  The image head is used if no snapshot id is
4298  *      provided.  Snapshot mappings are always read-only.
4299  */
4300 static int rbd_add_parse_args(const char *buf,
4301                                 struct ceph_options **ceph_opts,
4302                                 struct rbd_options **opts,
4303                                 struct rbd_spec **rbd_spec)
4304 {
4305         size_t len;
4306         char *options;
4307         const char *mon_addrs;
4308         char *snap_name;
4309         size_t mon_addrs_size;
4310         struct rbd_spec *spec = NULL;
4311         struct rbd_options *rbd_opts = NULL;
4312         struct ceph_options *copts;
4313         int ret;
4314
4315         /* The first four tokens are required */
4316
4317         len = next_token(&buf);
4318         if (!len) {
4319                 rbd_warn(NULL, "no monitor address(es) provided");
4320                 return -EINVAL;
4321         }
4322         mon_addrs = buf;
4323         mon_addrs_size = len + 1;
4324         buf += len;
4325
4326         ret = -EINVAL;
4327         options = dup_token(&buf, NULL);
4328         if (!options)
4329                 return -ENOMEM;
4330         if (!*options) {
4331                 rbd_warn(NULL, "no options provided");
4332                 goto out_err;
4333         }
4334
4335         spec = rbd_spec_alloc();
4336         if (!spec)
4337                 goto out_mem;
4338
4339         spec->pool_name = dup_token(&buf, NULL);
4340         if (!spec->pool_name)
4341                 goto out_mem;
4342         if (!*spec->pool_name) {
4343                 rbd_warn(NULL, "no pool name provided");
4344                 goto out_err;
4345         }
4346
4347         spec->image_name = dup_token(&buf, NULL);
4348         if (!spec->image_name)
4349                 goto out_mem;
4350         if (!*spec->image_name) {
4351                 rbd_warn(NULL, "no image name provided");
4352                 goto out_err;
4353         }
4354
4355         /*
4356          * Snapshot name is optional; default is to use "-"
4357          * (indicating the head/no snapshot).
4358          */
4359         len = next_token(&buf);
4360         if (!len) {
4361                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4362                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4363         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4364                 ret = -ENAMETOOLONG;
4365                 goto out_err;
4366         }
4367         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4368         if (!snap_name)
4369                 goto out_mem;
4370         *(snap_name + len) = '\0';
4371         spec->snap_name = snap_name;
4372
4373         /* Initialize all rbd options to the defaults */
4374
4375         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4376         if (!rbd_opts)
4377                 goto out_mem;
4378
4379         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4380
4381         copts = ceph_parse_options(options, mon_addrs,
4382                                         mon_addrs + mon_addrs_size - 1,
4383                                         parse_rbd_opts_token, rbd_opts);
4384         if (IS_ERR(copts)) {
4385                 ret = PTR_ERR(copts);
4386                 goto out_err;
4387         }
4388         kfree(options);
4389
4390         *ceph_opts = copts;
4391         *opts = rbd_opts;
4392         *rbd_spec = spec;
4393
4394         return 0;
4395 out_mem:
4396         ret = -ENOMEM;
4397 out_err:
4398         kfree(rbd_opts);
4399         rbd_spec_put(spec);
4400         kfree(options);
4401
4402         return ret;
4403 }
4404
4405 /*
4406  * An rbd format 2 image has a unique identifier, distinct from the
4407  * name given to it by the user.  Internally, that identifier is
4408  * what's used to specify the names of objects related to the image.
4409  *
4410  * A special "rbd id" object is used to map an rbd image name to its
4411  * id.  If that object doesn't exist, then there is no v2 rbd image
4412  * with the supplied name.
4413  *
4414  * This function will record the given rbd_dev's image_id field if
4415  * it can be determined, and in that case will return 0.  If any
4416  * errors occur a negative errno will be returned and the rbd_dev's
4417  * image_id field will be unchanged (and should be NULL).
4418  */
4419 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4420 {
4421         int ret;
4422         size_t size;
4423         char *object_name;
4424         void *response;
4425         char *image_id;
4426
4427         /*
4428          * When probing a parent image, the image id is already
4429          * known (and the image name likely is not).  There's no
4430          * need to fetch the image id again in this case.  We
4431          * do still need to set the image format though.
4432          */
4433         if (rbd_dev->spec->image_id) {
4434                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4435
4436                 return 0;
4437         }
4438
4439         /*
4440          * First, see if the format 2 image id file exists, and if
4441          * so, get the image's persistent id from it.
4442          */
4443         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4444         object_name = kmalloc(size, GFP_NOIO);
4445         if (!object_name)
4446                 return -ENOMEM;
4447         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4448         dout("rbd id object name is %s\n", object_name);
4449
4450         /* Response will be an encoded string, which includes a length */
4451
4452         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4453         response = kzalloc(size, GFP_NOIO);
4454         if (!response) {
4455                 ret = -ENOMEM;
4456                 goto out;
4457         }
4458
4459         /* If it doesn't exist we'll assume it's a format 1 image */
4460
4461         ret = rbd_obj_method_sync(rbd_dev, object_name,
4462                                 "rbd", "get_id", NULL, 0,
4463                                 response, RBD_IMAGE_ID_LEN_MAX);
4464         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4465         if (ret == -ENOENT) {
4466                 image_id = kstrdup("", GFP_KERNEL);
4467                 ret = image_id ? 0 : -ENOMEM;
4468                 if (!ret)
4469                         rbd_dev->image_format = 1;
4470         } else if (ret > sizeof (__le32)) {
4471                 void *p = response;
4472
4473                 image_id = ceph_extract_encoded_string(&p, p + ret,
4474                                                 NULL, GFP_NOIO);
4475                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4476                 if (!ret)
4477                         rbd_dev->image_format = 2;
4478         } else {
4479                 ret = -EINVAL;
4480         }
4481
4482         if (!ret) {
4483                 rbd_dev->spec->image_id = image_id;
4484                 dout("image_id is %s\n", image_id);
4485         }
4486 out:
4487         kfree(response);
4488         kfree(object_name);
4489
4490         return ret;
4491 }
4492
4493 /* Undo whatever state changes are made by v1 or v2 image probe */
4494
4495 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4496 {
4497         struct rbd_image_header *header;
4498
4499         rbd_dev_remove_parent(rbd_dev);
4500         rbd_spec_put(rbd_dev->parent_spec);
4501         rbd_dev->parent_spec = NULL;
4502         rbd_dev->parent_overlap = 0;
4503
4504         /* Free dynamic fields from the header, then zero it out */
4505
4506         header = &rbd_dev->header;
4507         ceph_put_snap_context(header->snapc);
4508         kfree(header->snap_sizes);
4509         kfree(header->snap_names);
4510         kfree(header->object_prefix);
4511         memset(header, 0, sizeof (*header));
4512 }
4513
4514 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4515 {
4516         int ret;
4517
4518         /* Populate rbd image metadata */
4519
4520         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4521         if (ret < 0)
4522                 goto out_err;
4523
4524         /* Version 1 images have no parent (no layering) */
4525
4526         rbd_dev->parent_spec = NULL;
4527         rbd_dev->parent_overlap = 0;
4528
4529         dout("discovered version 1 image, header name is %s\n",
4530                 rbd_dev->header_name);
4531
4532         return 0;
4533
4534 out_err:
4535         kfree(rbd_dev->header_name);
4536         rbd_dev->header_name = NULL;
4537         kfree(rbd_dev->spec->image_id);
4538         rbd_dev->spec->image_id = NULL;
4539
4540         return ret;
4541 }
4542
4543 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4544 {
4545         int ret;
4546
4547         ret = rbd_dev_v2_image_size(rbd_dev);
4548         if (ret)
4549                 goto out_err;
4550
4551         /* Get the object prefix (a.k.a. block_name) for the image */
4552
4553         ret = rbd_dev_v2_object_prefix(rbd_dev);
4554         if (ret)
4555                 goto out_err;
4556
4557         /* Get the and check features for the image */
4558
4559         ret = rbd_dev_v2_features(rbd_dev);
4560         if (ret)
4561                 goto out_err;
4562
4563         /* If the image supports layering, get the parent info */
4564
4565         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4566                 ret = rbd_dev_v2_parent_info(rbd_dev);
4567                 if (ret)
4568                         goto out_err;
4569                 /*
4570                  * Print a warning if this image has a parent.
4571                  * Don't print it if the image now being probed
4572                  * is itself a parent.  We can tell at this point
4573                  * because we won't know its pool name yet (just its
4574                  * pool id).
4575                  */
4576                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4577                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4578                                         "is EXPERIMENTAL!");
4579         }
4580
4581         /* If the image supports fancy striping, get its parameters */
4582
4583         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4584                 ret = rbd_dev_v2_striping_info(rbd_dev);
4585                 if (ret < 0)
4586                         goto out_err;
4587         }
4588
4589         /* crypto and compression type aren't (yet) supported for v2 images */
4590
4591         rbd_dev->header.crypt_type = 0;
4592         rbd_dev->header.comp_type = 0;
4593
4594         /* Get the snapshot context, plus the header version */
4595
4596         ret = rbd_dev_v2_snap_context(rbd_dev);
4597         if (ret)
4598                 goto out_err;
4599
4600         dout("discovered version 2 image, header name is %s\n",
4601                 rbd_dev->header_name);
4602
4603         return 0;
4604 out_err:
4605         rbd_dev->parent_overlap = 0;
4606         rbd_spec_put(rbd_dev->parent_spec);
4607         rbd_dev->parent_spec = NULL;
4608         kfree(rbd_dev->header_name);
4609         rbd_dev->header_name = NULL;
4610         kfree(rbd_dev->header.object_prefix);
4611         rbd_dev->header.object_prefix = NULL;
4612
4613         return ret;
4614 }
4615
4616 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4617 {
4618         struct rbd_device *parent = NULL;
4619         struct rbd_spec *parent_spec;
4620         struct rbd_client *rbdc;
4621         int ret;
4622
4623         if (!rbd_dev->parent_spec)
4624                 return 0;
4625         /*
4626          * We need to pass a reference to the client and the parent
4627          * spec when creating the parent rbd_dev.  Images related by
4628          * parent/child relationships always share both.
4629          */
4630         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4631         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4632
4633         ret = -ENOMEM;
4634         parent = rbd_dev_create(rbdc, parent_spec);
4635         if (!parent)
4636                 goto out_err;
4637
4638         ret = rbd_dev_image_probe(parent, true);
4639         if (ret < 0)
4640                 goto out_err;
4641         rbd_dev->parent = parent;
4642
4643         return 0;
4644 out_err:
4645         if (parent) {
4646                 rbd_spec_put(rbd_dev->parent_spec);
4647                 kfree(rbd_dev->header_name);
4648                 rbd_dev_destroy(parent);
4649         } else {
4650                 rbd_put_client(rbdc);
4651                 rbd_spec_put(parent_spec);
4652         }
4653
4654         return ret;
4655 }
4656
4657 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4658 {
4659         int ret;
4660
4661         /* generate unique id: find highest unique id, add one */
4662         rbd_dev_id_get(rbd_dev);
4663
4664         /* Fill in the device name, now that we have its id. */
4665         BUILD_BUG_ON(DEV_NAME_LEN
4666                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4667         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4668
4669         /* Get our block major device number. */
4670
4671         ret = register_blkdev(0, rbd_dev->name);
4672         if (ret < 0)
4673                 goto err_out_id;
4674         rbd_dev->major = ret;
4675
4676         /* Set up the blkdev mapping. */
4677
4678         ret = rbd_init_disk(rbd_dev);
4679         if (ret)
4680                 goto err_out_blkdev;
4681
4682         ret = rbd_dev_mapping_set(rbd_dev);
4683         if (ret)
4684                 goto err_out_disk;
4685         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4686
4687         ret = rbd_bus_add_dev(rbd_dev);
4688         if (ret)
4689                 goto err_out_mapping;
4690
4691         /* Everything's ready.  Announce the disk to the world. */
4692
4693         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4694         add_disk(rbd_dev->disk);
4695
4696         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4697                 (unsigned long long) rbd_dev->mapping.size);
4698
4699         return ret;
4700
4701 err_out_mapping:
4702         rbd_dev_mapping_clear(rbd_dev);
4703 err_out_disk:
4704         rbd_free_disk(rbd_dev);
4705 err_out_blkdev:
4706         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4707 err_out_id:
4708         rbd_dev_id_put(rbd_dev);
4709         rbd_dev_mapping_clear(rbd_dev);
4710
4711         return ret;
4712 }
4713
4714 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4715 {
4716         struct rbd_spec *spec = rbd_dev->spec;
4717         size_t size;
4718
4719         /* Record the header object name for this rbd image. */
4720
4721         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4722
4723         if (rbd_dev->image_format == 1)
4724                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4725         else
4726                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4727
4728         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4729         if (!rbd_dev->header_name)
4730                 return -ENOMEM;
4731
4732         if (rbd_dev->image_format == 1)
4733                 sprintf(rbd_dev->header_name, "%s%s",
4734                         spec->image_name, RBD_SUFFIX);
4735         else
4736                 sprintf(rbd_dev->header_name, "%s%s",
4737                         RBD_HEADER_PREFIX, spec->image_id);
4738         return 0;
4739 }
4740
4741 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4742 {
4743         int ret;
4744
4745         rbd_dev_unprobe(rbd_dev);
4746         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4747         if (ret)
4748                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4749         kfree(rbd_dev->header_name);
4750         rbd_dev->header_name = NULL;
4751         rbd_dev->image_format = 0;
4752         kfree(rbd_dev->spec->image_id);
4753         rbd_dev->spec->image_id = NULL;
4754
4755         rbd_dev_destroy(rbd_dev);
4756 }
4757
4758 /*
4759  * Probe for the existence of the header object for the given rbd
4760  * device.  For format 2 images this includes determining the image
4761  * id.
4762  */
4763 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4764 {
4765         int ret;
4766         int tmp;
4767
4768         /*
4769          * Get the id from the image id object.  If it's not a
4770          * format 2 image, we'll get ENOENT back, and we'll assume
4771          * it's a format 1 image.
4772          */
4773         ret = rbd_dev_image_id(rbd_dev);
4774         if (ret)
4775                 return ret;
4776         rbd_assert(rbd_dev->spec->image_id);
4777         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4778
4779         ret = rbd_dev_header_name(rbd_dev);
4780         if (ret)
4781                 goto err_out_format;
4782
4783         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4784         if (ret)
4785                 goto out_header_name;
4786
4787         if (rbd_dev->image_format == 1)
4788                 ret = rbd_dev_v1_probe(rbd_dev);
4789         else
4790                 ret = rbd_dev_v2_probe(rbd_dev);
4791         if (ret)
4792                 goto err_out_watch;
4793
4794         ret = rbd_dev_spec_update(rbd_dev);
4795         if (ret)
4796                 goto err_out_probe;
4797
4798         /* If we are mapping a snapshot it must be marked read-only */
4799
4800         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4801                 read_only = true;
4802         rbd_dev->mapping.read_only = read_only;
4803
4804         ret = rbd_dev_probe_parent(rbd_dev);
4805         if (!ret)
4806                 return 0;
4807
4808 err_out_probe:
4809         rbd_dev_unprobe(rbd_dev);
4810 err_out_watch:
4811         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4812         if (tmp)
4813                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4814 out_header_name:
4815         kfree(rbd_dev->header_name);
4816         rbd_dev->header_name = NULL;
4817 err_out_format:
4818         rbd_dev->image_format = 0;
4819         kfree(rbd_dev->spec->image_id);
4820         rbd_dev->spec->image_id = NULL;
4821
4822         dout("probe failed, returning %d\n", ret);
4823
4824         return ret;
4825 }
4826
4827 static ssize_t rbd_add(struct bus_type *bus,
4828                        const char *buf,
4829                        size_t count)
4830 {
4831         struct rbd_device *rbd_dev = NULL;
4832         struct ceph_options *ceph_opts = NULL;
4833         struct rbd_options *rbd_opts = NULL;
4834         struct rbd_spec *spec = NULL;
4835         struct rbd_client *rbdc;
4836         struct ceph_osd_client *osdc;
4837         bool read_only;
4838         int rc = -ENOMEM;
4839
4840         if (!try_module_get(THIS_MODULE))
4841                 return -ENODEV;
4842
4843         /* parse add command */
4844         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4845         if (rc < 0)
4846                 goto err_out_module;
4847         read_only = rbd_opts->read_only;
4848         kfree(rbd_opts);
4849         rbd_opts = NULL;        /* done with this */
4850
4851         rbdc = rbd_get_client(ceph_opts);
4852         if (IS_ERR(rbdc)) {
4853                 rc = PTR_ERR(rbdc);
4854                 goto err_out_args;
4855         }
4856         ceph_opts = NULL;       /* rbd_dev client now owns this */
4857
4858         /* pick the pool */
4859         osdc = &rbdc->client->osdc;
4860         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4861         if (rc < 0)
4862                 goto err_out_client;
4863         spec->pool_id = (u64)rc;
4864
4865         /* The ceph file layout needs to fit pool id in 32 bits */
4866
4867         if (spec->pool_id > (u64)U32_MAX) {
4868                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4869                                 (unsigned long long)spec->pool_id, U32_MAX);
4870                 rc = -EIO;
4871                 goto err_out_client;
4872         }
4873
4874         rbd_dev = rbd_dev_create(rbdc, spec);
4875         if (!rbd_dev)
4876                 goto err_out_client;
4877         rbdc = NULL;            /* rbd_dev now owns this */
4878         spec = NULL;            /* rbd_dev now owns this */
4879
4880         rc = rbd_dev_image_probe(rbd_dev, read_only);
4881         if (rc < 0)
4882                 goto err_out_rbd_dev;
4883
4884         rc = rbd_dev_device_setup(rbd_dev);
4885         if (!rc)
4886                 return count;
4887
4888         rbd_dev_image_release(rbd_dev);
4889 err_out_rbd_dev:
4890         rbd_dev_destroy(rbd_dev);
4891 err_out_client:
4892         rbd_put_client(rbdc);
4893 err_out_args:
4894         if (ceph_opts)
4895                 ceph_destroy_options(ceph_opts);
4896         kfree(rbd_opts);
4897         rbd_spec_put(spec);
4898 err_out_module:
4899         module_put(THIS_MODULE);
4900
4901         dout("Error adding device %s\n", buf);
4902
4903         return (ssize_t)rc;
4904 }
4905
4906 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4907 {
4908         struct list_head *tmp;
4909         struct rbd_device *rbd_dev;
4910
4911         spin_lock(&rbd_dev_list_lock);
4912         list_for_each(tmp, &rbd_dev_list) {
4913                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4914                 if (rbd_dev->dev_id == dev_id) {
4915                         spin_unlock(&rbd_dev_list_lock);
4916                         return rbd_dev;
4917                 }
4918         }
4919         spin_unlock(&rbd_dev_list_lock);
4920         return NULL;
4921 }
4922
4923 static void rbd_dev_device_release(struct device *dev)
4924 {
4925         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4926
4927         rbd_free_disk(rbd_dev);
4928         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4929         rbd_dev_mapping_clear(rbd_dev);
4930         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4931         rbd_dev->major = 0;
4932         rbd_dev_id_put(rbd_dev);
4933         rbd_dev_mapping_clear(rbd_dev);
4934 }
4935
4936 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4937 {
4938         while (rbd_dev->parent) {
4939                 struct rbd_device *first = rbd_dev;
4940                 struct rbd_device *second = first->parent;
4941                 struct rbd_device *third;
4942
4943                 /*
4944                  * Follow to the parent with no grandparent and
4945                  * remove it.
4946                  */
4947                 while (second && (third = second->parent)) {
4948                         first = second;
4949                         second = third;
4950                 }
4951                 rbd_assert(second);
4952                 rbd_dev_image_release(second);
4953                 first->parent = NULL;
4954                 first->parent_overlap = 0;
4955
4956                 rbd_assert(first->parent_spec);
4957                 rbd_spec_put(first->parent_spec);
4958                 first->parent_spec = NULL;
4959         }
4960 }
4961
4962 static ssize_t rbd_remove(struct bus_type *bus,
4963                           const char *buf,
4964                           size_t count)
4965 {
4966         struct rbd_device *rbd_dev = NULL;
4967         int target_id;
4968         unsigned long ul;
4969         int ret;
4970
4971         ret = strict_strtoul(buf, 10, &ul);
4972         if (ret)
4973                 return ret;
4974
4975         /* convert to int; abort if we lost anything in the conversion */
4976         target_id = (int) ul;
4977         if (target_id != ul)
4978                 return -EINVAL;
4979
4980         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4981
4982         rbd_dev = __rbd_get_dev(target_id);
4983         if (!rbd_dev) {
4984                 ret = -ENOENT;
4985                 goto done;
4986         }
4987
4988         spin_lock_irq(&rbd_dev->lock);
4989         if (rbd_dev->open_count)
4990                 ret = -EBUSY;
4991         else
4992                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4993         spin_unlock_irq(&rbd_dev->lock);
4994         if (ret < 0)
4995                 goto done;
4996         ret = count;
4997         rbd_bus_del_dev(rbd_dev);
4998         rbd_dev_image_release(rbd_dev);
4999         module_put(THIS_MODULE);
5000 done:
5001         mutex_unlock(&ctl_mutex);
5002
5003         return ret;
5004 }
5005
5006 /*
5007  * create control files in sysfs
5008  * /sys/bus/rbd/...
5009  */
5010 static int rbd_sysfs_init(void)
5011 {
5012         int ret;
5013
5014         ret = device_register(&rbd_root_dev);
5015         if (ret < 0)
5016                 return ret;
5017
5018         ret = bus_register(&rbd_bus_type);
5019         if (ret < 0)
5020                 device_unregister(&rbd_root_dev);
5021
5022         return ret;
5023 }
5024
5025 static void rbd_sysfs_cleanup(void)
5026 {
5027         bus_unregister(&rbd_bus_type);
5028         device_unregister(&rbd_root_dev);
5029 }
5030
5031 static int rbd_slab_init(void)
5032 {
5033         rbd_assert(!rbd_img_request_cache);
5034         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5035                                         sizeof (struct rbd_img_request),
5036                                         __alignof__(struct rbd_img_request),
5037                                         0, NULL);
5038         if (!rbd_img_request_cache)
5039                 return -ENOMEM;
5040
5041         rbd_assert(!rbd_obj_request_cache);
5042         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5043                                         sizeof (struct rbd_obj_request),
5044                                         __alignof__(struct rbd_obj_request),
5045                                         0, NULL);
5046         if (!rbd_obj_request_cache)
5047                 goto out_err;
5048
5049         rbd_assert(!rbd_segment_name_cache);
5050         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5051                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5052         if (rbd_segment_name_cache)
5053                 return 0;
5054 out_err:
5055         if (rbd_obj_request_cache) {
5056                 kmem_cache_destroy(rbd_obj_request_cache);
5057                 rbd_obj_request_cache = NULL;
5058         }
5059
5060         kmem_cache_destroy(rbd_img_request_cache);
5061         rbd_img_request_cache = NULL;
5062
5063         return -ENOMEM;
5064 }
5065
5066 static void rbd_slab_exit(void)
5067 {
5068         rbd_assert(rbd_segment_name_cache);
5069         kmem_cache_destroy(rbd_segment_name_cache);
5070         rbd_segment_name_cache = NULL;
5071
5072         rbd_assert(rbd_obj_request_cache);
5073         kmem_cache_destroy(rbd_obj_request_cache);
5074         rbd_obj_request_cache = NULL;
5075
5076         rbd_assert(rbd_img_request_cache);
5077         kmem_cache_destroy(rbd_img_request_cache);
5078         rbd_img_request_cache = NULL;
5079 }
5080
5081 static int __init rbd_init(void)
5082 {
5083         int rc;
5084
5085         if (!libceph_compatible(NULL)) {
5086                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5087
5088                 return -EINVAL;
5089         }
5090         rc = rbd_slab_init();
5091         if (rc)
5092                 return rc;
5093         rc = rbd_sysfs_init();
5094         if (rc)
5095                 rbd_slab_exit();
5096         else
5097                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5098
5099         return rc;
5100 }
5101
5102 static void __exit rbd_exit(void)
5103 {
5104         rbd_sysfs_cleanup();
5105         rbd_slab_exit();
5106 }
5107
5108 module_init(rbd_init);
5109 module_exit(rbd_exit);
5110
5111 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5112 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5113 MODULE_DESCRIPTION("rados block device");
5114
5115 /* following authorship retained from original osdblk.c */
5116 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5117
5118 MODULE_LICENSE("GPL");