]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
2a0e9b81be483e1d57ac3c3facc7a457b89be2df
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431                                         u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433                                 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435                 u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476
477         return 0;
478 }
479
480 static const struct block_device_operations rbd_bd_ops = {
481         .owner                  = THIS_MODULE,
482         .open                   = rbd_open,
483         .release                = rbd_release,
484 };
485
486 /*
487  * Initialize an rbd client instance.
488  * We own *ceph_opts.
489  */
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *rbdc;
493         int ret = -ENOMEM;
494
495         dout("%s:\n", __func__);
496         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497         if (!rbdc)
498                 goto out_opt;
499
500         kref_init(&rbdc->kref);
501         INIT_LIST_HEAD(&rbdc->node);
502
503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
505         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506         if (IS_ERR(rbdc->client))
507                 goto out_mutex;
508         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509
510         ret = ceph_open_session(rbdc->client);
511         if (ret < 0)
512                 goto out_err;
513
514         spin_lock(&rbd_client_list_lock);
515         list_add_tail(&rbdc->node, &rbd_client_list);
516         spin_unlock(&rbd_client_list_lock);
517
518         mutex_unlock(&ctl_mutex);
519         dout("%s: rbdc %p\n", __func__, rbdc);
520
521         return rbdc;
522
523 out_err:
524         ceph_destroy_client(rbdc->client);
525 out_mutex:
526         mutex_unlock(&ctl_mutex);
527         kfree(rbdc);
528 out_opt:
529         if (ceph_opts)
530                 ceph_destroy_options(ceph_opts);
531         dout("%s: error %d\n", __func__, ret);
532
533         return ERR_PTR(ret);
534 }
535
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 {
538         kref_get(&rbdc->kref);
539
540         return rbdc;
541 }
542
543 /*
544  * Find a ceph client with specific addr and configuration.  If
545  * found, bump its reference count.
546  */
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 {
549         struct rbd_client *client_node;
550         bool found = false;
551
552         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553                 return NULL;
554
555         spin_lock(&rbd_client_list_lock);
556         list_for_each_entry(client_node, &rbd_client_list, node) {
557                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558                         __rbd_get_client(client_node);
559
560                         found = true;
561                         break;
562                 }
563         }
564         spin_unlock(&rbd_client_list_lock);
565
566         return found ? client_node : NULL;
567 }
568
569 /*
570  * mount options
571  */
572 enum {
573         Opt_last_int,
574         /* int args above */
575         Opt_last_string,
576         /* string args above */
577         Opt_read_only,
578         Opt_read_write,
579         /* Boolean args above */
580         Opt_last_bool,
581 };
582
583 static match_table_t rbd_opts_tokens = {
584         /* int args above */
585         /* string args above */
586         {Opt_read_only, "read_only"},
587         {Opt_read_only, "ro"},          /* Alternate spelling */
588         {Opt_read_write, "read_write"},
589         {Opt_read_write, "rw"},         /* Alternate spelling */
590         /* Boolean args above */
591         {-1, NULL}
592 };
593
594 struct rbd_options {
595         bool    read_only;
596 };
597
598 #define RBD_READ_ONLY_DEFAULT   false
599
600 static int parse_rbd_opts_token(char *c, void *private)
601 {
602         struct rbd_options *rbd_opts = private;
603         substring_t argstr[MAX_OPT_ARGS];
604         int token, intval, ret;
605
606         token = match_token(c, rbd_opts_tokens, argstr);
607         if (token < 0)
608                 return -EINVAL;
609
610         if (token < Opt_last_int) {
611                 ret = match_int(&argstr[0], &intval);
612                 if (ret < 0) {
613                         pr_err("bad mount option arg (not int) "
614                                "at '%s'\n", c);
615                         return ret;
616                 }
617                 dout("got int token %d val %d\n", token, intval);
618         } else if (token > Opt_last_int && token < Opt_last_string) {
619                 dout("got string token %d val %s\n", token,
620                      argstr[0].from);
621         } else if (token > Opt_last_string && token < Opt_last_bool) {
622                 dout("got Boolean token %d\n", token);
623         } else {
624                 dout("got token %d\n", token);
625         }
626
627         switch (token) {
628         case Opt_read_only:
629                 rbd_opts->read_only = true;
630                 break;
631         case Opt_read_write:
632                 rbd_opts->read_only = false;
633                 break;
634         default:
635                 rbd_assert(false);
636                 break;
637         }
638         return 0;
639 }
640
641 /*
642  * Get a ceph client with specific addr and configuration, if one does
643  * not exist create it.
644  */
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 {
647         struct rbd_client *rbdc;
648
649         rbdc = rbd_client_find(ceph_opts);
650         if (rbdc)       /* using an existing client */
651                 ceph_destroy_options(ceph_opts);
652         else
653                 rbdc = rbd_client_create(ceph_opts);
654
655         return rbdc;
656 }
657
658 /*
659  * Destroy ceph client
660  *
661  * Caller must hold rbd_client_list_lock.
662  */
663 static void rbd_client_release(struct kref *kref)
664 {
665         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
667         dout("%s: rbdc %p\n", __func__, rbdc);
668         spin_lock(&rbd_client_list_lock);
669         list_del(&rbdc->node);
670         spin_unlock(&rbd_client_list_lock);
671
672         ceph_destroy_client(rbdc->client);
673         kfree(rbdc);
674 }
675
676 /*
677  * Drop reference to ceph client node. If it's not referenced anymore, release
678  * it.
679  */
680 static void rbd_put_client(struct rbd_client *rbdc)
681 {
682         if (rbdc)
683                 kref_put(&rbdc->kref, rbd_client_release);
684 }
685
686 static bool rbd_image_format_valid(u32 image_format)
687 {
688         return image_format == 1 || image_format == 2;
689 }
690
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692 {
693         size_t size;
694         u32 snap_count;
695
696         /* The header has to start with the magic rbd header text */
697         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698                 return false;
699
700         /* The bio layer requires at least sector-sized I/O */
701
702         if (ondisk->options.order < SECTOR_SHIFT)
703                 return false;
704
705         /* If we use u64 in a few spots we may be able to loosen this */
706
707         if (ondisk->options.order > 8 * sizeof (int) - 1)
708                 return false;
709
710         /*
711          * The size of a snapshot header has to fit in a size_t, and
712          * that limits the number of snapshots.
713          */
714         snap_count = le32_to_cpu(ondisk->snap_count);
715         size = SIZE_MAX - sizeof (struct ceph_snap_context);
716         if (snap_count > size / sizeof (__le64))
717                 return false;
718
719         /*
720          * Not only that, but the size of the entire the snapshot
721          * header must also be representable in a size_t.
722          */
723         size -= snap_count * sizeof (__le64);
724         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725                 return false;
726
727         return true;
728 }
729
730 /*
731  * Fill an rbd image header with information from the given format 1
732  * on-disk header.
733  */
734 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
735                                  struct rbd_image_header_ondisk *ondisk)
736 {
737         struct rbd_image_header *header = &rbd_dev->header;
738         bool first_time = header->object_prefix == NULL;
739         struct ceph_snap_context *snapc;
740         char *object_prefix = NULL;
741         char *snap_names = NULL;
742         u64 *snap_sizes = NULL;
743         u32 snap_count;
744         size_t size;
745         int ret = -ENOMEM;
746         u32 i;
747
748         /* Allocate this now to avoid having to handle failure below */
749
750         if (first_time) {
751                 size_t len;
752
753                 len = strnlen(ondisk->object_prefix,
754                                 sizeof (ondisk->object_prefix));
755                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
756                 if (!object_prefix)
757                         return -ENOMEM;
758                 memcpy(object_prefix, ondisk->object_prefix, len);
759                 object_prefix[len] = '\0';
760         }
761
762         /* Allocate the snapshot context and fill it in */
763
764         snap_count = le32_to_cpu(ondisk->snap_count);
765         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
766         if (!snapc)
767                 goto out_err;
768         snapc->seq = le64_to_cpu(ondisk->snap_seq);
769         if (snap_count) {
770                 struct rbd_image_snap_ondisk *snaps;
771                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
772
773                 /* We'll keep a copy of the snapshot names... */
774
775                 if (snap_names_len > (u64)SIZE_MAX)
776                         goto out_2big;
777                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
778                 if (!snap_names)
779                         goto out_err;
780
781                 /* ...as well as the array of their sizes. */
782
783                 size = snap_count * sizeof (*header->snap_sizes);
784                 snap_sizes = kmalloc(size, GFP_KERNEL);
785                 if (!snap_sizes)
786                         goto out_err;
787
788                 /*
789                  * Copy the names, and fill in each snapshot's id
790                  * and size.
791                  *
792                  * Note that rbd_dev_v1_header_info() guarantees the
793                  * ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798                 snaps = ondisk->snaps;
799                 for (i = 0; i < snap_count; i++) {
800                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
802                 }
803         }
804
805         /* We won't fail any more, fill in the header */
806
807         down_write(&rbd_dev->header_rwsem);
808         if (first_time) {
809                 header->object_prefix = object_prefix;
810                 header->obj_order = ondisk->options.order;
811                 header->crypt_type = ondisk->options.crypt_type;
812                 header->comp_type = ondisk->options.comp_type;
813                 /* The rest aren't used for format 1 images */
814                 header->stripe_unit = 0;
815                 header->stripe_count = 0;
816                 header->features = 0;
817         } else {
818                 ceph_put_snap_context(header->snapc);
819                 kfree(header->snap_names);
820                 kfree(header->snap_sizes);
821         }
822
823         /* The remaining fields always get updated (when we refresh) */
824
825         header->image_size = le64_to_cpu(ondisk->image_size);
826         header->snapc = snapc;
827         header->snap_names = snap_names;
828         header->snap_sizes = snap_sizes;
829
830         /* Make sure mapping size is consistent with header info */
831
832         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833                 if (rbd_dev->mapping.size != header->image_size)
834                         rbd_dev->mapping.size = header->image_size;
835
836         up_write(&rbd_dev->header_rwsem);
837
838         return 0;
839 out_2big:
840         ret = -EIO;
841 out_err:
842         kfree(snap_sizes);
843         kfree(snap_names);
844         ceph_put_snap_context(snapc);
845         kfree(object_prefix);
846
847         return ret;
848 }
849
850 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
851 {
852         const char *snap_name;
853
854         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
855
856         /* Skip over names until we find the one we are looking for */
857
858         snap_name = rbd_dev->header.snap_names;
859         while (which--)
860                 snap_name += strlen(snap_name) + 1;
861
862         return kstrdup(snap_name, GFP_KERNEL);
863 }
864
865 /*
866  * Snapshot id comparison function for use with qsort()/bsearch().
867  * Note that result is for snapshots in *descending* order.
868  */
869 static int snapid_compare_reverse(const void *s1, const void *s2)
870 {
871         u64 snap_id1 = *(u64 *)s1;
872         u64 snap_id2 = *(u64 *)s2;
873
874         if (snap_id1 < snap_id2)
875                 return 1;
876         return snap_id1 == snap_id2 ? 0 : -1;
877 }
878
879 /*
880  * Search a snapshot context to see if the given snapshot id is
881  * present.
882  *
883  * Returns the position of the snapshot id in the array if it's found,
884  * or BAD_SNAP_INDEX otherwise.
885  *
886  * Note: The snapshot array is in kept sorted (by the osd) in
887  * reverse order, highest snapshot id first.
888  */
889 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
890 {
891         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
892         u64 *found;
893
894         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895                                 sizeof (snap_id), snapid_compare_reverse);
896
897         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
898 }
899
900 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
901                                         u64 snap_id)
902 {
903         u32 which;
904
905         which = rbd_dev_snap_index(rbd_dev, snap_id);
906         if (which == BAD_SNAP_INDEX)
907                 return NULL;
908
909         return _rbd_dev_v1_snap_name(rbd_dev, which);
910 }
911
912 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
913 {
914         if (snap_id == CEPH_NOSNAP)
915                 return RBD_SNAP_HEAD_NAME;
916
917         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918         if (rbd_dev->image_format == 1)
919                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
920
921         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
922 }
923
924 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
925                                 u64 *snap_size)
926 {
927         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928         if (snap_id == CEPH_NOSNAP) {
929                 *snap_size = rbd_dev->header.image_size;
930         } else if (rbd_dev->image_format == 1) {
931                 u32 which;
932
933                 which = rbd_dev_snap_index(rbd_dev, snap_id);
934                 if (which == BAD_SNAP_INDEX)
935                         return -ENOENT;
936
937                 *snap_size = rbd_dev->header.snap_sizes[which];
938         } else {
939                 u64 size = 0;
940                 int ret;
941
942                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
943                 if (ret)
944                         return ret;
945
946                 *snap_size = size;
947         }
948         return 0;
949 }
950
951 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
952                         u64 *snap_features)
953 {
954         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955         if (snap_id == CEPH_NOSNAP) {
956                 *snap_features = rbd_dev->header.features;
957         } else if (rbd_dev->image_format == 1) {
958                 *snap_features = 0;     /* No features for format 1 */
959         } else {
960                 u64 features = 0;
961                 int ret;
962
963                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
964                 if (ret)
965                         return ret;
966
967                 *snap_features = features;
968         }
969         return 0;
970 }
971
972 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
973 {
974         u64 snap_id = rbd_dev->spec->snap_id;
975         u64 size = 0;
976         u64 features = 0;
977         int ret;
978
979         ret = rbd_snap_size(rbd_dev, snap_id, &size);
980         if (ret)
981                 return ret;
982         ret = rbd_snap_features(rbd_dev, snap_id, &features);
983         if (ret)
984                 return ret;
985
986         rbd_dev->mapping.size = size;
987         rbd_dev->mapping.features = features;
988
989         return 0;
990 }
991
992 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
993 {
994         rbd_dev->mapping.size = 0;
995         rbd_dev->mapping.features = 0;
996 }
997
998 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
999 {
1000         char *name;
1001         u64 segment;
1002         int ret;
1003
1004         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1005         if (!name)
1006                 return NULL;
1007         segment = offset >> rbd_dev->header.obj_order;
1008         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1009                         rbd_dev->header.object_prefix, segment);
1010         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1011                 pr_err("error formatting segment name for #%llu (%d)\n",
1012                         segment, ret);
1013                 kfree(name);
1014                 name = NULL;
1015         }
1016
1017         return name;
1018 }
1019
1020 static void rbd_segment_name_free(const char *name)
1021 {
1022         /* The explicit cast here is needed to drop the const qualifier */
1023
1024         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1025 }
1026
1027 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1028 {
1029         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1030
1031         return offset & (segment_size - 1);
1032 }
1033
1034 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035                                 u64 offset, u64 length)
1036 {
1037         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1038
1039         offset &= segment_size - 1;
1040
1041         rbd_assert(length <= U64_MAX - offset);
1042         if (offset + length > segment_size)
1043                 length = segment_size - offset;
1044
1045         return length;
1046 }
1047
1048 /*
1049  * returns the size of an object in the image
1050  */
1051 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1052 {
1053         return 1 << header->obj_order;
1054 }
1055
1056 /*
1057  * bio helpers
1058  */
1059
1060 static void bio_chain_put(struct bio *chain)
1061 {
1062         struct bio *tmp;
1063
1064         while (chain) {
1065                 tmp = chain;
1066                 chain = chain->bi_next;
1067                 bio_put(tmp);
1068         }
1069 }
1070
1071 /*
1072  * zeros a bio chain, starting at specific offset
1073  */
1074 static void zero_bio_chain(struct bio *chain, int start_ofs)
1075 {
1076         struct bio_vec *bv;
1077         unsigned long flags;
1078         void *buf;
1079         int i;
1080         int pos = 0;
1081
1082         while (chain) {
1083                 bio_for_each_segment(bv, chain, i) {
1084                         if (pos + bv->bv_len > start_ofs) {
1085                                 int remainder = max(start_ofs - pos, 0);
1086                                 buf = bvec_kmap_irq(bv, &flags);
1087                                 memset(buf + remainder, 0,
1088                                        bv->bv_len - remainder);
1089                                 bvec_kunmap_irq(buf, &flags);
1090                         }
1091                         pos += bv->bv_len;
1092                 }
1093
1094                 chain = chain->bi_next;
1095         }
1096 }
1097
1098 /*
1099  * similar to zero_bio_chain(), zeros data defined by a page array,
1100  * starting at the given byte offset from the start of the array and
1101  * continuing up to the given end offset.  The pages array is
1102  * assumed to be big enough to hold all bytes up to the end.
1103  */
1104 static void zero_pages(struct page **pages, u64 offset, u64 end)
1105 {
1106         struct page **page = &pages[offset >> PAGE_SHIFT];
1107
1108         rbd_assert(end > offset);
1109         rbd_assert(end - offset <= (u64)SIZE_MAX);
1110         while (offset < end) {
1111                 size_t page_offset;
1112                 size_t length;
1113                 unsigned long flags;
1114                 void *kaddr;
1115
1116                 page_offset = (size_t)(offset & ~PAGE_MASK);
1117                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118                 local_irq_save(flags);
1119                 kaddr = kmap_atomic(*page);
1120                 memset(kaddr + page_offset, 0, length);
1121                 kunmap_atomic(kaddr);
1122                 local_irq_restore(flags);
1123
1124                 offset += length;
1125                 page++;
1126         }
1127 }
1128
1129 /*
1130  * Clone a portion of a bio, starting at the given byte offset
1131  * and continuing for the number of bytes indicated.
1132  */
1133 static struct bio *bio_clone_range(struct bio *bio_src,
1134                                         unsigned int offset,
1135                                         unsigned int len,
1136                                         gfp_t gfpmask)
1137 {
1138         struct bio_vec *bv;
1139         unsigned int resid;
1140         unsigned short idx;
1141         unsigned int voff;
1142         unsigned short end_idx;
1143         unsigned short vcnt;
1144         struct bio *bio;
1145
1146         /* Handle the easy case for the caller */
1147
1148         if (!offset && len == bio_src->bi_size)
1149                 return bio_clone(bio_src, gfpmask);
1150
1151         if (WARN_ON_ONCE(!len))
1152                 return NULL;
1153         if (WARN_ON_ONCE(len > bio_src->bi_size))
1154                 return NULL;
1155         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1156                 return NULL;
1157
1158         /* Find first affected segment... */
1159
1160         resid = offset;
1161         __bio_for_each_segment(bv, bio_src, idx, 0) {
1162                 if (resid < bv->bv_len)
1163                         break;
1164                 resid -= bv->bv_len;
1165         }
1166         voff = resid;
1167
1168         /* ...and the last affected segment */
1169
1170         resid += len;
1171         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172                 if (resid <= bv->bv_len)
1173                         break;
1174                 resid -= bv->bv_len;
1175         }
1176         vcnt = end_idx - idx + 1;
1177
1178         /* Build the clone */
1179
1180         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1181         if (!bio)
1182                 return NULL;    /* ENOMEM */
1183
1184         bio->bi_bdev = bio_src->bi_bdev;
1185         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186         bio->bi_rw = bio_src->bi_rw;
1187         bio->bi_flags |= 1 << BIO_CLONED;
1188
1189         /*
1190          * Copy over our part of the bio_vec, then update the first
1191          * and last (or only) entries.
1192          */
1193         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194                         vcnt * sizeof (struct bio_vec));
1195         bio->bi_io_vec[0].bv_offset += voff;
1196         if (vcnt > 1) {
1197                 bio->bi_io_vec[0].bv_len -= voff;
1198                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1199         } else {
1200                 bio->bi_io_vec[0].bv_len = len;
1201         }
1202
1203         bio->bi_vcnt = vcnt;
1204         bio->bi_size = len;
1205         bio->bi_idx = 0;
1206
1207         return bio;
1208 }
1209
1210 /*
1211  * Clone a portion of a bio chain, starting at the given byte offset
1212  * into the first bio in the source chain and continuing for the
1213  * number of bytes indicated.  The result is another bio chain of
1214  * exactly the given length, or a null pointer on error.
1215  *
1216  * The bio_src and offset parameters are both in-out.  On entry they
1217  * refer to the first source bio and the offset into that bio where
1218  * the start of data to be cloned is located.
1219  *
1220  * On return, bio_src is updated to refer to the bio in the source
1221  * chain that contains first un-cloned byte, and *offset will
1222  * contain the offset of that byte within that bio.
1223  */
1224 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225                                         unsigned int *offset,
1226                                         unsigned int len,
1227                                         gfp_t gfpmask)
1228 {
1229         struct bio *bi = *bio_src;
1230         unsigned int off = *offset;
1231         struct bio *chain = NULL;
1232         struct bio **end;
1233
1234         /* Build up a chain of clone bios up to the limit */
1235
1236         if (!bi || off >= bi->bi_size || !len)
1237                 return NULL;            /* Nothing to clone */
1238
1239         end = &chain;
1240         while (len) {
1241                 unsigned int bi_size;
1242                 struct bio *bio;
1243
1244                 if (!bi) {
1245                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1246                         goto out_err;   /* EINVAL; ran out of bio's */
1247                 }
1248                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1250                 if (!bio)
1251                         goto out_err;   /* ENOMEM */
1252
1253                 *end = bio;
1254                 end = &bio->bi_next;
1255
1256                 off += bi_size;
1257                 if (off == bi->bi_size) {
1258                         bi = bi->bi_next;
1259                         off = 0;
1260                 }
1261                 len -= bi_size;
1262         }
1263         *bio_src = bi;
1264         *offset = off;
1265
1266         return chain;
1267 out_err:
1268         bio_chain_put(chain);
1269
1270         return NULL;
1271 }
1272
1273 /*
1274  * The default/initial value for all object request flags is 0.  For
1275  * each flag, once its value is set to 1 it is never reset to 0
1276  * again.
1277  */
1278 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1279 {
1280         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1281                 struct rbd_device *rbd_dev;
1282
1283                 rbd_dev = obj_request->img_request->rbd_dev;
1284                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1285                         obj_request);
1286         }
1287 }
1288
1289 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1290 {
1291         smp_mb();
1292         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1293 }
1294
1295 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1296 {
1297         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298                 struct rbd_device *rbd_dev = NULL;
1299
1300                 if (obj_request_img_data_test(obj_request))
1301                         rbd_dev = obj_request->img_request->rbd_dev;
1302                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1303                         obj_request);
1304         }
1305 }
1306
1307 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1308 {
1309         smp_mb();
1310         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1311 }
1312
1313 /*
1314  * This sets the KNOWN flag after (possibly) setting the EXISTS
1315  * flag.  The latter is set based on the "exists" value provided.
1316  *
1317  * Note that for our purposes once an object exists it never goes
1318  * away again.  It's possible that the response from two existence
1319  * checks are separated by the creation of the target object, and
1320  * the first ("doesn't exist") response arrives *after* the second
1321  * ("does exist").  In that case we ignore the second one.
1322  */
1323 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1324                                 bool exists)
1325 {
1326         if (exists)
1327                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1329         smp_mb();
1330 }
1331
1332 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1333 {
1334         smp_mb();
1335         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1336 }
1337
1338 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1339 {
1340         smp_mb();
1341         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1342 }
1343
1344 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1345 {
1346         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347                 atomic_read(&obj_request->kref.refcount));
1348         kref_get(&obj_request->kref);
1349 }
1350
1351 static void rbd_obj_request_destroy(struct kref *kref);
1352 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1353 {
1354         rbd_assert(obj_request != NULL);
1355         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356                 atomic_read(&obj_request->kref.refcount));
1357         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1358 }
1359
1360 static void rbd_img_request_get(struct rbd_img_request *img_request)
1361 {
1362         dout("%s: img %p (was %d)\n", __func__, img_request,
1363                 atomic_read(&img_request->kref.refcount));
1364         kref_get(&img_request->kref);
1365 }
1366
1367 static void rbd_img_request_destroy(struct kref *kref);
1368 static void rbd_img_request_put(struct rbd_img_request *img_request)
1369 {
1370         rbd_assert(img_request != NULL);
1371         dout("%s: img %p (was %d)\n", __func__, img_request,
1372                 atomic_read(&img_request->kref.refcount));
1373         kref_put(&img_request->kref, rbd_img_request_destroy);
1374 }
1375
1376 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1377                                         struct rbd_obj_request *obj_request)
1378 {
1379         rbd_assert(obj_request->img_request == NULL);
1380
1381         /* Image request now owns object's original reference */
1382         obj_request->img_request = img_request;
1383         obj_request->which = img_request->obj_request_count;
1384         rbd_assert(!obj_request_img_data_test(obj_request));
1385         obj_request_img_data_set(obj_request);
1386         rbd_assert(obj_request->which != BAD_WHICH);
1387         img_request->obj_request_count++;
1388         list_add_tail(&obj_request->links, &img_request->obj_requests);
1389         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1390                 obj_request->which);
1391 }
1392
1393 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1394                                         struct rbd_obj_request *obj_request)
1395 {
1396         rbd_assert(obj_request->which != BAD_WHICH);
1397
1398         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1399                 obj_request->which);
1400         list_del(&obj_request->links);
1401         rbd_assert(img_request->obj_request_count > 0);
1402         img_request->obj_request_count--;
1403         rbd_assert(obj_request->which == img_request->obj_request_count);
1404         obj_request->which = BAD_WHICH;
1405         rbd_assert(obj_request_img_data_test(obj_request));
1406         rbd_assert(obj_request->img_request == img_request);
1407         obj_request->img_request = NULL;
1408         obj_request->callback = NULL;
1409         rbd_obj_request_put(obj_request);
1410 }
1411
1412 static bool obj_request_type_valid(enum obj_request_type type)
1413 {
1414         switch (type) {
1415         case OBJ_REQUEST_NODATA:
1416         case OBJ_REQUEST_BIO:
1417         case OBJ_REQUEST_PAGES:
1418                 return true;
1419         default:
1420                 return false;
1421         }
1422 }
1423
1424 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1425                                 struct rbd_obj_request *obj_request)
1426 {
1427         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1428
1429         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1430 }
1431
1432 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1433 {
1434
1435         dout("%s: img %p\n", __func__, img_request);
1436
1437         /*
1438          * If no error occurred, compute the aggregate transfer
1439          * count for the image request.  We could instead use
1440          * atomic64_cmpxchg() to update it as each object request
1441          * completes; not clear which way is better off hand.
1442          */
1443         if (!img_request->result) {
1444                 struct rbd_obj_request *obj_request;
1445                 u64 xferred = 0;
1446
1447                 for_each_obj_request(img_request, obj_request)
1448                         xferred += obj_request->xferred;
1449                 img_request->xferred = xferred;
1450         }
1451
1452         if (img_request->callback)
1453                 img_request->callback(img_request);
1454         else
1455                 rbd_img_request_put(img_request);
1456 }
1457
1458 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1459
1460 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1461 {
1462         dout("%s: obj %p\n", __func__, obj_request);
1463
1464         return wait_for_completion_interruptible(&obj_request->completion);
1465 }
1466
1467 /*
1468  * The default/initial value for all image request flags is 0.  Each
1469  * is conditionally set to 1 at image request initialization time
1470  * and currently never change thereafter.
1471  */
1472 static void img_request_write_set(struct rbd_img_request *img_request)
1473 {
1474         set_bit(IMG_REQ_WRITE, &img_request->flags);
1475         smp_mb();
1476 }
1477
1478 static bool img_request_write_test(struct rbd_img_request *img_request)
1479 {
1480         smp_mb();
1481         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1482 }
1483
1484 static void img_request_child_set(struct rbd_img_request *img_request)
1485 {
1486         set_bit(IMG_REQ_CHILD, &img_request->flags);
1487         smp_mb();
1488 }
1489
1490 static bool img_request_child_test(struct rbd_img_request *img_request)
1491 {
1492         smp_mb();
1493         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1494 }
1495
1496 static void img_request_layered_set(struct rbd_img_request *img_request)
1497 {
1498         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1499         smp_mb();
1500 }
1501
1502 static bool img_request_layered_test(struct rbd_img_request *img_request)
1503 {
1504         smp_mb();
1505         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1506 }
1507
1508 static void
1509 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1510 {
1511         u64 xferred = obj_request->xferred;
1512         u64 length = obj_request->length;
1513
1514         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1515                 obj_request, obj_request->img_request, obj_request->result,
1516                 xferred, length);
1517         /*
1518          * ENOENT means a hole in the image.  We zero-fill the
1519          * entire length of the request.  A short read also implies
1520          * zero-fill to the end of the request.  Either way we
1521          * update the xferred count to indicate the whole request
1522          * was satisfied.
1523          */
1524         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1525         if (obj_request->result == -ENOENT) {
1526                 if (obj_request->type == OBJ_REQUEST_BIO)
1527                         zero_bio_chain(obj_request->bio_list, 0);
1528                 else
1529                         zero_pages(obj_request->pages, 0, length);
1530                 obj_request->result = 0;
1531                 obj_request->xferred = length;
1532         } else if (xferred < length && !obj_request->result) {
1533                 if (obj_request->type == OBJ_REQUEST_BIO)
1534                         zero_bio_chain(obj_request->bio_list, xferred);
1535                 else
1536                         zero_pages(obj_request->pages, xferred, length);
1537                 obj_request->xferred = length;
1538         }
1539         obj_request_done_set(obj_request);
1540 }
1541
1542 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1543 {
1544         dout("%s: obj %p cb %p\n", __func__, obj_request,
1545                 obj_request->callback);
1546         if (obj_request->callback)
1547                 obj_request->callback(obj_request);
1548         else
1549                 complete_all(&obj_request->completion);
1550 }
1551
1552 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1553 {
1554         dout("%s: obj %p\n", __func__, obj_request);
1555         obj_request_done_set(obj_request);
1556 }
1557
1558 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1559 {
1560         struct rbd_img_request *img_request = NULL;
1561         struct rbd_device *rbd_dev = NULL;
1562         bool layered = false;
1563
1564         if (obj_request_img_data_test(obj_request)) {
1565                 img_request = obj_request->img_request;
1566                 layered = img_request && img_request_layered_test(img_request);
1567                 rbd_dev = img_request->rbd_dev;
1568         }
1569
1570         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1571                 obj_request, img_request, obj_request->result,
1572                 obj_request->xferred, obj_request->length);
1573         if (layered && obj_request->result == -ENOENT &&
1574                         obj_request->img_offset < rbd_dev->parent_overlap)
1575                 rbd_img_parent_read(obj_request);
1576         else if (img_request)
1577                 rbd_img_obj_request_read_callback(obj_request);
1578         else
1579                 obj_request_done_set(obj_request);
1580 }
1581
1582 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1583 {
1584         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1585                 obj_request->result, obj_request->length);
1586         /*
1587          * There is no such thing as a successful short write.  Set
1588          * it to our originally-requested length.
1589          */
1590         obj_request->xferred = obj_request->length;
1591         obj_request_done_set(obj_request);
1592 }
1593
1594 /*
1595  * For a simple stat call there's nothing to do.  We'll do more if
1596  * this is part of a write sequence for a layered image.
1597  */
1598 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1599 {
1600         dout("%s: obj %p\n", __func__, obj_request);
1601         obj_request_done_set(obj_request);
1602 }
1603
1604 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1605                                 struct ceph_msg *msg)
1606 {
1607         struct rbd_obj_request *obj_request = osd_req->r_priv;
1608         u16 opcode;
1609
1610         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1611         rbd_assert(osd_req == obj_request->osd_req);
1612         if (obj_request_img_data_test(obj_request)) {
1613                 rbd_assert(obj_request->img_request);
1614                 rbd_assert(obj_request->which != BAD_WHICH);
1615         } else {
1616                 rbd_assert(obj_request->which == BAD_WHICH);
1617         }
1618
1619         if (osd_req->r_result < 0)
1620                 obj_request->result = osd_req->r_result;
1621
1622         BUG_ON(osd_req->r_num_ops > 2);
1623
1624         /*
1625          * We support a 64-bit length, but ultimately it has to be
1626          * passed to blk_end_request(), which takes an unsigned int.
1627          */
1628         obj_request->xferred = osd_req->r_reply_op_len[0];
1629         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1630         opcode = osd_req->r_ops[0].op;
1631         switch (opcode) {
1632         case CEPH_OSD_OP_READ:
1633                 rbd_osd_read_callback(obj_request);
1634                 break;
1635         case CEPH_OSD_OP_WRITE:
1636                 rbd_osd_write_callback(obj_request);
1637                 break;
1638         case CEPH_OSD_OP_STAT:
1639                 rbd_osd_stat_callback(obj_request);
1640                 break;
1641         case CEPH_OSD_OP_CALL:
1642         case CEPH_OSD_OP_NOTIFY_ACK:
1643         case CEPH_OSD_OP_WATCH:
1644                 rbd_osd_trivial_callback(obj_request);
1645                 break;
1646         default:
1647                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1648                         obj_request->object_name, (unsigned short) opcode);
1649                 break;
1650         }
1651
1652         if (obj_request_done_test(obj_request))
1653                 rbd_obj_request_complete(obj_request);
1654 }
1655
1656 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1657 {
1658         struct rbd_img_request *img_request = obj_request->img_request;
1659         struct ceph_osd_request *osd_req = obj_request->osd_req;
1660         u64 snap_id;
1661
1662         rbd_assert(osd_req != NULL);
1663
1664         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1665         ceph_osdc_build_request(osd_req, obj_request->offset,
1666                         NULL, snap_id, NULL);
1667 }
1668
1669 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1670 {
1671         struct rbd_img_request *img_request = obj_request->img_request;
1672         struct ceph_osd_request *osd_req = obj_request->osd_req;
1673         struct ceph_snap_context *snapc;
1674         struct timespec mtime = CURRENT_TIME;
1675
1676         rbd_assert(osd_req != NULL);
1677
1678         snapc = img_request ? img_request->snapc : NULL;
1679         ceph_osdc_build_request(osd_req, obj_request->offset,
1680                         snapc, CEPH_NOSNAP, &mtime);
1681 }
1682
1683 static struct ceph_osd_request *rbd_osd_req_create(
1684                                         struct rbd_device *rbd_dev,
1685                                         bool write_request,
1686                                         struct rbd_obj_request *obj_request)
1687 {
1688         struct ceph_snap_context *snapc = NULL;
1689         struct ceph_osd_client *osdc;
1690         struct ceph_osd_request *osd_req;
1691
1692         if (obj_request_img_data_test(obj_request)) {
1693                 struct rbd_img_request *img_request = obj_request->img_request;
1694
1695                 rbd_assert(write_request ==
1696                                 img_request_write_test(img_request));
1697                 if (write_request)
1698                         snapc = img_request->snapc;
1699         }
1700
1701         /* Allocate and initialize the request, for the single op */
1702
1703         osdc = &rbd_dev->rbd_client->client->osdc;
1704         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1705         if (!osd_req)
1706                 return NULL;    /* ENOMEM */
1707
1708         if (write_request)
1709                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1710         else
1711                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1712
1713         osd_req->r_callback = rbd_osd_req_callback;
1714         osd_req->r_priv = obj_request;
1715
1716         osd_req->r_oid_len = strlen(obj_request->object_name);
1717         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1718         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1719
1720         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1721
1722         return osd_req;
1723 }
1724
1725 /*
1726  * Create a copyup osd request based on the information in the
1727  * object request supplied.  A copyup request has two osd ops,
1728  * a copyup method call, and a "normal" write request.
1729  */
1730 static struct ceph_osd_request *
1731 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1732 {
1733         struct rbd_img_request *img_request;
1734         struct ceph_snap_context *snapc;
1735         struct rbd_device *rbd_dev;
1736         struct ceph_osd_client *osdc;
1737         struct ceph_osd_request *osd_req;
1738
1739         rbd_assert(obj_request_img_data_test(obj_request));
1740         img_request = obj_request->img_request;
1741         rbd_assert(img_request);
1742         rbd_assert(img_request_write_test(img_request));
1743
1744         /* Allocate and initialize the request, for the two ops */
1745
1746         snapc = img_request->snapc;
1747         rbd_dev = img_request->rbd_dev;
1748         osdc = &rbd_dev->rbd_client->client->osdc;
1749         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1750         if (!osd_req)
1751                 return NULL;    /* ENOMEM */
1752
1753         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1754         osd_req->r_callback = rbd_osd_req_callback;
1755         osd_req->r_priv = obj_request;
1756
1757         osd_req->r_oid_len = strlen(obj_request->object_name);
1758         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1759         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1760
1761         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1762
1763         return osd_req;
1764 }
1765
1766
1767 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1768 {
1769         ceph_osdc_put_request(osd_req);
1770 }
1771
1772 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1773
1774 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1775                                                 u64 offset, u64 length,
1776                                                 enum obj_request_type type)
1777 {
1778         struct rbd_obj_request *obj_request;
1779         size_t size;
1780         char *name;
1781
1782         rbd_assert(obj_request_type_valid(type));
1783
1784         size = strlen(object_name) + 1;
1785         name = kmalloc(size, GFP_KERNEL);
1786         if (!name)
1787                 return NULL;
1788
1789         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1790         if (!obj_request) {
1791                 kfree(name);
1792                 return NULL;
1793         }
1794
1795         obj_request->object_name = memcpy(name, object_name, size);
1796         obj_request->offset = offset;
1797         obj_request->length = length;
1798         obj_request->flags = 0;
1799         obj_request->which = BAD_WHICH;
1800         obj_request->type = type;
1801         INIT_LIST_HEAD(&obj_request->links);
1802         init_completion(&obj_request->completion);
1803         kref_init(&obj_request->kref);
1804
1805         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1806                 offset, length, (int)type, obj_request);
1807
1808         return obj_request;
1809 }
1810
1811 static void rbd_obj_request_destroy(struct kref *kref)
1812 {
1813         struct rbd_obj_request *obj_request;
1814
1815         obj_request = container_of(kref, struct rbd_obj_request, kref);
1816
1817         dout("%s: obj %p\n", __func__, obj_request);
1818
1819         rbd_assert(obj_request->img_request == NULL);
1820         rbd_assert(obj_request->which == BAD_WHICH);
1821
1822         if (obj_request->osd_req)
1823                 rbd_osd_req_destroy(obj_request->osd_req);
1824
1825         rbd_assert(obj_request_type_valid(obj_request->type));
1826         switch (obj_request->type) {
1827         case OBJ_REQUEST_NODATA:
1828                 break;          /* Nothing to do */
1829         case OBJ_REQUEST_BIO:
1830                 if (obj_request->bio_list)
1831                         bio_chain_put(obj_request->bio_list);
1832                 break;
1833         case OBJ_REQUEST_PAGES:
1834                 if (obj_request->pages)
1835                         ceph_release_page_vector(obj_request->pages,
1836                                                 obj_request->page_count);
1837                 break;
1838         }
1839
1840         kfree(obj_request->object_name);
1841         obj_request->object_name = NULL;
1842         kmem_cache_free(rbd_obj_request_cache, obj_request);
1843 }
1844
1845 /*
1846  * Caller is responsible for filling in the list of object requests
1847  * that comprises the image request, and the Linux request pointer
1848  * (if there is one).
1849  */
1850 static struct rbd_img_request *rbd_img_request_create(
1851                                         struct rbd_device *rbd_dev,
1852                                         u64 offset, u64 length,
1853                                         bool write_request,
1854                                         bool child_request)
1855 {
1856         struct rbd_img_request *img_request;
1857
1858         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1859         if (!img_request)
1860                 return NULL;
1861
1862         if (write_request) {
1863                 down_read(&rbd_dev->header_rwsem);
1864                 ceph_get_snap_context(rbd_dev->header.snapc);
1865                 up_read(&rbd_dev->header_rwsem);
1866         }
1867
1868         img_request->rq = NULL;
1869         img_request->rbd_dev = rbd_dev;
1870         img_request->offset = offset;
1871         img_request->length = length;
1872         img_request->flags = 0;
1873         if (write_request) {
1874                 img_request_write_set(img_request);
1875                 img_request->snapc = rbd_dev->header.snapc;
1876         } else {
1877                 img_request->snap_id = rbd_dev->spec->snap_id;
1878         }
1879         if (child_request)
1880                 img_request_child_set(img_request);
1881         if (rbd_dev->parent_spec)
1882                 img_request_layered_set(img_request);
1883         spin_lock_init(&img_request->completion_lock);
1884         img_request->next_completion = 0;
1885         img_request->callback = NULL;
1886         img_request->result = 0;
1887         img_request->obj_request_count = 0;
1888         INIT_LIST_HEAD(&img_request->obj_requests);
1889         kref_init(&img_request->kref);
1890
1891         rbd_img_request_get(img_request);       /* Avoid a warning */
1892         rbd_img_request_put(img_request);       /* TEMPORARY */
1893
1894         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1895                 write_request ? "write" : "read", offset, length,
1896                 img_request);
1897
1898         return img_request;
1899 }
1900
1901 static void rbd_img_request_destroy(struct kref *kref)
1902 {
1903         struct rbd_img_request *img_request;
1904         struct rbd_obj_request *obj_request;
1905         struct rbd_obj_request *next_obj_request;
1906
1907         img_request = container_of(kref, struct rbd_img_request, kref);
1908
1909         dout("%s: img %p\n", __func__, img_request);
1910
1911         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1912                 rbd_img_obj_request_del(img_request, obj_request);
1913         rbd_assert(img_request->obj_request_count == 0);
1914
1915         if (img_request_write_test(img_request))
1916                 ceph_put_snap_context(img_request->snapc);
1917
1918         if (img_request_child_test(img_request))
1919                 rbd_obj_request_put(img_request->obj_request);
1920
1921         kmem_cache_free(rbd_img_request_cache, img_request);
1922 }
1923
1924 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1925 {
1926         struct rbd_img_request *img_request;
1927         unsigned int xferred;
1928         int result;
1929         bool more;
1930
1931         rbd_assert(obj_request_img_data_test(obj_request));
1932         img_request = obj_request->img_request;
1933
1934         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1935         xferred = (unsigned int)obj_request->xferred;
1936         result = obj_request->result;
1937         if (result) {
1938                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1939
1940                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1941                         img_request_write_test(img_request) ? "write" : "read",
1942                         obj_request->length, obj_request->img_offset,
1943                         obj_request->offset);
1944                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1945                         result, xferred);
1946                 if (!img_request->result)
1947                         img_request->result = result;
1948         }
1949
1950         /* Image object requests don't own their page array */
1951
1952         if (obj_request->type == OBJ_REQUEST_PAGES) {
1953                 obj_request->pages = NULL;
1954                 obj_request->page_count = 0;
1955         }
1956
1957         if (img_request_child_test(img_request)) {
1958                 rbd_assert(img_request->obj_request != NULL);
1959                 more = obj_request->which < img_request->obj_request_count - 1;
1960         } else {
1961                 rbd_assert(img_request->rq != NULL);
1962                 more = blk_end_request(img_request->rq, result, xferred);
1963         }
1964
1965         return more;
1966 }
1967
1968 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1969 {
1970         struct rbd_img_request *img_request;
1971         u32 which = obj_request->which;
1972         bool more = true;
1973
1974         rbd_assert(obj_request_img_data_test(obj_request));
1975         img_request = obj_request->img_request;
1976
1977         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1978         rbd_assert(img_request != NULL);
1979         rbd_assert(img_request->obj_request_count > 0);
1980         rbd_assert(which != BAD_WHICH);
1981         rbd_assert(which < img_request->obj_request_count);
1982         rbd_assert(which >= img_request->next_completion);
1983
1984         spin_lock_irq(&img_request->completion_lock);
1985         if (which != img_request->next_completion)
1986                 goto out;
1987
1988         for_each_obj_request_from(img_request, obj_request) {
1989                 rbd_assert(more);
1990                 rbd_assert(which < img_request->obj_request_count);
1991
1992                 if (!obj_request_done_test(obj_request))
1993                         break;
1994                 more = rbd_img_obj_end_request(obj_request);
1995                 which++;
1996         }
1997
1998         rbd_assert(more ^ (which == img_request->obj_request_count));
1999         img_request->next_completion = which;
2000 out:
2001         spin_unlock_irq(&img_request->completion_lock);
2002
2003         if (!more)
2004                 rbd_img_request_complete(img_request);
2005 }
2006
2007 /*
2008  * Split up an image request into one or more object requests, each
2009  * to a different object.  The "type" parameter indicates whether
2010  * "data_desc" is the pointer to the head of a list of bio
2011  * structures, or the base of a page array.  In either case this
2012  * function assumes data_desc describes memory sufficient to hold
2013  * all data described by the image request.
2014  */
2015 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2016                                         enum obj_request_type type,
2017                                         void *data_desc)
2018 {
2019         struct rbd_device *rbd_dev = img_request->rbd_dev;
2020         struct rbd_obj_request *obj_request = NULL;
2021         struct rbd_obj_request *next_obj_request;
2022         bool write_request = img_request_write_test(img_request);
2023         struct bio *bio_list;
2024         unsigned int bio_offset = 0;
2025         struct page **pages;
2026         u64 img_offset;
2027         u64 resid;
2028         u16 opcode;
2029
2030         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2031                 (int)type, data_desc);
2032
2033         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2034         img_offset = img_request->offset;
2035         resid = img_request->length;
2036         rbd_assert(resid > 0);
2037
2038         if (type == OBJ_REQUEST_BIO) {
2039                 bio_list = data_desc;
2040                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2041         } else {
2042                 rbd_assert(type == OBJ_REQUEST_PAGES);
2043                 pages = data_desc;
2044         }
2045
2046         while (resid) {
2047                 struct ceph_osd_request *osd_req;
2048                 const char *object_name;
2049                 u64 offset;
2050                 u64 length;
2051
2052                 object_name = rbd_segment_name(rbd_dev, img_offset);
2053                 if (!object_name)
2054                         goto out_unwind;
2055                 offset = rbd_segment_offset(rbd_dev, img_offset);
2056                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2057                 obj_request = rbd_obj_request_create(object_name,
2058                                                 offset, length, type);
2059                 /* object request has its own copy of the object name */
2060                 rbd_segment_name_free(object_name);
2061                 if (!obj_request)
2062                         goto out_unwind;
2063
2064                 if (type == OBJ_REQUEST_BIO) {
2065                         unsigned int clone_size;
2066
2067                         rbd_assert(length <= (u64)UINT_MAX);
2068                         clone_size = (unsigned int)length;
2069                         obj_request->bio_list =
2070                                         bio_chain_clone_range(&bio_list,
2071                                                                 &bio_offset,
2072                                                                 clone_size,
2073                                                                 GFP_ATOMIC);
2074                         if (!obj_request->bio_list)
2075                                 goto out_partial;
2076                 } else {
2077                         unsigned int page_count;
2078
2079                         obj_request->pages = pages;
2080                         page_count = (u32)calc_pages_for(offset, length);
2081                         obj_request->page_count = page_count;
2082                         if ((offset + length) & ~PAGE_MASK)
2083                                 page_count--;   /* more on last page */
2084                         pages += page_count;
2085                 }
2086
2087                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2088                                                 obj_request);
2089                 if (!osd_req)
2090                         goto out_partial;
2091                 obj_request->osd_req = osd_req;
2092                 obj_request->callback = rbd_img_obj_callback;
2093
2094                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2095                                                 0, 0);
2096                 if (type == OBJ_REQUEST_BIO)
2097                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2098                                         obj_request->bio_list, length);
2099                 else
2100                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2101                                         obj_request->pages, length,
2102                                         offset & ~PAGE_MASK, false, false);
2103
2104                 if (write_request)
2105                         rbd_osd_req_format_write(obj_request);
2106                 else
2107                         rbd_osd_req_format_read(obj_request);
2108
2109                 obj_request->img_offset = img_offset;
2110                 rbd_img_obj_request_add(img_request, obj_request);
2111
2112                 img_offset += length;
2113                 resid -= length;
2114         }
2115
2116         return 0;
2117
2118 out_partial:
2119         rbd_obj_request_put(obj_request);
2120 out_unwind:
2121         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122                 rbd_obj_request_put(obj_request);
2123
2124         return -ENOMEM;
2125 }
2126
2127 static void
2128 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2129 {
2130         struct rbd_img_request *img_request;
2131         struct rbd_device *rbd_dev;
2132         u64 length;
2133         u32 page_count;
2134
2135         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2136         rbd_assert(obj_request_img_data_test(obj_request));
2137         img_request = obj_request->img_request;
2138         rbd_assert(img_request);
2139
2140         rbd_dev = img_request->rbd_dev;
2141         rbd_assert(rbd_dev);
2142         length = (u64)1 << rbd_dev->header.obj_order;
2143         page_count = (u32)calc_pages_for(0, length);
2144
2145         rbd_assert(obj_request->copyup_pages);
2146         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2147         obj_request->copyup_pages = NULL;
2148
2149         /*
2150          * We want the transfer count to reflect the size of the
2151          * original write request.  There is no such thing as a
2152          * successful short write, so if the request was successful
2153          * we can just set it to the originally-requested length.
2154          */
2155         if (!obj_request->result)
2156                 obj_request->xferred = obj_request->length;
2157
2158         /* Finish up with the normal image object callback */
2159
2160         rbd_img_obj_callback(obj_request);
2161 }
2162
2163 static void
2164 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2165 {
2166         struct rbd_obj_request *orig_request;
2167         struct ceph_osd_request *osd_req;
2168         struct ceph_osd_client *osdc;
2169         struct rbd_device *rbd_dev;
2170         struct page **pages;
2171         int result;
2172         u64 obj_size;
2173         u64 xferred;
2174
2175         rbd_assert(img_request_child_test(img_request));
2176
2177         /* First get what we need from the image request */
2178
2179         pages = img_request->copyup_pages;
2180         rbd_assert(pages != NULL);
2181         img_request->copyup_pages = NULL;
2182
2183         orig_request = img_request->obj_request;
2184         rbd_assert(orig_request != NULL);
2185         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2186         result = img_request->result;
2187         obj_size = img_request->length;
2188         xferred = img_request->xferred;
2189         rbd_img_request_put(img_request);
2190
2191         rbd_assert(orig_request->img_request);
2192         rbd_dev = orig_request->img_request->rbd_dev;
2193         rbd_assert(rbd_dev);
2194         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2195
2196         if (result)
2197                 goto out_err;
2198
2199         /* Allocate the new copyup osd request for the original request */
2200
2201         result = -ENOMEM;
2202         rbd_assert(!orig_request->osd_req);
2203         osd_req = rbd_osd_req_create_copyup(orig_request);
2204         if (!osd_req)
2205                 goto out_err;
2206         orig_request->osd_req = osd_req;
2207         orig_request->copyup_pages = pages;
2208
2209         /* Initialize the copyup op */
2210
2211         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2212         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2213                                                 false, false);
2214
2215         /* Then the original write request op */
2216
2217         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2218                                         orig_request->offset,
2219                                         orig_request->length, 0, 0);
2220         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2221                                         orig_request->length);
2222
2223         rbd_osd_req_format_write(orig_request);
2224
2225         /* All set, send it off. */
2226
2227         orig_request->callback = rbd_img_obj_copyup_callback;
2228         osdc = &rbd_dev->rbd_client->client->osdc;
2229         result = rbd_obj_request_submit(osdc, orig_request);
2230         if (!result)
2231                 return;
2232 out_err:
2233         /* Record the error code and complete the request */
2234
2235         orig_request->result = result;
2236         orig_request->xferred = 0;
2237         obj_request_done_set(orig_request);
2238         rbd_obj_request_complete(orig_request);
2239 }
2240
2241 /*
2242  * Read from the parent image the range of data that covers the
2243  * entire target of the given object request.  This is used for
2244  * satisfying a layered image write request when the target of an
2245  * object request from the image request does not exist.
2246  *
2247  * A page array big enough to hold the returned data is allocated
2248  * and supplied to rbd_img_request_fill() as the "data descriptor."
2249  * When the read completes, this page array will be transferred to
2250  * the original object request for the copyup operation.
2251  *
2252  * If an error occurs, record it as the result of the original
2253  * object request and mark it done so it gets completed.
2254  */
2255 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2256 {
2257         struct rbd_img_request *img_request = NULL;
2258         struct rbd_img_request *parent_request = NULL;
2259         struct rbd_device *rbd_dev;
2260         u64 img_offset;
2261         u64 length;
2262         struct page **pages = NULL;
2263         u32 page_count;
2264         int result;
2265
2266         rbd_assert(obj_request_img_data_test(obj_request));
2267         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2268
2269         img_request = obj_request->img_request;
2270         rbd_assert(img_request != NULL);
2271         rbd_dev = img_request->rbd_dev;
2272         rbd_assert(rbd_dev->parent != NULL);
2273
2274         /*
2275          * First things first.  The original osd request is of no
2276          * use to use any more, we'll need a new one that can hold
2277          * the two ops in a copyup request.  We'll get that later,
2278          * but for now we can release the old one.
2279          */
2280         rbd_osd_req_destroy(obj_request->osd_req);
2281         obj_request->osd_req = NULL;
2282
2283         /*
2284          * Determine the byte range covered by the object in the
2285          * child image to which the original request was to be sent.
2286          */
2287         img_offset = obj_request->img_offset - obj_request->offset;
2288         length = (u64)1 << rbd_dev->header.obj_order;
2289
2290         /*
2291          * There is no defined parent data beyond the parent
2292          * overlap, so limit what we read at that boundary if
2293          * necessary.
2294          */
2295         if (img_offset + length > rbd_dev->parent_overlap) {
2296                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297                 length = rbd_dev->parent_overlap - img_offset;
2298         }
2299
2300         /*
2301          * Allocate a page array big enough to receive the data read
2302          * from the parent.
2303          */
2304         page_count = (u32)calc_pages_for(0, length);
2305         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306         if (IS_ERR(pages)) {
2307                 result = PTR_ERR(pages);
2308                 pages = NULL;
2309                 goto out_err;
2310         }
2311
2312         result = -ENOMEM;
2313         parent_request = rbd_img_request_create(rbd_dev->parent,
2314                                                 img_offset, length,
2315                                                 false, true);
2316         if (!parent_request)
2317                 goto out_err;
2318         rbd_obj_request_get(obj_request);
2319         parent_request->obj_request = obj_request;
2320
2321         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2322         if (result)
2323                 goto out_err;
2324         parent_request->copyup_pages = pages;
2325
2326         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2327         result = rbd_img_request_submit(parent_request);
2328         if (!result)
2329                 return 0;
2330
2331         parent_request->copyup_pages = NULL;
2332         parent_request->obj_request = NULL;
2333         rbd_obj_request_put(obj_request);
2334 out_err:
2335         if (pages)
2336                 ceph_release_page_vector(pages, page_count);
2337         if (parent_request)
2338                 rbd_img_request_put(parent_request);
2339         obj_request->result = result;
2340         obj_request->xferred = 0;
2341         obj_request_done_set(obj_request);
2342
2343         return result;
2344 }
2345
2346 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2347 {
2348         struct rbd_obj_request *orig_request;
2349         int result;
2350
2351         rbd_assert(!obj_request_img_data_test(obj_request));
2352
2353         /*
2354          * All we need from the object request is the original
2355          * request and the result of the STAT op.  Grab those, then
2356          * we're done with the request.
2357          */
2358         orig_request = obj_request->obj_request;
2359         obj_request->obj_request = NULL;
2360         rbd_assert(orig_request);
2361         rbd_assert(orig_request->img_request);
2362
2363         result = obj_request->result;
2364         obj_request->result = 0;
2365
2366         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2367                 obj_request, orig_request, result,
2368                 obj_request->xferred, obj_request->length);
2369         rbd_obj_request_put(obj_request);
2370
2371         rbd_assert(orig_request);
2372         rbd_assert(orig_request->img_request);
2373
2374         /*
2375          * Our only purpose here is to determine whether the object
2376          * exists, and we don't want to treat the non-existence as
2377          * an error.  If something else comes back, transfer the
2378          * error to the original request and complete it now.
2379          */
2380         if (!result) {
2381                 obj_request_existence_set(orig_request, true);
2382         } else if (result == -ENOENT) {
2383                 obj_request_existence_set(orig_request, false);
2384         } else if (result) {
2385                 orig_request->result = result;
2386                 goto out;
2387         }
2388
2389         /*
2390          * Resubmit the original request now that we have recorded
2391          * whether the target object exists.
2392          */
2393         orig_request->result = rbd_img_obj_request_submit(orig_request);
2394 out:
2395         if (orig_request->result)
2396                 rbd_obj_request_complete(orig_request);
2397         rbd_obj_request_put(orig_request);
2398 }
2399
2400 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2401 {
2402         struct rbd_obj_request *stat_request;
2403         struct rbd_device *rbd_dev;
2404         struct ceph_osd_client *osdc;
2405         struct page **pages = NULL;
2406         u32 page_count;
2407         size_t size;
2408         int ret;
2409
2410         /*
2411          * The response data for a STAT call consists of:
2412          *     le64 length;
2413          *     struct {
2414          *         le32 tv_sec;
2415          *         le32 tv_nsec;
2416          *     } mtime;
2417          */
2418         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2419         page_count = (u32)calc_pages_for(0, size);
2420         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2421         if (IS_ERR(pages))
2422                 return PTR_ERR(pages);
2423
2424         ret = -ENOMEM;
2425         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2426                                                         OBJ_REQUEST_PAGES);
2427         if (!stat_request)
2428                 goto out;
2429
2430         rbd_obj_request_get(obj_request);
2431         stat_request->obj_request = obj_request;
2432         stat_request->pages = pages;
2433         stat_request->page_count = page_count;
2434
2435         rbd_assert(obj_request->img_request);
2436         rbd_dev = obj_request->img_request->rbd_dev;
2437         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2438                                                 stat_request);
2439         if (!stat_request->osd_req)
2440                 goto out;
2441         stat_request->callback = rbd_img_obj_exists_callback;
2442
2443         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2444         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2445                                         false, false);
2446         rbd_osd_req_format_read(stat_request);
2447
2448         osdc = &rbd_dev->rbd_client->client->osdc;
2449         ret = rbd_obj_request_submit(osdc, stat_request);
2450 out:
2451         if (ret)
2452                 rbd_obj_request_put(obj_request);
2453
2454         return ret;
2455 }
2456
2457 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2458 {
2459         struct rbd_img_request *img_request;
2460         struct rbd_device *rbd_dev;
2461         bool known;
2462
2463         rbd_assert(obj_request_img_data_test(obj_request));
2464
2465         img_request = obj_request->img_request;
2466         rbd_assert(img_request);
2467         rbd_dev = img_request->rbd_dev;
2468
2469         /*
2470          * Only writes to layered images need special handling.
2471          * Reads and non-layered writes are simple object requests.
2472          * Layered writes that start beyond the end of the overlap
2473          * with the parent have no parent data, so they too are
2474          * simple object requests.  Finally, if the target object is
2475          * known to already exist, its parent data has already been
2476          * copied, so a write to the object can also be handled as a
2477          * simple object request.
2478          */
2479         if (!img_request_write_test(img_request) ||
2480                 !img_request_layered_test(img_request) ||
2481                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2482                 ((known = obj_request_known_test(obj_request)) &&
2483                         obj_request_exists_test(obj_request))) {
2484
2485                 struct rbd_device *rbd_dev;
2486                 struct ceph_osd_client *osdc;
2487
2488                 rbd_dev = obj_request->img_request->rbd_dev;
2489                 osdc = &rbd_dev->rbd_client->client->osdc;
2490
2491                 return rbd_obj_request_submit(osdc, obj_request);
2492         }
2493
2494         /*
2495          * It's a layered write.  The target object might exist but
2496          * we may not know that yet.  If we know it doesn't exist,
2497          * start by reading the data for the full target object from
2498          * the parent so we can use it for a copyup to the target.
2499          */
2500         if (known)
2501                 return rbd_img_obj_parent_read_full(obj_request);
2502
2503         /* We don't know whether the target exists.  Go find out. */
2504
2505         return rbd_img_obj_exists_submit(obj_request);
2506 }
2507
2508 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2509 {
2510         struct rbd_obj_request *obj_request;
2511         struct rbd_obj_request *next_obj_request;
2512
2513         dout("%s: img %p\n", __func__, img_request);
2514         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2515                 int ret;
2516
2517                 ret = rbd_img_obj_request_submit(obj_request);
2518                 if (ret)
2519                         return ret;
2520         }
2521
2522         return 0;
2523 }
2524
2525 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2526 {
2527         struct rbd_obj_request *obj_request;
2528         struct rbd_device *rbd_dev;
2529         u64 obj_end;
2530
2531         rbd_assert(img_request_child_test(img_request));
2532
2533         obj_request = img_request->obj_request;
2534         rbd_assert(obj_request);
2535         rbd_assert(obj_request->img_request);
2536
2537         obj_request->result = img_request->result;
2538         if (obj_request->result)
2539                 goto out;
2540
2541         /*
2542          * We need to zero anything beyond the parent overlap
2543          * boundary.  Since rbd_img_obj_request_read_callback()
2544          * will zero anything beyond the end of a short read, an
2545          * easy way to do this is to pretend the data from the
2546          * parent came up short--ending at the overlap boundary.
2547          */
2548         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2549         obj_end = obj_request->img_offset + obj_request->length;
2550         rbd_dev = obj_request->img_request->rbd_dev;
2551         if (obj_end > rbd_dev->parent_overlap) {
2552                 u64 xferred = 0;
2553
2554                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2555                         xferred = rbd_dev->parent_overlap -
2556                                         obj_request->img_offset;
2557
2558                 obj_request->xferred = min(img_request->xferred, xferred);
2559         } else {
2560                 obj_request->xferred = img_request->xferred;
2561         }
2562 out:
2563         rbd_img_request_put(img_request);
2564         rbd_img_obj_request_read_callback(obj_request);
2565         rbd_obj_request_complete(obj_request);
2566 }
2567
2568 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2569 {
2570         struct rbd_device *rbd_dev;
2571         struct rbd_img_request *img_request;
2572         int result;
2573
2574         rbd_assert(obj_request_img_data_test(obj_request));
2575         rbd_assert(obj_request->img_request != NULL);
2576         rbd_assert(obj_request->result == (s32) -ENOENT);
2577         rbd_assert(obj_request_type_valid(obj_request->type));
2578
2579         rbd_dev = obj_request->img_request->rbd_dev;
2580         rbd_assert(rbd_dev->parent != NULL);
2581         /* rbd_read_finish(obj_request, obj_request->length); */
2582         img_request = rbd_img_request_create(rbd_dev->parent,
2583                                                 obj_request->img_offset,
2584                                                 obj_request->length,
2585                                                 false, true);
2586         result = -ENOMEM;
2587         if (!img_request)
2588                 goto out_err;
2589
2590         rbd_obj_request_get(obj_request);
2591         img_request->obj_request = obj_request;
2592
2593         if (obj_request->type == OBJ_REQUEST_BIO)
2594                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2595                                                 obj_request->bio_list);
2596         else
2597                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2598                                                 obj_request->pages);
2599         if (result)
2600                 goto out_err;
2601
2602         img_request->callback = rbd_img_parent_read_callback;
2603         result = rbd_img_request_submit(img_request);
2604         if (result)
2605                 goto out_err;
2606
2607         return;
2608 out_err:
2609         if (img_request)
2610                 rbd_img_request_put(img_request);
2611         obj_request->result = result;
2612         obj_request->xferred = 0;
2613         obj_request_done_set(obj_request);
2614 }
2615
2616 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2617 {
2618         struct rbd_obj_request *obj_request;
2619         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2620         int ret;
2621
2622         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2623                                                         OBJ_REQUEST_NODATA);
2624         if (!obj_request)
2625                 return -ENOMEM;
2626
2627         ret = -ENOMEM;
2628         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2629         if (!obj_request->osd_req)
2630                 goto out;
2631         obj_request->callback = rbd_obj_request_put;
2632
2633         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2634                                         notify_id, 0, 0);
2635         rbd_osd_req_format_read(obj_request);
2636
2637         ret = rbd_obj_request_submit(osdc, obj_request);
2638 out:
2639         if (ret)
2640                 rbd_obj_request_put(obj_request);
2641
2642         return ret;
2643 }
2644
2645 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2646 {
2647         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2648         int ret;
2649
2650         if (!rbd_dev)
2651                 return;
2652
2653         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2654                 rbd_dev->header_name, (unsigned long long)notify_id,
2655                 (unsigned int)opcode);
2656         ret = rbd_dev_refresh(rbd_dev);
2657         if (ret)
2658                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2659
2660         rbd_obj_notify_ack(rbd_dev, notify_id);
2661 }
2662
2663 /*
2664  * Request sync osd watch/unwatch.  The value of "start" determines
2665  * whether a watch request is being initiated or torn down.
2666  */
2667 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2668 {
2669         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2670         struct rbd_obj_request *obj_request;
2671         int ret;
2672
2673         rbd_assert(start ^ !!rbd_dev->watch_event);
2674         rbd_assert(start ^ !!rbd_dev->watch_request);
2675
2676         if (start) {
2677                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2678                                                 &rbd_dev->watch_event);
2679                 if (ret < 0)
2680                         return ret;
2681                 rbd_assert(rbd_dev->watch_event != NULL);
2682         }
2683
2684         ret = -ENOMEM;
2685         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2686                                                         OBJ_REQUEST_NODATA);
2687         if (!obj_request)
2688                 goto out_cancel;
2689
2690         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2691         if (!obj_request->osd_req)
2692                 goto out_cancel;
2693
2694         if (start)
2695                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2696         else
2697                 ceph_osdc_unregister_linger_request(osdc,
2698                                         rbd_dev->watch_request->osd_req);
2699
2700         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2701                                 rbd_dev->watch_event->cookie, 0, start);
2702         rbd_osd_req_format_write(obj_request);
2703
2704         ret = rbd_obj_request_submit(osdc, obj_request);
2705         if (ret)
2706                 goto out_cancel;
2707         ret = rbd_obj_request_wait(obj_request);
2708         if (ret)
2709                 goto out_cancel;
2710         ret = obj_request->result;
2711         if (ret)
2712                 goto out_cancel;
2713
2714         /*
2715          * A watch request is set to linger, so the underlying osd
2716          * request won't go away until we unregister it.  We retain
2717          * a pointer to the object request during that time (in
2718          * rbd_dev->watch_request), so we'll keep a reference to
2719          * it.  We'll drop that reference (below) after we've
2720          * unregistered it.
2721          */
2722         if (start) {
2723                 rbd_dev->watch_request = obj_request;
2724
2725                 return 0;
2726         }
2727
2728         /* We have successfully torn down the watch request */
2729
2730         rbd_obj_request_put(rbd_dev->watch_request);
2731         rbd_dev->watch_request = NULL;
2732 out_cancel:
2733         /* Cancel the event if we're tearing down, or on error */
2734         ceph_osdc_cancel_event(rbd_dev->watch_event);
2735         rbd_dev->watch_event = NULL;
2736         if (obj_request)
2737                 rbd_obj_request_put(obj_request);
2738
2739         return ret;
2740 }
2741
2742 /*
2743  * Synchronous osd object method call.  Returns the number of bytes
2744  * returned in the outbound buffer, or a negative error code.
2745  */
2746 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2747                              const char *object_name,
2748                              const char *class_name,
2749                              const char *method_name,
2750                              const void *outbound,
2751                              size_t outbound_size,
2752                              void *inbound,
2753                              size_t inbound_size)
2754 {
2755         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2756         struct rbd_obj_request *obj_request;
2757         struct page **pages;
2758         u32 page_count;
2759         int ret;
2760
2761         /*
2762          * Method calls are ultimately read operations.  The result
2763          * should placed into the inbound buffer provided.  They
2764          * also supply outbound data--parameters for the object
2765          * method.  Currently if this is present it will be a
2766          * snapshot id.
2767          */
2768         page_count = (u32)calc_pages_for(0, inbound_size);
2769         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2770         if (IS_ERR(pages))
2771                 return PTR_ERR(pages);
2772
2773         ret = -ENOMEM;
2774         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2775                                                         OBJ_REQUEST_PAGES);
2776         if (!obj_request)
2777                 goto out;
2778
2779         obj_request->pages = pages;
2780         obj_request->page_count = page_count;
2781
2782         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2783         if (!obj_request->osd_req)
2784                 goto out;
2785
2786         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2787                                         class_name, method_name);
2788         if (outbound_size) {
2789                 struct ceph_pagelist *pagelist;
2790
2791                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2792                 if (!pagelist)
2793                         goto out;
2794
2795                 ceph_pagelist_init(pagelist);
2796                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2797                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2798                                                 pagelist);
2799         }
2800         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2801                                         obj_request->pages, inbound_size,
2802                                         0, false, false);
2803         rbd_osd_req_format_read(obj_request);
2804
2805         ret = rbd_obj_request_submit(osdc, obj_request);
2806         if (ret)
2807                 goto out;
2808         ret = rbd_obj_request_wait(obj_request);
2809         if (ret)
2810                 goto out;
2811
2812         ret = obj_request->result;
2813         if (ret < 0)
2814                 goto out;
2815
2816         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2817         ret = (int)obj_request->xferred;
2818         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2819 out:
2820         if (obj_request)
2821                 rbd_obj_request_put(obj_request);
2822         else
2823                 ceph_release_page_vector(pages, page_count);
2824
2825         return ret;
2826 }
2827
2828 static void rbd_request_fn(struct request_queue *q)
2829                 __releases(q->queue_lock) __acquires(q->queue_lock)
2830 {
2831         struct rbd_device *rbd_dev = q->queuedata;
2832         bool read_only = rbd_dev->mapping.read_only;
2833         struct request *rq;
2834         int result;
2835
2836         while ((rq = blk_fetch_request(q))) {
2837                 bool write_request = rq_data_dir(rq) == WRITE;
2838                 struct rbd_img_request *img_request;
2839                 u64 offset;
2840                 u64 length;
2841
2842                 /* Ignore any non-FS requests that filter through. */
2843
2844                 if (rq->cmd_type != REQ_TYPE_FS) {
2845                         dout("%s: non-fs request type %d\n", __func__,
2846                                 (int) rq->cmd_type);
2847                         __blk_end_request_all(rq, 0);
2848                         continue;
2849                 }
2850
2851                 /* Ignore/skip any zero-length requests */
2852
2853                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2854                 length = (u64) blk_rq_bytes(rq);
2855
2856                 if (!length) {
2857                         dout("%s: zero-length request\n", __func__);
2858                         __blk_end_request_all(rq, 0);
2859                         continue;
2860                 }
2861
2862                 spin_unlock_irq(q->queue_lock);
2863
2864                 /* Disallow writes to a read-only device */
2865
2866                 if (write_request) {
2867                         result = -EROFS;
2868                         if (read_only)
2869                                 goto end_request;
2870                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2871                 }
2872
2873                 /*
2874                  * Quit early if the mapped snapshot no longer
2875                  * exists.  It's still possible the snapshot will
2876                  * have disappeared by the time our request arrives
2877                  * at the osd, but there's no sense in sending it if
2878                  * we already know.
2879                  */
2880                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2881                         dout("request for non-existent snapshot");
2882                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2883                         result = -ENXIO;
2884                         goto end_request;
2885                 }
2886
2887                 result = -EINVAL;
2888                 if (offset && length > U64_MAX - offset + 1) {
2889                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2890                                 offset, length);
2891                         goto end_request;       /* Shouldn't happen */
2892                 }
2893
2894                 result = -EIO;
2895                 if (offset + length > rbd_dev->mapping.size) {
2896                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2897                                 offset, length, rbd_dev->mapping.size);
2898                         goto end_request;
2899                 }
2900
2901                 result = -ENOMEM;
2902                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2903                                                         write_request, false);
2904                 if (!img_request)
2905                         goto end_request;
2906
2907                 img_request->rq = rq;
2908
2909                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2910                                                 rq->bio);
2911                 if (!result)
2912                         result = rbd_img_request_submit(img_request);
2913                 if (result)
2914                         rbd_img_request_put(img_request);
2915 end_request:
2916                 spin_lock_irq(q->queue_lock);
2917                 if (result < 0) {
2918                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2919                                 write_request ? "write" : "read",
2920                                 length, offset, result);
2921
2922                         __blk_end_request_all(rq, result);
2923                 }
2924         }
2925 }
2926
2927 /*
2928  * a queue callback. Makes sure that we don't create a bio that spans across
2929  * multiple osd objects. One exception would be with a single page bios,
2930  * which we handle later at bio_chain_clone_range()
2931  */
2932 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2933                           struct bio_vec *bvec)
2934 {
2935         struct rbd_device *rbd_dev = q->queuedata;
2936         sector_t sector_offset;
2937         sector_t sectors_per_obj;
2938         sector_t obj_sector_offset;
2939         int ret;
2940
2941         /*
2942          * Find how far into its rbd object the partition-relative
2943          * bio start sector is to offset relative to the enclosing
2944          * device.
2945          */
2946         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2947         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2948         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2949
2950         /*
2951          * Compute the number of bytes from that offset to the end
2952          * of the object.  Account for what's already used by the bio.
2953          */
2954         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2955         if (ret > bmd->bi_size)
2956                 ret -= bmd->bi_size;
2957         else
2958                 ret = 0;
2959
2960         /*
2961          * Don't send back more than was asked for.  And if the bio
2962          * was empty, let the whole thing through because:  "Note
2963          * that a block device *must* allow a single page to be
2964          * added to an empty bio."
2965          */
2966         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2967         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2968                 ret = (int) bvec->bv_len;
2969
2970         return ret;
2971 }
2972
2973 static void rbd_free_disk(struct rbd_device *rbd_dev)
2974 {
2975         struct gendisk *disk = rbd_dev->disk;
2976
2977         if (!disk)
2978                 return;
2979
2980         rbd_dev->disk = NULL;
2981         if (disk->flags & GENHD_FL_UP) {
2982                 del_gendisk(disk);
2983                 if (disk->queue)
2984                         blk_cleanup_queue(disk->queue);
2985         }
2986         put_disk(disk);
2987 }
2988
2989 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2990                                 const char *object_name,
2991                                 u64 offset, u64 length, void *buf)
2992
2993 {
2994         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2995         struct rbd_obj_request *obj_request;
2996         struct page **pages = NULL;
2997         u32 page_count;
2998         size_t size;
2999         int ret;
3000
3001         page_count = (u32) calc_pages_for(offset, length);
3002         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3003         if (IS_ERR(pages))
3004                 ret = PTR_ERR(pages);
3005
3006         ret = -ENOMEM;
3007         obj_request = rbd_obj_request_create(object_name, offset, length,
3008                                                         OBJ_REQUEST_PAGES);
3009         if (!obj_request)
3010                 goto out;
3011
3012         obj_request->pages = pages;
3013         obj_request->page_count = page_count;
3014
3015         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3016         if (!obj_request->osd_req)
3017                 goto out;
3018
3019         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3020                                         offset, length, 0, 0);
3021         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3022                                         obj_request->pages,
3023                                         obj_request->length,
3024                                         obj_request->offset & ~PAGE_MASK,
3025                                         false, false);
3026         rbd_osd_req_format_read(obj_request);
3027
3028         ret = rbd_obj_request_submit(osdc, obj_request);
3029         if (ret)
3030                 goto out;
3031         ret = rbd_obj_request_wait(obj_request);
3032         if (ret)
3033                 goto out;
3034
3035         ret = obj_request->result;
3036         if (ret < 0)
3037                 goto out;
3038
3039         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3040         size = (size_t) obj_request->xferred;
3041         ceph_copy_from_page_vector(pages, buf, 0, size);
3042         rbd_assert(size <= (size_t)INT_MAX);
3043         ret = (int)size;
3044 out:
3045         if (obj_request)
3046                 rbd_obj_request_put(obj_request);
3047         else
3048                 ceph_release_page_vector(pages, page_count);
3049
3050         return ret;
3051 }
3052
3053 /*
3054  * Read the complete header for the given rbd device.  On successful
3055  * return, the rbd_dev->header field will contain up-to-date
3056  * information about the image.
3057  */
3058 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3059 {
3060         struct rbd_image_header_ondisk *ondisk = NULL;
3061         u32 snap_count = 0;
3062         u64 names_size = 0;
3063         u32 want_count;
3064         int ret;
3065
3066         /*
3067          * The complete header will include an array of its 64-bit
3068          * snapshot ids, followed by the names of those snapshots as
3069          * a contiguous block of NUL-terminated strings.  Note that
3070          * the number of snapshots could change by the time we read
3071          * it in, in which case we re-read it.
3072          */
3073         do {
3074                 size_t size;
3075
3076                 kfree(ondisk);
3077
3078                 size = sizeof (*ondisk);
3079                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3080                 size += names_size;
3081                 ondisk = kmalloc(size, GFP_KERNEL);
3082                 if (!ondisk)
3083                         return -ENOMEM;
3084
3085                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3086                                        0, size, ondisk);
3087                 if (ret < 0)
3088                         goto out;
3089                 if ((size_t)ret < size) {
3090                         ret = -ENXIO;
3091                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3092                                 size, ret);
3093                         goto out;
3094                 }
3095                 if (!rbd_dev_ondisk_valid(ondisk)) {
3096                         ret = -ENXIO;
3097                         rbd_warn(rbd_dev, "invalid header");
3098                         goto out;
3099                 }
3100
3101                 names_size = le64_to_cpu(ondisk->snap_names_len);
3102                 want_count = snap_count;
3103                 snap_count = le32_to_cpu(ondisk->snap_count);
3104         } while (snap_count != want_count);
3105
3106         ret = rbd_header_from_disk(rbd_dev, ondisk);
3107 out:
3108         kfree(ondisk);
3109
3110         return ret;
3111 }
3112
3113 /*
3114  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3115  * has disappeared from the (just updated) snapshot context.
3116  */
3117 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3118 {
3119         u64 snap_id;
3120
3121         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3122                 return;
3123
3124         snap_id = rbd_dev->spec->snap_id;
3125         if (snap_id == CEPH_NOSNAP)
3126                 return;
3127
3128         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3129                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3130 }
3131
3132 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3133 {
3134         u64 mapping_size;
3135         int ret;
3136
3137         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3138         mapping_size = rbd_dev->mapping.size;
3139         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3140         if (rbd_dev->image_format == 1)
3141                 ret = rbd_dev_v1_header_info(rbd_dev);
3142         else
3143                 ret = rbd_dev_v2_header_info(rbd_dev);
3144
3145         /* If it's a mapped snapshot, validate its EXISTS flag */
3146
3147         rbd_exists_validate(rbd_dev);
3148         mutex_unlock(&ctl_mutex);
3149         if (mapping_size != rbd_dev->mapping.size) {
3150                 sector_t size;
3151
3152                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3153                 dout("setting size to %llu sectors", (unsigned long long)size);
3154                 set_capacity(rbd_dev->disk, size);
3155                 revalidate_disk(rbd_dev->disk);
3156         }
3157
3158         return ret;
3159 }
3160
3161 static int rbd_init_disk(struct rbd_device *rbd_dev)
3162 {
3163         struct gendisk *disk;
3164         struct request_queue *q;
3165         u64 segment_size;
3166
3167         /* create gendisk info */
3168         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3169         if (!disk)
3170                 return -ENOMEM;
3171
3172         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3173                  rbd_dev->dev_id);
3174         disk->major = rbd_dev->major;
3175         disk->first_minor = 0;
3176         disk->fops = &rbd_bd_ops;
3177         disk->private_data = rbd_dev;
3178
3179         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3180         if (!q)
3181                 goto out_disk;
3182
3183         /* We use the default size, but let's be explicit about it. */
3184         blk_queue_physical_block_size(q, SECTOR_SIZE);
3185
3186         /* set io sizes to object size */
3187         segment_size = rbd_obj_bytes(&rbd_dev->header);
3188         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3189         blk_queue_max_segment_size(q, segment_size);
3190         blk_queue_io_min(q, segment_size);
3191         blk_queue_io_opt(q, segment_size);
3192
3193         blk_queue_merge_bvec(q, rbd_merge_bvec);
3194         disk->queue = q;
3195
3196         q->queuedata = rbd_dev;
3197
3198         rbd_dev->disk = disk;
3199
3200         return 0;
3201 out_disk:
3202         put_disk(disk);
3203
3204         return -ENOMEM;
3205 }
3206
3207 /*
3208   sysfs
3209 */
3210
3211 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3212 {
3213         return container_of(dev, struct rbd_device, dev);
3214 }
3215
3216 static ssize_t rbd_size_show(struct device *dev,
3217                              struct device_attribute *attr, char *buf)
3218 {
3219         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3220
3221         return sprintf(buf, "%llu\n",
3222                 (unsigned long long)rbd_dev->mapping.size);
3223 }
3224
3225 /*
3226  * Note this shows the features for whatever's mapped, which is not
3227  * necessarily the base image.
3228  */
3229 static ssize_t rbd_features_show(struct device *dev,
3230                              struct device_attribute *attr, char *buf)
3231 {
3232         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233
3234         return sprintf(buf, "0x%016llx\n",
3235                         (unsigned long long)rbd_dev->mapping.features);
3236 }
3237
3238 static ssize_t rbd_major_show(struct device *dev,
3239                               struct device_attribute *attr, char *buf)
3240 {
3241         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3242
3243         if (rbd_dev->major)
3244                 return sprintf(buf, "%d\n", rbd_dev->major);
3245
3246         return sprintf(buf, "(none)\n");
3247
3248 }
3249
3250 static ssize_t rbd_client_id_show(struct device *dev,
3251                                   struct device_attribute *attr, char *buf)
3252 {
3253         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3254
3255         return sprintf(buf, "client%lld\n",
3256                         ceph_client_id(rbd_dev->rbd_client->client));
3257 }
3258
3259 static ssize_t rbd_pool_show(struct device *dev,
3260                              struct device_attribute *attr, char *buf)
3261 {
3262         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3263
3264         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3265 }
3266
3267 static ssize_t rbd_pool_id_show(struct device *dev,
3268                              struct device_attribute *attr, char *buf)
3269 {
3270         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3271
3272         return sprintf(buf, "%llu\n",
3273                         (unsigned long long) rbd_dev->spec->pool_id);
3274 }
3275
3276 static ssize_t rbd_name_show(struct device *dev,
3277                              struct device_attribute *attr, char *buf)
3278 {
3279         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3280
3281         if (rbd_dev->spec->image_name)
3282                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3283
3284         return sprintf(buf, "(unknown)\n");
3285 }
3286
3287 static ssize_t rbd_image_id_show(struct device *dev,
3288                              struct device_attribute *attr, char *buf)
3289 {
3290         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291
3292         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3293 }
3294
3295 /*
3296  * Shows the name of the currently-mapped snapshot (or
3297  * RBD_SNAP_HEAD_NAME for the base image).
3298  */
3299 static ssize_t rbd_snap_show(struct device *dev,
3300                              struct device_attribute *attr,
3301                              char *buf)
3302 {
3303         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3304
3305         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3306 }
3307
3308 /*
3309  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3310  * for the parent image.  If there is no parent, simply shows
3311  * "(no parent image)".
3312  */
3313 static ssize_t rbd_parent_show(struct device *dev,
3314                              struct device_attribute *attr,
3315                              char *buf)
3316 {
3317         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3318         struct rbd_spec *spec = rbd_dev->parent_spec;
3319         int count;
3320         char *bufp = buf;
3321
3322         if (!spec)
3323                 return sprintf(buf, "(no parent image)\n");
3324
3325         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3326                         (unsigned long long) spec->pool_id, spec->pool_name);
3327         if (count < 0)
3328                 return count;
3329         bufp += count;
3330
3331         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3332                         spec->image_name ? spec->image_name : "(unknown)");
3333         if (count < 0)
3334                 return count;
3335         bufp += count;
3336
3337         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3338                         (unsigned long long) spec->snap_id, spec->snap_name);
3339         if (count < 0)
3340                 return count;
3341         bufp += count;
3342
3343         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3344         if (count < 0)
3345                 return count;
3346         bufp += count;
3347
3348         return (ssize_t) (bufp - buf);
3349 }
3350
3351 static ssize_t rbd_image_refresh(struct device *dev,
3352                                  struct device_attribute *attr,
3353                                  const char *buf,
3354                                  size_t size)
3355 {
3356         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3357         int ret;
3358
3359         ret = rbd_dev_refresh(rbd_dev);
3360         if (ret)
3361                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3362
3363         return ret < 0 ? ret : size;
3364 }
3365
3366 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3367 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3368 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3369 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3370 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3371 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3372 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3373 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3374 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3375 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3376 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3377
3378 static struct attribute *rbd_attrs[] = {
3379         &dev_attr_size.attr,
3380         &dev_attr_features.attr,
3381         &dev_attr_major.attr,
3382         &dev_attr_client_id.attr,
3383         &dev_attr_pool.attr,
3384         &dev_attr_pool_id.attr,
3385         &dev_attr_name.attr,
3386         &dev_attr_image_id.attr,
3387         &dev_attr_current_snap.attr,
3388         &dev_attr_parent.attr,
3389         &dev_attr_refresh.attr,
3390         NULL
3391 };
3392
3393 static struct attribute_group rbd_attr_group = {
3394         .attrs = rbd_attrs,
3395 };
3396
3397 static const struct attribute_group *rbd_attr_groups[] = {
3398         &rbd_attr_group,
3399         NULL
3400 };
3401
3402 static void rbd_sysfs_dev_release(struct device *dev)
3403 {
3404 }
3405
3406 static struct device_type rbd_device_type = {
3407         .name           = "rbd",
3408         .groups         = rbd_attr_groups,
3409         .release        = rbd_sysfs_dev_release,
3410 };
3411
3412 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3413 {
3414         kref_get(&spec->kref);
3415
3416         return spec;
3417 }
3418
3419 static void rbd_spec_free(struct kref *kref);
3420 static void rbd_spec_put(struct rbd_spec *spec)
3421 {
3422         if (spec)
3423                 kref_put(&spec->kref, rbd_spec_free);
3424 }
3425
3426 static struct rbd_spec *rbd_spec_alloc(void)
3427 {
3428         struct rbd_spec *spec;
3429
3430         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3431         if (!spec)
3432                 return NULL;
3433         kref_init(&spec->kref);
3434
3435         return spec;
3436 }
3437
3438 static void rbd_spec_free(struct kref *kref)
3439 {
3440         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3441
3442         kfree(spec->pool_name);
3443         kfree(spec->image_id);
3444         kfree(spec->image_name);
3445         kfree(spec->snap_name);
3446         kfree(spec);
3447 }
3448
3449 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3450                                 struct rbd_spec *spec)
3451 {
3452         struct rbd_device *rbd_dev;
3453
3454         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3455         if (!rbd_dev)
3456                 return NULL;
3457
3458         spin_lock_init(&rbd_dev->lock);
3459         rbd_dev->flags = 0;
3460         INIT_LIST_HEAD(&rbd_dev->node);
3461         init_rwsem(&rbd_dev->header_rwsem);
3462
3463         rbd_dev->spec = spec;
3464         rbd_dev->rbd_client = rbdc;
3465
3466         /* Initialize the layout used for all rbd requests */
3467
3468         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3469         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3470         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3471         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3472
3473         return rbd_dev;
3474 }
3475
3476 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3477 {
3478         rbd_put_client(rbd_dev->rbd_client);
3479         rbd_spec_put(rbd_dev->spec);
3480         kfree(rbd_dev);
3481 }
3482
3483 /*
3484  * Get the size and object order for an image snapshot, or if
3485  * snap_id is CEPH_NOSNAP, gets this information for the base
3486  * image.
3487  */
3488 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3489                                 u8 *order, u64 *snap_size)
3490 {
3491         __le64 snapid = cpu_to_le64(snap_id);
3492         int ret;
3493         struct {
3494                 u8 order;
3495                 __le64 size;
3496         } __attribute__ ((packed)) size_buf = { 0 };
3497
3498         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3499                                 "rbd", "get_size",
3500                                 &snapid, sizeof (snapid),
3501                                 &size_buf, sizeof (size_buf));
3502         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3503         if (ret < 0)
3504                 return ret;
3505         if (ret < sizeof (size_buf))
3506                 return -ERANGE;
3507
3508         if (order)
3509                 *order = size_buf.order;
3510         *snap_size = le64_to_cpu(size_buf.size);
3511
3512         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3513                 (unsigned long long)snap_id, (unsigned int)*order,
3514                 (unsigned long long)*snap_size);
3515
3516         return 0;
3517 }
3518
3519 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3520 {
3521         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3522                                         &rbd_dev->header.obj_order,
3523                                         &rbd_dev->header.image_size);
3524 }
3525
3526 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3527 {
3528         void *reply_buf;
3529         int ret;
3530         void *p;
3531
3532         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3533         if (!reply_buf)
3534                 return -ENOMEM;
3535
3536         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3537                                 "rbd", "get_object_prefix", NULL, 0,
3538                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3539         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3540         if (ret < 0)
3541                 goto out;
3542
3543         p = reply_buf;
3544         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3545                                                 p + ret, NULL, GFP_NOIO);
3546         ret = 0;
3547
3548         if (IS_ERR(rbd_dev->header.object_prefix)) {
3549                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3550                 rbd_dev->header.object_prefix = NULL;
3551         } else {
3552                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3553         }
3554 out:
3555         kfree(reply_buf);
3556
3557         return ret;
3558 }
3559
3560 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3561                 u64 *snap_features)
3562 {
3563         __le64 snapid = cpu_to_le64(snap_id);
3564         struct {
3565                 __le64 features;
3566                 __le64 incompat;
3567         } __attribute__ ((packed)) features_buf = { 0 };
3568         u64 incompat;
3569         int ret;
3570
3571         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3572                                 "rbd", "get_features",
3573                                 &snapid, sizeof (snapid),
3574                                 &features_buf, sizeof (features_buf));
3575         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3576         if (ret < 0)
3577                 return ret;
3578         if (ret < sizeof (features_buf))
3579                 return -ERANGE;
3580
3581         incompat = le64_to_cpu(features_buf.incompat);
3582         if (incompat & ~RBD_FEATURES_SUPPORTED)
3583                 return -ENXIO;
3584
3585         *snap_features = le64_to_cpu(features_buf.features);
3586
3587         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3588                 (unsigned long long)snap_id,
3589                 (unsigned long long)*snap_features,
3590                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3591
3592         return 0;
3593 }
3594
3595 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3596 {
3597         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3598                                                 &rbd_dev->header.features);
3599 }
3600
3601 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3602 {
3603         struct rbd_spec *parent_spec;
3604         size_t size;
3605         void *reply_buf = NULL;
3606         __le64 snapid;
3607         void *p;
3608         void *end;
3609         char *image_id;
3610         u64 overlap;
3611         int ret;
3612
3613         parent_spec = rbd_spec_alloc();
3614         if (!parent_spec)
3615                 return -ENOMEM;
3616
3617         size = sizeof (__le64) +                                /* pool_id */
3618                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3619                 sizeof (__le64) +                               /* snap_id */
3620                 sizeof (__le64);                                /* overlap */
3621         reply_buf = kmalloc(size, GFP_KERNEL);
3622         if (!reply_buf) {
3623                 ret = -ENOMEM;
3624                 goto out_err;
3625         }
3626
3627         snapid = cpu_to_le64(CEPH_NOSNAP);
3628         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3629                                 "rbd", "get_parent",
3630                                 &snapid, sizeof (snapid),
3631                                 reply_buf, size);
3632         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3633         if (ret < 0)
3634                 goto out_err;
3635
3636         p = reply_buf;
3637         end = reply_buf + ret;
3638         ret = -ERANGE;
3639         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3640         if (parent_spec->pool_id == CEPH_NOPOOL)
3641                 goto out;       /* No parent?  No problem. */
3642
3643         /* The ceph file layout needs to fit pool id in 32 bits */
3644
3645         ret = -EIO;
3646         if (parent_spec->pool_id > (u64)U32_MAX) {
3647                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3648                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3649                 goto out_err;
3650         }
3651
3652         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3653         if (IS_ERR(image_id)) {
3654                 ret = PTR_ERR(image_id);
3655                 goto out_err;
3656         }
3657         parent_spec->image_id = image_id;
3658         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3659         ceph_decode_64_safe(&p, end, overlap, out_err);
3660
3661         rbd_dev->parent_overlap = overlap;
3662         rbd_dev->parent_spec = parent_spec;
3663         parent_spec = NULL;     /* rbd_dev now owns this */
3664 out:
3665         ret = 0;
3666 out_err:
3667         kfree(reply_buf);
3668         rbd_spec_put(parent_spec);
3669
3670         return ret;
3671 }
3672
3673 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3674 {
3675         struct {
3676                 __le64 stripe_unit;
3677                 __le64 stripe_count;
3678         } __attribute__ ((packed)) striping_info_buf = { 0 };
3679         size_t size = sizeof (striping_info_buf);
3680         void *p;
3681         u64 obj_size;
3682         u64 stripe_unit;
3683         u64 stripe_count;
3684         int ret;
3685
3686         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3687                                 "rbd", "get_stripe_unit_count", NULL, 0,
3688                                 (char *)&striping_info_buf, size);
3689         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3690         if (ret < 0)
3691                 return ret;
3692         if (ret < size)
3693                 return -ERANGE;
3694
3695         /*
3696          * We don't actually support the "fancy striping" feature
3697          * (STRIPINGV2) yet, but if the striping sizes are the
3698          * defaults the behavior is the same as before.  So find
3699          * out, and only fail if the image has non-default values.
3700          */
3701         ret = -EINVAL;
3702         obj_size = (u64)1 << rbd_dev->header.obj_order;
3703         p = &striping_info_buf;
3704         stripe_unit = ceph_decode_64(&p);
3705         if (stripe_unit != obj_size) {
3706                 rbd_warn(rbd_dev, "unsupported stripe unit "
3707                                 "(got %llu want %llu)",
3708                                 stripe_unit, obj_size);
3709                 return -EINVAL;
3710         }
3711         stripe_count = ceph_decode_64(&p);
3712         if (stripe_count != 1) {
3713                 rbd_warn(rbd_dev, "unsupported stripe count "
3714                                 "(got %llu want 1)", stripe_count);
3715                 return -EINVAL;
3716         }
3717         rbd_dev->header.stripe_unit = stripe_unit;
3718         rbd_dev->header.stripe_count = stripe_count;
3719
3720         return 0;
3721 }
3722
3723 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3724 {
3725         size_t image_id_size;
3726         char *image_id;
3727         void *p;
3728         void *end;
3729         size_t size;
3730         void *reply_buf = NULL;
3731         size_t len = 0;
3732         char *image_name = NULL;
3733         int ret;
3734
3735         rbd_assert(!rbd_dev->spec->image_name);
3736
3737         len = strlen(rbd_dev->spec->image_id);
3738         image_id_size = sizeof (__le32) + len;
3739         image_id = kmalloc(image_id_size, GFP_KERNEL);
3740         if (!image_id)
3741                 return NULL;
3742
3743         p = image_id;
3744         end = image_id + image_id_size;
3745         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3746
3747         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3748         reply_buf = kmalloc(size, GFP_KERNEL);
3749         if (!reply_buf)
3750                 goto out;
3751
3752         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3753                                 "rbd", "dir_get_name",
3754                                 image_id, image_id_size,
3755                                 reply_buf, size);
3756         if (ret < 0)
3757                 goto out;
3758         p = reply_buf;
3759         end = reply_buf + ret;
3760
3761         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3762         if (IS_ERR(image_name))
3763                 image_name = NULL;
3764         else
3765                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3766 out:
3767         kfree(reply_buf);
3768         kfree(image_id);
3769
3770         return image_name;
3771 }
3772
3773 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3774 {
3775         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3776         const char *snap_name;
3777         u32 which = 0;
3778
3779         /* Skip over names until we find the one we are looking for */
3780
3781         snap_name = rbd_dev->header.snap_names;
3782         while (which < snapc->num_snaps) {
3783                 if (!strcmp(name, snap_name))
3784                         return snapc->snaps[which];
3785                 snap_name += strlen(snap_name) + 1;
3786                 which++;
3787         }
3788         return CEPH_NOSNAP;
3789 }
3790
3791 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3792 {
3793         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3794         u32 which;
3795         bool found = false;
3796         u64 snap_id;
3797
3798         for (which = 0; !found && which < snapc->num_snaps; which++) {
3799                 const char *snap_name;
3800
3801                 snap_id = snapc->snaps[which];
3802                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3803                 if (IS_ERR(snap_name))
3804                         break;
3805                 found = !strcmp(name, snap_name);
3806                 kfree(snap_name);
3807         }
3808         return found ? snap_id : CEPH_NOSNAP;
3809 }
3810
3811 /*
3812  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3813  * no snapshot by that name is found, or if an error occurs.
3814  */
3815 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3816 {
3817         if (rbd_dev->image_format == 1)
3818                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3819
3820         return rbd_v2_snap_id_by_name(rbd_dev, name);
3821 }
3822
3823 /*
3824  * When an rbd image has a parent image, it is identified by the
3825  * pool, image, and snapshot ids (not names).  This function fills
3826  * in the names for those ids.  (It's OK if we can't figure out the
3827  * name for an image id, but the pool and snapshot ids should always
3828  * exist and have names.)  All names in an rbd spec are dynamically
3829  * allocated.
3830  *
3831  * When an image being mapped (not a parent) is probed, we have the
3832  * pool name and pool id, image name and image id, and the snapshot
3833  * name.  The only thing we're missing is the snapshot id.
3834  */
3835 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3836 {
3837         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3838         struct rbd_spec *spec = rbd_dev->spec;
3839         const char *pool_name;
3840         const char *image_name;
3841         const char *snap_name;
3842         int ret;
3843
3844         /*
3845          * An image being mapped will have the pool name (etc.), but
3846          * we need to look up the snapshot id.
3847          */
3848         if (spec->pool_name) {
3849                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3850                         u64 snap_id;
3851
3852                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3853                         if (snap_id == CEPH_NOSNAP)
3854                                 return -ENOENT;
3855                         spec->snap_id = snap_id;
3856                 } else {
3857                         spec->snap_id = CEPH_NOSNAP;
3858                 }
3859
3860                 return 0;
3861         }
3862
3863         /* Get the pool name; we have to make our own copy of this */
3864
3865         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3866         if (!pool_name) {
3867                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3868                 return -EIO;
3869         }
3870         pool_name = kstrdup(pool_name, GFP_KERNEL);
3871         if (!pool_name)
3872                 return -ENOMEM;
3873
3874         /* Fetch the image name; tolerate failure here */
3875
3876         image_name = rbd_dev_image_name(rbd_dev);
3877         if (!image_name)
3878                 rbd_warn(rbd_dev, "unable to get image name");
3879
3880         /* Look up the snapshot name, and make a copy */
3881
3882         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3883         if (!snap_name) {
3884                 ret = -ENOMEM;
3885                 goto out_err;
3886         }
3887
3888         spec->pool_name = pool_name;
3889         spec->image_name = image_name;
3890         spec->snap_name = snap_name;
3891
3892         return 0;
3893 out_err:
3894         kfree(image_name);
3895         kfree(pool_name);
3896
3897         return ret;
3898 }
3899
3900 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3901 {
3902         size_t size;
3903         int ret;
3904         void *reply_buf;
3905         void *p;
3906         void *end;
3907         u64 seq;
3908         u32 snap_count;
3909         struct ceph_snap_context *snapc;
3910         u32 i;
3911
3912         /*
3913          * We'll need room for the seq value (maximum snapshot id),
3914          * snapshot count, and array of that many snapshot ids.
3915          * For now we have a fixed upper limit on the number we're
3916          * prepared to receive.
3917          */
3918         size = sizeof (__le64) + sizeof (__le32) +
3919                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3920         reply_buf = kzalloc(size, GFP_KERNEL);
3921         if (!reply_buf)
3922                 return -ENOMEM;
3923
3924         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3925                                 "rbd", "get_snapcontext", NULL, 0,
3926                                 reply_buf, size);
3927         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3928         if (ret < 0)
3929                 goto out;
3930
3931         p = reply_buf;
3932         end = reply_buf + ret;
3933         ret = -ERANGE;
3934         ceph_decode_64_safe(&p, end, seq, out);
3935         ceph_decode_32_safe(&p, end, snap_count, out);
3936
3937         /*
3938          * Make sure the reported number of snapshot ids wouldn't go
3939          * beyond the end of our buffer.  But before checking that,
3940          * make sure the computed size of the snapshot context we
3941          * allocate is representable in a size_t.
3942          */
3943         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3944                                  / sizeof (u64)) {
3945                 ret = -EINVAL;
3946                 goto out;
3947         }
3948         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3949                 goto out;
3950         ret = 0;
3951
3952         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3953         if (!snapc) {
3954                 ret = -ENOMEM;
3955                 goto out;
3956         }
3957         snapc->seq = seq;
3958         for (i = 0; i < snap_count; i++)
3959                 snapc->snaps[i] = ceph_decode_64(&p);
3960
3961         ceph_put_snap_context(rbd_dev->header.snapc);
3962         rbd_dev->header.snapc = snapc;
3963
3964         dout("  snap context seq = %llu, snap_count = %u\n",
3965                 (unsigned long long)seq, (unsigned int)snap_count);
3966 out:
3967         kfree(reply_buf);
3968
3969         return ret;
3970 }
3971
3972 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3973                                         u64 snap_id)
3974 {
3975         size_t size;
3976         void *reply_buf;
3977         __le64 snapid;
3978         int ret;
3979         void *p;
3980         void *end;
3981         char *snap_name;
3982
3983         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3984         reply_buf = kmalloc(size, GFP_KERNEL);
3985         if (!reply_buf)
3986                 return ERR_PTR(-ENOMEM);
3987
3988         snapid = cpu_to_le64(snap_id);
3989         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3990                                 "rbd", "get_snapshot_name",
3991                                 &snapid, sizeof (snapid),
3992                                 reply_buf, size);
3993         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3994         if (ret < 0) {
3995                 snap_name = ERR_PTR(ret);
3996                 goto out;
3997         }
3998
3999         p = reply_buf;
4000         end = reply_buf + ret;
4001         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4002         if (IS_ERR(snap_name))
4003                 goto out;
4004
4005         dout("  snap_id 0x%016llx snap_name = %s\n",
4006                 (unsigned long long)snap_id, snap_name);
4007 out:
4008         kfree(reply_buf);
4009
4010         return snap_name;
4011 }
4012
4013 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4014 {
4015         bool first_time = rbd_dev->header.object_prefix == NULL;
4016         int ret;
4017
4018         down_write(&rbd_dev->header_rwsem);
4019
4020         if (first_time) {
4021                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4022                 if (ret)
4023                         goto out;
4024         }
4025
4026         ret = rbd_dev_v2_image_size(rbd_dev);
4027         if (ret)
4028                 goto out;
4029         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4030                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4031                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4032
4033         ret = rbd_dev_v2_snap_context(rbd_dev);
4034         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4035         if (ret)
4036                 goto out;
4037 out:
4038         up_write(&rbd_dev->header_rwsem);
4039
4040         return ret;
4041 }
4042
4043 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4044 {
4045         struct device *dev;
4046         int ret;
4047
4048         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4049
4050         dev = &rbd_dev->dev;
4051         dev->bus = &rbd_bus_type;
4052         dev->type = &rbd_device_type;
4053         dev->parent = &rbd_root_dev;
4054         dev->release = rbd_dev_device_release;
4055         dev_set_name(dev, "%d", rbd_dev->dev_id);
4056         ret = device_register(dev);
4057
4058         mutex_unlock(&ctl_mutex);
4059
4060         return ret;
4061 }
4062
4063 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4064 {
4065         device_unregister(&rbd_dev->dev);
4066 }
4067
4068 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4069
4070 /*
4071  * Get a unique rbd identifier for the given new rbd_dev, and add
4072  * the rbd_dev to the global list.  The minimum rbd id is 1.
4073  */
4074 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4075 {
4076         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4077
4078         spin_lock(&rbd_dev_list_lock);
4079         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4080         spin_unlock(&rbd_dev_list_lock);
4081         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4082                 (unsigned long long) rbd_dev->dev_id);
4083 }
4084
4085 /*
4086  * Remove an rbd_dev from the global list, and record that its
4087  * identifier is no longer in use.
4088  */
4089 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4090 {
4091         struct list_head *tmp;
4092         int rbd_id = rbd_dev->dev_id;
4093         int max_id;
4094
4095         rbd_assert(rbd_id > 0);
4096
4097         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4098                 (unsigned long long) rbd_dev->dev_id);
4099         spin_lock(&rbd_dev_list_lock);
4100         list_del_init(&rbd_dev->node);
4101
4102         /*
4103          * If the id being "put" is not the current maximum, there
4104          * is nothing special we need to do.
4105          */
4106         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4107                 spin_unlock(&rbd_dev_list_lock);
4108                 return;
4109         }
4110
4111         /*
4112          * We need to update the current maximum id.  Search the
4113          * list to find out what it is.  We're more likely to find
4114          * the maximum at the end, so search the list backward.
4115          */
4116         max_id = 0;
4117         list_for_each_prev(tmp, &rbd_dev_list) {
4118                 struct rbd_device *rbd_dev;
4119
4120                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4121                 if (rbd_dev->dev_id > max_id)
4122                         max_id = rbd_dev->dev_id;
4123         }
4124         spin_unlock(&rbd_dev_list_lock);
4125
4126         /*
4127          * The max id could have been updated by rbd_dev_id_get(), in
4128          * which case it now accurately reflects the new maximum.
4129          * Be careful not to overwrite the maximum value in that
4130          * case.
4131          */
4132         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4133         dout("  max dev id has been reset\n");
4134 }
4135
4136 /*
4137  * Skips over white space at *buf, and updates *buf to point to the
4138  * first found non-space character (if any). Returns the length of
4139  * the token (string of non-white space characters) found.  Note
4140  * that *buf must be terminated with '\0'.
4141  */
4142 static inline size_t next_token(const char **buf)
4143 {
4144         /*
4145         * These are the characters that produce nonzero for
4146         * isspace() in the "C" and "POSIX" locales.
4147         */
4148         const char *spaces = " \f\n\r\t\v";
4149
4150         *buf += strspn(*buf, spaces);   /* Find start of token */
4151
4152         return strcspn(*buf, spaces);   /* Return token length */
4153 }
4154
4155 /*
4156  * Finds the next token in *buf, and if the provided token buffer is
4157  * big enough, copies the found token into it.  The result, if
4158  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4159  * must be terminated with '\0' on entry.
4160  *
4161  * Returns the length of the token found (not including the '\0').
4162  * Return value will be 0 if no token is found, and it will be >=
4163  * token_size if the token would not fit.
4164  *
4165  * The *buf pointer will be updated to point beyond the end of the
4166  * found token.  Note that this occurs even if the token buffer is
4167  * too small to hold it.
4168  */
4169 static inline size_t copy_token(const char **buf,
4170                                 char *token,
4171                                 size_t token_size)
4172 {
4173         size_t len;
4174
4175         len = next_token(buf);
4176         if (len < token_size) {
4177                 memcpy(token, *buf, len);
4178                 *(token + len) = '\0';
4179         }
4180         *buf += len;
4181
4182         return len;
4183 }
4184
4185 /*
4186  * Finds the next token in *buf, dynamically allocates a buffer big
4187  * enough to hold a copy of it, and copies the token into the new
4188  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4189  * that a duplicate buffer is created even for a zero-length token.
4190  *
4191  * Returns a pointer to the newly-allocated duplicate, or a null
4192  * pointer if memory for the duplicate was not available.  If
4193  * the lenp argument is a non-null pointer, the length of the token
4194  * (not including the '\0') is returned in *lenp.
4195  *
4196  * If successful, the *buf pointer will be updated to point beyond
4197  * the end of the found token.
4198  *
4199  * Note: uses GFP_KERNEL for allocation.
4200  */
4201 static inline char *dup_token(const char **buf, size_t *lenp)
4202 {
4203         char *dup;
4204         size_t len;
4205
4206         len = next_token(buf);
4207         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4208         if (!dup)
4209                 return NULL;
4210         *(dup + len) = '\0';
4211         *buf += len;
4212
4213         if (lenp)
4214                 *lenp = len;
4215
4216         return dup;
4217 }
4218
4219 /*
4220  * Parse the options provided for an "rbd add" (i.e., rbd image
4221  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4222  * and the data written is passed here via a NUL-terminated buffer.
4223  * Returns 0 if successful or an error code otherwise.
4224  *
4225  * The information extracted from these options is recorded in
4226  * the other parameters which return dynamically-allocated
4227  * structures:
4228  *  ceph_opts
4229  *      The address of a pointer that will refer to a ceph options
4230  *      structure.  Caller must release the returned pointer using
4231  *      ceph_destroy_options() when it is no longer needed.
4232  *  rbd_opts
4233  *      Address of an rbd options pointer.  Fully initialized by
4234  *      this function; caller must release with kfree().
4235  *  spec
4236  *      Address of an rbd image specification pointer.  Fully
4237  *      initialized by this function based on parsed options.
4238  *      Caller must release with rbd_spec_put().
4239  *
4240  * The options passed take this form:
4241  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4242  * where:
4243  *  <mon_addrs>
4244  *      A comma-separated list of one or more monitor addresses.
4245  *      A monitor address is an ip address, optionally followed
4246  *      by a port number (separated by a colon).
4247  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4248  *  <options>
4249  *      A comma-separated list of ceph and/or rbd options.
4250  *  <pool_name>
4251  *      The name of the rados pool containing the rbd image.
4252  *  <image_name>
4253  *      The name of the image in that pool to map.
4254  *  <snap_id>
4255  *      An optional snapshot id.  If provided, the mapping will
4256  *      present data from the image at the time that snapshot was
4257  *      created.  The image head is used if no snapshot id is
4258  *      provided.  Snapshot mappings are always read-only.
4259  */
4260 static int rbd_add_parse_args(const char *buf,
4261                                 struct ceph_options **ceph_opts,
4262                                 struct rbd_options **opts,
4263                                 struct rbd_spec **rbd_spec)
4264 {
4265         size_t len;
4266         char *options;
4267         const char *mon_addrs;
4268         char *snap_name;
4269         size_t mon_addrs_size;
4270         struct rbd_spec *spec = NULL;
4271         struct rbd_options *rbd_opts = NULL;
4272         struct ceph_options *copts;
4273         int ret;
4274
4275         /* The first four tokens are required */
4276
4277         len = next_token(&buf);
4278         if (!len) {
4279                 rbd_warn(NULL, "no monitor address(es) provided");
4280                 return -EINVAL;
4281         }
4282         mon_addrs = buf;
4283         mon_addrs_size = len + 1;
4284         buf += len;
4285
4286         ret = -EINVAL;
4287         options = dup_token(&buf, NULL);
4288         if (!options)
4289                 return -ENOMEM;
4290         if (!*options) {
4291                 rbd_warn(NULL, "no options provided");
4292                 goto out_err;
4293         }
4294
4295         spec = rbd_spec_alloc();
4296         if (!spec)
4297                 goto out_mem;
4298
4299         spec->pool_name = dup_token(&buf, NULL);
4300         if (!spec->pool_name)
4301                 goto out_mem;
4302         if (!*spec->pool_name) {
4303                 rbd_warn(NULL, "no pool name provided");
4304                 goto out_err;
4305         }
4306
4307         spec->image_name = dup_token(&buf, NULL);
4308         if (!spec->image_name)
4309                 goto out_mem;
4310         if (!*spec->image_name) {
4311                 rbd_warn(NULL, "no image name provided");
4312                 goto out_err;
4313         }
4314
4315         /*
4316          * Snapshot name is optional; default is to use "-"
4317          * (indicating the head/no snapshot).
4318          */
4319         len = next_token(&buf);
4320         if (!len) {
4321                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4322                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4323         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4324                 ret = -ENAMETOOLONG;
4325                 goto out_err;
4326         }
4327         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4328         if (!snap_name)
4329                 goto out_mem;
4330         *(snap_name + len) = '\0';
4331         spec->snap_name = snap_name;
4332
4333         /* Initialize all rbd options to the defaults */
4334
4335         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4336         if (!rbd_opts)
4337                 goto out_mem;
4338
4339         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4340
4341         copts = ceph_parse_options(options, mon_addrs,
4342                                         mon_addrs + mon_addrs_size - 1,
4343                                         parse_rbd_opts_token, rbd_opts);
4344         if (IS_ERR(copts)) {
4345                 ret = PTR_ERR(copts);
4346                 goto out_err;
4347         }
4348         kfree(options);
4349
4350         *ceph_opts = copts;
4351         *opts = rbd_opts;
4352         *rbd_spec = spec;
4353
4354         return 0;
4355 out_mem:
4356         ret = -ENOMEM;
4357 out_err:
4358         kfree(rbd_opts);
4359         rbd_spec_put(spec);
4360         kfree(options);
4361
4362         return ret;
4363 }
4364
4365 /*
4366  * An rbd format 2 image has a unique identifier, distinct from the
4367  * name given to it by the user.  Internally, that identifier is
4368  * what's used to specify the names of objects related to the image.
4369  *
4370  * A special "rbd id" object is used to map an rbd image name to its
4371  * id.  If that object doesn't exist, then there is no v2 rbd image
4372  * with the supplied name.
4373  *
4374  * This function will record the given rbd_dev's image_id field if
4375  * it can be determined, and in that case will return 0.  If any
4376  * errors occur a negative errno will be returned and the rbd_dev's
4377  * image_id field will be unchanged (and should be NULL).
4378  */
4379 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4380 {
4381         int ret;
4382         size_t size;
4383         char *object_name;
4384         void *response;
4385         char *image_id;
4386
4387         /*
4388          * When probing a parent image, the image id is already
4389          * known (and the image name likely is not).  There's no
4390          * need to fetch the image id again in this case.  We
4391          * do still need to set the image format though.
4392          */
4393         if (rbd_dev->spec->image_id) {
4394                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4395
4396                 return 0;
4397         }
4398
4399         /*
4400          * First, see if the format 2 image id file exists, and if
4401          * so, get the image's persistent id from it.
4402          */
4403         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4404         object_name = kmalloc(size, GFP_NOIO);
4405         if (!object_name)
4406                 return -ENOMEM;
4407         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4408         dout("rbd id object name is %s\n", object_name);
4409
4410         /* Response will be an encoded string, which includes a length */
4411
4412         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4413         response = kzalloc(size, GFP_NOIO);
4414         if (!response) {
4415                 ret = -ENOMEM;
4416                 goto out;
4417         }
4418
4419         /* If it doesn't exist we'll assume it's a format 1 image */
4420
4421         ret = rbd_obj_method_sync(rbd_dev, object_name,
4422                                 "rbd", "get_id", NULL, 0,
4423                                 response, RBD_IMAGE_ID_LEN_MAX);
4424         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4425         if (ret == -ENOENT) {
4426                 image_id = kstrdup("", GFP_KERNEL);
4427                 ret = image_id ? 0 : -ENOMEM;
4428                 if (!ret)
4429                         rbd_dev->image_format = 1;
4430         } else if (ret > sizeof (__le32)) {
4431                 void *p = response;
4432
4433                 image_id = ceph_extract_encoded_string(&p, p + ret,
4434                                                 NULL, GFP_NOIO);
4435                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4436                 if (!ret)
4437                         rbd_dev->image_format = 2;
4438         } else {
4439                 ret = -EINVAL;
4440         }
4441
4442         if (!ret) {
4443                 rbd_dev->spec->image_id = image_id;
4444                 dout("image_id is %s\n", image_id);
4445         }
4446 out:
4447         kfree(response);
4448         kfree(object_name);
4449
4450         return ret;
4451 }
4452
4453 /* Undo whatever state changes are made by v1 or v2 image probe */
4454
4455 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4456 {
4457         struct rbd_image_header *header;
4458
4459         rbd_dev_remove_parent(rbd_dev);
4460         rbd_spec_put(rbd_dev->parent_spec);
4461         rbd_dev->parent_spec = NULL;
4462         rbd_dev->parent_overlap = 0;
4463
4464         /* Free dynamic fields from the header, then zero it out */
4465
4466         header = &rbd_dev->header;
4467         ceph_put_snap_context(header->snapc);
4468         kfree(header->snap_sizes);
4469         kfree(header->snap_names);
4470         kfree(header->object_prefix);
4471         memset(header, 0, sizeof (*header));
4472 }
4473
4474 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4475 {
4476         int ret;
4477
4478         ret = rbd_dev_v2_object_prefix(rbd_dev);
4479         if (ret)
4480                 goto out_err;
4481
4482         /*
4483          * Get the and check features for the image.  Currently the
4484          * features are assumed to never change.
4485          */
4486         ret = rbd_dev_v2_features(rbd_dev);
4487         if (ret)
4488                 goto out_err;
4489
4490         /* If the image supports layering, get the parent info */
4491
4492         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4493                 ret = rbd_dev_v2_parent_info(rbd_dev);
4494                 if (ret)
4495                         goto out_err;
4496                 /*
4497                  * Print a warning if this image has a parent.
4498                  * Don't print it if the image now being probed
4499                  * is itself a parent.  We can tell at this point
4500                  * because we won't know its pool name yet (just its
4501                  * pool id).
4502                  */
4503                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4504                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4505                                         "is EXPERIMENTAL!");
4506         }
4507
4508         /* If the image supports fancy striping, get its parameters */
4509
4510         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4511                 ret = rbd_dev_v2_striping_info(rbd_dev);
4512                 if (ret < 0)
4513                         goto out_err;
4514         }
4515         /* No support for crypto and compression type format 2 images */
4516
4517         return 0;
4518 out_err:
4519         rbd_dev->parent_overlap = 0;
4520         rbd_spec_put(rbd_dev->parent_spec);
4521         rbd_dev->parent_spec = NULL;
4522         kfree(rbd_dev->header_name);
4523         rbd_dev->header_name = NULL;
4524         kfree(rbd_dev->header.object_prefix);
4525         rbd_dev->header.object_prefix = NULL;
4526
4527         return ret;
4528 }
4529
4530 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4531 {
4532         struct rbd_device *parent = NULL;
4533         struct rbd_spec *parent_spec;
4534         struct rbd_client *rbdc;
4535         int ret;
4536
4537         if (!rbd_dev->parent_spec)
4538                 return 0;
4539         /*
4540          * We need to pass a reference to the client and the parent
4541          * spec when creating the parent rbd_dev.  Images related by
4542          * parent/child relationships always share both.
4543          */
4544         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4545         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4546
4547         ret = -ENOMEM;
4548         parent = rbd_dev_create(rbdc, parent_spec);
4549         if (!parent)
4550                 goto out_err;
4551
4552         ret = rbd_dev_image_probe(parent, true);
4553         if (ret < 0)
4554                 goto out_err;
4555         rbd_dev->parent = parent;
4556
4557         return 0;
4558 out_err:
4559         if (parent) {
4560                 rbd_spec_put(rbd_dev->parent_spec);
4561                 kfree(rbd_dev->header_name);
4562                 rbd_dev_destroy(parent);
4563         } else {
4564                 rbd_put_client(rbdc);
4565                 rbd_spec_put(parent_spec);
4566         }
4567
4568         return ret;
4569 }
4570
4571 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4572 {
4573         int ret;
4574
4575         /* generate unique id: find highest unique id, add one */
4576         rbd_dev_id_get(rbd_dev);
4577
4578         /* Fill in the device name, now that we have its id. */
4579         BUILD_BUG_ON(DEV_NAME_LEN
4580                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4581         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4582
4583         /* Get our block major device number. */
4584
4585         ret = register_blkdev(0, rbd_dev->name);
4586         if (ret < 0)
4587                 goto err_out_id;
4588         rbd_dev->major = ret;
4589
4590         /* Set up the blkdev mapping. */
4591
4592         ret = rbd_init_disk(rbd_dev);
4593         if (ret)
4594                 goto err_out_blkdev;
4595
4596         ret = rbd_dev_mapping_set(rbd_dev);
4597         if (ret)
4598                 goto err_out_disk;
4599         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4600
4601         ret = rbd_bus_add_dev(rbd_dev);
4602         if (ret)
4603                 goto err_out_mapping;
4604
4605         /* Everything's ready.  Announce the disk to the world. */
4606
4607         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4608         add_disk(rbd_dev->disk);
4609
4610         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4611                 (unsigned long long) rbd_dev->mapping.size);
4612
4613         return ret;
4614
4615 err_out_mapping:
4616         rbd_dev_mapping_clear(rbd_dev);
4617 err_out_disk:
4618         rbd_free_disk(rbd_dev);
4619 err_out_blkdev:
4620         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4621 err_out_id:
4622         rbd_dev_id_put(rbd_dev);
4623         rbd_dev_mapping_clear(rbd_dev);
4624
4625         return ret;
4626 }
4627
4628 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4629 {
4630         struct rbd_spec *spec = rbd_dev->spec;
4631         size_t size;
4632
4633         /* Record the header object name for this rbd image. */
4634
4635         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4636
4637         if (rbd_dev->image_format == 1)
4638                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4639         else
4640                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4641
4642         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4643         if (!rbd_dev->header_name)
4644                 return -ENOMEM;
4645
4646         if (rbd_dev->image_format == 1)
4647                 sprintf(rbd_dev->header_name, "%s%s",
4648                         spec->image_name, RBD_SUFFIX);
4649         else
4650                 sprintf(rbd_dev->header_name, "%s%s",
4651                         RBD_HEADER_PREFIX, spec->image_id);
4652         return 0;
4653 }
4654
4655 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4656 {
4657         int ret;
4658
4659         rbd_dev_unprobe(rbd_dev);
4660         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4661         if (ret)
4662                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4663         kfree(rbd_dev->header_name);
4664         rbd_dev->header_name = NULL;
4665         rbd_dev->image_format = 0;
4666         kfree(rbd_dev->spec->image_id);
4667         rbd_dev->spec->image_id = NULL;
4668
4669         rbd_dev_destroy(rbd_dev);
4670 }
4671
4672 /*
4673  * Probe for the existence of the header object for the given rbd
4674  * device.  For format 2 images this includes determining the image
4675  * id.
4676  */
4677 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4678 {
4679         int ret;
4680         int tmp;
4681
4682         /*
4683          * Get the id from the image id object.  If it's not a
4684          * format 2 image, we'll get ENOENT back, and we'll assume
4685          * it's a format 1 image.
4686          */
4687         ret = rbd_dev_image_id(rbd_dev);
4688         if (ret)
4689                 return ret;
4690         rbd_assert(rbd_dev->spec->image_id);
4691         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4692
4693         ret = rbd_dev_header_name(rbd_dev);
4694         if (ret)
4695                 goto err_out_format;
4696
4697         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4698         if (ret)
4699                 goto out_header_name;
4700
4701         if (rbd_dev->image_format == 1)
4702                 ret = rbd_dev_v1_header_info(rbd_dev);
4703         else
4704                 ret = rbd_dev_v2_header_info(rbd_dev);
4705         if (ret)
4706                 goto err_out_watch;
4707
4708         ret = rbd_dev_spec_update(rbd_dev);
4709         if (ret)
4710                 goto err_out_probe;
4711
4712         /* If we are mapping a snapshot it must be marked read-only */
4713
4714         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4715                 read_only = true;
4716         rbd_dev->mapping.read_only = read_only;
4717
4718         ret = rbd_dev_probe_parent(rbd_dev);
4719         if (ret)
4720                 goto err_out_probe;
4721
4722         dout("discovered format %u image, header name is %s\n",
4723                 rbd_dev->image_format, rbd_dev->header_name);
4724
4725         return 0;
4726 err_out_probe:
4727         rbd_dev_unprobe(rbd_dev);
4728 err_out_watch:
4729         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4730         if (tmp)
4731                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4732 out_header_name:
4733         kfree(rbd_dev->header_name);
4734         rbd_dev->header_name = NULL;
4735 err_out_format:
4736         rbd_dev->image_format = 0;
4737         kfree(rbd_dev->spec->image_id);
4738         rbd_dev->spec->image_id = NULL;
4739
4740         dout("probe failed, returning %d\n", ret);
4741
4742         return ret;
4743 }
4744
4745 static ssize_t rbd_add(struct bus_type *bus,
4746                        const char *buf,
4747                        size_t count)
4748 {
4749         struct rbd_device *rbd_dev = NULL;
4750         struct ceph_options *ceph_opts = NULL;
4751         struct rbd_options *rbd_opts = NULL;
4752         struct rbd_spec *spec = NULL;
4753         struct rbd_client *rbdc;
4754         struct ceph_osd_client *osdc;
4755         bool read_only;
4756         int rc = -ENOMEM;
4757
4758         if (!try_module_get(THIS_MODULE))
4759                 return -ENODEV;
4760
4761         /* parse add command */
4762         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4763         if (rc < 0)
4764                 goto err_out_module;
4765         read_only = rbd_opts->read_only;
4766         kfree(rbd_opts);
4767         rbd_opts = NULL;        /* done with this */
4768
4769         rbdc = rbd_get_client(ceph_opts);
4770         if (IS_ERR(rbdc)) {
4771                 rc = PTR_ERR(rbdc);
4772                 goto err_out_args;
4773         }
4774         ceph_opts = NULL;       /* rbd_dev client now owns this */
4775
4776         /* pick the pool */
4777         osdc = &rbdc->client->osdc;
4778         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4779         if (rc < 0)
4780                 goto err_out_client;
4781         spec->pool_id = (u64)rc;
4782
4783         /* The ceph file layout needs to fit pool id in 32 bits */
4784
4785         if (spec->pool_id > (u64)U32_MAX) {
4786                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4787                                 (unsigned long long)spec->pool_id, U32_MAX);
4788                 rc = -EIO;
4789                 goto err_out_client;
4790         }
4791
4792         rbd_dev = rbd_dev_create(rbdc, spec);
4793         if (!rbd_dev)
4794                 goto err_out_client;
4795         rbdc = NULL;            /* rbd_dev now owns this */
4796         spec = NULL;            /* rbd_dev now owns this */
4797
4798         rc = rbd_dev_image_probe(rbd_dev, read_only);
4799         if (rc < 0)
4800                 goto err_out_rbd_dev;
4801
4802         rc = rbd_dev_device_setup(rbd_dev);
4803         if (!rc)
4804                 return count;
4805
4806         rbd_dev_image_release(rbd_dev);
4807 err_out_rbd_dev:
4808         rbd_dev_destroy(rbd_dev);
4809 err_out_client:
4810         rbd_put_client(rbdc);
4811 err_out_args:
4812         if (ceph_opts)
4813                 ceph_destroy_options(ceph_opts);
4814         kfree(rbd_opts);
4815         rbd_spec_put(spec);
4816 err_out_module:
4817         module_put(THIS_MODULE);
4818
4819         dout("Error adding device %s\n", buf);
4820
4821         return (ssize_t)rc;
4822 }
4823
4824 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4825 {
4826         struct list_head *tmp;
4827         struct rbd_device *rbd_dev;
4828
4829         spin_lock(&rbd_dev_list_lock);
4830         list_for_each(tmp, &rbd_dev_list) {
4831                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4832                 if (rbd_dev->dev_id == dev_id) {
4833                         spin_unlock(&rbd_dev_list_lock);
4834                         return rbd_dev;
4835                 }
4836         }
4837         spin_unlock(&rbd_dev_list_lock);
4838         return NULL;
4839 }
4840
4841 static void rbd_dev_device_release(struct device *dev)
4842 {
4843         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4844
4845         rbd_free_disk(rbd_dev);
4846         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4847         rbd_dev_mapping_clear(rbd_dev);
4848         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4849         rbd_dev->major = 0;
4850         rbd_dev_id_put(rbd_dev);
4851         rbd_dev_mapping_clear(rbd_dev);
4852 }
4853
4854 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4855 {
4856         while (rbd_dev->parent) {
4857                 struct rbd_device *first = rbd_dev;
4858                 struct rbd_device *second = first->parent;
4859                 struct rbd_device *third;
4860
4861                 /*
4862                  * Follow to the parent with no grandparent and
4863                  * remove it.
4864                  */
4865                 while (second && (third = second->parent)) {
4866                         first = second;
4867                         second = third;
4868                 }
4869                 rbd_assert(second);
4870                 rbd_dev_image_release(second);
4871                 first->parent = NULL;
4872                 first->parent_overlap = 0;
4873
4874                 rbd_assert(first->parent_spec);
4875                 rbd_spec_put(first->parent_spec);
4876                 first->parent_spec = NULL;
4877         }
4878 }
4879
4880 static ssize_t rbd_remove(struct bus_type *bus,
4881                           const char *buf,
4882                           size_t count)
4883 {
4884         struct rbd_device *rbd_dev = NULL;
4885         int target_id;
4886         unsigned long ul;
4887         int ret;
4888
4889         ret = strict_strtoul(buf, 10, &ul);
4890         if (ret)
4891                 return ret;
4892
4893         /* convert to int; abort if we lost anything in the conversion */
4894         target_id = (int) ul;
4895         if (target_id != ul)
4896                 return -EINVAL;
4897
4898         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4899
4900         rbd_dev = __rbd_get_dev(target_id);
4901         if (!rbd_dev) {
4902                 ret = -ENOENT;
4903                 goto done;
4904         }
4905
4906         spin_lock_irq(&rbd_dev->lock);
4907         if (rbd_dev->open_count)
4908                 ret = -EBUSY;
4909         else
4910                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4911         spin_unlock_irq(&rbd_dev->lock);
4912         if (ret < 0)
4913                 goto done;
4914         ret = count;
4915         rbd_bus_del_dev(rbd_dev);
4916         rbd_dev_image_release(rbd_dev);
4917         module_put(THIS_MODULE);
4918 done:
4919         mutex_unlock(&ctl_mutex);
4920
4921         return ret;
4922 }
4923
4924 /*
4925  * create control files in sysfs
4926  * /sys/bus/rbd/...
4927  */
4928 static int rbd_sysfs_init(void)
4929 {
4930         int ret;
4931
4932         ret = device_register(&rbd_root_dev);
4933         if (ret < 0)
4934                 return ret;
4935
4936         ret = bus_register(&rbd_bus_type);
4937         if (ret < 0)
4938                 device_unregister(&rbd_root_dev);
4939
4940         return ret;
4941 }
4942
4943 static void rbd_sysfs_cleanup(void)
4944 {
4945         bus_unregister(&rbd_bus_type);
4946         device_unregister(&rbd_root_dev);
4947 }
4948
4949 static int rbd_slab_init(void)
4950 {
4951         rbd_assert(!rbd_img_request_cache);
4952         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4953                                         sizeof (struct rbd_img_request),
4954                                         __alignof__(struct rbd_img_request),
4955                                         0, NULL);
4956         if (!rbd_img_request_cache)
4957                 return -ENOMEM;
4958
4959         rbd_assert(!rbd_obj_request_cache);
4960         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4961                                         sizeof (struct rbd_obj_request),
4962                                         __alignof__(struct rbd_obj_request),
4963                                         0, NULL);
4964         if (!rbd_obj_request_cache)
4965                 goto out_err;
4966
4967         rbd_assert(!rbd_segment_name_cache);
4968         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4969                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4970         if (rbd_segment_name_cache)
4971                 return 0;
4972 out_err:
4973         if (rbd_obj_request_cache) {
4974                 kmem_cache_destroy(rbd_obj_request_cache);
4975                 rbd_obj_request_cache = NULL;
4976         }
4977
4978         kmem_cache_destroy(rbd_img_request_cache);
4979         rbd_img_request_cache = NULL;
4980
4981         return -ENOMEM;
4982 }
4983
4984 static void rbd_slab_exit(void)
4985 {
4986         rbd_assert(rbd_segment_name_cache);
4987         kmem_cache_destroy(rbd_segment_name_cache);
4988         rbd_segment_name_cache = NULL;
4989
4990         rbd_assert(rbd_obj_request_cache);
4991         kmem_cache_destroy(rbd_obj_request_cache);
4992         rbd_obj_request_cache = NULL;
4993
4994         rbd_assert(rbd_img_request_cache);
4995         kmem_cache_destroy(rbd_img_request_cache);
4996         rbd_img_request_cache = NULL;
4997 }
4998
4999 static int __init rbd_init(void)
5000 {
5001         int rc;
5002
5003         if (!libceph_compatible(NULL)) {
5004                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5005
5006                 return -EINVAL;
5007         }
5008         rc = rbd_slab_init();
5009         if (rc)
5010                 return rc;
5011         rc = rbd_sysfs_init();
5012         if (rc)
5013                 rbd_slab_exit();
5014         else
5015                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5016
5017         return rc;
5018 }
5019
5020 static void __exit rbd_exit(void)
5021 {
5022         rbd_sysfs_cleanup();
5023         rbd_slab_exit();
5024 }
5025
5026 module_init(rbd_init);
5027 module_exit(rbd_exit);
5028
5029 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5030 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5031 MODULE_DESCRIPTION("rados block device");
5032
5033 /* following authorship retained from original osdblk.c */
5034 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5035
5036 MODULE_LICENSE("GPL");