]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: zero format 1 header structure earlier
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
429 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
430                                         u64 snap_id);
431 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432                                 u8 *order, u64 *snap_size);
433 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
434                 u64 *snap_features);
435 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Create a new header structure, translate header format from the on-disk
731  * header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         u32 snap_count;
737         size_t len;
738         size_t size;
739         u32 i;
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791
792         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
793         if (!header->snapc)
794                 goto out_err;
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         for (i = 0; i < snap_count; i++)
797                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
798
799         return 0;
800
801 out_err:
802         kfree(header->snap_sizes);
803         header->snap_sizes = NULL;
804         kfree(header->snap_names);
805         header->snap_names = NULL;
806         kfree(header->object_prefix);
807         header->object_prefix = NULL;
808
809         return -ENOMEM;
810 }
811
812 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
813 {
814         const char *snap_name;
815
816         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
817
818         /* Skip over names until we find the one we are looking for */
819
820         snap_name = rbd_dev->header.snap_names;
821         while (which--)
822                 snap_name += strlen(snap_name) + 1;
823
824         return kstrdup(snap_name, GFP_KERNEL);
825 }
826
827 /*
828  * Snapshot id comparison function for use with qsort()/bsearch().
829  * Note that result is for snapshots in *descending* order.
830  */
831 static int snapid_compare_reverse(const void *s1, const void *s2)
832 {
833         u64 snap_id1 = *(u64 *)s1;
834         u64 snap_id2 = *(u64 *)s2;
835
836         if (snap_id1 < snap_id2)
837                 return 1;
838         return snap_id1 == snap_id2 ? 0 : -1;
839 }
840
841 /*
842  * Search a snapshot context to see if the given snapshot id is
843  * present.
844  *
845  * Returns the position of the snapshot id in the array if it's found,
846  * or BAD_SNAP_INDEX otherwise.
847  *
848  * Note: The snapshot array is in kept sorted (by the osd) in
849  * reverse order, highest snapshot id first.
850  */
851 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
852 {
853         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
854         u64 *found;
855
856         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
857                                 sizeof (snap_id), snapid_compare_reverse);
858
859         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
860 }
861
862 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
863                                         u64 snap_id)
864 {
865         u32 which;
866
867         which = rbd_dev_snap_index(rbd_dev, snap_id);
868         if (which == BAD_SNAP_INDEX)
869                 return NULL;
870
871         return _rbd_dev_v1_snap_name(rbd_dev, which);
872 }
873
874 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
875 {
876         if (snap_id == CEPH_NOSNAP)
877                 return RBD_SNAP_HEAD_NAME;
878
879         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
880         if (rbd_dev->image_format == 1)
881                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
882
883         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
884 }
885
886 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
887                                 u64 *snap_size)
888 {
889         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
890         if (snap_id == CEPH_NOSNAP) {
891                 *snap_size = rbd_dev->header.image_size;
892         } else if (rbd_dev->image_format == 1) {
893                 u32 which;
894
895                 which = rbd_dev_snap_index(rbd_dev, snap_id);
896                 if (which == BAD_SNAP_INDEX)
897                         return -ENOENT;
898
899                 *snap_size = rbd_dev->header.snap_sizes[which];
900         } else {
901                 u64 size = 0;
902                 int ret;
903
904                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
905                 if (ret)
906                         return ret;
907
908                 *snap_size = size;
909         }
910         return 0;
911 }
912
913 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
914                         u64 *snap_features)
915 {
916         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
917         if (snap_id == CEPH_NOSNAP) {
918                 *snap_features = rbd_dev->header.features;
919         } else if (rbd_dev->image_format == 1) {
920                 *snap_features = 0;     /* No features for format 1 */
921         } else {
922                 u64 features = 0;
923                 int ret;
924
925                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
926                 if (ret)
927                         return ret;
928
929                 *snap_features = features;
930         }
931         return 0;
932 }
933
934 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
935 {
936         u64 snap_id = rbd_dev->spec->snap_id;
937         u64 size = 0;
938         u64 features = 0;
939         int ret;
940
941         ret = rbd_snap_size(rbd_dev, snap_id, &size);
942         if (ret)
943                 return ret;
944         ret = rbd_snap_features(rbd_dev, snap_id, &features);
945         if (ret)
946                 return ret;
947
948         rbd_dev->mapping.size = size;
949         rbd_dev->mapping.features = features;
950
951         return 0;
952 }
953
954 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
955 {
956         rbd_dev->mapping.size = 0;
957         rbd_dev->mapping.features = 0;
958 }
959
960 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
961 {
962         char *name;
963         u64 segment;
964         int ret;
965
966         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
967         if (!name)
968                 return NULL;
969         segment = offset >> rbd_dev->header.obj_order;
970         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
971                         rbd_dev->header.object_prefix, segment);
972         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
973                 pr_err("error formatting segment name for #%llu (%d)\n",
974                         segment, ret);
975                 kfree(name);
976                 name = NULL;
977         }
978
979         return name;
980 }
981
982 static void rbd_segment_name_free(const char *name)
983 {
984         /* The explicit cast here is needed to drop the const qualifier */
985
986         kmem_cache_free(rbd_segment_name_cache, (void *)name);
987 }
988
989 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
990 {
991         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
992
993         return offset & (segment_size - 1);
994 }
995
996 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
997                                 u64 offset, u64 length)
998 {
999         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1000
1001         offset &= segment_size - 1;
1002
1003         rbd_assert(length <= U64_MAX - offset);
1004         if (offset + length > segment_size)
1005                 length = segment_size - offset;
1006
1007         return length;
1008 }
1009
1010 /*
1011  * returns the size of an object in the image
1012  */
1013 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1014 {
1015         return 1 << header->obj_order;
1016 }
1017
1018 /*
1019  * bio helpers
1020  */
1021
1022 static void bio_chain_put(struct bio *chain)
1023 {
1024         struct bio *tmp;
1025
1026         while (chain) {
1027                 tmp = chain;
1028                 chain = chain->bi_next;
1029                 bio_put(tmp);
1030         }
1031 }
1032
1033 /*
1034  * zeros a bio chain, starting at specific offset
1035  */
1036 static void zero_bio_chain(struct bio *chain, int start_ofs)
1037 {
1038         struct bio_vec *bv;
1039         unsigned long flags;
1040         void *buf;
1041         int i;
1042         int pos = 0;
1043
1044         while (chain) {
1045                 bio_for_each_segment(bv, chain, i) {
1046                         if (pos + bv->bv_len > start_ofs) {
1047                                 int remainder = max(start_ofs - pos, 0);
1048                                 buf = bvec_kmap_irq(bv, &flags);
1049                                 memset(buf + remainder, 0,
1050                                        bv->bv_len - remainder);
1051                                 bvec_kunmap_irq(buf, &flags);
1052                         }
1053                         pos += bv->bv_len;
1054                 }
1055
1056                 chain = chain->bi_next;
1057         }
1058 }
1059
1060 /*
1061  * similar to zero_bio_chain(), zeros data defined by a page array,
1062  * starting at the given byte offset from the start of the array and
1063  * continuing up to the given end offset.  The pages array is
1064  * assumed to be big enough to hold all bytes up to the end.
1065  */
1066 static void zero_pages(struct page **pages, u64 offset, u64 end)
1067 {
1068         struct page **page = &pages[offset >> PAGE_SHIFT];
1069
1070         rbd_assert(end > offset);
1071         rbd_assert(end - offset <= (u64)SIZE_MAX);
1072         while (offset < end) {
1073                 size_t page_offset;
1074                 size_t length;
1075                 unsigned long flags;
1076                 void *kaddr;
1077
1078                 page_offset = (size_t)(offset & ~PAGE_MASK);
1079                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1080                 local_irq_save(flags);
1081                 kaddr = kmap_atomic(*page);
1082                 memset(kaddr + page_offset, 0, length);
1083                 kunmap_atomic(kaddr);
1084                 local_irq_restore(flags);
1085
1086                 offset += length;
1087                 page++;
1088         }
1089 }
1090
1091 /*
1092  * Clone a portion of a bio, starting at the given byte offset
1093  * and continuing for the number of bytes indicated.
1094  */
1095 static struct bio *bio_clone_range(struct bio *bio_src,
1096                                         unsigned int offset,
1097                                         unsigned int len,
1098                                         gfp_t gfpmask)
1099 {
1100         struct bio_vec *bv;
1101         unsigned int resid;
1102         unsigned short idx;
1103         unsigned int voff;
1104         unsigned short end_idx;
1105         unsigned short vcnt;
1106         struct bio *bio;
1107
1108         /* Handle the easy case for the caller */
1109
1110         if (!offset && len == bio_src->bi_size)
1111                 return bio_clone(bio_src, gfpmask);
1112
1113         if (WARN_ON_ONCE(!len))
1114                 return NULL;
1115         if (WARN_ON_ONCE(len > bio_src->bi_size))
1116                 return NULL;
1117         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1118                 return NULL;
1119
1120         /* Find first affected segment... */
1121
1122         resid = offset;
1123         __bio_for_each_segment(bv, bio_src, idx, 0) {
1124                 if (resid < bv->bv_len)
1125                         break;
1126                 resid -= bv->bv_len;
1127         }
1128         voff = resid;
1129
1130         /* ...and the last affected segment */
1131
1132         resid += len;
1133         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1134                 if (resid <= bv->bv_len)
1135                         break;
1136                 resid -= bv->bv_len;
1137         }
1138         vcnt = end_idx - idx + 1;
1139
1140         /* Build the clone */
1141
1142         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1143         if (!bio)
1144                 return NULL;    /* ENOMEM */
1145
1146         bio->bi_bdev = bio_src->bi_bdev;
1147         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1148         bio->bi_rw = bio_src->bi_rw;
1149         bio->bi_flags |= 1 << BIO_CLONED;
1150
1151         /*
1152          * Copy over our part of the bio_vec, then update the first
1153          * and last (or only) entries.
1154          */
1155         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1156                         vcnt * sizeof (struct bio_vec));
1157         bio->bi_io_vec[0].bv_offset += voff;
1158         if (vcnt > 1) {
1159                 bio->bi_io_vec[0].bv_len -= voff;
1160                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1161         } else {
1162                 bio->bi_io_vec[0].bv_len = len;
1163         }
1164
1165         bio->bi_vcnt = vcnt;
1166         bio->bi_size = len;
1167         bio->bi_idx = 0;
1168
1169         return bio;
1170 }
1171
1172 /*
1173  * Clone a portion of a bio chain, starting at the given byte offset
1174  * into the first bio in the source chain and continuing for the
1175  * number of bytes indicated.  The result is another bio chain of
1176  * exactly the given length, or a null pointer on error.
1177  *
1178  * The bio_src and offset parameters are both in-out.  On entry they
1179  * refer to the first source bio and the offset into that bio where
1180  * the start of data to be cloned is located.
1181  *
1182  * On return, bio_src is updated to refer to the bio in the source
1183  * chain that contains first un-cloned byte, and *offset will
1184  * contain the offset of that byte within that bio.
1185  */
1186 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1187                                         unsigned int *offset,
1188                                         unsigned int len,
1189                                         gfp_t gfpmask)
1190 {
1191         struct bio *bi = *bio_src;
1192         unsigned int off = *offset;
1193         struct bio *chain = NULL;
1194         struct bio **end;
1195
1196         /* Build up a chain of clone bios up to the limit */
1197
1198         if (!bi || off >= bi->bi_size || !len)
1199                 return NULL;            /* Nothing to clone */
1200
1201         end = &chain;
1202         while (len) {
1203                 unsigned int bi_size;
1204                 struct bio *bio;
1205
1206                 if (!bi) {
1207                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1208                         goto out_err;   /* EINVAL; ran out of bio's */
1209                 }
1210                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1211                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1212                 if (!bio)
1213                         goto out_err;   /* ENOMEM */
1214
1215                 *end = bio;
1216                 end = &bio->bi_next;
1217
1218                 off += bi_size;
1219                 if (off == bi->bi_size) {
1220                         bi = bi->bi_next;
1221                         off = 0;
1222                 }
1223                 len -= bi_size;
1224         }
1225         *bio_src = bi;
1226         *offset = off;
1227
1228         return chain;
1229 out_err:
1230         bio_chain_put(chain);
1231
1232         return NULL;
1233 }
1234
1235 /*
1236  * The default/initial value for all object request flags is 0.  For
1237  * each flag, once its value is set to 1 it is never reset to 0
1238  * again.
1239  */
1240 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1241 {
1242         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1243                 struct rbd_device *rbd_dev;
1244
1245                 rbd_dev = obj_request->img_request->rbd_dev;
1246                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1247                         obj_request);
1248         }
1249 }
1250
1251 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1252 {
1253         smp_mb();
1254         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1255 }
1256
1257 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1258 {
1259         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1260                 struct rbd_device *rbd_dev = NULL;
1261
1262                 if (obj_request_img_data_test(obj_request))
1263                         rbd_dev = obj_request->img_request->rbd_dev;
1264                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1265                         obj_request);
1266         }
1267 }
1268
1269 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1270 {
1271         smp_mb();
1272         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1273 }
1274
1275 /*
1276  * This sets the KNOWN flag after (possibly) setting the EXISTS
1277  * flag.  The latter is set based on the "exists" value provided.
1278  *
1279  * Note that for our purposes once an object exists it never goes
1280  * away again.  It's possible that the response from two existence
1281  * checks are separated by the creation of the target object, and
1282  * the first ("doesn't exist") response arrives *after* the second
1283  * ("does exist").  In that case we ignore the second one.
1284  */
1285 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1286                                 bool exists)
1287 {
1288         if (exists)
1289                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1290         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1291         smp_mb();
1292 }
1293
1294 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1295 {
1296         smp_mb();
1297         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1298 }
1299
1300 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1301 {
1302         smp_mb();
1303         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1304 }
1305
1306 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1307 {
1308         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1309                 atomic_read(&obj_request->kref.refcount));
1310         kref_get(&obj_request->kref);
1311 }
1312
1313 static void rbd_obj_request_destroy(struct kref *kref);
1314 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1315 {
1316         rbd_assert(obj_request != NULL);
1317         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1318                 atomic_read(&obj_request->kref.refcount));
1319         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1320 }
1321
1322 static void rbd_img_request_get(struct rbd_img_request *img_request)
1323 {
1324         dout("%s: img %p (was %d)\n", __func__, img_request,
1325                 atomic_read(&img_request->kref.refcount));
1326         kref_get(&img_request->kref);
1327 }
1328
1329 static void rbd_img_request_destroy(struct kref *kref);
1330 static void rbd_img_request_put(struct rbd_img_request *img_request)
1331 {
1332         rbd_assert(img_request != NULL);
1333         dout("%s: img %p (was %d)\n", __func__, img_request,
1334                 atomic_read(&img_request->kref.refcount));
1335         kref_put(&img_request->kref, rbd_img_request_destroy);
1336 }
1337
1338 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1339                                         struct rbd_obj_request *obj_request)
1340 {
1341         rbd_assert(obj_request->img_request == NULL);
1342
1343         /* Image request now owns object's original reference */
1344         obj_request->img_request = img_request;
1345         obj_request->which = img_request->obj_request_count;
1346         rbd_assert(!obj_request_img_data_test(obj_request));
1347         obj_request_img_data_set(obj_request);
1348         rbd_assert(obj_request->which != BAD_WHICH);
1349         img_request->obj_request_count++;
1350         list_add_tail(&obj_request->links, &img_request->obj_requests);
1351         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1352                 obj_request->which);
1353 }
1354
1355 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1356                                         struct rbd_obj_request *obj_request)
1357 {
1358         rbd_assert(obj_request->which != BAD_WHICH);
1359
1360         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1361                 obj_request->which);
1362         list_del(&obj_request->links);
1363         rbd_assert(img_request->obj_request_count > 0);
1364         img_request->obj_request_count--;
1365         rbd_assert(obj_request->which == img_request->obj_request_count);
1366         obj_request->which = BAD_WHICH;
1367         rbd_assert(obj_request_img_data_test(obj_request));
1368         rbd_assert(obj_request->img_request == img_request);
1369         obj_request->img_request = NULL;
1370         obj_request->callback = NULL;
1371         rbd_obj_request_put(obj_request);
1372 }
1373
1374 static bool obj_request_type_valid(enum obj_request_type type)
1375 {
1376         switch (type) {
1377         case OBJ_REQUEST_NODATA:
1378         case OBJ_REQUEST_BIO:
1379         case OBJ_REQUEST_PAGES:
1380                 return true;
1381         default:
1382                 return false;
1383         }
1384 }
1385
1386 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1387                                 struct rbd_obj_request *obj_request)
1388 {
1389         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1390
1391         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1392 }
1393
1394 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1395 {
1396
1397         dout("%s: img %p\n", __func__, img_request);
1398
1399         /*
1400          * If no error occurred, compute the aggregate transfer
1401          * count for the image request.  We could instead use
1402          * atomic64_cmpxchg() to update it as each object request
1403          * completes; not clear which way is better off hand.
1404          */
1405         if (!img_request->result) {
1406                 struct rbd_obj_request *obj_request;
1407                 u64 xferred = 0;
1408
1409                 for_each_obj_request(img_request, obj_request)
1410                         xferred += obj_request->xferred;
1411                 img_request->xferred = xferred;
1412         }
1413
1414         if (img_request->callback)
1415                 img_request->callback(img_request);
1416         else
1417                 rbd_img_request_put(img_request);
1418 }
1419
1420 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1421
1422 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1423 {
1424         dout("%s: obj %p\n", __func__, obj_request);
1425
1426         return wait_for_completion_interruptible(&obj_request->completion);
1427 }
1428
1429 /*
1430  * The default/initial value for all image request flags is 0.  Each
1431  * is conditionally set to 1 at image request initialization time
1432  * and currently never change thereafter.
1433  */
1434 static void img_request_write_set(struct rbd_img_request *img_request)
1435 {
1436         set_bit(IMG_REQ_WRITE, &img_request->flags);
1437         smp_mb();
1438 }
1439
1440 static bool img_request_write_test(struct rbd_img_request *img_request)
1441 {
1442         smp_mb();
1443         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1444 }
1445
1446 static void img_request_child_set(struct rbd_img_request *img_request)
1447 {
1448         set_bit(IMG_REQ_CHILD, &img_request->flags);
1449         smp_mb();
1450 }
1451
1452 static bool img_request_child_test(struct rbd_img_request *img_request)
1453 {
1454         smp_mb();
1455         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1456 }
1457
1458 static void img_request_layered_set(struct rbd_img_request *img_request)
1459 {
1460         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1461         smp_mb();
1462 }
1463
1464 static bool img_request_layered_test(struct rbd_img_request *img_request)
1465 {
1466         smp_mb();
1467         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1468 }
1469
1470 static void
1471 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1472 {
1473         u64 xferred = obj_request->xferred;
1474         u64 length = obj_request->length;
1475
1476         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1477                 obj_request, obj_request->img_request, obj_request->result,
1478                 xferred, length);
1479         /*
1480          * ENOENT means a hole in the image.  We zero-fill the
1481          * entire length of the request.  A short read also implies
1482          * zero-fill to the end of the request.  Either way we
1483          * update the xferred count to indicate the whole request
1484          * was satisfied.
1485          */
1486         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1487         if (obj_request->result == -ENOENT) {
1488                 if (obj_request->type == OBJ_REQUEST_BIO)
1489                         zero_bio_chain(obj_request->bio_list, 0);
1490                 else
1491                         zero_pages(obj_request->pages, 0, length);
1492                 obj_request->result = 0;
1493                 obj_request->xferred = length;
1494         } else if (xferred < length && !obj_request->result) {
1495                 if (obj_request->type == OBJ_REQUEST_BIO)
1496                         zero_bio_chain(obj_request->bio_list, xferred);
1497                 else
1498                         zero_pages(obj_request->pages, xferred, length);
1499                 obj_request->xferred = length;
1500         }
1501         obj_request_done_set(obj_request);
1502 }
1503
1504 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1505 {
1506         dout("%s: obj %p cb %p\n", __func__, obj_request,
1507                 obj_request->callback);
1508         if (obj_request->callback)
1509                 obj_request->callback(obj_request);
1510         else
1511                 complete_all(&obj_request->completion);
1512 }
1513
1514 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1515 {
1516         dout("%s: obj %p\n", __func__, obj_request);
1517         obj_request_done_set(obj_request);
1518 }
1519
1520 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1521 {
1522         struct rbd_img_request *img_request = NULL;
1523         struct rbd_device *rbd_dev = NULL;
1524         bool layered = false;
1525
1526         if (obj_request_img_data_test(obj_request)) {
1527                 img_request = obj_request->img_request;
1528                 layered = img_request && img_request_layered_test(img_request);
1529                 rbd_dev = img_request->rbd_dev;
1530         }
1531
1532         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1533                 obj_request, img_request, obj_request->result,
1534                 obj_request->xferred, obj_request->length);
1535         if (layered && obj_request->result == -ENOENT &&
1536                         obj_request->img_offset < rbd_dev->parent_overlap)
1537                 rbd_img_parent_read(obj_request);
1538         else if (img_request)
1539                 rbd_img_obj_request_read_callback(obj_request);
1540         else
1541                 obj_request_done_set(obj_request);
1542 }
1543
1544 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1545 {
1546         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1547                 obj_request->result, obj_request->length);
1548         /*
1549          * There is no such thing as a successful short write.  Set
1550          * it to our originally-requested length.
1551          */
1552         obj_request->xferred = obj_request->length;
1553         obj_request_done_set(obj_request);
1554 }
1555
1556 /*
1557  * For a simple stat call there's nothing to do.  We'll do more if
1558  * this is part of a write sequence for a layered image.
1559  */
1560 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1561 {
1562         dout("%s: obj %p\n", __func__, obj_request);
1563         obj_request_done_set(obj_request);
1564 }
1565
1566 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1567                                 struct ceph_msg *msg)
1568 {
1569         struct rbd_obj_request *obj_request = osd_req->r_priv;
1570         u16 opcode;
1571
1572         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1573         rbd_assert(osd_req == obj_request->osd_req);
1574         if (obj_request_img_data_test(obj_request)) {
1575                 rbd_assert(obj_request->img_request);
1576                 rbd_assert(obj_request->which != BAD_WHICH);
1577         } else {
1578                 rbd_assert(obj_request->which == BAD_WHICH);
1579         }
1580
1581         if (osd_req->r_result < 0)
1582                 obj_request->result = osd_req->r_result;
1583
1584         BUG_ON(osd_req->r_num_ops > 2);
1585
1586         /*
1587          * We support a 64-bit length, but ultimately it has to be
1588          * passed to blk_end_request(), which takes an unsigned int.
1589          */
1590         obj_request->xferred = osd_req->r_reply_op_len[0];
1591         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1592         opcode = osd_req->r_ops[0].op;
1593         switch (opcode) {
1594         case CEPH_OSD_OP_READ:
1595                 rbd_osd_read_callback(obj_request);
1596                 break;
1597         case CEPH_OSD_OP_WRITE:
1598                 rbd_osd_write_callback(obj_request);
1599                 break;
1600         case CEPH_OSD_OP_STAT:
1601                 rbd_osd_stat_callback(obj_request);
1602                 break;
1603         case CEPH_OSD_OP_CALL:
1604         case CEPH_OSD_OP_NOTIFY_ACK:
1605         case CEPH_OSD_OP_WATCH:
1606                 rbd_osd_trivial_callback(obj_request);
1607                 break;
1608         default:
1609                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1610                         obj_request->object_name, (unsigned short) opcode);
1611                 break;
1612         }
1613
1614         if (obj_request_done_test(obj_request))
1615                 rbd_obj_request_complete(obj_request);
1616 }
1617
1618 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1619 {
1620         struct rbd_img_request *img_request = obj_request->img_request;
1621         struct ceph_osd_request *osd_req = obj_request->osd_req;
1622         u64 snap_id;
1623
1624         rbd_assert(osd_req != NULL);
1625
1626         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1627         ceph_osdc_build_request(osd_req, obj_request->offset,
1628                         NULL, snap_id, NULL);
1629 }
1630
1631 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1632 {
1633         struct rbd_img_request *img_request = obj_request->img_request;
1634         struct ceph_osd_request *osd_req = obj_request->osd_req;
1635         struct ceph_snap_context *snapc;
1636         struct timespec mtime = CURRENT_TIME;
1637
1638         rbd_assert(osd_req != NULL);
1639
1640         snapc = img_request ? img_request->snapc : NULL;
1641         ceph_osdc_build_request(osd_req, obj_request->offset,
1642                         snapc, CEPH_NOSNAP, &mtime);
1643 }
1644
1645 static struct ceph_osd_request *rbd_osd_req_create(
1646                                         struct rbd_device *rbd_dev,
1647                                         bool write_request,
1648                                         struct rbd_obj_request *obj_request)
1649 {
1650         struct ceph_snap_context *snapc = NULL;
1651         struct ceph_osd_client *osdc;
1652         struct ceph_osd_request *osd_req;
1653
1654         if (obj_request_img_data_test(obj_request)) {
1655                 struct rbd_img_request *img_request = obj_request->img_request;
1656
1657                 rbd_assert(write_request ==
1658                                 img_request_write_test(img_request));
1659                 if (write_request)
1660                         snapc = img_request->snapc;
1661         }
1662
1663         /* Allocate and initialize the request, for the single op */
1664
1665         osdc = &rbd_dev->rbd_client->client->osdc;
1666         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1667         if (!osd_req)
1668                 return NULL;    /* ENOMEM */
1669
1670         if (write_request)
1671                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1672         else
1673                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1674
1675         osd_req->r_callback = rbd_osd_req_callback;
1676         osd_req->r_priv = obj_request;
1677
1678         osd_req->r_oid_len = strlen(obj_request->object_name);
1679         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1680         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1681
1682         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1683
1684         return osd_req;
1685 }
1686
1687 /*
1688  * Create a copyup osd request based on the information in the
1689  * object request supplied.  A copyup request has two osd ops,
1690  * a copyup method call, and a "normal" write request.
1691  */
1692 static struct ceph_osd_request *
1693 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1694 {
1695         struct rbd_img_request *img_request;
1696         struct ceph_snap_context *snapc;
1697         struct rbd_device *rbd_dev;
1698         struct ceph_osd_client *osdc;
1699         struct ceph_osd_request *osd_req;
1700
1701         rbd_assert(obj_request_img_data_test(obj_request));
1702         img_request = obj_request->img_request;
1703         rbd_assert(img_request);
1704         rbd_assert(img_request_write_test(img_request));
1705
1706         /* Allocate and initialize the request, for the two ops */
1707
1708         snapc = img_request->snapc;
1709         rbd_dev = img_request->rbd_dev;
1710         osdc = &rbd_dev->rbd_client->client->osdc;
1711         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1712         if (!osd_req)
1713                 return NULL;    /* ENOMEM */
1714
1715         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1716         osd_req->r_callback = rbd_osd_req_callback;
1717         osd_req->r_priv = obj_request;
1718
1719         osd_req->r_oid_len = strlen(obj_request->object_name);
1720         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1721         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1722
1723         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1724
1725         return osd_req;
1726 }
1727
1728
1729 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1730 {
1731         ceph_osdc_put_request(osd_req);
1732 }
1733
1734 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1735
1736 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1737                                                 u64 offset, u64 length,
1738                                                 enum obj_request_type type)
1739 {
1740         struct rbd_obj_request *obj_request;
1741         size_t size;
1742         char *name;
1743
1744         rbd_assert(obj_request_type_valid(type));
1745
1746         size = strlen(object_name) + 1;
1747         name = kmalloc(size, GFP_KERNEL);
1748         if (!name)
1749                 return NULL;
1750
1751         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1752         if (!obj_request) {
1753                 kfree(name);
1754                 return NULL;
1755         }
1756
1757         obj_request->object_name = memcpy(name, object_name, size);
1758         obj_request->offset = offset;
1759         obj_request->length = length;
1760         obj_request->flags = 0;
1761         obj_request->which = BAD_WHICH;
1762         obj_request->type = type;
1763         INIT_LIST_HEAD(&obj_request->links);
1764         init_completion(&obj_request->completion);
1765         kref_init(&obj_request->kref);
1766
1767         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1768                 offset, length, (int)type, obj_request);
1769
1770         return obj_request;
1771 }
1772
1773 static void rbd_obj_request_destroy(struct kref *kref)
1774 {
1775         struct rbd_obj_request *obj_request;
1776
1777         obj_request = container_of(kref, struct rbd_obj_request, kref);
1778
1779         dout("%s: obj %p\n", __func__, obj_request);
1780
1781         rbd_assert(obj_request->img_request == NULL);
1782         rbd_assert(obj_request->which == BAD_WHICH);
1783
1784         if (obj_request->osd_req)
1785                 rbd_osd_req_destroy(obj_request->osd_req);
1786
1787         rbd_assert(obj_request_type_valid(obj_request->type));
1788         switch (obj_request->type) {
1789         case OBJ_REQUEST_NODATA:
1790                 break;          /* Nothing to do */
1791         case OBJ_REQUEST_BIO:
1792                 if (obj_request->bio_list)
1793                         bio_chain_put(obj_request->bio_list);
1794                 break;
1795         case OBJ_REQUEST_PAGES:
1796                 if (obj_request->pages)
1797                         ceph_release_page_vector(obj_request->pages,
1798                                                 obj_request->page_count);
1799                 break;
1800         }
1801
1802         kfree(obj_request->object_name);
1803         obj_request->object_name = NULL;
1804         kmem_cache_free(rbd_obj_request_cache, obj_request);
1805 }
1806
1807 /*
1808  * Caller is responsible for filling in the list of object requests
1809  * that comprises the image request, and the Linux request pointer
1810  * (if there is one).
1811  */
1812 static struct rbd_img_request *rbd_img_request_create(
1813                                         struct rbd_device *rbd_dev,
1814                                         u64 offset, u64 length,
1815                                         bool write_request,
1816                                         bool child_request)
1817 {
1818         struct rbd_img_request *img_request;
1819
1820         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1821         if (!img_request)
1822                 return NULL;
1823
1824         if (write_request) {
1825                 down_read(&rbd_dev->header_rwsem);
1826                 ceph_get_snap_context(rbd_dev->header.snapc);
1827                 up_read(&rbd_dev->header_rwsem);
1828         }
1829
1830         img_request->rq = NULL;
1831         img_request->rbd_dev = rbd_dev;
1832         img_request->offset = offset;
1833         img_request->length = length;
1834         img_request->flags = 0;
1835         if (write_request) {
1836                 img_request_write_set(img_request);
1837                 img_request->snapc = rbd_dev->header.snapc;
1838         } else {
1839                 img_request->snap_id = rbd_dev->spec->snap_id;
1840         }
1841         if (child_request)
1842                 img_request_child_set(img_request);
1843         if (rbd_dev->parent_spec)
1844                 img_request_layered_set(img_request);
1845         spin_lock_init(&img_request->completion_lock);
1846         img_request->next_completion = 0;
1847         img_request->callback = NULL;
1848         img_request->result = 0;
1849         img_request->obj_request_count = 0;
1850         INIT_LIST_HEAD(&img_request->obj_requests);
1851         kref_init(&img_request->kref);
1852
1853         rbd_img_request_get(img_request);       /* Avoid a warning */
1854         rbd_img_request_put(img_request);       /* TEMPORARY */
1855
1856         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1857                 write_request ? "write" : "read", offset, length,
1858                 img_request);
1859
1860         return img_request;
1861 }
1862
1863 static void rbd_img_request_destroy(struct kref *kref)
1864 {
1865         struct rbd_img_request *img_request;
1866         struct rbd_obj_request *obj_request;
1867         struct rbd_obj_request *next_obj_request;
1868
1869         img_request = container_of(kref, struct rbd_img_request, kref);
1870
1871         dout("%s: img %p\n", __func__, img_request);
1872
1873         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1874                 rbd_img_obj_request_del(img_request, obj_request);
1875         rbd_assert(img_request->obj_request_count == 0);
1876
1877         if (img_request_write_test(img_request))
1878                 ceph_put_snap_context(img_request->snapc);
1879
1880         if (img_request_child_test(img_request))
1881                 rbd_obj_request_put(img_request->obj_request);
1882
1883         kmem_cache_free(rbd_img_request_cache, img_request);
1884 }
1885
1886 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1887 {
1888         struct rbd_img_request *img_request;
1889         unsigned int xferred;
1890         int result;
1891         bool more;
1892
1893         rbd_assert(obj_request_img_data_test(obj_request));
1894         img_request = obj_request->img_request;
1895
1896         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1897         xferred = (unsigned int)obj_request->xferred;
1898         result = obj_request->result;
1899         if (result) {
1900                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1901
1902                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1903                         img_request_write_test(img_request) ? "write" : "read",
1904                         obj_request->length, obj_request->img_offset,
1905                         obj_request->offset);
1906                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1907                         result, xferred);
1908                 if (!img_request->result)
1909                         img_request->result = result;
1910         }
1911
1912         /* Image object requests don't own their page array */
1913
1914         if (obj_request->type == OBJ_REQUEST_PAGES) {
1915                 obj_request->pages = NULL;
1916                 obj_request->page_count = 0;
1917         }
1918
1919         if (img_request_child_test(img_request)) {
1920                 rbd_assert(img_request->obj_request != NULL);
1921                 more = obj_request->which < img_request->obj_request_count - 1;
1922         } else {
1923                 rbd_assert(img_request->rq != NULL);
1924                 more = blk_end_request(img_request->rq, result, xferred);
1925         }
1926
1927         return more;
1928 }
1929
1930 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1931 {
1932         struct rbd_img_request *img_request;
1933         u32 which = obj_request->which;
1934         bool more = true;
1935
1936         rbd_assert(obj_request_img_data_test(obj_request));
1937         img_request = obj_request->img_request;
1938
1939         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1940         rbd_assert(img_request != NULL);
1941         rbd_assert(img_request->obj_request_count > 0);
1942         rbd_assert(which != BAD_WHICH);
1943         rbd_assert(which < img_request->obj_request_count);
1944         rbd_assert(which >= img_request->next_completion);
1945
1946         spin_lock_irq(&img_request->completion_lock);
1947         if (which != img_request->next_completion)
1948                 goto out;
1949
1950         for_each_obj_request_from(img_request, obj_request) {
1951                 rbd_assert(more);
1952                 rbd_assert(which < img_request->obj_request_count);
1953
1954                 if (!obj_request_done_test(obj_request))
1955                         break;
1956                 more = rbd_img_obj_end_request(obj_request);
1957                 which++;
1958         }
1959
1960         rbd_assert(more ^ (which == img_request->obj_request_count));
1961         img_request->next_completion = which;
1962 out:
1963         spin_unlock_irq(&img_request->completion_lock);
1964
1965         if (!more)
1966                 rbd_img_request_complete(img_request);
1967 }
1968
1969 /*
1970  * Split up an image request into one or more object requests, each
1971  * to a different object.  The "type" parameter indicates whether
1972  * "data_desc" is the pointer to the head of a list of bio
1973  * structures, or the base of a page array.  In either case this
1974  * function assumes data_desc describes memory sufficient to hold
1975  * all data described by the image request.
1976  */
1977 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1978                                         enum obj_request_type type,
1979                                         void *data_desc)
1980 {
1981         struct rbd_device *rbd_dev = img_request->rbd_dev;
1982         struct rbd_obj_request *obj_request = NULL;
1983         struct rbd_obj_request *next_obj_request;
1984         bool write_request = img_request_write_test(img_request);
1985         struct bio *bio_list;
1986         unsigned int bio_offset = 0;
1987         struct page **pages;
1988         u64 img_offset;
1989         u64 resid;
1990         u16 opcode;
1991
1992         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1993                 (int)type, data_desc);
1994
1995         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1996         img_offset = img_request->offset;
1997         resid = img_request->length;
1998         rbd_assert(resid > 0);
1999
2000         if (type == OBJ_REQUEST_BIO) {
2001                 bio_list = data_desc;
2002                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2003         } else {
2004                 rbd_assert(type == OBJ_REQUEST_PAGES);
2005                 pages = data_desc;
2006         }
2007
2008         while (resid) {
2009                 struct ceph_osd_request *osd_req;
2010                 const char *object_name;
2011                 u64 offset;
2012                 u64 length;
2013
2014                 object_name = rbd_segment_name(rbd_dev, img_offset);
2015                 if (!object_name)
2016                         goto out_unwind;
2017                 offset = rbd_segment_offset(rbd_dev, img_offset);
2018                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2019                 obj_request = rbd_obj_request_create(object_name,
2020                                                 offset, length, type);
2021                 /* object request has its own copy of the object name */
2022                 rbd_segment_name_free(object_name);
2023                 if (!obj_request)
2024                         goto out_unwind;
2025
2026                 if (type == OBJ_REQUEST_BIO) {
2027                         unsigned int clone_size;
2028
2029                         rbd_assert(length <= (u64)UINT_MAX);
2030                         clone_size = (unsigned int)length;
2031                         obj_request->bio_list =
2032                                         bio_chain_clone_range(&bio_list,
2033                                                                 &bio_offset,
2034                                                                 clone_size,
2035                                                                 GFP_ATOMIC);
2036                         if (!obj_request->bio_list)
2037                                 goto out_partial;
2038                 } else {
2039                         unsigned int page_count;
2040
2041                         obj_request->pages = pages;
2042                         page_count = (u32)calc_pages_for(offset, length);
2043                         obj_request->page_count = page_count;
2044                         if ((offset + length) & ~PAGE_MASK)
2045                                 page_count--;   /* more on last page */
2046                         pages += page_count;
2047                 }
2048
2049                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2050                                                 obj_request);
2051                 if (!osd_req)
2052                         goto out_partial;
2053                 obj_request->osd_req = osd_req;
2054                 obj_request->callback = rbd_img_obj_callback;
2055
2056                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2057                                                 0, 0);
2058                 if (type == OBJ_REQUEST_BIO)
2059                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2060                                         obj_request->bio_list, length);
2061                 else
2062                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2063                                         obj_request->pages, length,
2064                                         offset & ~PAGE_MASK, false, false);
2065
2066                 if (write_request)
2067                         rbd_osd_req_format_write(obj_request);
2068                 else
2069                         rbd_osd_req_format_read(obj_request);
2070
2071                 obj_request->img_offset = img_offset;
2072                 rbd_img_obj_request_add(img_request, obj_request);
2073
2074                 img_offset += length;
2075                 resid -= length;
2076         }
2077
2078         return 0;
2079
2080 out_partial:
2081         rbd_obj_request_put(obj_request);
2082 out_unwind:
2083         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2084                 rbd_obj_request_put(obj_request);
2085
2086         return -ENOMEM;
2087 }
2088
2089 static void
2090 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2091 {
2092         struct rbd_img_request *img_request;
2093         struct rbd_device *rbd_dev;
2094         u64 length;
2095         u32 page_count;
2096
2097         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2098         rbd_assert(obj_request_img_data_test(obj_request));
2099         img_request = obj_request->img_request;
2100         rbd_assert(img_request);
2101
2102         rbd_dev = img_request->rbd_dev;
2103         rbd_assert(rbd_dev);
2104         length = (u64)1 << rbd_dev->header.obj_order;
2105         page_count = (u32)calc_pages_for(0, length);
2106
2107         rbd_assert(obj_request->copyup_pages);
2108         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2109         obj_request->copyup_pages = NULL;
2110
2111         /*
2112          * We want the transfer count to reflect the size of the
2113          * original write request.  There is no such thing as a
2114          * successful short write, so if the request was successful
2115          * we can just set it to the originally-requested length.
2116          */
2117         if (!obj_request->result)
2118                 obj_request->xferred = obj_request->length;
2119
2120         /* Finish up with the normal image object callback */
2121
2122         rbd_img_obj_callback(obj_request);
2123 }
2124
2125 static void
2126 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2127 {
2128         struct rbd_obj_request *orig_request;
2129         struct ceph_osd_request *osd_req;
2130         struct ceph_osd_client *osdc;
2131         struct rbd_device *rbd_dev;
2132         struct page **pages;
2133         int result;
2134         u64 obj_size;
2135         u64 xferred;
2136
2137         rbd_assert(img_request_child_test(img_request));
2138
2139         /* First get what we need from the image request */
2140
2141         pages = img_request->copyup_pages;
2142         rbd_assert(pages != NULL);
2143         img_request->copyup_pages = NULL;
2144
2145         orig_request = img_request->obj_request;
2146         rbd_assert(orig_request != NULL);
2147         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2148         result = img_request->result;
2149         obj_size = img_request->length;
2150         xferred = img_request->xferred;
2151
2152         rbd_dev = img_request->rbd_dev;
2153         rbd_assert(rbd_dev);
2154         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2155
2156         rbd_img_request_put(img_request);
2157
2158         if (result)
2159                 goto out_err;
2160
2161         /* Allocate the new copyup osd request for the original request */
2162
2163         result = -ENOMEM;
2164         rbd_assert(!orig_request->osd_req);
2165         osd_req = rbd_osd_req_create_copyup(orig_request);
2166         if (!osd_req)
2167                 goto out_err;
2168         orig_request->osd_req = osd_req;
2169         orig_request->copyup_pages = pages;
2170
2171         /* Initialize the copyup op */
2172
2173         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2174         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2175                                                 false, false);
2176
2177         /* Then the original write request op */
2178
2179         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2180                                         orig_request->offset,
2181                                         orig_request->length, 0, 0);
2182         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2183                                         orig_request->length);
2184
2185         rbd_osd_req_format_write(orig_request);
2186
2187         /* All set, send it off. */
2188
2189         orig_request->callback = rbd_img_obj_copyup_callback;
2190         osdc = &rbd_dev->rbd_client->client->osdc;
2191         result = rbd_obj_request_submit(osdc, orig_request);
2192         if (!result)
2193                 return;
2194 out_err:
2195         /* Record the error code and complete the request */
2196
2197         orig_request->result = result;
2198         orig_request->xferred = 0;
2199         obj_request_done_set(orig_request);
2200         rbd_obj_request_complete(orig_request);
2201 }
2202
2203 /*
2204  * Read from the parent image the range of data that covers the
2205  * entire target of the given object request.  This is used for
2206  * satisfying a layered image write request when the target of an
2207  * object request from the image request does not exist.
2208  *
2209  * A page array big enough to hold the returned data is allocated
2210  * and supplied to rbd_img_request_fill() as the "data descriptor."
2211  * When the read completes, this page array will be transferred to
2212  * the original object request for the copyup operation.
2213  *
2214  * If an error occurs, record it as the result of the original
2215  * object request and mark it done so it gets completed.
2216  */
2217 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2218 {
2219         struct rbd_img_request *img_request = NULL;
2220         struct rbd_img_request *parent_request = NULL;
2221         struct rbd_device *rbd_dev;
2222         u64 img_offset;
2223         u64 length;
2224         struct page **pages = NULL;
2225         u32 page_count;
2226         int result;
2227
2228         rbd_assert(obj_request_img_data_test(obj_request));
2229         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2230
2231         img_request = obj_request->img_request;
2232         rbd_assert(img_request != NULL);
2233         rbd_dev = img_request->rbd_dev;
2234         rbd_assert(rbd_dev->parent != NULL);
2235
2236         /*
2237          * First things first.  The original osd request is of no
2238          * use to use any more, we'll need a new one that can hold
2239          * the two ops in a copyup request.  We'll get that later,
2240          * but for now we can release the old one.
2241          */
2242         rbd_osd_req_destroy(obj_request->osd_req);
2243         obj_request->osd_req = NULL;
2244
2245         /*
2246          * Determine the byte range covered by the object in the
2247          * child image to which the original request was to be sent.
2248          */
2249         img_offset = obj_request->img_offset - obj_request->offset;
2250         length = (u64)1 << rbd_dev->header.obj_order;
2251
2252         /*
2253          * There is no defined parent data beyond the parent
2254          * overlap, so limit what we read at that boundary if
2255          * necessary.
2256          */
2257         if (img_offset + length > rbd_dev->parent_overlap) {
2258                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2259                 length = rbd_dev->parent_overlap - img_offset;
2260         }
2261
2262         /*
2263          * Allocate a page array big enough to receive the data read
2264          * from the parent.
2265          */
2266         page_count = (u32)calc_pages_for(0, length);
2267         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2268         if (IS_ERR(pages)) {
2269                 result = PTR_ERR(pages);
2270                 pages = NULL;
2271                 goto out_err;
2272         }
2273
2274         result = -ENOMEM;
2275         parent_request = rbd_img_request_create(rbd_dev->parent,
2276                                                 img_offset, length,
2277                                                 false, true);
2278         if (!parent_request)
2279                 goto out_err;
2280         rbd_obj_request_get(obj_request);
2281         parent_request->obj_request = obj_request;
2282
2283         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2284         if (result)
2285                 goto out_err;
2286         parent_request->copyup_pages = pages;
2287
2288         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2289         result = rbd_img_request_submit(parent_request);
2290         if (!result)
2291                 return 0;
2292
2293         parent_request->copyup_pages = NULL;
2294         parent_request->obj_request = NULL;
2295         rbd_obj_request_put(obj_request);
2296 out_err:
2297         if (pages)
2298                 ceph_release_page_vector(pages, page_count);
2299         if (parent_request)
2300                 rbd_img_request_put(parent_request);
2301         obj_request->result = result;
2302         obj_request->xferred = 0;
2303         obj_request_done_set(obj_request);
2304
2305         return result;
2306 }
2307
2308 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2309 {
2310         struct rbd_obj_request *orig_request;
2311         int result;
2312
2313         rbd_assert(!obj_request_img_data_test(obj_request));
2314
2315         /*
2316          * All we need from the object request is the original
2317          * request and the result of the STAT op.  Grab those, then
2318          * we're done with the request.
2319          */
2320         orig_request = obj_request->obj_request;
2321         obj_request->obj_request = NULL;
2322         rbd_assert(orig_request);
2323         rbd_assert(orig_request->img_request);
2324
2325         result = obj_request->result;
2326         obj_request->result = 0;
2327
2328         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2329                 obj_request, orig_request, result,
2330                 obj_request->xferred, obj_request->length);
2331         rbd_obj_request_put(obj_request);
2332
2333         rbd_assert(orig_request);
2334         rbd_assert(orig_request->img_request);
2335
2336         /*
2337          * Our only purpose here is to determine whether the object
2338          * exists, and we don't want to treat the non-existence as
2339          * an error.  If something else comes back, transfer the
2340          * error to the original request and complete it now.
2341          */
2342         if (!result) {
2343                 obj_request_existence_set(orig_request, true);
2344         } else if (result == -ENOENT) {
2345                 obj_request_existence_set(orig_request, false);
2346         } else if (result) {
2347                 orig_request->result = result;
2348                 goto out;
2349         }
2350
2351         /*
2352          * Resubmit the original request now that we have recorded
2353          * whether the target object exists.
2354          */
2355         orig_request->result = rbd_img_obj_request_submit(orig_request);
2356 out:
2357         if (orig_request->result)
2358                 rbd_obj_request_complete(orig_request);
2359         rbd_obj_request_put(orig_request);
2360 }
2361
2362 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2363 {
2364         struct rbd_obj_request *stat_request;
2365         struct rbd_device *rbd_dev;
2366         struct ceph_osd_client *osdc;
2367         struct page **pages = NULL;
2368         u32 page_count;
2369         size_t size;
2370         int ret;
2371
2372         /*
2373          * The response data for a STAT call consists of:
2374          *     le64 length;
2375          *     struct {
2376          *         le32 tv_sec;
2377          *         le32 tv_nsec;
2378          *     } mtime;
2379          */
2380         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2381         page_count = (u32)calc_pages_for(0, size);
2382         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2383         if (IS_ERR(pages))
2384                 return PTR_ERR(pages);
2385
2386         ret = -ENOMEM;
2387         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2388                                                         OBJ_REQUEST_PAGES);
2389         if (!stat_request)
2390                 goto out;
2391
2392         rbd_obj_request_get(obj_request);
2393         stat_request->obj_request = obj_request;
2394         stat_request->pages = pages;
2395         stat_request->page_count = page_count;
2396
2397         rbd_assert(obj_request->img_request);
2398         rbd_dev = obj_request->img_request->rbd_dev;
2399         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2400                                                 stat_request);
2401         if (!stat_request->osd_req)
2402                 goto out;
2403         stat_request->callback = rbd_img_obj_exists_callback;
2404
2405         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2406         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2407                                         false, false);
2408         rbd_osd_req_format_read(stat_request);
2409
2410         osdc = &rbd_dev->rbd_client->client->osdc;
2411         ret = rbd_obj_request_submit(osdc, stat_request);
2412 out:
2413         if (ret)
2414                 rbd_obj_request_put(obj_request);
2415
2416         return ret;
2417 }
2418
2419 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2420 {
2421         struct rbd_img_request *img_request;
2422         struct rbd_device *rbd_dev;
2423         bool known;
2424
2425         rbd_assert(obj_request_img_data_test(obj_request));
2426
2427         img_request = obj_request->img_request;
2428         rbd_assert(img_request);
2429         rbd_dev = img_request->rbd_dev;
2430
2431         /*
2432          * Only writes to layered images need special handling.
2433          * Reads and non-layered writes are simple object requests.
2434          * Layered writes that start beyond the end of the overlap
2435          * with the parent have no parent data, so they too are
2436          * simple object requests.  Finally, if the target object is
2437          * known to already exist, its parent data has already been
2438          * copied, so a write to the object can also be handled as a
2439          * simple object request.
2440          */
2441         if (!img_request_write_test(img_request) ||
2442                 !img_request_layered_test(img_request) ||
2443                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2444                 ((known = obj_request_known_test(obj_request)) &&
2445                         obj_request_exists_test(obj_request))) {
2446
2447                 struct rbd_device *rbd_dev;
2448                 struct ceph_osd_client *osdc;
2449
2450                 rbd_dev = obj_request->img_request->rbd_dev;
2451                 osdc = &rbd_dev->rbd_client->client->osdc;
2452
2453                 return rbd_obj_request_submit(osdc, obj_request);
2454         }
2455
2456         /*
2457          * It's a layered write.  The target object might exist but
2458          * we may not know that yet.  If we know it doesn't exist,
2459          * start by reading the data for the full target object from
2460          * the parent so we can use it for a copyup to the target.
2461          */
2462         if (known)
2463                 return rbd_img_obj_parent_read_full(obj_request);
2464
2465         /* We don't know whether the target exists.  Go find out. */
2466
2467         return rbd_img_obj_exists_submit(obj_request);
2468 }
2469
2470 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2471 {
2472         struct rbd_obj_request *obj_request;
2473         struct rbd_obj_request *next_obj_request;
2474
2475         dout("%s: img %p\n", __func__, img_request);
2476         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2477                 int ret;
2478
2479                 ret = rbd_img_obj_request_submit(obj_request);
2480                 if (ret)
2481                         return ret;
2482         }
2483
2484         return 0;
2485 }
2486
2487 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2488 {
2489         struct rbd_obj_request *obj_request;
2490         struct rbd_device *rbd_dev;
2491         u64 obj_end;
2492
2493         rbd_assert(img_request_child_test(img_request));
2494
2495         obj_request = img_request->obj_request;
2496         rbd_assert(obj_request);
2497         rbd_assert(obj_request->img_request);
2498
2499         obj_request->result = img_request->result;
2500         if (obj_request->result)
2501                 goto out;
2502
2503         /*
2504          * We need to zero anything beyond the parent overlap
2505          * boundary.  Since rbd_img_obj_request_read_callback()
2506          * will zero anything beyond the end of a short read, an
2507          * easy way to do this is to pretend the data from the
2508          * parent came up short--ending at the overlap boundary.
2509          */
2510         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2511         obj_end = obj_request->img_offset + obj_request->length;
2512         rbd_dev = obj_request->img_request->rbd_dev;
2513         if (obj_end > rbd_dev->parent_overlap) {
2514                 u64 xferred = 0;
2515
2516                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2517                         xferred = rbd_dev->parent_overlap -
2518                                         obj_request->img_offset;
2519
2520                 obj_request->xferred = min(img_request->xferred, xferred);
2521         } else {
2522                 obj_request->xferred = img_request->xferred;
2523         }
2524 out:
2525         rbd_img_request_put(img_request);
2526         rbd_img_obj_request_read_callback(obj_request);
2527         rbd_obj_request_complete(obj_request);
2528 }
2529
2530 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2531 {
2532         struct rbd_device *rbd_dev;
2533         struct rbd_img_request *img_request;
2534         int result;
2535
2536         rbd_assert(obj_request_img_data_test(obj_request));
2537         rbd_assert(obj_request->img_request != NULL);
2538         rbd_assert(obj_request->result == (s32) -ENOENT);
2539         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2540
2541         rbd_dev = obj_request->img_request->rbd_dev;
2542         rbd_assert(rbd_dev->parent != NULL);
2543         /* rbd_read_finish(obj_request, obj_request->length); */
2544         img_request = rbd_img_request_create(rbd_dev->parent,
2545                                                 obj_request->img_offset,
2546                                                 obj_request->length,
2547                                                 false, true);
2548         result = -ENOMEM;
2549         if (!img_request)
2550                 goto out_err;
2551
2552         rbd_obj_request_get(obj_request);
2553         img_request->obj_request = obj_request;
2554
2555         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2556                                         obj_request->bio_list);
2557         if (result)
2558                 goto out_err;
2559
2560         img_request->callback = rbd_img_parent_read_callback;
2561         result = rbd_img_request_submit(img_request);
2562         if (result)
2563                 goto out_err;
2564
2565         return;
2566 out_err:
2567         if (img_request)
2568                 rbd_img_request_put(img_request);
2569         obj_request->result = result;
2570         obj_request->xferred = 0;
2571         obj_request_done_set(obj_request);
2572 }
2573
2574 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2575 {
2576         struct rbd_obj_request *obj_request;
2577         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2578         int ret;
2579
2580         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2581                                                         OBJ_REQUEST_NODATA);
2582         if (!obj_request)
2583                 return -ENOMEM;
2584
2585         ret = -ENOMEM;
2586         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2587         if (!obj_request->osd_req)
2588                 goto out;
2589         obj_request->callback = rbd_obj_request_put;
2590
2591         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2592                                         notify_id, 0, 0);
2593         rbd_osd_req_format_read(obj_request);
2594
2595         ret = rbd_obj_request_submit(osdc, obj_request);
2596 out:
2597         if (ret)
2598                 rbd_obj_request_put(obj_request);
2599
2600         return ret;
2601 }
2602
2603 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2604 {
2605         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2606         int ret;
2607
2608         if (!rbd_dev)
2609                 return;
2610
2611         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2612                 rbd_dev->header_name, (unsigned long long)notify_id,
2613                 (unsigned int)opcode);
2614         ret = rbd_dev_refresh(rbd_dev);
2615         if (ret)
2616                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2617
2618         rbd_obj_notify_ack(rbd_dev, notify_id);
2619 }
2620
2621 /*
2622  * Request sync osd watch/unwatch.  The value of "start" determines
2623  * whether a watch request is being initiated or torn down.
2624  */
2625 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2626 {
2627         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2628         struct rbd_obj_request *obj_request;
2629         int ret;
2630
2631         rbd_assert(start ^ !!rbd_dev->watch_event);
2632         rbd_assert(start ^ !!rbd_dev->watch_request);
2633
2634         if (start) {
2635                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2636                                                 &rbd_dev->watch_event);
2637                 if (ret < 0)
2638                         return ret;
2639                 rbd_assert(rbd_dev->watch_event != NULL);
2640         }
2641
2642         ret = -ENOMEM;
2643         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2644                                                         OBJ_REQUEST_NODATA);
2645         if (!obj_request)
2646                 goto out_cancel;
2647
2648         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2649         if (!obj_request->osd_req)
2650                 goto out_cancel;
2651
2652         if (start)
2653                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2654         else
2655                 ceph_osdc_unregister_linger_request(osdc,
2656                                         rbd_dev->watch_request->osd_req);
2657
2658         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2659                                 rbd_dev->watch_event->cookie, 0, start);
2660         rbd_osd_req_format_write(obj_request);
2661
2662         ret = rbd_obj_request_submit(osdc, obj_request);
2663         if (ret)
2664                 goto out_cancel;
2665         ret = rbd_obj_request_wait(obj_request);
2666         if (ret)
2667                 goto out_cancel;
2668         ret = obj_request->result;
2669         if (ret)
2670                 goto out_cancel;
2671
2672         /*
2673          * A watch request is set to linger, so the underlying osd
2674          * request won't go away until we unregister it.  We retain
2675          * a pointer to the object request during that time (in
2676          * rbd_dev->watch_request), so we'll keep a reference to
2677          * it.  We'll drop that reference (below) after we've
2678          * unregistered it.
2679          */
2680         if (start) {
2681                 rbd_dev->watch_request = obj_request;
2682
2683                 return 0;
2684         }
2685
2686         /* We have successfully torn down the watch request */
2687
2688         rbd_obj_request_put(rbd_dev->watch_request);
2689         rbd_dev->watch_request = NULL;
2690 out_cancel:
2691         /* Cancel the event if we're tearing down, or on error */
2692         ceph_osdc_cancel_event(rbd_dev->watch_event);
2693         rbd_dev->watch_event = NULL;
2694         if (obj_request)
2695                 rbd_obj_request_put(obj_request);
2696
2697         return ret;
2698 }
2699
2700 /*
2701  * Synchronous osd object method call.  Returns the number of bytes
2702  * returned in the outbound buffer, or a negative error code.
2703  */
2704 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2705                              const char *object_name,
2706                              const char *class_name,
2707                              const char *method_name,
2708                              const void *outbound,
2709                              size_t outbound_size,
2710                              void *inbound,
2711                              size_t inbound_size)
2712 {
2713         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2714         struct rbd_obj_request *obj_request;
2715         struct page **pages;
2716         u32 page_count;
2717         int ret;
2718
2719         /*
2720          * Method calls are ultimately read operations.  The result
2721          * should placed into the inbound buffer provided.  They
2722          * also supply outbound data--parameters for the object
2723          * method.  Currently if this is present it will be a
2724          * snapshot id.
2725          */
2726         page_count = (u32)calc_pages_for(0, inbound_size);
2727         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2728         if (IS_ERR(pages))
2729                 return PTR_ERR(pages);
2730
2731         ret = -ENOMEM;
2732         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2733                                                         OBJ_REQUEST_PAGES);
2734         if (!obj_request)
2735                 goto out;
2736
2737         obj_request->pages = pages;
2738         obj_request->page_count = page_count;
2739
2740         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2741         if (!obj_request->osd_req)
2742                 goto out;
2743
2744         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2745                                         class_name, method_name);
2746         if (outbound_size) {
2747                 struct ceph_pagelist *pagelist;
2748
2749                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2750                 if (!pagelist)
2751                         goto out;
2752
2753                 ceph_pagelist_init(pagelist);
2754                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2755                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2756                                                 pagelist);
2757         }
2758         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2759                                         obj_request->pages, inbound_size,
2760                                         0, false, false);
2761         rbd_osd_req_format_read(obj_request);
2762
2763         ret = rbd_obj_request_submit(osdc, obj_request);
2764         if (ret)
2765                 goto out;
2766         ret = rbd_obj_request_wait(obj_request);
2767         if (ret)
2768                 goto out;
2769
2770         ret = obj_request->result;
2771         if (ret < 0)
2772                 goto out;
2773
2774         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2775         ret = (int)obj_request->xferred;
2776         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2777 out:
2778         if (obj_request)
2779                 rbd_obj_request_put(obj_request);
2780         else
2781                 ceph_release_page_vector(pages, page_count);
2782
2783         return ret;
2784 }
2785
2786 static void rbd_request_fn(struct request_queue *q)
2787                 __releases(q->queue_lock) __acquires(q->queue_lock)
2788 {
2789         struct rbd_device *rbd_dev = q->queuedata;
2790         bool read_only = rbd_dev->mapping.read_only;
2791         struct request *rq;
2792         int result;
2793
2794         while ((rq = blk_fetch_request(q))) {
2795                 bool write_request = rq_data_dir(rq) == WRITE;
2796                 struct rbd_img_request *img_request;
2797                 u64 offset;
2798                 u64 length;
2799
2800                 /* Ignore any non-FS requests that filter through. */
2801
2802                 if (rq->cmd_type != REQ_TYPE_FS) {
2803                         dout("%s: non-fs request type %d\n", __func__,
2804                                 (int) rq->cmd_type);
2805                         __blk_end_request_all(rq, 0);
2806                         continue;
2807                 }
2808
2809                 /* Ignore/skip any zero-length requests */
2810
2811                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2812                 length = (u64) blk_rq_bytes(rq);
2813
2814                 if (!length) {
2815                         dout("%s: zero-length request\n", __func__);
2816                         __blk_end_request_all(rq, 0);
2817                         continue;
2818                 }
2819
2820                 spin_unlock_irq(q->queue_lock);
2821
2822                 /* Disallow writes to a read-only device */
2823
2824                 if (write_request) {
2825                         result = -EROFS;
2826                         if (read_only)
2827                                 goto end_request;
2828                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2829                 }
2830
2831                 /*
2832                  * Quit early if the mapped snapshot no longer
2833                  * exists.  It's still possible the snapshot will
2834                  * have disappeared by the time our request arrives
2835                  * at the osd, but there's no sense in sending it if
2836                  * we already know.
2837                  */
2838                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2839                         dout("request for non-existent snapshot");
2840                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2841                         result = -ENXIO;
2842                         goto end_request;
2843                 }
2844
2845                 result = -EINVAL;
2846                 if (offset && length > U64_MAX - offset + 1) {
2847                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2848                                 offset, length);
2849                         goto end_request;       /* Shouldn't happen */
2850                 }
2851
2852                 result = -EIO;
2853                 if (offset + length > rbd_dev->mapping.size) {
2854                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2855                                 offset, length, rbd_dev->mapping.size);
2856                         goto end_request;
2857                 }
2858
2859                 result = -ENOMEM;
2860                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2861                                                         write_request, false);
2862                 if (!img_request)
2863                         goto end_request;
2864
2865                 img_request->rq = rq;
2866
2867                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2868                                                 rq->bio);
2869                 if (!result)
2870                         result = rbd_img_request_submit(img_request);
2871                 if (result)
2872                         rbd_img_request_put(img_request);
2873 end_request:
2874                 spin_lock_irq(q->queue_lock);
2875                 if (result < 0) {
2876                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2877                                 write_request ? "write" : "read",
2878                                 length, offset, result);
2879
2880                         __blk_end_request_all(rq, result);
2881                 }
2882         }
2883 }
2884
2885 /*
2886  * a queue callback. Makes sure that we don't create a bio that spans across
2887  * multiple osd objects. One exception would be with a single page bios,
2888  * which we handle later at bio_chain_clone_range()
2889  */
2890 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2891                           struct bio_vec *bvec)
2892 {
2893         struct rbd_device *rbd_dev = q->queuedata;
2894         sector_t sector_offset;
2895         sector_t sectors_per_obj;
2896         sector_t obj_sector_offset;
2897         int ret;
2898
2899         /*
2900          * Find how far into its rbd object the partition-relative
2901          * bio start sector is to offset relative to the enclosing
2902          * device.
2903          */
2904         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2905         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2906         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2907
2908         /*
2909          * Compute the number of bytes from that offset to the end
2910          * of the object.  Account for what's already used by the bio.
2911          */
2912         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2913         if (ret > bmd->bi_size)
2914                 ret -= bmd->bi_size;
2915         else
2916                 ret = 0;
2917
2918         /*
2919          * Don't send back more than was asked for.  And if the bio
2920          * was empty, let the whole thing through because:  "Note
2921          * that a block device *must* allow a single page to be
2922          * added to an empty bio."
2923          */
2924         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2925         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2926                 ret = (int) bvec->bv_len;
2927
2928         return ret;
2929 }
2930
2931 static void rbd_free_disk(struct rbd_device *rbd_dev)
2932 {
2933         struct gendisk *disk = rbd_dev->disk;
2934
2935         if (!disk)
2936                 return;
2937
2938         rbd_dev->disk = NULL;
2939         if (disk->flags & GENHD_FL_UP) {
2940                 del_gendisk(disk);
2941                 if (disk->queue)
2942                         blk_cleanup_queue(disk->queue);
2943         }
2944         put_disk(disk);
2945 }
2946
2947 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2948                                 const char *object_name,
2949                                 u64 offset, u64 length, void *buf)
2950
2951 {
2952         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2953         struct rbd_obj_request *obj_request;
2954         struct page **pages = NULL;
2955         u32 page_count;
2956         size_t size;
2957         int ret;
2958
2959         page_count = (u32) calc_pages_for(offset, length);
2960         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2961         if (IS_ERR(pages))
2962                 ret = PTR_ERR(pages);
2963
2964         ret = -ENOMEM;
2965         obj_request = rbd_obj_request_create(object_name, offset, length,
2966                                                         OBJ_REQUEST_PAGES);
2967         if (!obj_request)
2968                 goto out;
2969
2970         obj_request->pages = pages;
2971         obj_request->page_count = page_count;
2972
2973         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2974         if (!obj_request->osd_req)
2975                 goto out;
2976
2977         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2978                                         offset, length, 0, 0);
2979         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2980                                         obj_request->pages,
2981                                         obj_request->length,
2982                                         obj_request->offset & ~PAGE_MASK,
2983                                         false, false);
2984         rbd_osd_req_format_read(obj_request);
2985
2986         ret = rbd_obj_request_submit(osdc, obj_request);
2987         if (ret)
2988                 goto out;
2989         ret = rbd_obj_request_wait(obj_request);
2990         if (ret)
2991                 goto out;
2992
2993         ret = obj_request->result;
2994         if (ret < 0)
2995                 goto out;
2996
2997         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2998         size = (size_t) obj_request->xferred;
2999         ceph_copy_from_page_vector(pages, buf, 0, size);
3000         rbd_assert(size <= (size_t)INT_MAX);
3001         ret = (int)size;
3002 out:
3003         if (obj_request)
3004                 rbd_obj_request_put(obj_request);
3005         else
3006                 ceph_release_page_vector(pages, page_count);
3007
3008         return ret;
3009 }
3010
3011 /*
3012  * Read the complete header for the given rbd device.
3013  *
3014  * Returns a pointer to a dynamically-allocated buffer containing
3015  * the complete and validated header.  Caller can pass the address
3016  * of a variable that will be filled in with the version of the
3017  * header object at the time it was read.
3018  *
3019  * Returns a pointer-coded errno if a failure occurs.
3020  */
3021 static struct rbd_image_header_ondisk *
3022 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3023 {
3024         struct rbd_image_header_ondisk *ondisk = NULL;
3025         u32 snap_count = 0;
3026         u64 names_size = 0;
3027         u32 want_count;
3028         int ret;
3029
3030         /*
3031          * The complete header will include an array of its 64-bit
3032          * snapshot ids, followed by the names of those snapshots as
3033          * a contiguous block of NUL-terminated strings.  Note that
3034          * the number of snapshots could change by the time we read
3035          * it in, in which case we re-read it.
3036          */
3037         do {
3038                 size_t size;
3039
3040                 kfree(ondisk);
3041
3042                 size = sizeof (*ondisk);
3043                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3044                 size += names_size;
3045                 ondisk = kmalloc(size, GFP_KERNEL);
3046                 if (!ondisk)
3047                         return ERR_PTR(-ENOMEM);
3048
3049                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3050                                        0, size, ondisk);
3051                 if (ret < 0)
3052                         goto out_err;
3053                 if ((size_t)ret < size) {
3054                         ret = -ENXIO;
3055                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3056                                 size, ret);
3057                         goto out_err;
3058                 }
3059                 if (!rbd_dev_ondisk_valid(ondisk)) {
3060                         ret = -ENXIO;
3061                         rbd_warn(rbd_dev, "invalid header");
3062                         goto out_err;
3063                 }
3064
3065                 names_size = le64_to_cpu(ondisk->snap_names_len);
3066                 want_count = snap_count;
3067                 snap_count = le32_to_cpu(ondisk->snap_count);
3068         } while (snap_count != want_count);
3069
3070         return ondisk;
3071
3072 out_err:
3073         kfree(ondisk);
3074
3075         return ERR_PTR(ret);
3076 }
3077
3078 /*
3079  * reload the ondisk the header
3080  */
3081 static int rbd_read_header(struct rbd_device *rbd_dev,
3082                            struct rbd_image_header *header)
3083 {
3084         struct rbd_image_header_ondisk *ondisk;
3085         int ret;
3086
3087         ondisk = rbd_dev_v1_header_read(rbd_dev);
3088         if (IS_ERR(ondisk))
3089                 return PTR_ERR(ondisk);
3090         ret = rbd_header_from_disk(header, ondisk);
3091         kfree(ondisk);
3092
3093         return ret;
3094 }
3095
3096 /*
3097  * only read the first part of the ondisk header, without the snaps info
3098  */
3099 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3100 {
3101         int ret;
3102         struct rbd_image_header h;
3103
3104         memset(&h, 0, sizeof (h));
3105         ret = rbd_read_header(rbd_dev, &h);
3106         if (ret < 0)
3107                 return ret;
3108
3109         down_write(&rbd_dev->header_rwsem);
3110
3111         /* Update image size, and check for resize of mapped image */
3112         rbd_dev->header.image_size = h.image_size;
3113         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3114                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3115                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3116
3117         /* rbd_dev->header.object_prefix shouldn't change */
3118         kfree(rbd_dev->header.snap_sizes);
3119         kfree(rbd_dev->header.snap_names);
3120         /* osd requests may still refer to snapc */
3121         ceph_put_snap_context(rbd_dev->header.snapc);
3122
3123         rbd_dev->header.image_size = h.image_size;
3124         rbd_dev->header.snapc = h.snapc;
3125         rbd_dev->header.snap_names = h.snap_names;
3126         rbd_dev->header.snap_sizes = h.snap_sizes;
3127         /* Free the extra copy of the object prefix */
3128         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3129                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3130         kfree(h.object_prefix);
3131
3132         up_write(&rbd_dev->header_rwsem);
3133
3134         return ret;
3135 }
3136
3137 /*
3138  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3139  * has disappeared from the (just updated) snapshot context.
3140  */
3141 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3142 {
3143         u64 snap_id;
3144
3145         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3146                 return;
3147
3148         snap_id = rbd_dev->spec->snap_id;
3149         if (snap_id == CEPH_NOSNAP)
3150                 return;
3151
3152         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3153                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3154 }
3155
3156 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3157 {
3158         u64 mapping_size;
3159         int ret;
3160
3161         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3162         mapping_size = rbd_dev->mapping.size;
3163         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3164         if (rbd_dev->image_format == 1)
3165                 ret = rbd_dev_v1_refresh(rbd_dev);
3166         else
3167                 ret = rbd_dev_v2_refresh(rbd_dev);
3168
3169         /* If it's a mapped snapshot, validate its EXISTS flag */
3170
3171         rbd_exists_validate(rbd_dev);
3172         mutex_unlock(&ctl_mutex);
3173         if (mapping_size != rbd_dev->mapping.size) {
3174                 sector_t size;
3175
3176                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3177                 dout("setting size to %llu sectors", (unsigned long long)size);
3178                 set_capacity(rbd_dev->disk, size);
3179                 revalidate_disk(rbd_dev->disk);
3180         }
3181
3182         return ret;
3183 }
3184
3185 static int rbd_init_disk(struct rbd_device *rbd_dev)
3186 {
3187         struct gendisk *disk;
3188         struct request_queue *q;
3189         u64 segment_size;
3190
3191         /* create gendisk info */
3192         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3193         if (!disk)
3194                 return -ENOMEM;
3195
3196         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3197                  rbd_dev->dev_id);
3198         disk->major = rbd_dev->major;
3199         disk->first_minor = 0;
3200         disk->fops = &rbd_bd_ops;
3201         disk->private_data = rbd_dev;
3202
3203         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3204         if (!q)
3205                 goto out_disk;
3206
3207         /* We use the default size, but let's be explicit about it. */
3208         blk_queue_physical_block_size(q, SECTOR_SIZE);
3209
3210         /* set io sizes to object size */
3211         segment_size = rbd_obj_bytes(&rbd_dev->header);
3212         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3213         blk_queue_max_segment_size(q, segment_size);
3214         blk_queue_io_min(q, segment_size);
3215         blk_queue_io_opt(q, segment_size);
3216
3217         blk_queue_merge_bvec(q, rbd_merge_bvec);
3218         disk->queue = q;
3219
3220         q->queuedata = rbd_dev;
3221
3222         rbd_dev->disk = disk;
3223
3224         return 0;
3225 out_disk:
3226         put_disk(disk);
3227
3228         return -ENOMEM;
3229 }
3230
3231 /*
3232   sysfs
3233 */
3234
3235 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3236 {
3237         return container_of(dev, struct rbd_device, dev);
3238 }
3239
3240 static ssize_t rbd_size_show(struct device *dev,
3241                              struct device_attribute *attr, char *buf)
3242 {
3243         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244
3245         return sprintf(buf, "%llu\n",
3246                 (unsigned long long)rbd_dev->mapping.size);
3247 }
3248
3249 /*
3250  * Note this shows the features for whatever's mapped, which is not
3251  * necessarily the base image.
3252  */
3253 static ssize_t rbd_features_show(struct device *dev,
3254                              struct device_attribute *attr, char *buf)
3255 {
3256         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257
3258         return sprintf(buf, "0x%016llx\n",
3259                         (unsigned long long)rbd_dev->mapping.features);
3260 }
3261
3262 static ssize_t rbd_major_show(struct device *dev,
3263                               struct device_attribute *attr, char *buf)
3264 {
3265         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3266
3267         if (rbd_dev->major)
3268                 return sprintf(buf, "%d\n", rbd_dev->major);
3269
3270         return sprintf(buf, "(none)\n");
3271
3272 }
3273
3274 static ssize_t rbd_client_id_show(struct device *dev,
3275                                   struct device_attribute *attr, char *buf)
3276 {
3277         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3278
3279         return sprintf(buf, "client%lld\n",
3280                         ceph_client_id(rbd_dev->rbd_client->client));
3281 }
3282
3283 static ssize_t rbd_pool_show(struct device *dev,
3284                              struct device_attribute *attr, char *buf)
3285 {
3286         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287
3288         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3289 }
3290
3291 static ssize_t rbd_pool_id_show(struct device *dev,
3292                              struct device_attribute *attr, char *buf)
3293 {
3294         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3295
3296         return sprintf(buf, "%llu\n",
3297                         (unsigned long long) rbd_dev->spec->pool_id);
3298 }
3299
3300 static ssize_t rbd_name_show(struct device *dev,
3301                              struct device_attribute *attr, char *buf)
3302 {
3303         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3304
3305         if (rbd_dev->spec->image_name)
3306                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3307
3308         return sprintf(buf, "(unknown)\n");
3309 }
3310
3311 static ssize_t rbd_image_id_show(struct device *dev,
3312                              struct device_attribute *attr, char *buf)
3313 {
3314         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3315
3316         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3317 }
3318
3319 /*
3320  * Shows the name of the currently-mapped snapshot (or
3321  * RBD_SNAP_HEAD_NAME for the base image).
3322  */
3323 static ssize_t rbd_snap_show(struct device *dev,
3324                              struct device_attribute *attr,
3325                              char *buf)
3326 {
3327         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3328
3329         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3330 }
3331
3332 /*
3333  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3334  * for the parent image.  If there is no parent, simply shows
3335  * "(no parent image)".
3336  */
3337 static ssize_t rbd_parent_show(struct device *dev,
3338                              struct device_attribute *attr,
3339                              char *buf)
3340 {
3341         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3342         struct rbd_spec *spec = rbd_dev->parent_spec;
3343         int count;
3344         char *bufp = buf;
3345
3346         if (!spec)
3347                 return sprintf(buf, "(no parent image)\n");
3348
3349         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3350                         (unsigned long long) spec->pool_id, spec->pool_name);
3351         if (count < 0)
3352                 return count;
3353         bufp += count;
3354
3355         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3356                         spec->image_name ? spec->image_name : "(unknown)");
3357         if (count < 0)
3358                 return count;
3359         bufp += count;
3360
3361         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3362                         (unsigned long long) spec->snap_id, spec->snap_name);
3363         if (count < 0)
3364                 return count;
3365         bufp += count;
3366
3367         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3368         if (count < 0)
3369                 return count;
3370         bufp += count;
3371
3372         return (ssize_t) (bufp - buf);
3373 }
3374
3375 static ssize_t rbd_image_refresh(struct device *dev,
3376                                  struct device_attribute *attr,
3377                                  const char *buf,
3378                                  size_t size)
3379 {
3380         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3381         int ret;
3382
3383         ret = rbd_dev_refresh(rbd_dev);
3384         if (ret)
3385                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3386
3387         return ret < 0 ? ret : size;
3388 }
3389
3390 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3391 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3392 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3393 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3394 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3395 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3396 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3397 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3398 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3399 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3400 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3401
3402 static struct attribute *rbd_attrs[] = {
3403         &dev_attr_size.attr,
3404         &dev_attr_features.attr,
3405         &dev_attr_major.attr,
3406         &dev_attr_client_id.attr,
3407         &dev_attr_pool.attr,
3408         &dev_attr_pool_id.attr,
3409         &dev_attr_name.attr,
3410         &dev_attr_image_id.attr,
3411         &dev_attr_current_snap.attr,
3412         &dev_attr_parent.attr,
3413         &dev_attr_refresh.attr,
3414         NULL
3415 };
3416
3417 static struct attribute_group rbd_attr_group = {
3418         .attrs = rbd_attrs,
3419 };
3420
3421 static const struct attribute_group *rbd_attr_groups[] = {
3422         &rbd_attr_group,
3423         NULL
3424 };
3425
3426 static void rbd_sysfs_dev_release(struct device *dev)
3427 {
3428 }
3429
3430 static struct device_type rbd_device_type = {
3431         .name           = "rbd",
3432         .groups         = rbd_attr_groups,
3433         .release        = rbd_sysfs_dev_release,
3434 };
3435
3436 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3437 {
3438         kref_get(&spec->kref);
3439
3440         return spec;
3441 }
3442
3443 static void rbd_spec_free(struct kref *kref);
3444 static void rbd_spec_put(struct rbd_spec *spec)
3445 {
3446         if (spec)
3447                 kref_put(&spec->kref, rbd_spec_free);
3448 }
3449
3450 static struct rbd_spec *rbd_spec_alloc(void)
3451 {
3452         struct rbd_spec *spec;
3453
3454         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3455         if (!spec)
3456                 return NULL;
3457         kref_init(&spec->kref);
3458
3459         return spec;
3460 }
3461
3462 static void rbd_spec_free(struct kref *kref)
3463 {
3464         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3465
3466         kfree(spec->pool_name);
3467         kfree(spec->image_id);
3468         kfree(spec->image_name);
3469         kfree(spec->snap_name);
3470         kfree(spec);
3471 }
3472
3473 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3474                                 struct rbd_spec *spec)
3475 {
3476         struct rbd_device *rbd_dev;
3477
3478         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3479         if (!rbd_dev)
3480                 return NULL;
3481
3482         spin_lock_init(&rbd_dev->lock);
3483         rbd_dev->flags = 0;
3484         INIT_LIST_HEAD(&rbd_dev->node);
3485         init_rwsem(&rbd_dev->header_rwsem);
3486
3487         rbd_dev->spec = spec;
3488         rbd_dev->rbd_client = rbdc;
3489
3490         /* Initialize the layout used for all rbd requests */
3491
3492         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3493         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3494         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3495         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3496
3497         return rbd_dev;
3498 }
3499
3500 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3501 {
3502         rbd_put_client(rbd_dev->rbd_client);
3503         rbd_spec_put(rbd_dev->spec);
3504         kfree(rbd_dev);
3505 }
3506
3507 /*
3508  * Get the size and object order for an image snapshot, or if
3509  * snap_id is CEPH_NOSNAP, gets this information for the base
3510  * image.
3511  */
3512 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3513                                 u8 *order, u64 *snap_size)
3514 {
3515         __le64 snapid = cpu_to_le64(snap_id);
3516         int ret;
3517         struct {
3518                 u8 order;
3519                 __le64 size;
3520         } __attribute__ ((packed)) size_buf = { 0 };
3521
3522         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3523                                 "rbd", "get_size",
3524                                 &snapid, sizeof (snapid),
3525                                 &size_buf, sizeof (size_buf));
3526         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3527         if (ret < 0)
3528                 return ret;
3529         if (ret < sizeof (size_buf))
3530                 return -ERANGE;
3531
3532         if (order)
3533                 *order = size_buf.order;
3534         *snap_size = le64_to_cpu(size_buf.size);
3535
3536         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3537                 (unsigned long long)snap_id, (unsigned int)*order,
3538                 (unsigned long long)*snap_size);
3539
3540         return 0;
3541 }
3542
3543 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3544 {
3545         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3546                                         &rbd_dev->header.obj_order,
3547                                         &rbd_dev->header.image_size);
3548 }
3549
3550 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3551 {
3552         void *reply_buf;
3553         int ret;
3554         void *p;
3555
3556         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3557         if (!reply_buf)
3558                 return -ENOMEM;
3559
3560         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3561                                 "rbd", "get_object_prefix", NULL, 0,
3562                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3563         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3564         if (ret < 0)
3565                 goto out;
3566
3567         p = reply_buf;
3568         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3569                                                 p + ret, NULL, GFP_NOIO);
3570         ret = 0;
3571
3572         if (IS_ERR(rbd_dev->header.object_prefix)) {
3573                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3574                 rbd_dev->header.object_prefix = NULL;
3575         } else {
3576                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3577         }
3578 out:
3579         kfree(reply_buf);
3580
3581         return ret;
3582 }
3583
3584 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3585                 u64 *snap_features)
3586 {
3587         __le64 snapid = cpu_to_le64(snap_id);
3588         struct {
3589                 __le64 features;
3590                 __le64 incompat;
3591         } __attribute__ ((packed)) features_buf = { 0 };
3592         u64 incompat;
3593         int ret;
3594
3595         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3596                                 "rbd", "get_features",
3597                                 &snapid, sizeof (snapid),
3598                                 &features_buf, sizeof (features_buf));
3599         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3600         if (ret < 0)
3601                 return ret;
3602         if (ret < sizeof (features_buf))
3603                 return -ERANGE;
3604
3605         incompat = le64_to_cpu(features_buf.incompat);
3606         if (incompat & ~RBD_FEATURES_SUPPORTED)
3607                 return -ENXIO;
3608
3609         *snap_features = le64_to_cpu(features_buf.features);
3610
3611         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3612                 (unsigned long long)snap_id,
3613                 (unsigned long long)*snap_features,
3614                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3615
3616         return 0;
3617 }
3618
3619 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3620 {
3621         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3622                                                 &rbd_dev->header.features);
3623 }
3624
3625 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3626 {
3627         struct rbd_spec *parent_spec;
3628         size_t size;
3629         void *reply_buf = NULL;
3630         __le64 snapid;
3631         void *p;
3632         void *end;
3633         char *image_id;
3634         u64 overlap;
3635         int ret;
3636
3637         parent_spec = rbd_spec_alloc();
3638         if (!parent_spec)
3639                 return -ENOMEM;
3640
3641         size = sizeof (__le64) +                                /* pool_id */
3642                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3643                 sizeof (__le64) +                               /* snap_id */
3644                 sizeof (__le64);                                /* overlap */
3645         reply_buf = kmalloc(size, GFP_KERNEL);
3646         if (!reply_buf) {
3647                 ret = -ENOMEM;
3648                 goto out_err;
3649         }
3650
3651         snapid = cpu_to_le64(CEPH_NOSNAP);
3652         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3653                                 "rbd", "get_parent",
3654                                 &snapid, sizeof (snapid),
3655                                 reply_buf, size);
3656         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3657         if (ret < 0)
3658                 goto out_err;
3659
3660         p = reply_buf;
3661         end = reply_buf + ret;
3662         ret = -ERANGE;
3663         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3664         if (parent_spec->pool_id == CEPH_NOPOOL)
3665                 goto out;       /* No parent?  No problem. */
3666
3667         /* The ceph file layout needs to fit pool id in 32 bits */
3668
3669         ret = -EIO;
3670         if (parent_spec->pool_id > (u64)U32_MAX) {
3671                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3672                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3673                 goto out_err;
3674         }
3675
3676         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3677         if (IS_ERR(image_id)) {
3678                 ret = PTR_ERR(image_id);
3679                 goto out_err;
3680         }
3681         parent_spec->image_id = image_id;
3682         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3683         ceph_decode_64_safe(&p, end, overlap, out_err);
3684
3685         rbd_dev->parent_overlap = overlap;
3686         rbd_dev->parent_spec = parent_spec;
3687         parent_spec = NULL;     /* rbd_dev now owns this */
3688 out:
3689         ret = 0;
3690 out_err:
3691         kfree(reply_buf);
3692         rbd_spec_put(parent_spec);
3693
3694         return ret;
3695 }
3696
3697 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3698 {
3699         struct {
3700                 __le64 stripe_unit;
3701                 __le64 stripe_count;
3702         } __attribute__ ((packed)) striping_info_buf = { 0 };
3703         size_t size = sizeof (striping_info_buf);
3704         void *p;
3705         u64 obj_size;
3706         u64 stripe_unit;
3707         u64 stripe_count;
3708         int ret;
3709
3710         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3711                                 "rbd", "get_stripe_unit_count", NULL, 0,
3712                                 (char *)&striping_info_buf, size);
3713         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3714         if (ret < 0)
3715                 return ret;
3716         if (ret < size)
3717                 return -ERANGE;
3718
3719         /*
3720          * We don't actually support the "fancy striping" feature
3721          * (STRIPINGV2) yet, but if the striping sizes are the
3722          * defaults the behavior is the same as before.  So find
3723          * out, and only fail if the image has non-default values.
3724          */
3725         ret = -EINVAL;
3726         obj_size = (u64)1 << rbd_dev->header.obj_order;
3727         p = &striping_info_buf;
3728         stripe_unit = ceph_decode_64(&p);
3729         if (stripe_unit != obj_size) {
3730                 rbd_warn(rbd_dev, "unsupported stripe unit "
3731                                 "(got %llu want %llu)",
3732                                 stripe_unit, obj_size);
3733                 return -EINVAL;
3734         }
3735         stripe_count = ceph_decode_64(&p);
3736         if (stripe_count != 1) {
3737                 rbd_warn(rbd_dev, "unsupported stripe count "
3738                                 "(got %llu want 1)", stripe_count);
3739                 return -EINVAL;
3740         }
3741         rbd_dev->header.stripe_unit = stripe_unit;
3742         rbd_dev->header.stripe_count = stripe_count;
3743
3744         return 0;
3745 }
3746
3747 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3748 {
3749         size_t image_id_size;
3750         char *image_id;
3751         void *p;
3752         void *end;
3753         size_t size;
3754         void *reply_buf = NULL;
3755         size_t len = 0;
3756         char *image_name = NULL;
3757         int ret;
3758
3759         rbd_assert(!rbd_dev->spec->image_name);
3760
3761         len = strlen(rbd_dev->spec->image_id);
3762         image_id_size = sizeof (__le32) + len;
3763         image_id = kmalloc(image_id_size, GFP_KERNEL);
3764         if (!image_id)
3765                 return NULL;
3766
3767         p = image_id;
3768         end = image_id + image_id_size;
3769         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3770
3771         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3772         reply_buf = kmalloc(size, GFP_KERNEL);
3773         if (!reply_buf)
3774                 goto out;
3775
3776         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3777                                 "rbd", "dir_get_name",
3778                                 image_id, image_id_size,
3779                                 reply_buf, size);
3780         if (ret < 0)
3781                 goto out;
3782         p = reply_buf;
3783         end = reply_buf + ret;
3784
3785         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3786         if (IS_ERR(image_name))
3787                 image_name = NULL;
3788         else
3789                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3790 out:
3791         kfree(reply_buf);
3792         kfree(image_id);
3793
3794         return image_name;
3795 }
3796
3797 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3798 {
3799         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3800         const char *snap_name;
3801         u32 which = 0;
3802
3803         /* Skip over names until we find the one we are looking for */
3804
3805         snap_name = rbd_dev->header.snap_names;
3806         while (which < snapc->num_snaps) {
3807                 if (!strcmp(name, snap_name))
3808                         return snapc->snaps[which];
3809                 snap_name += strlen(snap_name) + 1;
3810                 which++;
3811         }
3812         return CEPH_NOSNAP;
3813 }
3814
3815 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3816 {
3817         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3818         u32 which;
3819         bool found = false;
3820         u64 snap_id;
3821
3822         for (which = 0; !found && which < snapc->num_snaps; which++) {
3823                 const char *snap_name;
3824
3825                 snap_id = snapc->snaps[which];
3826                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3827                 if (IS_ERR(snap_name))
3828                         break;
3829                 found = !strcmp(name, snap_name);
3830                 kfree(snap_name);
3831         }
3832         return found ? snap_id : CEPH_NOSNAP;
3833 }
3834
3835 /*
3836  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3837  * no snapshot by that name is found, or if an error occurs.
3838  */
3839 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3840 {
3841         if (rbd_dev->image_format == 1)
3842                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3843
3844         return rbd_v2_snap_id_by_name(rbd_dev, name);
3845 }
3846
3847 /*
3848  * When an rbd image has a parent image, it is identified by the
3849  * pool, image, and snapshot ids (not names).  This function fills
3850  * in the names for those ids.  (It's OK if we can't figure out the
3851  * name for an image id, but the pool and snapshot ids should always
3852  * exist and have names.)  All names in an rbd spec are dynamically
3853  * allocated.
3854  *
3855  * When an image being mapped (not a parent) is probed, we have the
3856  * pool name and pool id, image name and image id, and the snapshot
3857  * name.  The only thing we're missing is the snapshot id.
3858  */
3859 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3860 {
3861         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3862         struct rbd_spec *spec = rbd_dev->spec;
3863         const char *pool_name;
3864         const char *image_name;
3865         const char *snap_name;
3866         int ret;
3867
3868         /*
3869          * An image being mapped will have the pool name (etc.), but
3870          * we need to look up the snapshot id.
3871          */
3872         if (spec->pool_name) {
3873                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3874                         u64 snap_id;
3875
3876                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3877                         if (snap_id == CEPH_NOSNAP)
3878                                 return -ENOENT;
3879                         spec->snap_id = snap_id;
3880                 } else {
3881                         spec->snap_id = CEPH_NOSNAP;
3882                 }
3883
3884                 return 0;
3885         }
3886
3887         /* Get the pool name; we have to make our own copy of this */
3888
3889         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3890         if (!pool_name) {
3891                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3892                 return -EIO;
3893         }
3894         pool_name = kstrdup(pool_name, GFP_KERNEL);
3895         if (!pool_name)
3896                 return -ENOMEM;
3897
3898         /* Fetch the image name; tolerate failure here */
3899
3900         image_name = rbd_dev_image_name(rbd_dev);
3901         if (!image_name)
3902                 rbd_warn(rbd_dev, "unable to get image name");
3903
3904         /* Look up the snapshot name, and make a copy */
3905
3906         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3907         if (!snap_name) {
3908                 ret = -ENOMEM;
3909                 goto out_err;
3910         }
3911
3912         spec->pool_name = pool_name;
3913         spec->image_name = image_name;
3914         spec->snap_name = snap_name;
3915
3916         return 0;
3917 out_err:
3918         kfree(image_name);
3919         kfree(pool_name);
3920
3921         return ret;
3922 }
3923
3924 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3925 {
3926         size_t size;
3927         int ret;
3928         void *reply_buf;
3929         void *p;
3930         void *end;
3931         u64 seq;
3932         u32 snap_count;
3933         struct ceph_snap_context *snapc;
3934         u32 i;
3935
3936         /*
3937          * We'll need room for the seq value (maximum snapshot id),
3938          * snapshot count, and array of that many snapshot ids.
3939          * For now we have a fixed upper limit on the number we're
3940          * prepared to receive.
3941          */
3942         size = sizeof (__le64) + sizeof (__le32) +
3943                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3944         reply_buf = kzalloc(size, GFP_KERNEL);
3945         if (!reply_buf)
3946                 return -ENOMEM;
3947
3948         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3949                                 "rbd", "get_snapcontext", NULL, 0,
3950                                 reply_buf, size);
3951         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3952         if (ret < 0)
3953                 goto out;
3954
3955         p = reply_buf;
3956         end = reply_buf + ret;
3957         ret = -ERANGE;
3958         ceph_decode_64_safe(&p, end, seq, out);
3959         ceph_decode_32_safe(&p, end, snap_count, out);
3960
3961         /*
3962          * Make sure the reported number of snapshot ids wouldn't go
3963          * beyond the end of our buffer.  But before checking that,
3964          * make sure the computed size of the snapshot context we
3965          * allocate is representable in a size_t.
3966          */
3967         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3968                                  / sizeof (u64)) {
3969                 ret = -EINVAL;
3970                 goto out;
3971         }
3972         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3973                 goto out;
3974         ret = 0;
3975
3976         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3977         if (!snapc) {
3978                 ret = -ENOMEM;
3979                 goto out;
3980         }
3981         snapc->seq = seq;
3982         for (i = 0; i < snap_count; i++)
3983                 snapc->snaps[i] = ceph_decode_64(&p);
3984
3985         ceph_put_snap_context(rbd_dev->header.snapc);
3986         rbd_dev->header.snapc = snapc;
3987
3988         dout("  snap context seq = %llu, snap_count = %u\n",
3989                 (unsigned long long)seq, (unsigned int)snap_count);
3990 out:
3991         kfree(reply_buf);
3992
3993         return ret;
3994 }
3995
3996 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3997                                         u64 snap_id)
3998 {
3999         size_t size;
4000         void *reply_buf;
4001         __le64 snapid;
4002         int ret;
4003         void *p;
4004         void *end;
4005         char *snap_name;
4006
4007         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4008         reply_buf = kmalloc(size, GFP_KERNEL);
4009         if (!reply_buf)
4010                 return ERR_PTR(-ENOMEM);
4011
4012         snapid = cpu_to_le64(snap_id);
4013         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4014                                 "rbd", "get_snapshot_name",
4015                                 &snapid, sizeof (snapid),
4016                                 reply_buf, size);
4017         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4018         if (ret < 0) {
4019                 snap_name = ERR_PTR(ret);
4020                 goto out;
4021         }
4022
4023         p = reply_buf;
4024         end = reply_buf + ret;
4025         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4026         if (IS_ERR(snap_name))
4027                 goto out;
4028
4029         dout("  snap_id 0x%016llx snap_name = %s\n",
4030                 (unsigned long long)snap_id, snap_name);
4031 out:
4032         kfree(reply_buf);
4033
4034         return snap_name;
4035 }
4036
4037 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4038 {
4039         int ret;
4040
4041         down_write(&rbd_dev->header_rwsem);
4042
4043         ret = rbd_dev_v2_image_size(rbd_dev);
4044         if (ret)
4045                 goto out;
4046         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4047                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4048                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4049
4050         ret = rbd_dev_v2_snap_context(rbd_dev);
4051         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4052         if (ret)
4053                 goto out;
4054 out:
4055         up_write(&rbd_dev->header_rwsem);
4056
4057         return ret;
4058 }
4059
4060 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4061 {
4062         struct device *dev;
4063         int ret;
4064
4065         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4066
4067         dev = &rbd_dev->dev;
4068         dev->bus = &rbd_bus_type;
4069         dev->type = &rbd_device_type;
4070         dev->parent = &rbd_root_dev;
4071         dev->release = rbd_dev_device_release;
4072         dev_set_name(dev, "%d", rbd_dev->dev_id);
4073         ret = device_register(dev);
4074
4075         mutex_unlock(&ctl_mutex);
4076
4077         return ret;
4078 }
4079
4080 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4081 {
4082         device_unregister(&rbd_dev->dev);
4083 }
4084
4085 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4086
4087 /*
4088  * Get a unique rbd identifier for the given new rbd_dev, and add
4089  * the rbd_dev to the global list.  The minimum rbd id is 1.
4090  */
4091 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4092 {
4093         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4094
4095         spin_lock(&rbd_dev_list_lock);
4096         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4097         spin_unlock(&rbd_dev_list_lock);
4098         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4099                 (unsigned long long) rbd_dev->dev_id);
4100 }
4101
4102 /*
4103  * Remove an rbd_dev from the global list, and record that its
4104  * identifier is no longer in use.
4105  */
4106 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4107 {
4108         struct list_head *tmp;
4109         int rbd_id = rbd_dev->dev_id;
4110         int max_id;
4111
4112         rbd_assert(rbd_id > 0);
4113
4114         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4115                 (unsigned long long) rbd_dev->dev_id);
4116         spin_lock(&rbd_dev_list_lock);
4117         list_del_init(&rbd_dev->node);
4118
4119         /*
4120          * If the id being "put" is not the current maximum, there
4121          * is nothing special we need to do.
4122          */
4123         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4124                 spin_unlock(&rbd_dev_list_lock);
4125                 return;
4126         }
4127
4128         /*
4129          * We need to update the current maximum id.  Search the
4130          * list to find out what it is.  We're more likely to find
4131          * the maximum at the end, so search the list backward.
4132          */
4133         max_id = 0;
4134         list_for_each_prev(tmp, &rbd_dev_list) {
4135                 struct rbd_device *rbd_dev;
4136
4137                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4138                 if (rbd_dev->dev_id > max_id)
4139                         max_id = rbd_dev->dev_id;
4140         }
4141         spin_unlock(&rbd_dev_list_lock);
4142
4143         /*
4144          * The max id could have been updated by rbd_dev_id_get(), in
4145          * which case it now accurately reflects the new maximum.
4146          * Be careful not to overwrite the maximum value in that
4147          * case.
4148          */
4149         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4150         dout("  max dev id has been reset\n");
4151 }
4152
4153 /*
4154  * Skips over white space at *buf, and updates *buf to point to the
4155  * first found non-space character (if any). Returns the length of
4156  * the token (string of non-white space characters) found.  Note
4157  * that *buf must be terminated with '\0'.
4158  */
4159 static inline size_t next_token(const char **buf)
4160 {
4161         /*
4162         * These are the characters that produce nonzero for
4163         * isspace() in the "C" and "POSIX" locales.
4164         */
4165         const char *spaces = " \f\n\r\t\v";
4166
4167         *buf += strspn(*buf, spaces);   /* Find start of token */
4168
4169         return strcspn(*buf, spaces);   /* Return token length */
4170 }
4171
4172 /*
4173  * Finds the next token in *buf, and if the provided token buffer is
4174  * big enough, copies the found token into it.  The result, if
4175  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4176  * must be terminated with '\0' on entry.
4177  *
4178  * Returns the length of the token found (not including the '\0').
4179  * Return value will be 0 if no token is found, and it will be >=
4180  * token_size if the token would not fit.
4181  *
4182  * The *buf pointer will be updated to point beyond the end of the
4183  * found token.  Note that this occurs even if the token buffer is
4184  * too small to hold it.
4185  */
4186 static inline size_t copy_token(const char **buf,
4187                                 char *token,
4188                                 size_t token_size)
4189 {
4190         size_t len;
4191
4192         len = next_token(buf);
4193         if (len < token_size) {
4194                 memcpy(token, *buf, len);
4195                 *(token + len) = '\0';
4196         }
4197         *buf += len;
4198
4199         return len;
4200 }
4201
4202 /*
4203  * Finds the next token in *buf, dynamically allocates a buffer big
4204  * enough to hold a copy of it, and copies the token into the new
4205  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4206  * that a duplicate buffer is created even for a zero-length token.
4207  *
4208  * Returns a pointer to the newly-allocated duplicate, or a null
4209  * pointer if memory for the duplicate was not available.  If
4210  * the lenp argument is a non-null pointer, the length of the token
4211  * (not including the '\0') is returned in *lenp.
4212  *
4213  * If successful, the *buf pointer will be updated to point beyond
4214  * the end of the found token.
4215  *
4216  * Note: uses GFP_KERNEL for allocation.
4217  */
4218 static inline char *dup_token(const char **buf, size_t *lenp)
4219 {
4220         char *dup;
4221         size_t len;
4222
4223         len = next_token(buf);
4224         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4225         if (!dup)
4226                 return NULL;
4227         *(dup + len) = '\0';
4228         *buf += len;
4229
4230         if (lenp)
4231                 *lenp = len;
4232
4233         return dup;
4234 }
4235
4236 /*
4237  * Parse the options provided for an "rbd add" (i.e., rbd image
4238  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4239  * and the data written is passed here via a NUL-terminated buffer.
4240  * Returns 0 if successful or an error code otherwise.
4241  *
4242  * The information extracted from these options is recorded in
4243  * the other parameters which return dynamically-allocated
4244  * structures:
4245  *  ceph_opts
4246  *      The address of a pointer that will refer to a ceph options
4247  *      structure.  Caller must release the returned pointer using
4248  *      ceph_destroy_options() when it is no longer needed.
4249  *  rbd_opts
4250  *      Address of an rbd options pointer.  Fully initialized by
4251  *      this function; caller must release with kfree().
4252  *  spec
4253  *      Address of an rbd image specification pointer.  Fully
4254  *      initialized by this function based on parsed options.
4255  *      Caller must release with rbd_spec_put().
4256  *
4257  * The options passed take this form:
4258  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4259  * where:
4260  *  <mon_addrs>
4261  *      A comma-separated list of one or more monitor addresses.
4262  *      A monitor address is an ip address, optionally followed
4263  *      by a port number (separated by a colon).
4264  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4265  *  <options>
4266  *      A comma-separated list of ceph and/or rbd options.
4267  *  <pool_name>
4268  *      The name of the rados pool containing the rbd image.
4269  *  <image_name>
4270  *      The name of the image in that pool to map.
4271  *  <snap_id>
4272  *      An optional snapshot id.  If provided, the mapping will
4273  *      present data from the image at the time that snapshot was
4274  *      created.  The image head is used if no snapshot id is
4275  *      provided.  Snapshot mappings are always read-only.
4276  */
4277 static int rbd_add_parse_args(const char *buf,
4278                                 struct ceph_options **ceph_opts,
4279                                 struct rbd_options **opts,
4280                                 struct rbd_spec **rbd_spec)
4281 {
4282         size_t len;
4283         char *options;
4284         const char *mon_addrs;
4285         char *snap_name;
4286         size_t mon_addrs_size;
4287         struct rbd_spec *spec = NULL;
4288         struct rbd_options *rbd_opts = NULL;
4289         struct ceph_options *copts;
4290         int ret;
4291
4292         /* The first four tokens are required */
4293
4294         len = next_token(&buf);
4295         if (!len) {
4296                 rbd_warn(NULL, "no monitor address(es) provided");
4297                 return -EINVAL;
4298         }
4299         mon_addrs = buf;
4300         mon_addrs_size = len + 1;
4301         buf += len;
4302
4303         ret = -EINVAL;
4304         options = dup_token(&buf, NULL);
4305         if (!options)
4306                 return -ENOMEM;
4307         if (!*options) {
4308                 rbd_warn(NULL, "no options provided");
4309                 goto out_err;
4310         }
4311
4312         spec = rbd_spec_alloc();
4313         if (!spec)
4314                 goto out_mem;
4315
4316         spec->pool_name = dup_token(&buf, NULL);
4317         if (!spec->pool_name)
4318                 goto out_mem;
4319         if (!*spec->pool_name) {
4320                 rbd_warn(NULL, "no pool name provided");
4321                 goto out_err;
4322         }
4323
4324         spec->image_name = dup_token(&buf, NULL);
4325         if (!spec->image_name)
4326                 goto out_mem;
4327         if (!*spec->image_name) {
4328                 rbd_warn(NULL, "no image name provided");
4329                 goto out_err;
4330         }
4331
4332         /*
4333          * Snapshot name is optional; default is to use "-"
4334          * (indicating the head/no snapshot).
4335          */
4336         len = next_token(&buf);
4337         if (!len) {
4338                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4339                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4340         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4341                 ret = -ENAMETOOLONG;
4342                 goto out_err;
4343         }
4344         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4345         if (!snap_name)
4346                 goto out_mem;
4347         *(snap_name + len) = '\0';
4348         spec->snap_name = snap_name;
4349
4350         /* Initialize all rbd options to the defaults */
4351
4352         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4353         if (!rbd_opts)
4354                 goto out_mem;
4355
4356         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4357
4358         copts = ceph_parse_options(options, mon_addrs,
4359                                         mon_addrs + mon_addrs_size - 1,
4360                                         parse_rbd_opts_token, rbd_opts);
4361         if (IS_ERR(copts)) {
4362                 ret = PTR_ERR(copts);
4363                 goto out_err;
4364         }
4365         kfree(options);
4366
4367         *ceph_opts = copts;
4368         *opts = rbd_opts;
4369         *rbd_spec = spec;
4370
4371         return 0;
4372 out_mem:
4373         ret = -ENOMEM;
4374 out_err:
4375         kfree(rbd_opts);
4376         rbd_spec_put(spec);
4377         kfree(options);
4378
4379         return ret;
4380 }
4381
4382 /*
4383  * An rbd format 2 image has a unique identifier, distinct from the
4384  * name given to it by the user.  Internally, that identifier is
4385  * what's used to specify the names of objects related to the image.
4386  *
4387  * A special "rbd id" object is used to map an rbd image name to its
4388  * id.  If that object doesn't exist, then there is no v2 rbd image
4389  * with the supplied name.
4390  *
4391  * This function will record the given rbd_dev's image_id field if
4392  * it can be determined, and in that case will return 0.  If any
4393  * errors occur a negative errno will be returned and the rbd_dev's
4394  * image_id field will be unchanged (and should be NULL).
4395  */
4396 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4397 {
4398         int ret;
4399         size_t size;
4400         char *object_name;
4401         void *response;
4402         char *image_id;
4403
4404         /*
4405          * When probing a parent image, the image id is already
4406          * known (and the image name likely is not).  There's no
4407          * need to fetch the image id again in this case.  We
4408          * do still need to set the image format though.
4409          */
4410         if (rbd_dev->spec->image_id) {
4411                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4412
4413                 return 0;
4414         }
4415
4416         /*
4417          * First, see if the format 2 image id file exists, and if
4418          * so, get the image's persistent id from it.
4419          */
4420         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4421         object_name = kmalloc(size, GFP_NOIO);
4422         if (!object_name)
4423                 return -ENOMEM;
4424         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4425         dout("rbd id object name is %s\n", object_name);
4426
4427         /* Response will be an encoded string, which includes a length */
4428
4429         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4430         response = kzalloc(size, GFP_NOIO);
4431         if (!response) {
4432                 ret = -ENOMEM;
4433                 goto out;
4434         }
4435
4436         /* If it doesn't exist we'll assume it's a format 1 image */
4437
4438         ret = rbd_obj_method_sync(rbd_dev, object_name,
4439                                 "rbd", "get_id", NULL, 0,
4440                                 response, RBD_IMAGE_ID_LEN_MAX);
4441         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4442         if (ret == -ENOENT) {
4443                 image_id = kstrdup("", GFP_KERNEL);
4444                 ret = image_id ? 0 : -ENOMEM;
4445                 if (!ret)
4446                         rbd_dev->image_format = 1;
4447         } else if (ret > sizeof (__le32)) {
4448                 void *p = response;
4449
4450                 image_id = ceph_extract_encoded_string(&p, p + ret,
4451                                                 NULL, GFP_NOIO);
4452                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4453                 if (!ret)
4454                         rbd_dev->image_format = 2;
4455         } else {
4456                 ret = -EINVAL;
4457         }
4458
4459         if (!ret) {
4460                 rbd_dev->spec->image_id = image_id;
4461                 dout("image_id is %s\n", image_id);
4462         }
4463 out:
4464         kfree(response);
4465         kfree(object_name);
4466
4467         return ret;
4468 }
4469
4470 /* Undo whatever state changes are made by v1 or v2 image probe */
4471
4472 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4473 {
4474         struct rbd_image_header *header;
4475
4476         rbd_dev_remove_parent(rbd_dev);
4477         rbd_spec_put(rbd_dev->parent_spec);
4478         rbd_dev->parent_spec = NULL;
4479         rbd_dev->parent_overlap = 0;
4480
4481         /* Free dynamic fields from the header, then zero it out */
4482
4483         header = &rbd_dev->header;
4484         ceph_put_snap_context(header->snapc);
4485         kfree(header->snap_sizes);
4486         kfree(header->snap_names);
4487         kfree(header->object_prefix);
4488         memset(header, 0, sizeof (*header));
4489 }
4490
4491 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4492 {
4493         int ret;
4494
4495         /* Populate rbd image metadata */
4496
4497         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4498         if (ret < 0)
4499                 goto out_err;
4500
4501         /* Version 1 images have no parent (no layering) */
4502
4503         rbd_dev->parent_spec = NULL;
4504         rbd_dev->parent_overlap = 0;
4505
4506         dout("discovered version 1 image, header name is %s\n",
4507                 rbd_dev->header_name);
4508
4509         return 0;
4510
4511 out_err:
4512         kfree(rbd_dev->header_name);
4513         rbd_dev->header_name = NULL;
4514         kfree(rbd_dev->spec->image_id);
4515         rbd_dev->spec->image_id = NULL;
4516
4517         return ret;
4518 }
4519
4520 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4521 {
4522         int ret;
4523
4524         ret = rbd_dev_v2_image_size(rbd_dev);
4525         if (ret)
4526                 goto out_err;
4527
4528         /* Get the object prefix (a.k.a. block_name) for the image */
4529
4530         ret = rbd_dev_v2_object_prefix(rbd_dev);
4531         if (ret)
4532                 goto out_err;
4533
4534         /* Get the and check features for the image */
4535
4536         ret = rbd_dev_v2_features(rbd_dev);
4537         if (ret)
4538                 goto out_err;
4539
4540         /* If the image supports layering, get the parent info */
4541
4542         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4543                 ret = rbd_dev_v2_parent_info(rbd_dev);
4544                 if (ret)
4545                         goto out_err;
4546                 /*
4547                  * Print a warning if this image has a parent.
4548                  * Don't print it if the image now being probed
4549                  * is itself a parent.  We can tell at this point
4550                  * because we won't know its pool name yet (just its
4551                  * pool id).
4552                  */
4553                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4554                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4555                                         "is EXPERIMENTAL!");
4556         }
4557
4558         /* If the image supports fancy striping, get its parameters */
4559
4560         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4561                 ret = rbd_dev_v2_striping_info(rbd_dev);
4562                 if (ret < 0)
4563                         goto out_err;
4564         }
4565
4566         /* crypto and compression type aren't (yet) supported for v2 images */
4567
4568         rbd_dev->header.crypt_type = 0;
4569         rbd_dev->header.comp_type = 0;
4570
4571         /* Get the snapshot context, plus the header version */
4572
4573         ret = rbd_dev_v2_snap_context(rbd_dev);
4574         if (ret)
4575                 goto out_err;
4576
4577         dout("discovered version 2 image, header name is %s\n",
4578                 rbd_dev->header_name);
4579
4580         return 0;
4581 out_err:
4582         rbd_dev->parent_overlap = 0;
4583         rbd_spec_put(rbd_dev->parent_spec);
4584         rbd_dev->parent_spec = NULL;
4585         kfree(rbd_dev->header_name);
4586         rbd_dev->header_name = NULL;
4587         kfree(rbd_dev->header.object_prefix);
4588         rbd_dev->header.object_prefix = NULL;
4589
4590         return ret;
4591 }
4592
4593 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4594 {
4595         struct rbd_device *parent = NULL;
4596         struct rbd_spec *parent_spec;
4597         struct rbd_client *rbdc;
4598         int ret;
4599
4600         if (!rbd_dev->parent_spec)
4601                 return 0;
4602         /*
4603          * We need to pass a reference to the client and the parent
4604          * spec when creating the parent rbd_dev.  Images related by
4605          * parent/child relationships always share both.
4606          */
4607         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4608         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4609
4610         ret = -ENOMEM;
4611         parent = rbd_dev_create(rbdc, parent_spec);
4612         if (!parent)
4613                 goto out_err;
4614
4615         ret = rbd_dev_image_probe(parent, true);
4616         if (ret < 0)
4617                 goto out_err;
4618         rbd_dev->parent = parent;
4619
4620         return 0;
4621 out_err:
4622         if (parent) {
4623                 rbd_spec_put(rbd_dev->parent_spec);
4624                 kfree(rbd_dev->header_name);
4625                 rbd_dev_destroy(parent);
4626         } else {
4627                 rbd_put_client(rbdc);
4628                 rbd_spec_put(parent_spec);
4629         }
4630
4631         return ret;
4632 }
4633
4634 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4635 {
4636         int ret;
4637
4638         /* generate unique id: find highest unique id, add one */
4639         rbd_dev_id_get(rbd_dev);
4640
4641         /* Fill in the device name, now that we have its id. */
4642         BUILD_BUG_ON(DEV_NAME_LEN
4643                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4644         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4645
4646         /* Get our block major device number. */
4647
4648         ret = register_blkdev(0, rbd_dev->name);
4649         if (ret < 0)
4650                 goto err_out_id;
4651         rbd_dev->major = ret;
4652
4653         /* Set up the blkdev mapping. */
4654
4655         ret = rbd_init_disk(rbd_dev);
4656         if (ret)
4657                 goto err_out_blkdev;
4658
4659         ret = rbd_dev_mapping_set(rbd_dev);
4660         if (ret)
4661                 goto err_out_disk;
4662         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4663
4664         ret = rbd_bus_add_dev(rbd_dev);
4665         if (ret)
4666                 goto err_out_mapping;
4667
4668         /* Everything's ready.  Announce the disk to the world. */
4669
4670         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4671         add_disk(rbd_dev->disk);
4672
4673         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4674                 (unsigned long long) rbd_dev->mapping.size);
4675
4676         return ret;
4677
4678 err_out_mapping:
4679         rbd_dev_mapping_clear(rbd_dev);
4680 err_out_disk:
4681         rbd_free_disk(rbd_dev);
4682 err_out_blkdev:
4683         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4684 err_out_id:
4685         rbd_dev_id_put(rbd_dev);
4686         rbd_dev_mapping_clear(rbd_dev);
4687
4688         return ret;
4689 }
4690
4691 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4692 {
4693         struct rbd_spec *spec = rbd_dev->spec;
4694         size_t size;
4695
4696         /* Record the header object name for this rbd image. */
4697
4698         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4699
4700         if (rbd_dev->image_format == 1)
4701                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4702         else
4703                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4704
4705         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4706         if (!rbd_dev->header_name)
4707                 return -ENOMEM;
4708
4709         if (rbd_dev->image_format == 1)
4710                 sprintf(rbd_dev->header_name, "%s%s",
4711                         spec->image_name, RBD_SUFFIX);
4712         else
4713                 sprintf(rbd_dev->header_name, "%s%s",
4714                         RBD_HEADER_PREFIX, spec->image_id);
4715         return 0;
4716 }
4717
4718 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4719 {
4720         int ret;
4721
4722         rbd_dev_unprobe(rbd_dev);
4723         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4724         if (ret)
4725                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4726         kfree(rbd_dev->header_name);
4727         rbd_dev->header_name = NULL;
4728         rbd_dev->image_format = 0;
4729         kfree(rbd_dev->spec->image_id);
4730         rbd_dev->spec->image_id = NULL;
4731
4732         rbd_dev_destroy(rbd_dev);
4733 }
4734
4735 /*
4736  * Probe for the existence of the header object for the given rbd
4737  * device.  For format 2 images this includes determining the image
4738  * id.
4739  */
4740 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4741 {
4742         int ret;
4743         int tmp;
4744
4745         /*
4746          * Get the id from the image id object.  If it's not a
4747          * format 2 image, we'll get ENOENT back, and we'll assume
4748          * it's a format 1 image.
4749          */
4750         ret = rbd_dev_image_id(rbd_dev);
4751         if (ret)
4752                 return ret;
4753         rbd_assert(rbd_dev->spec->image_id);
4754         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4755
4756         ret = rbd_dev_header_name(rbd_dev);
4757         if (ret)
4758                 goto err_out_format;
4759
4760         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4761         if (ret)
4762                 goto out_header_name;
4763
4764         if (rbd_dev->image_format == 1)
4765                 ret = rbd_dev_v1_probe(rbd_dev);
4766         else
4767                 ret = rbd_dev_v2_probe(rbd_dev);
4768         if (ret)
4769                 goto err_out_watch;
4770
4771         ret = rbd_dev_spec_update(rbd_dev);
4772         if (ret)
4773                 goto err_out_probe;
4774
4775         /* If we are mapping a snapshot it must be marked read-only */
4776
4777         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4778                 read_only = true;
4779         rbd_dev->mapping.read_only = read_only;
4780
4781         ret = rbd_dev_probe_parent(rbd_dev);
4782         if (!ret)
4783                 return 0;
4784
4785 err_out_probe:
4786         rbd_dev_unprobe(rbd_dev);
4787 err_out_watch:
4788         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4789         if (tmp)
4790                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4791 out_header_name:
4792         kfree(rbd_dev->header_name);
4793         rbd_dev->header_name = NULL;
4794 err_out_format:
4795         rbd_dev->image_format = 0;
4796         kfree(rbd_dev->spec->image_id);
4797         rbd_dev->spec->image_id = NULL;
4798
4799         dout("probe failed, returning %d\n", ret);
4800
4801         return ret;
4802 }
4803
4804 static ssize_t rbd_add(struct bus_type *bus,
4805                        const char *buf,
4806                        size_t count)
4807 {
4808         struct rbd_device *rbd_dev = NULL;
4809         struct ceph_options *ceph_opts = NULL;
4810         struct rbd_options *rbd_opts = NULL;
4811         struct rbd_spec *spec = NULL;
4812         struct rbd_client *rbdc;
4813         struct ceph_osd_client *osdc;
4814         bool read_only;
4815         int rc = -ENOMEM;
4816
4817         if (!try_module_get(THIS_MODULE))
4818                 return -ENODEV;
4819
4820         /* parse add command */
4821         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4822         if (rc < 0)
4823                 goto err_out_module;
4824         read_only = rbd_opts->read_only;
4825         kfree(rbd_opts);
4826         rbd_opts = NULL;        /* done with this */
4827
4828         rbdc = rbd_get_client(ceph_opts);
4829         if (IS_ERR(rbdc)) {
4830                 rc = PTR_ERR(rbdc);
4831                 goto err_out_args;
4832         }
4833         ceph_opts = NULL;       /* rbd_dev client now owns this */
4834
4835         /* pick the pool */
4836         osdc = &rbdc->client->osdc;
4837         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4838         if (rc < 0)
4839                 goto err_out_client;
4840         spec->pool_id = (u64)rc;
4841
4842         /* The ceph file layout needs to fit pool id in 32 bits */
4843
4844         if (spec->pool_id > (u64)U32_MAX) {
4845                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4846                                 (unsigned long long)spec->pool_id, U32_MAX);
4847                 rc = -EIO;
4848                 goto err_out_client;
4849         }
4850
4851         rbd_dev = rbd_dev_create(rbdc, spec);
4852         if (!rbd_dev)
4853                 goto err_out_client;
4854         rbdc = NULL;            /* rbd_dev now owns this */
4855         spec = NULL;            /* rbd_dev now owns this */
4856
4857         rc = rbd_dev_image_probe(rbd_dev, read_only);
4858         if (rc < 0)
4859                 goto err_out_rbd_dev;
4860
4861         rc = rbd_dev_device_setup(rbd_dev);
4862         if (!rc)
4863                 return count;
4864
4865         rbd_dev_image_release(rbd_dev);
4866 err_out_rbd_dev:
4867         rbd_dev_destroy(rbd_dev);
4868 err_out_client:
4869         rbd_put_client(rbdc);
4870 err_out_args:
4871         if (ceph_opts)
4872                 ceph_destroy_options(ceph_opts);
4873         kfree(rbd_opts);
4874         rbd_spec_put(spec);
4875 err_out_module:
4876         module_put(THIS_MODULE);
4877
4878         dout("Error adding device %s\n", buf);
4879
4880         return (ssize_t)rc;
4881 }
4882
4883 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4884 {
4885         struct list_head *tmp;
4886         struct rbd_device *rbd_dev;
4887
4888         spin_lock(&rbd_dev_list_lock);
4889         list_for_each(tmp, &rbd_dev_list) {
4890                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4891                 if (rbd_dev->dev_id == dev_id) {
4892                         spin_unlock(&rbd_dev_list_lock);
4893                         return rbd_dev;
4894                 }
4895         }
4896         spin_unlock(&rbd_dev_list_lock);
4897         return NULL;
4898 }
4899
4900 static void rbd_dev_device_release(struct device *dev)
4901 {
4902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4903
4904         rbd_free_disk(rbd_dev);
4905         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4906         rbd_dev_mapping_clear(rbd_dev);
4907         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4908         rbd_dev->major = 0;
4909         rbd_dev_id_put(rbd_dev);
4910         rbd_dev_mapping_clear(rbd_dev);
4911 }
4912
4913 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4914 {
4915         while (rbd_dev->parent) {
4916                 struct rbd_device *first = rbd_dev;
4917                 struct rbd_device *second = first->parent;
4918                 struct rbd_device *third;
4919
4920                 /*
4921                  * Follow to the parent with no grandparent and
4922                  * remove it.
4923                  */
4924                 while (second && (third = second->parent)) {
4925                         first = second;
4926                         second = third;
4927                 }
4928                 rbd_assert(second);
4929                 rbd_dev_image_release(second);
4930                 first->parent = NULL;
4931                 first->parent_overlap = 0;
4932
4933                 rbd_assert(first->parent_spec);
4934                 rbd_spec_put(first->parent_spec);
4935                 first->parent_spec = NULL;
4936         }
4937 }
4938
4939 static ssize_t rbd_remove(struct bus_type *bus,
4940                           const char *buf,
4941                           size_t count)
4942 {
4943         struct rbd_device *rbd_dev = NULL;
4944         int target_id;
4945         unsigned long ul;
4946         int ret;
4947
4948         ret = strict_strtoul(buf, 10, &ul);
4949         if (ret)
4950                 return ret;
4951
4952         /* convert to int; abort if we lost anything in the conversion */
4953         target_id = (int) ul;
4954         if (target_id != ul)
4955                 return -EINVAL;
4956
4957         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4958
4959         rbd_dev = __rbd_get_dev(target_id);
4960         if (!rbd_dev) {
4961                 ret = -ENOENT;
4962                 goto done;
4963         }
4964
4965         spin_lock_irq(&rbd_dev->lock);
4966         if (rbd_dev->open_count)
4967                 ret = -EBUSY;
4968         else
4969                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4970         spin_unlock_irq(&rbd_dev->lock);
4971         if (ret < 0)
4972                 goto done;
4973         ret = count;
4974         rbd_bus_del_dev(rbd_dev);
4975         rbd_dev_image_release(rbd_dev);
4976         module_put(THIS_MODULE);
4977 done:
4978         mutex_unlock(&ctl_mutex);
4979
4980         return ret;
4981 }
4982
4983 /*
4984  * create control files in sysfs
4985  * /sys/bus/rbd/...
4986  */
4987 static int rbd_sysfs_init(void)
4988 {
4989         int ret;
4990
4991         ret = device_register(&rbd_root_dev);
4992         if (ret < 0)
4993                 return ret;
4994
4995         ret = bus_register(&rbd_bus_type);
4996         if (ret < 0)
4997                 device_unregister(&rbd_root_dev);
4998
4999         return ret;
5000 }
5001
5002 static void rbd_sysfs_cleanup(void)
5003 {
5004         bus_unregister(&rbd_bus_type);
5005         device_unregister(&rbd_root_dev);
5006 }
5007
5008 static int rbd_slab_init(void)
5009 {
5010         rbd_assert(!rbd_img_request_cache);
5011         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012                                         sizeof (struct rbd_img_request),
5013                                         __alignof__(struct rbd_img_request),
5014                                         0, NULL);
5015         if (!rbd_img_request_cache)
5016                 return -ENOMEM;
5017
5018         rbd_assert(!rbd_obj_request_cache);
5019         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020                                         sizeof (struct rbd_obj_request),
5021                                         __alignof__(struct rbd_obj_request),
5022                                         0, NULL);
5023         if (!rbd_obj_request_cache)
5024                 goto out_err;
5025
5026         rbd_assert(!rbd_segment_name_cache);
5027         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029         if (rbd_segment_name_cache)
5030                 return 0;
5031 out_err:
5032         if (rbd_obj_request_cache) {
5033                 kmem_cache_destroy(rbd_obj_request_cache);
5034                 rbd_obj_request_cache = NULL;
5035         }
5036
5037         kmem_cache_destroy(rbd_img_request_cache);
5038         rbd_img_request_cache = NULL;
5039
5040         return -ENOMEM;
5041 }
5042
5043 static void rbd_slab_exit(void)
5044 {
5045         rbd_assert(rbd_segment_name_cache);
5046         kmem_cache_destroy(rbd_segment_name_cache);
5047         rbd_segment_name_cache = NULL;
5048
5049         rbd_assert(rbd_obj_request_cache);
5050         kmem_cache_destroy(rbd_obj_request_cache);
5051         rbd_obj_request_cache = NULL;
5052
5053         rbd_assert(rbd_img_request_cache);
5054         kmem_cache_destroy(rbd_img_request_cache);
5055         rbd_img_request_cache = NULL;
5056 }
5057
5058 static int __init rbd_init(void)
5059 {
5060         int rc;
5061
5062         if (!libceph_compatible(NULL)) {
5063                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5064
5065                 return -EINVAL;
5066         }
5067         rc = rbd_slab_init();
5068         if (rc)
5069                 return rc;
5070         rc = rbd_sysfs_init();
5071         if (rc)
5072                 rbd_slab_exit();
5073         else
5074                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5075
5076         return rc;
5077 }
5078
5079 static void __exit rbd_exit(void)
5080 {
5081         rbd_sysfs_cleanup();
5082         rbd_slab_exit();
5083 }
5084
5085 module_init(rbd_init);
5086 module_exit(rbd_exit);
5087
5088 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090 MODULE_DESCRIPTION("rados block device");
5091
5092 /* following authorship retained from original osdblk.c */
5093 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5094
5095 MODULE_LICENSE("GPL");