]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: fix an incorrect assertion condition
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431                                         u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433                                 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435                 u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476
477         return 0;
478 }
479
480 static const struct block_device_operations rbd_bd_ops = {
481         .owner                  = THIS_MODULE,
482         .open                   = rbd_open,
483         .release                = rbd_release,
484 };
485
486 /*
487  * Initialize an rbd client instance.
488  * We own *ceph_opts.
489  */
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *rbdc;
493         int ret = -ENOMEM;
494
495         dout("%s:\n", __func__);
496         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497         if (!rbdc)
498                 goto out_opt;
499
500         kref_init(&rbdc->kref);
501         INIT_LIST_HEAD(&rbdc->node);
502
503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
505         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506         if (IS_ERR(rbdc->client))
507                 goto out_mutex;
508         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509
510         ret = ceph_open_session(rbdc->client);
511         if (ret < 0)
512                 goto out_err;
513
514         spin_lock(&rbd_client_list_lock);
515         list_add_tail(&rbdc->node, &rbd_client_list);
516         spin_unlock(&rbd_client_list_lock);
517
518         mutex_unlock(&ctl_mutex);
519         dout("%s: rbdc %p\n", __func__, rbdc);
520
521         return rbdc;
522
523 out_err:
524         ceph_destroy_client(rbdc->client);
525 out_mutex:
526         mutex_unlock(&ctl_mutex);
527         kfree(rbdc);
528 out_opt:
529         if (ceph_opts)
530                 ceph_destroy_options(ceph_opts);
531         dout("%s: error %d\n", __func__, ret);
532
533         return ERR_PTR(ret);
534 }
535
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 {
538         kref_get(&rbdc->kref);
539
540         return rbdc;
541 }
542
543 /*
544  * Find a ceph client with specific addr and configuration.  If
545  * found, bump its reference count.
546  */
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 {
549         struct rbd_client *client_node;
550         bool found = false;
551
552         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553                 return NULL;
554
555         spin_lock(&rbd_client_list_lock);
556         list_for_each_entry(client_node, &rbd_client_list, node) {
557                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558                         __rbd_get_client(client_node);
559
560                         found = true;
561                         break;
562                 }
563         }
564         spin_unlock(&rbd_client_list_lock);
565
566         return found ? client_node : NULL;
567 }
568
569 /*
570  * mount options
571  */
572 enum {
573         Opt_last_int,
574         /* int args above */
575         Opt_last_string,
576         /* string args above */
577         Opt_read_only,
578         Opt_read_write,
579         /* Boolean args above */
580         Opt_last_bool,
581 };
582
583 static match_table_t rbd_opts_tokens = {
584         /* int args above */
585         /* string args above */
586         {Opt_read_only, "read_only"},
587         {Opt_read_only, "ro"},          /* Alternate spelling */
588         {Opt_read_write, "read_write"},
589         {Opt_read_write, "rw"},         /* Alternate spelling */
590         /* Boolean args above */
591         {-1, NULL}
592 };
593
594 struct rbd_options {
595         bool    read_only;
596 };
597
598 #define RBD_READ_ONLY_DEFAULT   false
599
600 static int parse_rbd_opts_token(char *c, void *private)
601 {
602         struct rbd_options *rbd_opts = private;
603         substring_t argstr[MAX_OPT_ARGS];
604         int token, intval, ret;
605
606         token = match_token(c, rbd_opts_tokens, argstr);
607         if (token < 0)
608                 return -EINVAL;
609
610         if (token < Opt_last_int) {
611                 ret = match_int(&argstr[0], &intval);
612                 if (ret < 0) {
613                         pr_err("bad mount option arg (not int) "
614                                "at '%s'\n", c);
615                         return ret;
616                 }
617                 dout("got int token %d val %d\n", token, intval);
618         } else if (token > Opt_last_int && token < Opt_last_string) {
619                 dout("got string token %d val %s\n", token,
620                      argstr[0].from);
621         } else if (token > Opt_last_string && token < Opt_last_bool) {
622                 dout("got Boolean token %d\n", token);
623         } else {
624                 dout("got token %d\n", token);
625         }
626
627         switch (token) {
628         case Opt_read_only:
629                 rbd_opts->read_only = true;
630                 break;
631         case Opt_read_write:
632                 rbd_opts->read_only = false;
633                 break;
634         default:
635                 rbd_assert(false);
636                 break;
637         }
638         return 0;
639 }
640
641 /*
642  * Get a ceph client with specific addr and configuration, if one does
643  * not exist create it.
644  */
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 {
647         struct rbd_client *rbdc;
648
649         rbdc = rbd_client_find(ceph_opts);
650         if (rbdc)       /* using an existing client */
651                 ceph_destroy_options(ceph_opts);
652         else
653                 rbdc = rbd_client_create(ceph_opts);
654
655         return rbdc;
656 }
657
658 /*
659  * Destroy ceph client
660  *
661  * Caller must hold rbd_client_list_lock.
662  */
663 static void rbd_client_release(struct kref *kref)
664 {
665         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
667         dout("%s: rbdc %p\n", __func__, rbdc);
668         spin_lock(&rbd_client_list_lock);
669         list_del(&rbdc->node);
670         spin_unlock(&rbd_client_list_lock);
671
672         ceph_destroy_client(rbdc->client);
673         kfree(rbdc);
674 }
675
676 /*
677  * Drop reference to ceph client node. If it's not referenced anymore, release
678  * it.
679  */
680 static void rbd_put_client(struct rbd_client *rbdc)
681 {
682         if (rbdc)
683                 kref_put(&rbdc->kref, rbd_client_release);
684 }
685
686 static bool rbd_image_format_valid(u32 image_format)
687 {
688         return image_format == 1 || image_format == 2;
689 }
690
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692 {
693         size_t size;
694         u32 snap_count;
695
696         /* The header has to start with the magic rbd header text */
697         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698                 return false;
699
700         /* The bio layer requires at least sector-sized I/O */
701
702         if (ondisk->options.order < SECTOR_SHIFT)
703                 return false;
704
705         /* If we use u64 in a few spots we may be able to loosen this */
706
707         if (ondisk->options.order > 8 * sizeof (int) - 1)
708                 return false;
709
710         /*
711          * The size of a snapshot header has to fit in a size_t, and
712          * that limits the number of snapshots.
713          */
714         snap_count = le32_to_cpu(ondisk->snap_count);
715         size = SIZE_MAX - sizeof (struct ceph_snap_context);
716         if (snap_count > size / sizeof (__le64))
717                 return false;
718
719         /*
720          * Not only that, but the size of the entire the snapshot
721          * header must also be representable in a size_t.
722          */
723         size -= snap_count * sizeof (__le64);
724         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725                 return false;
726
727         return true;
728 }
729
730 /*
731  * Fill an rbd image header with information from the given format 1
732  * on-disk header.
733  */
734 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
735                                  struct rbd_image_header_ondisk *ondisk)
736 {
737         struct rbd_image_header *header = &rbd_dev->header;
738         bool first_time = header->object_prefix == NULL;
739         struct ceph_snap_context *snapc;
740         char *object_prefix = NULL;
741         char *snap_names = NULL;
742         u64 *snap_sizes = NULL;
743         u32 snap_count;
744         size_t size;
745         int ret = -ENOMEM;
746         u32 i;
747
748         /* Allocate this now to avoid having to handle failure below */
749
750         if (first_time) {
751                 size_t len;
752
753                 len = strnlen(ondisk->object_prefix,
754                                 sizeof (ondisk->object_prefix));
755                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
756                 if (!object_prefix)
757                         return -ENOMEM;
758                 memcpy(object_prefix, ondisk->object_prefix, len);
759                 object_prefix[len] = '\0';
760         }
761
762         /* Allocate the snapshot context and fill it in */
763
764         snap_count = le32_to_cpu(ondisk->snap_count);
765         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
766         if (!snapc)
767                 goto out_err;
768         snapc->seq = le64_to_cpu(ondisk->snap_seq);
769         if (snap_count) {
770                 struct rbd_image_snap_ondisk *snaps;
771                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
772
773                 /* We'll keep a copy of the snapshot names... */
774
775                 if (snap_names_len > (u64)SIZE_MAX)
776                         goto out_2big;
777                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
778                 if (!snap_names)
779                         goto out_err;
780
781                 /* ...as well as the array of their sizes. */
782
783                 size = snap_count * sizeof (*header->snap_sizes);
784                 snap_sizes = kmalloc(size, GFP_KERNEL);
785                 if (!snap_sizes)
786                         goto out_err;
787
788                 /*
789                  * Copy the names, and fill in each snapshot's id
790                  * and size.
791                  *
792                  * Note that rbd_dev_v1_header_info() guarantees the
793                  * ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798                 snaps = ondisk->snaps;
799                 for (i = 0; i < snap_count; i++) {
800                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
802                 }
803         }
804
805         /* We won't fail any more, fill in the header */
806
807         down_write(&rbd_dev->header_rwsem);
808         if (first_time) {
809                 header->object_prefix = object_prefix;
810                 header->obj_order = ondisk->options.order;
811                 header->crypt_type = ondisk->options.crypt_type;
812                 header->comp_type = ondisk->options.comp_type;
813                 /* The rest aren't used for format 1 images */
814                 header->stripe_unit = 0;
815                 header->stripe_count = 0;
816                 header->features = 0;
817         } else {
818                 ceph_put_snap_context(header->snapc);
819                 kfree(header->snap_names);
820                 kfree(header->snap_sizes);
821         }
822
823         /* The remaining fields always get updated (when we refresh) */
824
825         header->image_size = le64_to_cpu(ondisk->image_size);
826         header->snapc = snapc;
827         header->snap_names = snap_names;
828         header->snap_sizes = snap_sizes;
829
830         /* Make sure mapping size is consistent with header info */
831
832         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833                 if (rbd_dev->mapping.size != header->image_size)
834                         rbd_dev->mapping.size = header->image_size;
835
836         up_write(&rbd_dev->header_rwsem);
837
838         return 0;
839 out_2big:
840         ret = -EIO;
841 out_err:
842         kfree(snap_sizes);
843         kfree(snap_names);
844         ceph_put_snap_context(snapc);
845         kfree(object_prefix);
846
847         return ret;
848 }
849
850 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
851 {
852         const char *snap_name;
853
854         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
855
856         /* Skip over names until we find the one we are looking for */
857
858         snap_name = rbd_dev->header.snap_names;
859         while (which--)
860                 snap_name += strlen(snap_name) + 1;
861
862         return kstrdup(snap_name, GFP_KERNEL);
863 }
864
865 /*
866  * Snapshot id comparison function for use with qsort()/bsearch().
867  * Note that result is for snapshots in *descending* order.
868  */
869 static int snapid_compare_reverse(const void *s1, const void *s2)
870 {
871         u64 snap_id1 = *(u64 *)s1;
872         u64 snap_id2 = *(u64 *)s2;
873
874         if (snap_id1 < snap_id2)
875                 return 1;
876         return snap_id1 == snap_id2 ? 0 : -1;
877 }
878
879 /*
880  * Search a snapshot context to see if the given snapshot id is
881  * present.
882  *
883  * Returns the position of the snapshot id in the array if it's found,
884  * or BAD_SNAP_INDEX otherwise.
885  *
886  * Note: The snapshot array is in kept sorted (by the osd) in
887  * reverse order, highest snapshot id first.
888  */
889 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
890 {
891         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
892         u64 *found;
893
894         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895                                 sizeof (snap_id), snapid_compare_reverse);
896
897         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
898 }
899
900 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
901                                         u64 snap_id)
902 {
903         u32 which;
904
905         which = rbd_dev_snap_index(rbd_dev, snap_id);
906         if (which == BAD_SNAP_INDEX)
907                 return NULL;
908
909         return _rbd_dev_v1_snap_name(rbd_dev, which);
910 }
911
912 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
913 {
914         if (snap_id == CEPH_NOSNAP)
915                 return RBD_SNAP_HEAD_NAME;
916
917         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918         if (rbd_dev->image_format == 1)
919                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
920
921         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
922 }
923
924 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
925                                 u64 *snap_size)
926 {
927         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928         if (snap_id == CEPH_NOSNAP) {
929                 *snap_size = rbd_dev->header.image_size;
930         } else if (rbd_dev->image_format == 1) {
931                 u32 which;
932
933                 which = rbd_dev_snap_index(rbd_dev, snap_id);
934                 if (which == BAD_SNAP_INDEX)
935                         return -ENOENT;
936
937                 *snap_size = rbd_dev->header.snap_sizes[which];
938         } else {
939                 u64 size = 0;
940                 int ret;
941
942                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
943                 if (ret)
944                         return ret;
945
946                 *snap_size = size;
947         }
948         return 0;
949 }
950
951 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
952                         u64 *snap_features)
953 {
954         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955         if (snap_id == CEPH_NOSNAP) {
956                 *snap_features = rbd_dev->header.features;
957         } else if (rbd_dev->image_format == 1) {
958                 *snap_features = 0;     /* No features for format 1 */
959         } else {
960                 u64 features = 0;
961                 int ret;
962
963                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
964                 if (ret)
965                         return ret;
966
967                 *snap_features = features;
968         }
969         return 0;
970 }
971
972 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
973 {
974         u64 snap_id = rbd_dev->spec->snap_id;
975         u64 size = 0;
976         u64 features = 0;
977         int ret;
978
979         ret = rbd_snap_size(rbd_dev, snap_id, &size);
980         if (ret)
981                 return ret;
982         ret = rbd_snap_features(rbd_dev, snap_id, &features);
983         if (ret)
984                 return ret;
985
986         rbd_dev->mapping.size = size;
987         rbd_dev->mapping.features = features;
988
989         return 0;
990 }
991
992 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
993 {
994         rbd_dev->mapping.size = 0;
995         rbd_dev->mapping.features = 0;
996 }
997
998 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
999 {
1000         char *name;
1001         u64 segment;
1002         int ret;
1003
1004         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1005         if (!name)
1006                 return NULL;
1007         segment = offset >> rbd_dev->header.obj_order;
1008         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1009                         rbd_dev->header.object_prefix, segment);
1010         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1011                 pr_err("error formatting segment name for #%llu (%d)\n",
1012                         segment, ret);
1013                 kfree(name);
1014                 name = NULL;
1015         }
1016
1017         return name;
1018 }
1019
1020 static void rbd_segment_name_free(const char *name)
1021 {
1022         /* The explicit cast here is needed to drop the const qualifier */
1023
1024         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1025 }
1026
1027 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1028 {
1029         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1030
1031         return offset & (segment_size - 1);
1032 }
1033
1034 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035                                 u64 offset, u64 length)
1036 {
1037         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1038
1039         offset &= segment_size - 1;
1040
1041         rbd_assert(length <= U64_MAX - offset);
1042         if (offset + length > segment_size)
1043                 length = segment_size - offset;
1044
1045         return length;
1046 }
1047
1048 /*
1049  * returns the size of an object in the image
1050  */
1051 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1052 {
1053         return 1 << header->obj_order;
1054 }
1055
1056 /*
1057  * bio helpers
1058  */
1059
1060 static void bio_chain_put(struct bio *chain)
1061 {
1062         struct bio *tmp;
1063
1064         while (chain) {
1065                 tmp = chain;
1066                 chain = chain->bi_next;
1067                 bio_put(tmp);
1068         }
1069 }
1070
1071 /*
1072  * zeros a bio chain, starting at specific offset
1073  */
1074 static void zero_bio_chain(struct bio *chain, int start_ofs)
1075 {
1076         struct bio_vec *bv;
1077         unsigned long flags;
1078         void *buf;
1079         int i;
1080         int pos = 0;
1081
1082         while (chain) {
1083                 bio_for_each_segment(bv, chain, i) {
1084                         if (pos + bv->bv_len > start_ofs) {
1085                                 int remainder = max(start_ofs - pos, 0);
1086                                 buf = bvec_kmap_irq(bv, &flags);
1087                                 memset(buf + remainder, 0,
1088                                        bv->bv_len - remainder);
1089                                 bvec_kunmap_irq(buf, &flags);
1090                         }
1091                         pos += bv->bv_len;
1092                 }
1093
1094                 chain = chain->bi_next;
1095         }
1096 }
1097
1098 /*
1099  * similar to zero_bio_chain(), zeros data defined by a page array,
1100  * starting at the given byte offset from the start of the array and
1101  * continuing up to the given end offset.  The pages array is
1102  * assumed to be big enough to hold all bytes up to the end.
1103  */
1104 static void zero_pages(struct page **pages, u64 offset, u64 end)
1105 {
1106         struct page **page = &pages[offset >> PAGE_SHIFT];
1107
1108         rbd_assert(end > offset);
1109         rbd_assert(end - offset <= (u64)SIZE_MAX);
1110         while (offset < end) {
1111                 size_t page_offset;
1112                 size_t length;
1113                 unsigned long flags;
1114                 void *kaddr;
1115
1116                 page_offset = (size_t)(offset & ~PAGE_MASK);
1117                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118                 local_irq_save(flags);
1119                 kaddr = kmap_atomic(*page);
1120                 memset(kaddr + page_offset, 0, length);
1121                 kunmap_atomic(kaddr);
1122                 local_irq_restore(flags);
1123
1124                 offset += length;
1125                 page++;
1126         }
1127 }
1128
1129 /*
1130  * Clone a portion of a bio, starting at the given byte offset
1131  * and continuing for the number of bytes indicated.
1132  */
1133 static struct bio *bio_clone_range(struct bio *bio_src,
1134                                         unsigned int offset,
1135                                         unsigned int len,
1136                                         gfp_t gfpmask)
1137 {
1138         struct bio_vec *bv;
1139         unsigned int resid;
1140         unsigned short idx;
1141         unsigned int voff;
1142         unsigned short end_idx;
1143         unsigned short vcnt;
1144         struct bio *bio;
1145
1146         /* Handle the easy case for the caller */
1147
1148         if (!offset && len == bio_src->bi_size)
1149                 return bio_clone(bio_src, gfpmask);
1150
1151         if (WARN_ON_ONCE(!len))
1152                 return NULL;
1153         if (WARN_ON_ONCE(len > bio_src->bi_size))
1154                 return NULL;
1155         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1156                 return NULL;
1157
1158         /* Find first affected segment... */
1159
1160         resid = offset;
1161         __bio_for_each_segment(bv, bio_src, idx, 0) {
1162                 if (resid < bv->bv_len)
1163                         break;
1164                 resid -= bv->bv_len;
1165         }
1166         voff = resid;
1167
1168         /* ...and the last affected segment */
1169
1170         resid += len;
1171         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172                 if (resid <= bv->bv_len)
1173                         break;
1174                 resid -= bv->bv_len;
1175         }
1176         vcnt = end_idx - idx + 1;
1177
1178         /* Build the clone */
1179
1180         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1181         if (!bio)
1182                 return NULL;    /* ENOMEM */
1183
1184         bio->bi_bdev = bio_src->bi_bdev;
1185         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186         bio->bi_rw = bio_src->bi_rw;
1187         bio->bi_flags |= 1 << BIO_CLONED;
1188
1189         /*
1190          * Copy over our part of the bio_vec, then update the first
1191          * and last (or only) entries.
1192          */
1193         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194                         vcnt * sizeof (struct bio_vec));
1195         bio->bi_io_vec[0].bv_offset += voff;
1196         if (vcnt > 1) {
1197                 bio->bi_io_vec[0].bv_len -= voff;
1198                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1199         } else {
1200                 bio->bi_io_vec[0].bv_len = len;
1201         }
1202
1203         bio->bi_vcnt = vcnt;
1204         bio->bi_size = len;
1205         bio->bi_idx = 0;
1206
1207         return bio;
1208 }
1209
1210 /*
1211  * Clone a portion of a bio chain, starting at the given byte offset
1212  * into the first bio in the source chain and continuing for the
1213  * number of bytes indicated.  The result is another bio chain of
1214  * exactly the given length, or a null pointer on error.
1215  *
1216  * The bio_src and offset parameters are both in-out.  On entry they
1217  * refer to the first source bio and the offset into that bio where
1218  * the start of data to be cloned is located.
1219  *
1220  * On return, bio_src is updated to refer to the bio in the source
1221  * chain that contains first un-cloned byte, and *offset will
1222  * contain the offset of that byte within that bio.
1223  */
1224 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225                                         unsigned int *offset,
1226                                         unsigned int len,
1227                                         gfp_t gfpmask)
1228 {
1229         struct bio *bi = *bio_src;
1230         unsigned int off = *offset;
1231         struct bio *chain = NULL;
1232         struct bio **end;
1233
1234         /* Build up a chain of clone bios up to the limit */
1235
1236         if (!bi || off >= bi->bi_size || !len)
1237                 return NULL;            /* Nothing to clone */
1238
1239         end = &chain;
1240         while (len) {
1241                 unsigned int bi_size;
1242                 struct bio *bio;
1243
1244                 if (!bi) {
1245                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1246                         goto out_err;   /* EINVAL; ran out of bio's */
1247                 }
1248                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1250                 if (!bio)
1251                         goto out_err;   /* ENOMEM */
1252
1253                 *end = bio;
1254                 end = &bio->bi_next;
1255
1256                 off += bi_size;
1257                 if (off == bi->bi_size) {
1258                         bi = bi->bi_next;
1259                         off = 0;
1260                 }
1261                 len -= bi_size;
1262         }
1263         *bio_src = bi;
1264         *offset = off;
1265
1266         return chain;
1267 out_err:
1268         bio_chain_put(chain);
1269
1270         return NULL;
1271 }
1272
1273 /*
1274  * The default/initial value for all object request flags is 0.  For
1275  * each flag, once its value is set to 1 it is never reset to 0
1276  * again.
1277  */
1278 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1279 {
1280         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1281                 struct rbd_device *rbd_dev;
1282
1283                 rbd_dev = obj_request->img_request->rbd_dev;
1284                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1285                         obj_request);
1286         }
1287 }
1288
1289 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1290 {
1291         smp_mb();
1292         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1293 }
1294
1295 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1296 {
1297         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298                 struct rbd_device *rbd_dev = NULL;
1299
1300                 if (obj_request_img_data_test(obj_request))
1301                         rbd_dev = obj_request->img_request->rbd_dev;
1302                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1303                         obj_request);
1304         }
1305 }
1306
1307 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1308 {
1309         smp_mb();
1310         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1311 }
1312
1313 /*
1314  * This sets the KNOWN flag after (possibly) setting the EXISTS
1315  * flag.  The latter is set based on the "exists" value provided.
1316  *
1317  * Note that for our purposes once an object exists it never goes
1318  * away again.  It's possible that the response from two existence
1319  * checks are separated by the creation of the target object, and
1320  * the first ("doesn't exist") response arrives *after* the second
1321  * ("does exist").  In that case we ignore the second one.
1322  */
1323 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1324                                 bool exists)
1325 {
1326         if (exists)
1327                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1329         smp_mb();
1330 }
1331
1332 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1333 {
1334         smp_mb();
1335         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1336 }
1337
1338 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1339 {
1340         smp_mb();
1341         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1342 }
1343
1344 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1345 {
1346         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347                 atomic_read(&obj_request->kref.refcount));
1348         kref_get(&obj_request->kref);
1349 }
1350
1351 static void rbd_obj_request_destroy(struct kref *kref);
1352 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1353 {
1354         rbd_assert(obj_request != NULL);
1355         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356                 atomic_read(&obj_request->kref.refcount));
1357         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1358 }
1359
1360 static void rbd_img_request_get(struct rbd_img_request *img_request)
1361 {
1362         dout("%s: img %p (was %d)\n", __func__, img_request,
1363                 atomic_read(&img_request->kref.refcount));
1364         kref_get(&img_request->kref);
1365 }
1366
1367 static void rbd_img_request_destroy(struct kref *kref);
1368 static void rbd_img_request_put(struct rbd_img_request *img_request)
1369 {
1370         rbd_assert(img_request != NULL);
1371         dout("%s: img %p (was %d)\n", __func__, img_request,
1372                 atomic_read(&img_request->kref.refcount));
1373         kref_put(&img_request->kref, rbd_img_request_destroy);
1374 }
1375
1376 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1377                                         struct rbd_obj_request *obj_request)
1378 {
1379         rbd_assert(obj_request->img_request == NULL);
1380
1381         /* Image request now owns object's original reference */
1382         obj_request->img_request = img_request;
1383         obj_request->which = img_request->obj_request_count;
1384         rbd_assert(!obj_request_img_data_test(obj_request));
1385         obj_request_img_data_set(obj_request);
1386         rbd_assert(obj_request->which != BAD_WHICH);
1387         img_request->obj_request_count++;
1388         list_add_tail(&obj_request->links, &img_request->obj_requests);
1389         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1390                 obj_request->which);
1391 }
1392
1393 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1394                                         struct rbd_obj_request *obj_request)
1395 {
1396         rbd_assert(obj_request->which != BAD_WHICH);
1397
1398         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1399                 obj_request->which);
1400         list_del(&obj_request->links);
1401         rbd_assert(img_request->obj_request_count > 0);
1402         img_request->obj_request_count--;
1403         rbd_assert(obj_request->which == img_request->obj_request_count);
1404         obj_request->which = BAD_WHICH;
1405         rbd_assert(obj_request_img_data_test(obj_request));
1406         rbd_assert(obj_request->img_request == img_request);
1407         obj_request->img_request = NULL;
1408         obj_request->callback = NULL;
1409         rbd_obj_request_put(obj_request);
1410 }
1411
1412 static bool obj_request_type_valid(enum obj_request_type type)
1413 {
1414         switch (type) {
1415         case OBJ_REQUEST_NODATA:
1416         case OBJ_REQUEST_BIO:
1417         case OBJ_REQUEST_PAGES:
1418                 return true;
1419         default:
1420                 return false;
1421         }
1422 }
1423
1424 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1425                                 struct rbd_obj_request *obj_request)
1426 {
1427         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1428
1429         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1430 }
1431
1432 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1433 {
1434
1435         dout("%s: img %p\n", __func__, img_request);
1436
1437         /*
1438          * If no error occurred, compute the aggregate transfer
1439          * count for the image request.  We could instead use
1440          * atomic64_cmpxchg() to update it as each object request
1441          * completes; not clear which way is better off hand.
1442          */
1443         if (!img_request->result) {
1444                 struct rbd_obj_request *obj_request;
1445                 u64 xferred = 0;
1446
1447                 for_each_obj_request(img_request, obj_request)
1448                         xferred += obj_request->xferred;
1449                 img_request->xferred = xferred;
1450         }
1451
1452         if (img_request->callback)
1453                 img_request->callback(img_request);
1454         else
1455                 rbd_img_request_put(img_request);
1456 }
1457
1458 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1459
1460 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1461 {
1462         dout("%s: obj %p\n", __func__, obj_request);
1463
1464         return wait_for_completion_interruptible(&obj_request->completion);
1465 }
1466
1467 /*
1468  * The default/initial value for all image request flags is 0.  Each
1469  * is conditionally set to 1 at image request initialization time
1470  * and currently never change thereafter.
1471  */
1472 static void img_request_write_set(struct rbd_img_request *img_request)
1473 {
1474         set_bit(IMG_REQ_WRITE, &img_request->flags);
1475         smp_mb();
1476 }
1477
1478 static bool img_request_write_test(struct rbd_img_request *img_request)
1479 {
1480         smp_mb();
1481         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1482 }
1483
1484 static void img_request_child_set(struct rbd_img_request *img_request)
1485 {
1486         set_bit(IMG_REQ_CHILD, &img_request->flags);
1487         smp_mb();
1488 }
1489
1490 static bool img_request_child_test(struct rbd_img_request *img_request)
1491 {
1492         smp_mb();
1493         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1494 }
1495
1496 static void img_request_layered_set(struct rbd_img_request *img_request)
1497 {
1498         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1499         smp_mb();
1500 }
1501
1502 static bool img_request_layered_test(struct rbd_img_request *img_request)
1503 {
1504         smp_mb();
1505         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1506 }
1507
1508 static void
1509 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1510 {
1511         u64 xferred = obj_request->xferred;
1512         u64 length = obj_request->length;
1513
1514         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1515                 obj_request, obj_request->img_request, obj_request->result,
1516                 xferred, length);
1517         /*
1518          * ENOENT means a hole in the image.  We zero-fill the
1519          * entire length of the request.  A short read also implies
1520          * zero-fill to the end of the request.  Either way we
1521          * update the xferred count to indicate the whole request
1522          * was satisfied.
1523          */
1524         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1525         if (obj_request->result == -ENOENT) {
1526                 if (obj_request->type == OBJ_REQUEST_BIO)
1527                         zero_bio_chain(obj_request->bio_list, 0);
1528                 else
1529                         zero_pages(obj_request->pages, 0, length);
1530                 obj_request->result = 0;
1531                 obj_request->xferred = length;
1532         } else if (xferred < length && !obj_request->result) {
1533                 if (obj_request->type == OBJ_REQUEST_BIO)
1534                         zero_bio_chain(obj_request->bio_list, xferred);
1535                 else
1536                         zero_pages(obj_request->pages, xferred, length);
1537                 obj_request->xferred = length;
1538         }
1539         obj_request_done_set(obj_request);
1540 }
1541
1542 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1543 {
1544         dout("%s: obj %p cb %p\n", __func__, obj_request,
1545                 obj_request->callback);
1546         if (obj_request->callback)
1547                 obj_request->callback(obj_request);
1548         else
1549                 complete_all(&obj_request->completion);
1550 }
1551
1552 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1553 {
1554         dout("%s: obj %p\n", __func__, obj_request);
1555         obj_request_done_set(obj_request);
1556 }
1557
1558 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1559 {
1560         struct rbd_img_request *img_request = NULL;
1561         struct rbd_device *rbd_dev = NULL;
1562         bool layered = false;
1563
1564         if (obj_request_img_data_test(obj_request)) {
1565                 img_request = obj_request->img_request;
1566                 layered = img_request && img_request_layered_test(img_request);
1567                 rbd_dev = img_request->rbd_dev;
1568         }
1569
1570         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1571                 obj_request, img_request, obj_request->result,
1572                 obj_request->xferred, obj_request->length);
1573         if (layered && obj_request->result == -ENOENT &&
1574                         obj_request->img_offset < rbd_dev->parent_overlap)
1575                 rbd_img_parent_read(obj_request);
1576         else if (img_request)
1577                 rbd_img_obj_request_read_callback(obj_request);
1578         else
1579                 obj_request_done_set(obj_request);
1580 }
1581
1582 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1583 {
1584         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1585                 obj_request->result, obj_request->length);
1586         /*
1587          * There is no such thing as a successful short write.  Set
1588          * it to our originally-requested length.
1589          */
1590         obj_request->xferred = obj_request->length;
1591         obj_request_done_set(obj_request);
1592 }
1593
1594 /*
1595  * For a simple stat call there's nothing to do.  We'll do more if
1596  * this is part of a write sequence for a layered image.
1597  */
1598 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1599 {
1600         dout("%s: obj %p\n", __func__, obj_request);
1601         obj_request_done_set(obj_request);
1602 }
1603
1604 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1605                                 struct ceph_msg *msg)
1606 {
1607         struct rbd_obj_request *obj_request = osd_req->r_priv;
1608         u16 opcode;
1609
1610         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1611         rbd_assert(osd_req == obj_request->osd_req);
1612         if (obj_request_img_data_test(obj_request)) {
1613                 rbd_assert(obj_request->img_request);
1614                 rbd_assert(obj_request->which != BAD_WHICH);
1615         } else {
1616                 rbd_assert(obj_request->which == BAD_WHICH);
1617         }
1618
1619         if (osd_req->r_result < 0)
1620                 obj_request->result = osd_req->r_result;
1621
1622         BUG_ON(osd_req->r_num_ops > 2);
1623
1624         /*
1625          * We support a 64-bit length, but ultimately it has to be
1626          * passed to blk_end_request(), which takes an unsigned int.
1627          */
1628         obj_request->xferred = osd_req->r_reply_op_len[0];
1629         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1630         opcode = osd_req->r_ops[0].op;
1631         switch (opcode) {
1632         case CEPH_OSD_OP_READ:
1633                 rbd_osd_read_callback(obj_request);
1634                 break;
1635         case CEPH_OSD_OP_WRITE:
1636                 rbd_osd_write_callback(obj_request);
1637                 break;
1638         case CEPH_OSD_OP_STAT:
1639                 rbd_osd_stat_callback(obj_request);
1640                 break;
1641         case CEPH_OSD_OP_CALL:
1642         case CEPH_OSD_OP_NOTIFY_ACK:
1643         case CEPH_OSD_OP_WATCH:
1644                 rbd_osd_trivial_callback(obj_request);
1645                 break;
1646         default:
1647                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1648                         obj_request->object_name, (unsigned short) opcode);
1649                 break;
1650         }
1651
1652         if (obj_request_done_test(obj_request))
1653                 rbd_obj_request_complete(obj_request);
1654 }
1655
1656 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1657 {
1658         struct rbd_img_request *img_request = obj_request->img_request;
1659         struct ceph_osd_request *osd_req = obj_request->osd_req;
1660         u64 snap_id;
1661
1662         rbd_assert(osd_req != NULL);
1663
1664         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1665         ceph_osdc_build_request(osd_req, obj_request->offset,
1666                         NULL, snap_id, NULL);
1667 }
1668
1669 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1670 {
1671         struct rbd_img_request *img_request = obj_request->img_request;
1672         struct ceph_osd_request *osd_req = obj_request->osd_req;
1673         struct ceph_snap_context *snapc;
1674         struct timespec mtime = CURRENT_TIME;
1675
1676         rbd_assert(osd_req != NULL);
1677
1678         snapc = img_request ? img_request->snapc : NULL;
1679         ceph_osdc_build_request(osd_req, obj_request->offset,
1680                         snapc, CEPH_NOSNAP, &mtime);
1681 }
1682
1683 static struct ceph_osd_request *rbd_osd_req_create(
1684                                         struct rbd_device *rbd_dev,
1685                                         bool write_request,
1686                                         struct rbd_obj_request *obj_request)
1687 {
1688         struct ceph_snap_context *snapc = NULL;
1689         struct ceph_osd_client *osdc;
1690         struct ceph_osd_request *osd_req;
1691
1692         if (obj_request_img_data_test(obj_request)) {
1693                 struct rbd_img_request *img_request = obj_request->img_request;
1694
1695                 rbd_assert(write_request ==
1696                                 img_request_write_test(img_request));
1697                 if (write_request)
1698                         snapc = img_request->snapc;
1699         }
1700
1701         /* Allocate and initialize the request, for the single op */
1702
1703         osdc = &rbd_dev->rbd_client->client->osdc;
1704         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1705         if (!osd_req)
1706                 return NULL;    /* ENOMEM */
1707
1708         if (write_request)
1709                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1710         else
1711                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1712
1713         osd_req->r_callback = rbd_osd_req_callback;
1714         osd_req->r_priv = obj_request;
1715
1716         osd_req->r_oid_len = strlen(obj_request->object_name);
1717         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1718         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1719
1720         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1721
1722         return osd_req;
1723 }
1724
1725 /*
1726  * Create a copyup osd request based on the information in the
1727  * object request supplied.  A copyup request has two osd ops,
1728  * a copyup method call, and a "normal" write request.
1729  */
1730 static struct ceph_osd_request *
1731 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1732 {
1733         struct rbd_img_request *img_request;
1734         struct ceph_snap_context *snapc;
1735         struct rbd_device *rbd_dev;
1736         struct ceph_osd_client *osdc;
1737         struct ceph_osd_request *osd_req;
1738
1739         rbd_assert(obj_request_img_data_test(obj_request));
1740         img_request = obj_request->img_request;
1741         rbd_assert(img_request);
1742         rbd_assert(img_request_write_test(img_request));
1743
1744         /* Allocate and initialize the request, for the two ops */
1745
1746         snapc = img_request->snapc;
1747         rbd_dev = img_request->rbd_dev;
1748         osdc = &rbd_dev->rbd_client->client->osdc;
1749         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1750         if (!osd_req)
1751                 return NULL;    /* ENOMEM */
1752
1753         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1754         osd_req->r_callback = rbd_osd_req_callback;
1755         osd_req->r_priv = obj_request;
1756
1757         osd_req->r_oid_len = strlen(obj_request->object_name);
1758         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1759         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1760
1761         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1762
1763         return osd_req;
1764 }
1765
1766
1767 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1768 {
1769         ceph_osdc_put_request(osd_req);
1770 }
1771
1772 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1773
1774 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1775                                                 u64 offset, u64 length,
1776                                                 enum obj_request_type type)
1777 {
1778         struct rbd_obj_request *obj_request;
1779         size_t size;
1780         char *name;
1781
1782         rbd_assert(obj_request_type_valid(type));
1783
1784         size = strlen(object_name) + 1;
1785         name = kmalloc(size, GFP_KERNEL);
1786         if (!name)
1787                 return NULL;
1788
1789         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1790         if (!obj_request) {
1791                 kfree(name);
1792                 return NULL;
1793         }
1794
1795         obj_request->object_name = memcpy(name, object_name, size);
1796         obj_request->offset = offset;
1797         obj_request->length = length;
1798         obj_request->flags = 0;
1799         obj_request->which = BAD_WHICH;
1800         obj_request->type = type;
1801         INIT_LIST_HEAD(&obj_request->links);
1802         init_completion(&obj_request->completion);
1803         kref_init(&obj_request->kref);
1804
1805         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1806                 offset, length, (int)type, obj_request);
1807
1808         return obj_request;
1809 }
1810
1811 static void rbd_obj_request_destroy(struct kref *kref)
1812 {
1813         struct rbd_obj_request *obj_request;
1814
1815         obj_request = container_of(kref, struct rbd_obj_request, kref);
1816
1817         dout("%s: obj %p\n", __func__, obj_request);
1818
1819         rbd_assert(obj_request->img_request == NULL);
1820         rbd_assert(obj_request->which == BAD_WHICH);
1821
1822         if (obj_request->osd_req)
1823                 rbd_osd_req_destroy(obj_request->osd_req);
1824
1825         rbd_assert(obj_request_type_valid(obj_request->type));
1826         switch (obj_request->type) {
1827         case OBJ_REQUEST_NODATA:
1828                 break;          /* Nothing to do */
1829         case OBJ_REQUEST_BIO:
1830                 if (obj_request->bio_list)
1831                         bio_chain_put(obj_request->bio_list);
1832                 break;
1833         case OBJ_REQUEST_PAGES:
1834                 if (obj_request->pages)
1835                         ceph_release_page_vector(obj_request->pages,
1836                                                 obj_request->page_count);
1837                 break;
1838         }
1839
1840         kfree(obj_request->object_name);
1841         obj_request->object_name = NULL;
1842         kmem_cache_free(rbd_obj_request_cache, obj_request);
1843 }
1844
1845 /*
1846  * Caller is responsible for filling in the list of object requests
1847  * that comprises the image request, and the Linux request pointer
1848  * (if there is one).
1849  */
1850 static struct rbd_img_request *rbd_img_request_create(
1851                                         struct rbd_device *rbd_dev,
1852                                         u64 offset, u64 length,
1853                                         bool write_request,
1854                                         bool child_request)
1855 {
1856         struct rbd_img_request *img_request;
1857
1858         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1859         if (!img_request)
1860                 return NULL;
1861
1862         if (write_request) {
1863                 down_read(&rbd_dev->header_rwsem);
1864                 ceph_get_snap_context(rbd_dev->header.snapc);
1865                 up_read(&rbd_dev->header_rwsem);
1866         }
1867
1868         img_request->rq = NULL;
1869         img_request->rbd_dev = rbd_dev;
1870         img_request->offset = offset;
1871         img_request->length = length;
1872         img_request->flags = 0;
1873         if (write_request) {
1874                 img_request_write_set(img_request);
1875                 img_request->snapc = rbd_dev->header.snapc;
1876         } else {
1877                 img_request->snap_id = rbd_dev->spec->snap_id;
1878         }
1879         if (child_request)
1880                 img_request_child_set(img_request);
1881         if (rbd_dev->parent_spec)
1882                 img_request_layered_set(img_request);
1883         spin_lock_init(&img_request->completion_lock);
1884         img_request->next_completion = 0;
1885         img_request->callback = NULL;
1886         img_request->result = 0;
1887         img_request->obj_request_count = 0;
1888         INIT_LIST_HEAD(&img_request->obj_requests);
1889         kref_init(&img_request->kref);
1890
1891         rbd_img_request_get(img_request);       /* Avoid a warning */
1892         rbd_img_request_put(img_request);       /* TEMPORARY */
1893
1894         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1895                 write_request ? "write" : "read", offset, length,
1896                 img_request);
1897
1898         return img_request;
1899 }
1900
1901 static void rbd_img_request_destroy(struct kref *kref)
1902 {
1903         struct rbd_img_request *img_request;
1904         struct rbd_obj_request *obj_request;
1905         struct rbd_obj_request *next_obj_request;
1906
1907         img_request = container_of(kref, struct rbd_img_request, kref);
1908
1909         dout("%s: img %p\n", __func__, img_request);
1910
1911         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1912                 rbd_img_obj_request_del(img_request, obj_request);
1913         rbd_assert(img_request->obj_request_count == 0);
1914
1915         if (img_request_write_test(img_request))
1916                 ceph_put_snap_context(img_request->snapc);
1917
1918         if (img_request_child_test(img_request))
1919                 rbd_obj_request_put(img_request->obj_request);
1920
1921         kmem_cache_free(rbd_img_request_cache, img_request);
1922 }
1923
1924 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1925 {
1926         struct rbd_img_request *img_request;
1927         unsigned int xferred;
1928         int result;
1929         bool more;
1930
1931         rbd_assert(obj_request_img_data_test(obj_request));
1932         img_request = obj_request->img_request;
1933
1934         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1935         xferred = (unsigned int)obj_request->xferred;
1936         result = obj_request->result;
1937         if (result) {
1938                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1939
1940                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1941                         img_request_write_test(img_request) ? "write" : "read",
1942                         obj_request->length, obj_request->img_offset,
1943                         obj_request->offset);
1944                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1945                         result, xferred);
1946                 if (!img_request->result)
1947                         img_request->result = result;
1948         }
1949
1950         /* Image object requests don't own their page array */
1951
1952         if (obj_request->type == OBJ_REQUEST_PAGES) {
1953                 obj_request->pages = NULL;
1954                 obj_request->page_count = 0;
1955         }
1956
1957         if (img_request_child_test(img_request)) {
1958                 rbd_assert(img_request->obj_request != NULL);
1959                 more = obj_request->which < img_request->obj_request_count - 1;
1960         } else {
1961                 rbd_assert(img_request->rq != NULL);
1962                 more = blk_end_request(img_request->rq, result, xferred);
1963         }
1964
1965         return more;
1966 }
1967
1968 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1969 {
1970         struct rbd_img_request *img_request;
1971         u32 which = obj_request->which;
1972         bool more = true;
1973
1974         rbd_assert(obj_request_img_data_test(obj_request));
1975         img_request = obj_request->img_request;
1976
1977         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1978         rbd_assert(img_request != NULL);
1979         rbd_assert(img_request->obj_request_count > 0);
1980         rbd_assert(which != BAD_WHICH);
1981         rbd_assert(which < img_request->obj_request_count);
1982         rbd_assert(which >= img_request->next_completion);
1983
1984         spin_lock_irq(&img_request->completion_lock);
1985         if (which != img_request->next_completion)
1986                 goto out;
1987
1988         for_each_obj_request_from(img_request, obj_request) {
1989                 rbd_assert(more);
1990                 rbd_assert(which < img_request->obj_request_count);
1991
1992                 if (!obj_request_done_test(obj_request))
1993                         break;
1994                 more = rbd_img_obj_end_request(obj_request);
1995                 which++;
1996         }
1997
1998         rbd_assert(more ^ (which == img_request->obj_request_count));
1999         img_request->next_completion = which;
2000 out:
2001         spin_unlock_irq(&img_request->completion_lock);
2002
2003         if (!more)
2004                 rbd_img_request_complete(img_request);
2005 }
2006
2007 /*
2008  * Split up an image request into one or more object requests, each
2009  * to a different object.  The "type" parameter indicates whether
2010  * "data_desc" is the pointer to the head of a list of bio
2011  * structures, or the base of a page array.  In either case this
2012  * function assumes data_desc describes memory sufficient to hold
2013  * all data described by the image request.
2014  */
2015 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2016                                         enum obj_request_type type,
2017                                         void *data_desc)
2018 {
2019         struct rbd_device *rbd_dev = img_request->rbd_dev;
2020         struct rbd_obj_request *obj_request = NULL;
2021         struct rbd_obj_request *next_obj_request;
2022         bool write_request = img_request_write_test(img_request);
2023         struct bio *bio_list;
2024         unsigned int bio_offset = 0;
2025         struct page **pages;
2026         u64 img_offset;
2027         u64 resid;
2028         u16 opcode;
2029
2030         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2031                 (int)type, data_desc);
2032
2033         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2034         img_offset = img_request->offset;
2035         resid = img_request->length;
2036         rbd_assert(resid > 0);
2037
2038         if (type == OBJ_REQUEST_BIO) {
2039                 bio_list = data_desc;
2040                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2041         } else {
2042                 rbd_assert(type == OBJ_REQUEST_PAGES);
2043                 pages = data_desc;
2044         }
2045
2046         while (resid) {
2047                 struct ceph_osd_request *osd_req;
2048                 const char *object_name;
2049                 u64 offset;
2050                 u64 length;
2051
2052                 object_name = rbd_segment_name(rbd_dev, img_offset);
2053                 if (!object_name)
2054                         goto out_unwind;
2055                 offset = rbd_segment_offset(rbd_dev, img_offset);
2056                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2057                 obj_request = rbd_obj_request_create(object_name,
2058                                                 offset, length, type);
2059                 /* object request has its own copy of the object name */
2060                 rbd_segment_name_free(object_name);
2061                 if (!obj_request)
2062                         goto out_unwind;
2063
2064                 if (type == OBJ_REQUEST_BIO) {
2065                         unsigned int clone_size;
2066
2067                         rbd_assert(length <= (u64)UINT_MAX);
2068                         clone_size = (unsigned int)length;
2069                         obj_request->bio_list =
2070                                         bio_chain_clone_range(&bio_list,
2071                                                                 &bio_offset,
2072                                                                 clone_size,
2073                                                                 GFP_ATOMIC);
2074                         if (!obj_request->bio_list)
2075                                 goto out_partial;
2076                 } else {
2077                         unsigned int page_count;
2078
2079                         obj_request->pages = pages;
2080                         page_count = (u32)calc_pages_for(offset, length);
2081                         obj_request->page_count = page_count;
2082                         if ((offset + length) & ~PAGE_MASK)
2083                                 page_count--;   /* more on last page */
2084                         pages += page_count;
2085                 }
2086
2087                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2088                                                 obj_request);
2089                 if (!osd_req)
2090                         goto out_partial;
2091                 obj_request->osd_req = osd_req;
2092                 obj_request->callback = rbd_img_obj_callback;
2093
2094                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2095                                                 0, 0);
2096                 if (type == OBJ_REQUEST_BIO)
2097                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2098                                         obj_request->bio_list, length);
2099                 else
2100                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2101                                         obj_request->pages, length,
2102                                         offset & ~PAGE_MASK, false, false);
2103
2104                 if (write_request)
2105                         rbd_osd_req_format_write(obj_request);
2106                 else
2107                         rbd_osd_req_format_read(obj_request);
2108
2109                 obj_request->img_offset = img_offset;
2110                 rbd_img_obj_request_add(img_request, obj_request);
2111
2112                 img_offset += length;
2113                 resid -= length;
2114         }
2115
2116         return 0;
2117
2118 out_partial:
2119         rbd_obj_request_put(obj_request);
2120 out_unwind:
2121         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122                 rbd_obj_request_put(obj_request);
2123
2124         return -ENOMEM;
2125 }
2126
2127 static void
2128 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2129 {
2130         struct rbd_img_request *img_request;
2131         struct rbd_device *rbd_dev;
2132         u64 length;
2133         u32 page_count;
2134
2135         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2136         rbd_assert(obj_request_img_data_test(obj_request));
2137         img_request = obj_request->img_request;
2138         rbd_assert(img_request);
2139
2140         rbd_dev = img_request->rbd_dev;
2141         rbd_assert(rbd_dev);
2142         length = (u64)1 << rbd_dev->header.obj_order;
2143         page_count = (u32)calc_pages_for(0, length);
2144
2145         rbd_assert(obj_request->copyup_pages);
2146         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2147         obj_request->copyup_pages = NULL;
2148
2149         /*
2150          * We want the transfer count to reflect the size of the
2151          * original write request.  There is no such thing as a
2152          * successful short write, so if the request was successful
2153          * we can just set it to the originally-requested length.
2154          */
2155         if (!obj_request->result)
2156                 obj_request->xferred = obj_request->length;
2157
2158         /* Finish up with the normal image object callback */
2159
2160         rbd_img_obj_callback(obj_request);
2161 }
2162
2163 static void
2164 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2165 {
2166         struct rbd_obj_request *orig_request;
2167         struct ceph_osd_request *osd_req;
2168         struct ceph_osd_client *osdc;
2169         struct rbd_device *rbd_dev;
2170         struct page **pages;
2171         int result;
2172         u64 obj_size;
2173         u64 xferred;
2174
2175         rbd_assert(img_request_child_test(img_request));
2176
2177         /* First get what we need from the image request */
2178
2179         pages = img_request->copyup_pages;
2180         rbd_assert(pages != NULL);
2181         img_request->copyup_pages = NULL;
2182
2183         orig_request = img_request->obj_request;
2184         rbd_assert(orig_request != NULL);
2185         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2186         result = img_request->result;
2187         obj_size = img_request->length;
2188         xferred = img_request->xferred;
2189         rbd_img_request_put(img_request);
2190
2191         rbd_assert(orig_request->img_request);
2192         rbd_dev = orig_request->img_request->rbd_dev;
2193         rbd_assert(rbd_dev);
2194         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2195
2196         if (result)
2197                 goto out_err;
2198
2199         /* Allocate the new copyup osd request for the original request */
2200
2201         result = -ENOMEM;
2202         rbd_assert(!orig_request->osd_req);
2203         osd_req = rbd_osd_req_create_copyup(orig_request);
2204         if (!osd_req)
2205                 goto out_err;
2206         orig_request->osd_req = osd_req;
2207         orig_request->copyup_pages = pages;
2208
2209         /* Initialize the copyup op */
2210
2211         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2212         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2213                                                 false, false);
2214
2215         /* Then the original write request op */
2216
2217         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2218                                         orig_request->offset,
2219                                         orig_request->length, 0, 0);
2220         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2221                                         orig_request->length);
2222
2223         rbd_osd_req_format_write(orig_request);
2224
2225         /* All set, send it off. */
2226
2227         orig_request->callback = rbd_img_obj_copyup_callback;
2228         osdc = &rbd_dev->rbd_client->client->osdc;
2229         result = rbd_obj_request_submit(osdc, orig_request);
2230         if (!result)
2231                 return;
2232 out_err:
2233         /* Record the error code and complete the request */
2234
2235         orig_request->result = result;
2236         orig_request->xferred = 0;
2237         obj_request_done_set(orig_request);
2238         rbd_obj_request_complete(orig_request);
2239 }
2240
2241 /*
2242  * Read from the parent image the range of data that covers the
2243  * entire target of the given object request.  This is used for
2244  * satisfying a layered image write request when the target of an
2245  * object request from the image request does not exist.
2246  *
2247  * A page array big enough to hold the returned data is allocated
2248  * and supplied to rbd_img_request_fill() as the "data descriptor."
2249  * When the read completes, this page array will be transferred to
2250  * the original object request for the copyup operation.
2251  *
2252  * If an error occurs, record it as the result of the original
2253  * object request and mark it done so it gets completed.
2254  */
2255 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2256 {
2257         struct rbd_img_request *img_request = NULL;
2258         struct rbd_img_request *parent_request = NULL;
2259         struct rbd_device *rbd_dev;
2260         u64 img_offset;
2261         u64 length;
2262         struct page **pages = NULL;
2263         u32 page_count;
2264         int result;
2265
2266         rbd_assert(obj_request_img_data_test(obj_request));
2267         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2268
2269         img_request = obj_request->img_request;
2270         rbd_assert(img_request != NULL);
2271         rbd_dev = img_request->rbd_dev;
2272         rbd_assert(rbd_dev->parent != NULL);
2273
2274         /*
2275          * First things first.  The original osd request is of no
2276          * use to use any more, we'll need a new one that can hold
2277          * the two ops in a copyup request.  We'll get that later,
2278          * but for now we can release the old one.
2279          */
2280         rbd_osd_req_destroy(obj_request->osd_req);
2281         obj_request->osd_req = NULL;
2282
2283         /*
2284          * Determine the byte range covered by the object in the
2285          * child image to which the original request was to be sent.
2286          */
2287         img_offset = obj_request->img_offset - obj_request->offset;
2288         length = (u64)1 << rbd_dev->header.obj_order;
2289
2290         /*
2291          * There is no defined parent data beyond the parent
2292          * overlap, so limit what we read at that boundary if
2293          * necessary.
2294          */
2295         if (img_offset + length > rbd_dev->parent_overlap) {
2296                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297                 length = rbd_dev->parent_overlap - img_offset;
2298         }
2299
2300         /*
2301          * Allocate a page array big enough to receive the data read
2302          * from the parent.
2303          */
2304         page_count = (u32)calc_pages_for(0, length);
2305         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306         if (IS_ERR(pages)) {
2307                 result = PTR_ERR(pages);
2308                 pages = NULL;
2309                 goto out_err;
2310         }
2311
2312         result = -ENOMEM;
2313         parent_request = rbd_img_request_create(rbd_dev->parent,
2314                                                 img_offset, length,
2315                                                 false, true);
2316         if (!parent_request)
2317                 goto out_err;
2318         rbd_obj_request_get(obj_request);
2319         parent_request->obj_request = obj_request;
2320
2321         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2322         if (result)
2323                 goto out_err;
2324         parent_request->copyup_pages = pages;
2325
2326         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2327         result = rbd_img_request_submit(parent_request);
2328         if (!result)
2329                 return 0;
2330
2331         parent_request->copyup_pages = NULL;
2332         parent_request->obj_request = NULL;
2333         rbd_obj_request_put(obj_request);
2334 out_err:
2335         if (pages)
2336                 ceph_release_page_vector(pages, page_count);
2337         if (parent_request)
2338                 rbd_img_request_put(parent_request);
2339         obj_request->result = result;
2340         obj_request->xferred = 0;
2341         obj_request_done_set(obj_request);
2342
2343         return result;
2344 }
2345
2346 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2347 {
2348         struct rbd_obj_request *orig_request;
2349         int result;
2350
2351         rbd_assert(!obj_request_img_data_test(obj_request));
2352
2353         /*
2354          * All we need from the object request is the original
2355          * request and the result of the STAT op.  Grab those, then
2356          * we're done with the request.
2357          */
2358         orig_request = obj_request->obj_request;
2359         obj_request->obj_request = NULL;
2360         rbd_assert(orig_request);
2361         rbd_assert(orig_request->img_request);
2362
2363         result = obj_request->result;
2364         obj_request->result = 0;
2365
2366         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2367                 obj_request, orig_request, result,
2368                 obj_request->xferred, obj_request->length);
2369         rbd_obj_request_put(obj_request);
2370
2371         rbd_assert(orig_request);
2372         rbd_assert(orig_request->img_request);
2373
2374         /*
2375          * Our only purpose here is to determine whether the object
2376          * exists, and we don't want to treat the non-existence as
2377          * an error.  If something else comes back, transfer the
2378          * error to the original request and complete it now.
2379          */
2380         if (!result) {
2381                 obj_request_existence_set(orig_request, true);
2382         } else if (result == -ENOENT) {
2383                 obj_request_existence_set(orig_request, false);
2384         } else if (result) {
2385                 orig_request->result = result;
2386                 goto out;
2387         }
2388
2389         /*
2390          * Resubmit the original request now that we have recorded
2391          * whether the target object exists.
2392          */
2393         orig_request->result = rbd_img_obj_request_submit(orig_request);
2394 out:
2395         if (orig_request->result)
2396                 rbd_obj_request_complete(orig_request);
2397         rbd_obj_request_put(orig_request);
2398 }
2399
2400 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2401 {
2402         struct rbd_obj_request *stat_request;
2403         struct rbd_device *rbd_dev;
2404         struct ceph_osd_client *osdc;
2405         struct page **pages = NULL;
2406         u32 page_count;
2407         size_t size;
2408         int ret;
2409
2410         /*
2411          * The response data for a STAT call consists of:
2412          *     le64 length;
2413          *     struct {
2414          *         le32 tv_sec;
2415          *         le32 tv_nsec;
2416          *     } mtime;
2417          */
2418         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2419         page_count = (u32)calc_pages_for(0, size);
2420         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2421         if (IS_ERR(pages))
2422                 return PTR_ERR(pages);
2423
2424         ret = -ENOMEM;
2425         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2426                                                         OBJ_REQUEST_PAGES);
2427         if (!stat_request)
2428                 goto out;
2429
2430         rbd_obj_request_get(obj_request);
2431         stat_request->obj_request = obj_request;
2432         stat_request->pages = pages;
2433         stat_request->page_count = page_count;
2434
2435         rbd_assert(obj_request->img_request);
2436         rbd_dev = obj_request->img_request->rbd_dev;
2437         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2438                                                 stat_request);
2439         if (!stat_request->osd_req)
2440                 goto out;
2441         stat_request->callback = rbd_img_obj_exists_callback;
2442
2443         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2444         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2445                                         false, false);
2446         rbd_osd_req_format_read(stat_request);
2447
2448         osdc = &rbd_dev->rbd_client->client->osdc;
2449         ret = rbd_obj_request_submit(osdc, stat_request);
2450 out:
2451         if (ret)
2452                 rbd_obj_request_put(obj_request);
2453
2454         return ret;
2455 }
2456
2457 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2458 {
2459         struct rbd_img_request *img_request;
2460         struct rbd_device *rbd_dev;
2461         bool known;
2462
2463         rbd_assert(obj_request_img_data_test(obj_request));
2464
2465         img_request = obj_request->img_request;
2466         rbd_assert(img_request);
2467         rbd_dev = img_request->rbd_dev;
2468
2469         /*
2470          * Only writes to layered images need special handling.
2471          * Reads and non-layered writes are simple object requests.
2472          * Layered writes that start beyond the end of the overlap
2473          * with the parent have no parent data, so they too are
2474          * simple object requests.  Finally, if the target object is
2475          * known to already exist, its parent data has already been
2476          * copied, so a write to the object can also be handled as a
2477          * simple object request.
2478          */
2479         if (!img_request_write_test(img_request) ||
2480                 !img_request_layered_test(img_request) ||
2481                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2482                 ((known = obj_request_known_test(obj_request)) &&
2483                         obj_request_exists_test(obj_request))) {
2484
2485                 struct rbd_device *rbd_dev;
2486                 struct ceph_osd_client *osdc;
2487
2488                 rbd_dev = obj_request->img_request->rbd_dev;
2489                 osdc = &rbd_dev->rbd_client->client->osdc;
2490
2491                 return rbd_obj_request_submit(osdc, obj_request);
2492         }
2493
2494         /*
2495          * It's a layered write.  The target object might exist but
2496          * we may not know that yet.  If we know it doesn't exist,
2497          * start by reading the data for the full target object from
2498          * the parent so we can use it for a copyup to the target.
2499          */
2500         if (known)
2501                 return rbd_img_obj_parent_read_full(obj_request);
2502
2503         /* We don't know whether the target exists.  Go find out. */
2504
2505         return rbd_img_obj_exists_submit(obj_request);
2506 }
2507
2508 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2509 {
2510         struct rbd_obj_request *obj_request;
2511         struct rbd_obj_request *next_obj_request;
2512
2513         dout("%s: img %p\n", __func__, img_request);
2514         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2515                 int ret;
2516
2517                 ret = rbd_img_obj_request_submit(obj_request);
2518                 if (ret)
2519                         return ret;
2520         }
2521
2522         return 0;
2523 }
2524
2525 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2526 {
2527         struct rbd_obj_request *obj_request;
2528         struct rbd_device *rbd_dev;
2529         u64 obj_end;
2530
2531         rbd_assert(img_request_child_test(img_request));
2532
2533         obj_request = img_request->obj_request;
2534         rbd_assert(obj_request);
2535         rbd_assert(obj_request->img_request);
2536
2537         obj_request->result = img_request->result;
2538         if (obj_request->result)
2539                 goto out;
2540
2541         /*
2542          * We need to zero anything beyond the parent overlap
2543          * boundary.  Since rbd_img_obj_request_read_callback()
2544          * will zero anything beyond the end of a short read, an
2545          * easy way to do this is to pretend the data from the
2546          * parent came up short--ending at the overlap boundary.
2547          */
2548         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2549         obj_end = obj_request->img_offset + obj_request->length;
2550         rbd_dev = obj_request->img_request->rbd_dev;
2551         if (obj_end > rbd_dev->parent_overlap) {
2552                 u64 xferred = 0;
2553
2554                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2555                         xferred = rbd_dev->parent_overlap -
2556                                         obj_request->img_offset;
2557
2558                 obj_request->xferred = min(img_request->xferred, xferred);
2559         } else {
2560                 obj_request->xferred = img_request->xferred;
2561         }
2562 out:
2563         rbd_img_request_put(img_request);
2564         rbd_img_obj_request_read_callback(obj_request);
2565         rbd_obj_request_complete(obj_request);
2566 }
2567
2568 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2569 {
2570         struct rbd_device *rbd_dev;
2571         struct rbd_img_request *img_request;
2572         int result;
2573
2574         rbd_assert(obj_request_img_data_test(obj_request));
2575         rbd_assert(obj_request->img_request != NULL);
2576         rbd_assert(obj_request->result == (s32) -ENOENT);
2577         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2578
2579         rbd_dev = obj_request->img_request->rbd_dev;
2580         rbd_assert(rbd_dev->parent != NULL);
2581         /* rbd_read_finish(obj_request, obj_request->length); */
2582         img_request = rbd_img_request_create(rbd_dev->parent,
2583                                                 obj_request->img_offset,
2584                                                 obj_request->length,
2585                                                 false, true);
2586         result = -ENOMEM;
2587         if (!img_request)
2588                 goto out_err;
2589
2590         rbd_obj_request_get(obj_request);
2591         img_request->obj_request = obj_request;
2592
2593         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2594                                         obj_request->bio_list);
2595         if (result)
2596                 goto out_err;
2597
2598         img_request->callback = rbd_img_parent_read_callback;
2599         result = rbd_img_request_submit(img_request);
2600         if (result)
2601                 goto out_err;
2602
2603         return;
2604 out_err:
2605         if (img_request)
2606                 rbd_img_request_put(img_request);
2607         obj_request->result = result;
2608         obj_request->xferred = 0;
2609         obj_request_done_set(obj_request);
2610 }
2611
2612 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2613 {
2614         struct rbd_obj_request *obj_request;
2615         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2616         int ret;
2617
2618         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2619                                                         OBJ_REQUEST_NODATA);
2620         if (!obj_request)
2621                 return -ENOMEM;
2622
2623         ret = -ENOMEM;
2624         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2625         if (!obj_request->osd_req)
2626                 goto out;
2627         obj_request->callback = rbd_obj_request_put;
2628
2629         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2630                                         notify_id, 0, 0);
2631         rbd_osd_req_format_read(obj_request);
2632
2633         ret = rbd_obj_request_submit(osdc, obj_request);
2634 out:
2635         if (ret)
2636                 rbd_obj_request_put(obj_request);
2637
2638         return ret;
2639 }
2640
2641 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2642 {
2643         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2644         int ret;
2645
2646         if (!rbd_dev)
2647                 return;
2648
2649         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2650                 rbd_dev->header_name, (unsigned long long)notify_id,
2651                 (unsigned int)opcode);
2652         ret = rbd_dev_refresh(rbd_dev);
2653         if (ret)
2654                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2655
2656         rbd_obj_notify_ack(rbd_dev, notify_id);
2657 }
2658
2659 /*
2660  * Request sync osd watch/unwatch.  The value of "start" determines
2661  * whether a watch request is being initiated or torn down.
2662  */
2663 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2664 {
2665         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2666         struct rbd_obj_request *obj_request;
2667         int ret;
2668
2669         rbd_assert(start ^ !!rbd_dev->watch_event);
2670         rbd_assert(start ^ !!rbd_dev->watch_request);
2671
2672         if (start) {
2673                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2674                                                 &rbd_dev->watch_event);
2675                 if (ret < 0)
2676                         return ret;
2677                 rbd_assert(rbd_dev->watch_event != NULL);
2678         }
2679
2680         ret = -ENOMEM;
2681         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2682                                                         OBJ_REQUEST_NODATA);
2683         if (!obj_request)
2684                 goto out_cancel;
2685
2686         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2687         if (!obj_request->osd_req)
2688                 goto out_cancel;
2689
2690         if (start)
2691                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2692         else
2693                 ceph_osdc_unregister_linger_request(osdc,
2694                                         rbd_dev->watch_request->osd_req);
2695
2696         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2697                                 rbd_dev->watch_event->cookie, 0, start);
2698         rbd_osd_req_format_write(obj_request);
2699
2700         ret = rbd_obj_request_submit(osdc, obj_request);
2701         if (ret)
2702                 goto out_cancel;
2703         ret = rbd_obj_request_wait(obj_request);
2704         if (ret)
2705                 goto out_cancel;
2706         ret = obj_request->result;
2707         if (ret)
2708                 goto out_cancel;
2709
2710         /*
2711          * A watch request is set to linger, so the underlying osd
2712          * request won't go away until we unregister it.  We retain
2713          * a pointer to the object request during that time (in
2714          * rbd_dev->watch_request), so we'll keep a reference to
2715          * it.  We'll drop that reference (below) after we've
2716          * unregistered it.
2717          */
2718         if (start) {
2719                 rbd_dev->watch_request = obj_request;
2720
2721                 return 0;
2722         }
2723
2724         /* We have successfully torn down the watch request */
2725
2726         rbd_obj_request_put(rbd_dev->watch_request);
2727         rbd_dev->watch_request = NULL;
2728 out_cancel:
2729         /* Cancel the event if we're tearing down, or on error */
2730         ceph_osdc_cancel_event(rbd_dev->watch_event);
2731         rbd_dev->watch_event = NULL;
2732         if (obj_request)
2733                 rbd_obj_request_put(obj_request);
2734
2735         return ret;
2736 }
2737
2738 /*
2739  * Synchronous osd object method call.  Returns the number of bytes
2740  * returned in the outbound buffer, or a negative error code.
2741  */
2742 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2743                              const char *object_name,
2744                              const char *class_name,
2745                              const char *method_name,
2746                              const void *outbound,
2747                              size_t outbound_size,
2748                              void *inbound,
2749                              size_t inbound_size)
2750 {
2751         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2752         struct rbd_obj_request *obj_request;
2753         struct page **pages;
2754         u32 page_count;
2755         int ret;
2756
2757         /*
2758          * Method calls are ultimately read operations.  The result
2759          * should placed into the inbound buffer provided.  They
2760          * also supply outbound data--parameters for the object
2761          * method.  Currently if this is present it will be a
2762          * snapshot id.
2763          */
2764         page_count = (u32)calc_pages_for(0, inbound_size);
2765         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2766         if (IS_ERR(pages))
2767                 return PTR_ERR(pages);
2768
2769         ret = -ENOMEM;
2770         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2771                                                         OBJ_REQUEST_PAGES);
2772         if (!obj_request)
2773                 goto out;
2774
2775         obj_request->pages = pages;
2776         obj_request->page_count = page_count;
2777
2778         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2779         if (!obj_request->osd_req)
2780                 goto out;
2781
2782         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2783                                         class_name, method_name);
2784         if (outbound_size) {
2785                 struct ceph_pagelist *pagelist;
2786
2787                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2788                 if (!pagelist)
2789                         goto out;
2790
2791                 ceph_pagelist_init(pagelist);
2792                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2793                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2794                                                 pagelist);
2795         }
2796         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2797                                         obj_request->pages, inbound_size,
2798                                         0, false, false);
2799         rbd_osd_req_format_read(obj_request);
2800
2801         ret = rbd_obj_request_submit(osdc, obj_request);
2802         if (ret)
2803                 goto out;
2804         ret = rbd_obj_request_wait(obj_request);
2805         if (ret)
2806                 goto out;
2807
2808         ret = obj_request->result;
2809         if (ret < 0)
2810                 goto out;
2811
2812         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2813         ret = (int)obj_request->xferred;
2814         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2815 out:
2816         if (obj_request)
2817                 rbd_obj_request_put(obj_request);
2818         else
2819                 ceph_release_page_vector(pages, page_count);
2820
2821         return ret;
2822 }
2823
2824 static void rbd_request_fn(struct request_queue *q)
2825                 __releases(q->queue_lock) __acquires(q->queue_lock)
2826 {
2827         struct rbd_device *rbd_dev = q->queuedata;
2828         bool read_only = rbd_dev->mapping.read_only;
2829         struct request *rq;
2830         int result;
2831
2832         while ((rq = blk_fetch_request(q))) {
2833                 bool write_request = rq_data_dir(rq) == WRITE;
2834                 struct rbd_img_request *img_request;
2835                 u64 offset;
2836                 u64 length;
2837
2838                 /* Ignore any non-FS requests that filter through. */
2839
2840                 if (rq->cmd_type != REQ_TYPE_FS) {
2841                         dout("%s: non-fs request type %d\n", __func__,
2842                                 (int) rq->cmd_type);
2843                         __blk_end_request_all(rq, 0);
2844                         continue;
2845                 }
2846
2847                 /* Ignore/skip any zero-length requests */
2848
2849                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2850                 length = (u64) blk_rq_bytes(rq);
2851
2852                 if (!length) {
2853                         dout("%s: zero-length request\n", __func__);
2854                         __blk_end_request_all(rq, 0);
2855                         continue;
2856                 }
2857
2858                 spin_unlock_irq(q->queue_lock);
2859
2860                 /* Disallow writes to a read-only device */
2861
2862                 if (write_request) {
2863                         result = -EROFS;
2864                         if (read_only)
2865                                 goto end_request;
2866                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2867                 }
2868
2869                 /*
2870                  * Quit early if the mapped snapshot no longer
2871                  * exists.  It's still possible the snapshot will
2872                  * have disappeared by the time our request arrives
2873                  * at the osd, but there's no sense in sending it if
2874                  * we already know.
2875                  */
2876                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2877                         dout("request for non-existent snapshot");
2878                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2879                         result = -ENXIO;
2880                         goto end_request;
2881                 }
2882
2883                 result = -EINVAL;
2884                 if (offset && length > U64_MAX - offset + 1) {
2885                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2886                                 offset, length);
2887                         goto end_request;       /* Shouldn't happen */
2888                 }
2889
2890                 result = -EIO;
2891                 if (offset + length > rbd_dev->mapping.size) {
2892                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2893                                 offset, length, rbd_dev->mapping.size);
2894                         goto end_request;
2895                 }
2896
2897                 result = -ENOMEM;
2898                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2899                                                         write_request, false);
2900                 if (!img_request)
2901                         goto end_request;
2902
2903                 img_request->rq = rq;
2904
2905                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2906                                                 rq->bio);
2907                 if (!result)
2908                         result = rbd_img_request_submit(img_request);
2909                 if (result)
2910                         rbd_img_request_put(img_request);
2911 end_request:
2912                 spin_lock_irq(q->queue_lock);
2913                 if (result < 0) {
2914                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2915                                 write_request ? "write" : "read",
2916                                 length, offset, result);
2917
2918                         __blk_end_request_all(rq, result);
2919                 }
2920         }
2921 }
2922
2923 /*
2924  * a queue callback. Makes sure that we don't create a bio that spans across
2925  * multiple osd objects. One exception would be with a single page bios,
2926  * which we handle later at bio_chain_clone_range()
2927  */
2928 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2929                           struct bio_vec *bvec)
2930 {
2931         struct rbd_device *rbd_dev = q->queuedata;
2932         sector_t sector_offset;
2933         sector_t sectors_per_obj;
2934         sector_t obj_sector_offset;
2935         int ret;
2936
2937         /*
2938          * Find how far into its rbd object the partition-relative
2939          * bio start sector is to offset relative to the enclosing
2940          * device.
2941          */
2942         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2943         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2944         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2945
2946         /*
2947          * Compute the number of bytes from that offset to the end
2948          * of the object.  Account for what's already used by the bio.
2949          */
2950         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2951         if (ret > bmd->bi_size)
2952                 ret -= bmd->bi_size;
2953         else
2954                 ret = 0;
2955
2956         /*
2957          * Don't send back more than was asked for.  And if the bio
2958          * was empty, let the whole thing through because:  "Note
2959          * that a block device *must* allow a single page to be
2960          * added to an empty bio."
2961          */
2962         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2963         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2964                 ret = (int) bvec->bv_len;
2965
2966         return ret;
2967 }
2968
2969 static void rbd_free_disk(struct rbd_device *rbd_dev)
2970 {
2971         struct gendisk *disk = rbd_dev->disk;
2972
2973         if (!disk)
2974                 return;
2975
2976         rbd_dev->disk = NULL;
2977         if (disk->flags & GENHD_FL_UP) {
2978                 del_gendisk(disk);
2979                 if (disk->queue)
2980                         blk_cleanup_queue(disk->queue);
2981         }
2982         put_disk(disk);
2983 }
2984
2985 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2986                                 const char *object_name,
2987                                 u64 offset, u64 length, void *buf)
2988
2989 {
2990         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2991         struct rbd_obj_request *obj_request;
2992         struct page **pages = NULL;
2993         u32 page_count;
2994         size_t size;
2995         int ret;
2996
2997         page_count = (u32) calc_pages_for(offset, length);
2998         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2999         if (IS_ERR(pages))
3000                 ret = PTR_ERR(pages);
3001
3002         ret = -ENOMEM;
3003         obj_request = rbd_obj_request_create(object_name, offset, length,
3004                                                         OBJ_REQUEST_PAGES);
3005         if (!obj_request)
3006                 goto out;
3007
3008         obj_request->pages = pages;
3009         obj_request->page_count = page_count;
3010
3011         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3012         if (!obj_request->osd_req)
3013                 goto out;
3014
3015         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3016                                         offset, length, 0, 0);
3017         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3018                                         obj_request->pages,
3019                                         obj_request->length,
3020                                         obj_request->offset & ~PAGE_MASK,
3021                                         false, false);
3022         rbd_osd_req_format_read(obj_request);
3023
3024         ret = rbd_obj_request_submit(osdc, obj_request);
3025         if (ret)
3026                 goto out;
3027         ret = rbd_obj_request_wait(obj_request);
3028         if (ret)
3029                 goto out;
3030
3031         ret = obj_request->result;
3032         if (ret < 0)
3033                 goto out;
3034
3035         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3036         size = (size_t) obj_request->xferred;
3037         ceph_copy_from_page_vector(pages, buf, 0, size);
3038         rbd_assert(size <= (size_t)INT_MAX);
3039         ret = (int)size;
3040 out:
3041         if (obj_request)
3042                 rbd_obj_request_put(obj_request);
3043         else
3044                 ceph_release_page_vector(pages, page_count);
3045
3046         return ret;
3047 }
3048
3049 /*
3050  * Read the complete header for the given rbd device.  On successful
3051  * return, the rbd_dev->header field will contain up-to-date
3052  * information about the image.
3053  */
3054 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3055 {
3056         struct rbd_image_header_ondisk *ondisk = NULL;
3057         u32 snap_count = 0;
3058         u64 names_size = 0;
3059         u32 want_count;
3060         int ret;
3061
3062         /*
3063          * The complete header will include an array of its 64-bit
3064          * snapshot ids, followed by the names of those snapshots as
3065          * a contiguous block of NUL-terminated strings.  Note that
3066          * the number of snapshots could change by the time we read
3067          * it in, in which case we re-read it.
3068          */
3069         do {
3070                 size_t size;
3071
3072                 kfree(ondisk);
3073
3074                 size = sizeof (*ondisk);
3075                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3076                 size += names_size;
3077                 ondisk = kmalloc(size, GFP_KERNEL);
3078                 if (!ondisk)
3079                         return -ENOMEM;
3080
3081                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3082                                        0, size, ondisk);
3083                 if (ret < 0)
3084                         goto out;
3085                 if ((size_t)ret < size) {
3086                         ret = -ENXIO;
3087                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3088                                 size, ret);
3089                         goto out;
3090                 }
3091                 if (!rbd_dev_ondisk_valid(ondisk)) {
3092                         ret = -ENXIO;
3093                         rbd_warn(rbd_dev, "invalid header");
3094                         goto out;
3095                 }
3096
3097                 names_size = le64_to_cpu(ondisk->snap_names_len);
3098                 want_count = snap_count;
3099                 snap_count = le32_to_cpu(ondisk->snap_count);
3100         } while (snap_count != want_count);
3101
3102         ret = rbd_header_from_disk(rbd_dev, ondisk);
3103 out:
3104         kfree(ondisk);
3105
3106         return ret;
3107 }
3108
3109 /*
3110  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3111  * has disappeared from the (just updated) snapshot context.
3112  */
3113 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3114 {
3115         u64 snap_id;
3116
3117         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3118                 return;
3119
3120         snap_id = rbd_dev->spec->snap_id;
3121         if (snap_id == CEPH_NOSNAP)
3122                 return;
3123
3124         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3125                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3126 }
3127
3128 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3129 {
3130         u64 mapping_size;
3131         int ret;
3132
3133         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3134         mapping_size = rbd_dev->mapping.size;
3135         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3136         if (rbd_dev->image_format == 1)
3137                 ret = rbd_dev_v1_header_info(rbd_dev);
3138         else
3139                 ret = rbd_dev_v2_header_info(rbd_dev);
3140
3141         /* If it's a mapped snapshot, validate its EXISTS flag */
3142
3143         rbd_exists_validate(rbd_dev);
3144         mutex_unlock(&ctl_mutex);
3145         if (mapping_size != rbd_dev->mapping.size) {
3146                 sector_t size;
3147
3148                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3149                 dout("setting size to %llu sectors", (unsigned long long)size);
3150                 set_capacity(rbd_dev->disk, size);
3151                 revalidate_disk(rbd_dev->disk);
3152         }
3153
3154         return ret;
3155 }
3156
3157 static int rbd_init_disk(struct rbd_device *rbd_dev)
3158 {
3159         struct gendisk *disk;
3160         struct request_queue *q;
3161         u64 segment_size;
3162
3163         /* create gendisk info */
3164         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3165         if (!disk)
3166                 return -ENOMEM;
3167
3168         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3169                  rbd_dev->dev_id);
3170         disk->major = rbd_dev->major;
3171         disk->first_minor = 0;
3172         disk->fops = &rbd_bd_ops;
3173         disk->private_data = rbd_dev;
3174
3175         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3176         if (!q)
3177                 goto out_disk;
3178
3179         /* We use the default size, but let's be explicit about it. */
3180         blk_queue_physical_block_size(q, SECTOR_SIZE);
3181
3182         /* set io sizes to object size */
3183         segment_size = rbd_obj_bytes(&rbd_dev->header);
3184         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3185         blk_queue_max_segment_size(q, segment_size);
3186         blk_queue_io_min(q, segment_size);
3187         blk_queue_io_opt(q, segment_size);
3188
3189         blk_queue_merge_bvec(q, rbd_merge_bvec);
3190         disk->queue = q;
3191
3192         q->queuedata = rbd_dev;
3193
3194         rbd_dev->disk = disk;
3195
3196         return 0;
3197 out_disk:
3198         put_disk(disk);
3199
3200         return -ENOMEM;
3201 }
3202
3203 /*
3204   sysfs
3205 */
3206
3207 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3208 {
3209         return container_of(dev, struct rbd_device, dev);
3210 }
3211
3212 static ssize_t rbd_size_show(struct device *dev,
3213                              struct device_attribute *attr, char *buf)
3214 {
3215         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3216
3217         return sprintf(buf, "%llu\n",
3218                 (unsigned long long)rbd_dev->mapping.size);
3219 }
3220
3221 /*
3222  * Note this shows the features for whatever's mapped, which is not
3223  * necessarily the base image.
3224  */
3225 static ssize_t rbd_features_show(struct device *dev,
3226                              struct device_attribute *attr, char *buf)
3227 {
3228         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229
3230         return sprintf(buf, "0x%016llx\n",
3231                         (unsigned long long)rbd_dev->mapping.features);
3232 }
3233
3234 static ssize_t rbd_major_show(struct device *dev,
3235                               struct device_attribute *attr, char *buf)
3236 {
3237         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3238
3239         if (rbd_dev->major)
3240                 return sprintf(buf, "%d\n", rbd_dev->major);
3241
3242         return sprintf(buf, "(none)\n");
3243
3244 }
3245
3246 static ssize_t rbd_client_id_show(struct device *dev,
3247                                   struct device_attribute *attr, char *buf)
3248 {
3249         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3250
3251         return sprintf(buf, "client%lld\n",
3252                         ceph_client_id(rbd_dev->rbd_client->client));
3253 }
3254
3255 static ssize_t rbd_pool_show(struct device *dev,
3256                              struct device_attribute *attr, char *buf)
3257 {
3258         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3259
3260         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3261 }
3262
3263 static ssize_t rbd_pool_id_show(struct device *dev,
3264                              struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         return sprintf(buf, "%llu\n",
3269                         (unsigned long long) rbd_dev->spec->pool_id);
3270 }
3271
3272 static ssize_t rbd_name_show(struct device *dev,
3273                              struct device_attribute *attr, char *buf)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276
3277         if (rbd_dev->spec->image_name)
3278                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3279
3280         return sprintf(buf, "(unknown)\n");
3281 }
3282
3283 static ssize_t rbd_image_id_show(struct device *dev,
3284                              struct device_attribute *attr, char *buf)
3285 {
3286         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287
3288         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3289 }
3290
3291 /*
3292  * Shows the name of the currently-mapped snapshot (or
3293  * RBD_SNAP_HEAD_NAME for the base image).
3294  */
3295 static ssize_t rbd_snap_show(struct device *dev,
3296                              struct device_attribute *attr,
3297                              char *buf)
3298 {
3299         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3300
3301         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3302 }
3303
3304 /*
3305  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3306  * for the parent image.  If there is no parent, simply shows
3307  * "(no parent image)".
3308  */
3309 static ssize_t rbd_parent_show(struct device *dev,
3310                              struct device_attribute *attr,
3311                              char *buf)
3312 {
3313         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3314         struct rbd_spec *spec = rbd_dev->parent_spec;
3315         int count;
3316         char *bufp = buf;
3317
3318         if (!spec)
3319                 return sprintf(buf, "(no parent image)\n");
3320
3321         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3322                         (unsigned long long) spec->pool_id, spec->pool_name);
3323         if (count < 0)
3324                 return count;
3325         bufp += count;
3326
3327         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3328                         spec->image_name ? spec->image_name : "(unknown)");
3329         if (count < 0)
3330                 return count;
3331         bufp += count;
3332
3333         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3334                         (unsigned long long) spec->snap_id, spec->snap_name);
3335         if (count < 0)
3336                 return count;
3337         bufp += count;
3338
3339         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3340         if (count < 0)
3341                 return count;
3342         bufp += count;
3343
3344         return (ssize_t) (bufp - buf);
3345 }
3346
3347 static ssize_t rbd_image_refresh(struct device *dev,
3348                                  struct device_attribute *attr,
3349                                  const char *buf,
3350                                  size_t size)
3351 {
3352         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3353         int ret;
3354
3355         ret = rbd_dev_refresh(rbd_dev);
3356         if (ret)
3357                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3358
3359         return ret < 0 ? ret : size;
3360 }
3361
3362 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3363 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3364 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3365 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3366 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3367 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3368 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3369 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3370 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3371 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3372 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3373
3374 static struct attribute *rbd_attrs[] = {
3375         &dev_attr_size.attr,
3376         &dev_attr_features.attr,
3377         &dev_attr_major.attr,
3378         &dev_attr_client_id.attr,
3379         &dev_attr_pool.attr,
3380         &dev_attr_pool_id.attr,
3381         &dev_attr_name.attr,
3382         &dev_attr_image_id.attr,
3383         &dev_attr_current_snap.attr,
3384         &dev_attr_parent.attr,
3385         &dev_attr_refresh.attr,
3386         NULL
3387 };
3388
3389 static struct attribute_group rbd_attr_group = {
3390         .attrs = rbd_attrs,
3391 };
3392
3393 static const struct attribute_group *rbd_attr_groups[] = {
3394         &rbd_attr_group,
3395         NULL
3396 };
3397
3398 static void rbd_sysfs_dev_release(struct device *dev)
3399 {
3400 }
3401
3402 static struct device_type rbd_device_type = {
3403         .name           = "rbd",
3404         .groups         = rbd_attr_groups,
3405         .release        = rbd_sysfs_dev_release,
3406 };
3407
3408 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3409 {
3410         kref_get(&spec->kref);
3411
3412         return spec;
3413 }
3414
3415 static void rbd_spec_free(struct kref *kref);
3416 static void rbd_spec_put(struct rbd_spec *spec)
3417 {
3418         if (spec)
3419                 kref_put(&spec->kref, rbd_spec_free);
3420 }
3421
3422 static struct rbd_spec *rbd_spec_alloc(void)
3423 {
3424         struct rbd_spec *spec;
3425
3426         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3427         if (!spec)
3428                 return NULL;
3429         kref_init(&spec->kref);
3430
3431         return spec;
3432 }
3433
3434 static void rbd_spec_free(struct kref *kref)
3435 {
3436         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3437
3438         kfree(spec->pool_name);
3439         kfree(spec->image_id);
3440         kfree(spec->image_name);
3441         kfree(spec->snap_name);
3442         kfree(spec);
3443 }
3444
3445 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3446                                 struct rbd_spec *spec)
3447 {
3448         struct rbd_device *rbd_dev;
3449
3450         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3451         if (!rbd_dev)
3452                 return NULL;
3453
3454         spin_lock_init(&rbd_dev->lock);
3455         rbd_dev->flags = 0;
3456         INIT_LIST_HEAD(&rbd_dev->node);
3457         init_rwsem(&rbd_dev->header_rwsem);
3458
3459         rbd_dev->spec = spec;
3460         rbd_dev->rbd_client = rbdc;
3461
3462         /* Initialize the layout used for all rbd requests */
3463
3464         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3465         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3466         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3467         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3468
3469         return rbd_dev;
3470 }
3471
3472 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3473 {
3474         rbd_put_client(rbd_dev->rbd_client);
3475         rbd_spec_put(rbd_dev->spec);
3476         kfree(rbd_dev);
3477 }
3478
3479 /*
3480  * Get the size and object order for an image snapshot, or if
3481  * snap_id is CEPH_NOSNAP, gets this information for the base
3482  * image.
3483  */
3484 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3485                                 u8 *order, u64 *snap_size)
3486 {
3487         __le64 snapid = cpu_to_le64(snap_id);
3488         int ret;
3489         struct {
3490                 u8 order;
3491                 __le64 size;
3492         } __attribute__ ((packed)) size_buf = { 0 };
3493
3494         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3495                                 "rbd", "get_size",
3496                                 &snapid, sizeof (snapid),
3497                                 &size_buf, sizeof (size_buf));
3498         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3499         if (ret < 0)
3500                 return ret;
3501         if (ret < sizeof (size_buf))
3502                 return -ERANGE;
3503
3504         if (order)
3505                 *order = size_buf.order;
3506         *snap_size = le64_to_cpu(size_buf.size);
3507
3508         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3509                 (unsigned long long)snap_id, (unsigned int)*order,
3510                 (unsigned long long)*snap_size);
3511
3512         return 0;
3513 }
3514
3515 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3516 {
3517         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3518                                         &rbd_dev->header.obj_order,
3519                                         &rbd_dev->header.image_size);
3520 }
3521
3522 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3523 {
3524         void *reply_buf;
3525         int ret;
3526         void *p;
3527
3528         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3529         if (!reply_buf)
3530                 return -ENOMEM;
3531
3532         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3533                                 "rbd", "get_object_prefix", NULL, 0,
3534                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3535         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3536         if (ret < 0)
3537                 goto out;
3538
3539         p = reply_buf;
3540         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3541                                                 p + ret, NULL, GFP_NOIO);
3542         ret = 0;
3543
3544         if (IS_ERR(rbd_dev->header.object_prefix)) {
3545                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3546                 rbd_dev->header.object_prefix = NULL;
3547         } else {
3548                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3549         }
3550 out:
3551         kfree(reply_buf);
3552
3553         return ret;
3554 }
3555
3556 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3557                 u64 *snap_features)
3558 {
3559         __le64 snapid = cpu_to_le64(snap_id);
3560         struct {
3561                 __le64 features;
3562                 __le64 incompat;
3563         } __attribute__ ((packed)) features_buf = { 0 };
3564         u64 incompat;
3565         int ret;
3566
3567         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3568                                 "rbd", "get_features",
3569                                 &snapid, sizeof (snapid),
3570                                 &features_buf, sizeof (features_buf));
3571         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3572         if (ret < 0)
3573                 return ret;
3574         if (ret < sizeof (features_buf))
3575                 return -ERANGE;
3576
3577         incompat = le64_to_cpu(features_buf.incompat);
3578         if (incompat & ~RBD_FEATURES_SUPPORTED)
3579                 return -ENXIO;
3580
3581         *snap_features = le64_to_cpu(features_buf.features);
3582
3583         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3584                 (unsigned long long)snap_id,
3585                 (unsigned long long)*snap_features,
3586                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3587
3588         return 0;
3589 }
3590
3591 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3592 {
3593         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3594                                                 &rbd_dev->header.features);
3595 }
3596
3597 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3598 {
3599         struct rbd_spec *parent_spec;
3600         size_t size;
3601         void *reply_buf = NULL;
3602         __le64 snapid;
3603         void *p;
3604         void *end;
3605         char *image_id;
3606         u64 overlap;
3607         int ret;
3608
3609         parent_spec = rbd_spec_alloc();
3610         if (!parent_spec)
3611                 return -ENOMEM;
3612
3613         size = sizeof (__le64) +                                /* pool_id */
3614                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3615                 sizeof (__le64) +                               /* snap_id */
3616                 sizeof (__le64);                                /* overlap */
3617         reply_buf = kmalloc(size, GFP_KERNEL);
3618         if (!reply_buf) {
3619                 ret = -ENOMEM;
3620                 goto out_err;
3621         }
3622
3623         snapid = cpu_to_le64(CEPH_NOSNAP);
3624         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3625                                 "rbd", "get_parent",
3626                                 &snapid, sizeof (snapid),
3627                                 reply_buf, size);
3628         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3629         if (ret < 0)
3630                 goto out_err;
3631
3632         p = reply_buf;
3633         end = reply_buf + ret;
3634         ret = -ERANGE;
3635         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3636         if (parent_spec->pool_id == CEPH_NOPOOL)
3637                 goto out;       /* No parent?  No problem. */
3638
3639         /* The ceph file layout needs to fit pool id in 32 bits */
3640
3641         ret = -EIO;
3642         if (parent_spec->pool_id > (u64)U32_MAX) {
3643                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3644                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3645                 goto out_err;
3646         }
3647
3648         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3649         if (IS_ERR(image_id)) {
3650                 ret = PTR_ERR(image_id);
3651                 goto out_err;
3652         }
3653         parent_spec->image_id = image_id;
3654         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3655         ceph_decode_64_safe(&p, end, overlap, out_err);
3656
3657         rbd_dev->parent_overlap = overlap;
3658         rbd_dev->parent_spec = parent_spec;
3659         parent_spec = NULL;     /* rbd_dev now owns this */
3660 out:
3661         ret = 0;
3662 out_err:
3663         kfree(reply_buf);
3664         rbd_spec_put(parent_spec);
3665
3666         return ret;
3667 }
3668
3669 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3670 {
3671         struct {
3672                 __le64 stripe_unit;
3673                 __le64 stripe_count;
3674         } __attribute__ ((packed)) striping_info_buf = { 0 };
3675         size_t size = sizeof (striping_info_buf);
3676         void *p;
3677         u64 obj_size;
3678         u64 stripe_unit;
3679         u64 stripe_count;
3680         int ret;
3681
3682         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3683                                 "rbd", "get_stripe_unit_count", NULL, 0,
3684                                 (char *)&striping_info_buf, size);
3685         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3686         if (ret < 0)
3687                 return ret;
3688         if (ret < size)
3689                 return -ERANGE;
3690
3691         /*
3692          * We don't actually support the "fancy striping" feature
3693          * (STRIPINGV2) yet, but if the striping sizes are the
3694          * defaults the behavior is the same as before.  So find
3695          * out, and only fail if the image has non-default values.
3696          */
3697         ret = -EINVAL;
3698         obj_size = (u64)1 << rbd_dev->header.obj_order;
3699         p = &striping_info_buf;
3700         stripe_unit = ceph_decode_64(&p);
3701         if (stripe_unit != obj_size) {
3702                 rbd_warn(rbd_dev, "unsupported stripe unit "
3703                                 "(got %llu want %llu)",
3704                                 stripe_unit, obj_size);
3705                 return -EINVAL;
3706         }
3707         stripe_count = ceph_decode_64(&p);
3708         if (stripe_count != 1) {
3709                 rbd_warn(rbd_dev, "unsupported stripe count "
3710                                 "(got %llu want 1)", stripe_count);
3711                 return -EINVAL;
3712         }
3713         rbd_dev->header.stripe_unit = stripe_unit;
3714         rbd_dev->header.stripe_count = stripe_count;
3715
3716         return 0;
3717 }
3718
3719 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3720 {
3721         size_t image_id_size;
3722         char *image_id;
3723         void *p;
3724         void *end;
3725         size_t size;
3726         void *reply_buf = NULL;
3727         size_t len = 0;
3728         char *image_name = NULL;
3729         int ret;
3730
3731         rbd_assert(!rbd_dev->spec->image_name);
3732
3733         len = strlen(rbd_dev->spec->image_id);
3734         image_id_size = sizeof (__le32) + len;
3735         image_id = kmalloc(image_id_size, GFP_KERNEL);
3736         if (!image_id)
3737                 return NULL;
3738
3739         p = image_id;
3740         end = image_id + image_id_size;
3741         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3742
3743         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3744         reply_buf = kmalloc(size, GFP_KERNEL);
3745         if (!reply_buf)
3746                 goto out;
3747
3748         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3749                                 "rbd", "dir_get_name",
3750                                 image_id, image_id_size,
3751                                 reply_buf, size);
3752         if (ret < 0)
3753                 goto out;
3754         p = reply_buf;
3755         end = reply_buf + ret;
3756
3757         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3758         if (IS_ERR(image_name))
3759                 image_name = NULL;
3760         else
3761                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3762 out:
3763         kfree(reply_buf);
3764         kfree(image_id);
3765
3766         return image_name;
3767 }
3768
3769 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3770 {
3771         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3772         const char *snap_name;
3773         u32 which = 0;
3774
3775         /* Skip over names until we find the one we are looking for */
3776
3777         snap_name = rbd_dev->header.snap_names;
3778         while (which < snapc->num_snaps) {
3779                 if (!strcmp(name, snap_name))
3780                         return snapc->snaps[which];
3781                 snap_name += strlen(snap_name) + 1;
3782                 which++;
3783         }
3784         return CEPH_NOSNAP;
3785 }
3786
3787 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3788 {
3789         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3790         u32 which;
3791         bool found = false;
3792         u64 snap_id;
3793
3794         for (which = 0; !found && which < snapc->num_snaps; which++) {
3795                 const char *snap_name;
3796
3797                 snap_id = snapc->snaps[which];
3798                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3799                 if (IS_ERR(snap_name))
3800                         break;
3801                 found = !strcmp(name, snap_name);
3802                 kfree(snap_name);
3803         }
3804         return found ? snap_id : CEPH_NOSNAP;
3805 }
3806
3807 /*
3808  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3809  * no snapshot by that name is found, or if an error occurs.
3810  */
3811 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3812 {
3813         if (rbd_dev->image_format == 1)
3814                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3815
3816         return rbd_v2_snap_id_by_name(rbd_dev, name);
3817 }
3818
3819 /*
3820  * When an rbd image has a parent image, it is identified by the
3821  * pool, image, and snapshot ids (not names).  This function fills
3822  * in the names for those ids.  (It's OK if we can't figure out the
3823  * name for an image id, but the pool and snapshot ids should always
3824  * exist and have names.)  All names in an rbd spec are dynamically
3825  * allocated.
3826  *
3827  * When an image being mapped (not a parent) is probed, we have the
3828  * pool name and pool id, image name and image id, and the snapshot
3829  * name.  The only thing we're missing is the snapshot id.
3830  */
3831 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3832 {
3833         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3834         struct rbd_spec *spec = rbd_dev->spec;
3835         const char *pool_name;
3836         const char *image_name;
3837         const char *snap_name;
3838         int ret;
3839
3840         /*
3841          * An image being mapped will have the pool name (etc.), but
3842          * we need to look up the snapshot id.
3843          */
3844         if (spec->pool_name) {
3845                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3846                         u64 snap_id;
3847
3848                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3849                         if (snap_id == CEPH_NOSNAP)
3850                                 return -ENOENT;
3851                         spec->snap_id = snap_id;
3852                 } else {
3853                         spec->snap_id = CEPH_NOSNAP;
3854                 }
3855
3856                 return 0;
3857         }
3858
3859         /* Get the pool name; we have to make our own copy of this */
3860
3861         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3862         if (!pool_name) {
3863                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3864                 return -EIO;
3865         }
3866         pool_name = kstrdup(pool_name, GFP_KERNEL);
3867         if (!pool_name)
3868                 return -ENOMEM;
3869
3870         /* Fetch the image name; tolerate failure here */
3871
3872         image_name = rbd_dev_image_name(rbd_dev);
3873         if (!image_name)
3874                 rbd_warn(rbd_dev, "unable to get image name");
3875
3876         /* Look up the snapshot name, and make a copy */
3877
3878         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3879         if (!snap_name) {
3880                 ret = -ENOMEM;
3881                 goto out_err;
3882         }
3883
3884         spec->pool_name = pool_name;
3885         spec->image_name = image_name;
3886         spec->snap_name = snap_name;
3887
3888         return 0;
3889 out_err:
3890         kfree(image_name);
3891         kfree(pool_name);
3892
3893         return ret;
3894 }
3895
3896 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3897 {
3898         size_t size;
3899         int ret;
3900         void *reply_buf;
3901         void *p;
3902         void *end;
3903         u64 seq;
3904         u32 snap_count;
3905         struct ceph_snap_context *snapc;
3906         u32 i;
3907
3908         /*
3909          * We'll need room for the seq value (maximum snapshot id),
3910          * snapshot count, and array of that many snapshot ids.
3911          * For now we have a fixed upper limit on the number we're
3912          * prepared to receive.
3913          */
3914         size = sizeof (__le64) + sizeof (__le32) +
3915                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3916         reply_buf = kzalloc(size, GFP_KERNEL);
3917         if (!reply_buf)
3918                 return -ENOMEM;
3919
3920         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3921                                 "rbd", "get_snapcontext", NULL, 0,
3922                                 reply_buf, size);
3923         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3924         if (ret < 0)
3925                 goto out;
3926
3927         p = reply_buf;
3928         end = reply_buf + ret;
3929         ret = -ERANGE;
3930         ceph_decode_64_safe(&p, end, seq, out);
3931         ceph_decode_32_safe(&p, end, snap_count, out);
3932
3933         /*
3934          * Make sure the reported number of snapshot ids wouldn't go
3935          * beyond the end of our buffer.  But before checking that,
3936          * make sure the computed size of the snapshot context we
3937          * allocate is representable in a size_t.
3938          */
3939         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3940                                  / sizeof (u64)) {
3941                 ret = -EINVAL;
3942                 goto out;
3943         }
3944         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3945                 goto out;
3946         ret = 0;
3947
3948         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3949         if (!snapc) {
3950                 ret = -ENOMEM;
3951                 goto out;
3952         }
3953         snapc->seq = seq;
3954         for (i = 0; i < snap_count; i++)
3955                 snapc->snaps[i] = ceph_decode_64(&p);
3956
3957         ceph_put_snap_context(rbd_dev->header.snapc);
3958         rbd_dev->header.snapc = snapc;
3959
3960         dout("  snap context seq = %llu, snap_count = %u\n",
3961                 (unsigned long long)seq, (unsigned int)snap_count);
3962 out:
3963         kfree(reply_buf);
3964
3965         return ret;
3966 }
3967
3968 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3969                                         u64 snap_id)
3970 {
3971         size_t size;
3972         void *reply_buf;
3973         __le64 snapid;
3974         int ret;
3975         void *p;
3976         void *end;
3977         char *snap_name;
3978
3979         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3980         reply_buf = kmalloc(size, GFP_KERNEL);
3981         if (!reply_buf)
3982                 return ERR_PTR(-ENOMEM);
3983
3984         snapid = cpu_to_le64(snap_id);
3985         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3986                                 "rbd", "get_snapshot_name",
3987                                 &snapid, sizeof (snapid),
3988                                 reply_buf, size);
3989         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3990         if (ret < 0) {
3991                 snap_name = ERR_PTR(ret);
3992                 goto out;
3993         }
3994
3995         p = reply_buf;
3996         end = reply_buf + ret;
3997         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3998         if (IS_ERR(snap_name))
3999                 goto out;
4000
4001         dout("  snap_id 0x%016llx snap_name = %s\n",
4002                 (unsigned long long)snap_id, snap_name);
4003 out:
4004         kfree(reply_buf);
4005
4006         return snap_name;
4007 }
4008
4009 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4010 {
4011         bool first_time = rbd_dev->header.object_prefix == NULL;
4012         int ret;
4013
4014         down_write(&rbd_dev->header_rwsem);
4015
4016         if (first_time) {
4017                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4018                 if (ret)
4019                         goto out;
4020         }
4021
4022         ret = rbd_dev_v2_image_size(rbd_dev);
4023         if (ret)
4024                 goto out;
4025         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4026                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4027                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4028
4029         ret = rbd_dev_v2_snap_context(rbd_dev);
4030         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4031         if (ret)
4032                 goto out;
4033 out:
4034         up_write(&rbd_dev->header_rwsem);
4035
4036         return ret;
4037 }
4038
4039 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4040 {
4041         struct device *dev;
4042         int ret;
4043
4044         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4045
4046         dev = &rbd_dev->dev;
4047         dev->bus = &rbd_bus_type;
4048         dev->type = &rbd_device_type;
4049         dev->parent = &rbd_root_dev;
4050         dev->release = rbd_dev_device_release;
4051         dev_set_name(dev, "%d", rbd_dev->dev_id);
4052         ret = device_register(dev);
4053
4054         mutex_unlock(&ctl_mutex);
4055
4056         return ret;
4057 }
4058
4059 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4060 {
4061         device_unregister(&rbd_dev->dev);
4062 }
4063
4064 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4065
4066 /*
4067  * Get a unique rbd identifier for the given new rbd_dev, and add
4068  * the rbd_dev to the global list.  The minimum rbd id is 1.
4069  */
4070 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4071 {
4072         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4073
4074         spin_lock(&rbd_dev_list_lock);
4075         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4076         spin_unlock(&rbd_dev_list_lock);
4077         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4078                 (unsigned long long) rbd_dev->dev_id);
4079 }
4080
4081 /*
4082  * Remove an rbd_dev from the global list, and record that its
4083  * identifier is no longer in use.
4084  */
4085 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4086 {
4087         struct list_head *tmp;
4088         int rbd_id = rbd_dev->dev_id;
4089         int max_id;
4090
4091         rbd_assert(rbd_id > 0);
4092
4093         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4094                 (unsigned long long) rbd_dev->dev_id);
4095         spin_lock(&rbd_dev_list_lock);
4096         list_del_init(&rbd_dev->node);
4097
4098         /*
4099          * If the id being "put" is not the current maximum, there
4100          * is nothing special we need to do.
4101          */
4102         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4103                 spin_unlock(&rbd_dev_list_lock);
4104                 return;
4105         }
4106
4107         /*
4108          * We need to update the current maximum id.  Search the
4109          * list to find out what it is.  We're more likely to find
4110          * the maximum at the end, so search the list backward.
4111          */
4112         max_id = 0;
4113         list_for_each_prev(tmp, &rbd_dev_list) {
4114                 struct rbd_device *rbd_dev;
4115
4116                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4117                 if (rbd_dev->dev_id > max_id)
4118                         max_id = rbd_dev->dev_id;
4119         }
4120         spin_unlock(&rbd_dev_list_lock);
4121
4122         /*
4123          * The max id could have been updated by rbd_dev_id_get(), in
4124          * which case it now accurately reflects the new maximum.
4125          * Be careful not to overwrite the maximum value in that
4126          * case.
4127          */
4128         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4129         dout("  max dev id has been reset\n");
4130 }
4131
4132 /*
4133  * Skips over white space at *buf, and updates *buf to point to the
4134  * first found non-space character (if any). Returns the length of
4135  * the token (string of non-white space characters) found.  Note
4136  * that *buf must be terminated with '\0'.
4137  */
4138 static inline size_t next_token(const char **buf)
4139 {
4140         /*
4141         * These are the characters that produce nonzero for
4142         * isspace() in the "C" and "POSIX" locales.
4143         */
4144         const char *spaces = " \f\n\r\t\v";
4145
4146         *buf += strspn(*buf, spaces);   /* Find start of token */
4147
4148         return strcspn(*buf, spaces);   /* Return token length */
4149 }
4150
4151 /*
4152  * Finds the next token in *buf, and if the provided token buffer is
4153  * big enough, copies the found token into it.  The result, if
4154  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4155  * must be terminated with '\0' on entry.
4156  *
4157  * Returns the length of the token found (not including the '\0').
4158  * Return value will be 0 if no token is found, and it will be >=
4159  * token_size if the token would not fit.
4160  *
4161  * The *buf pointer will be updated to point beyond the end of the
4162  * found token.  Note that this occurs even if the token buffer is
4163  * too small to hold it.
4164  */
4165 static inline size_t copy_token(const char **buf,
4166                                 char *token,
4167                                 size_t token_size)
4168 {
4169         size_t len;
4170
4171         len = next_token(buf);
4172         if (len < token_size) {
4173                 memcpy(token, *buf, len);
4174                 *(token + len) = '\0';
4175         }
4176         *buf += len;
4177
4178         return len;
4179 }
4180
4181 /*
4182  * Finds the next token in *buf, dynamically allocates a buffer big
4183  * enough to hold a copy of it, and copies the token into the new
4184  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4185  * that a duplicate buffer is created even for a zero-length token.
4186  *
4187  * Returns a pointer to the newly-allocated duplicate, or a null
4188  * pointer if memory for the duplicate was not available.  If
4189  * the lenp argument is a non-null pointer, the length of the token
4190  * (not including the '\0') is returned in *lenp.
4191  *
4192  * If successful, the *buf pointer will be updated to point beyond
4193  * the end of the found token.
4194  *
4195  * Note: uses GFP_KERNEL for allocation.
4196  */
4197 static inline char *dup_token(const char **buf, size_t *lenp)
4198 {
4199         char *dup;
4200         size_t len;
4201
4202         len = next_token(buf);
4203         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4204         if (!dup)
4205                 return NULL;
4206         *(dup + len) = '\0';
4207         *buf += len;
4208
4209         if (lenp)
4210                 *lenp = len;
4211
4212         return dup;
4213 }
4214
4215 /*
4216  * Parse the options provided for an "rbd add" (i.e., rbd image
4217  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4218  * and the data written is passed here via a NUL-terminated buffer.
4219  * Returns 0 if successful or an error code otherwise.
4220  *
4221  * The information extracted from these options is recorded in
4222  * the other parameters which return dynamically-allocated
4223  * structures:
4224  *  ceph_opts
4225  *      The address of a pointer that will refer to a ceph options
4226  *      structure.  Caller must release the returned pointer using
4227  *      ceph_destroy_options() when it is no longer needed.
4228  *  rbd_opts
4229  *      Address of an rbd options pointer.  Fully initialized by
4230  *      this function; caller must release with kfree().
4231  *  spec
4232  *      Address of an rbd image specification pointer.  Fully
4233  *      initialized by this function based on parsed options.
4234  *      Caller must release with rbd_spec_put().
4235  *
4236  * The options passed take this form:
4237  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4238  * where:
4239  *  <mon_addrs>
4240  *      A comma-separated list of one or more monitor addresses.
4241  *      A monitor address is an ip address, optionally followed
4242  *      by a port number (separated by a colon).
4243  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4244  *  <options>
4245  *      A comma-separated list of ceph and/or rbd options.
4246  *  <pool_name>
4247  *      The name of the rados pool containing the rbd image.
4248  *  <image_name>
4249  *      The name of the image in that pool to map.
4250  *  <snap_id>
4251  *      An optional snapshot id.  If provided, the mapping will
4252  *      present data from the image at the time that snapshot was
4253  *      created.  The image head is used if no snapshot id is
4254  *      provided.  Snapshot mappings are always read-only.
4255  */
4256 static int rbd_add_parse_args(const char *buf,
4257                                 struct ceph_options **ceph_opts,
4258                                 struct rbd_options **opts,
4259                                 struct rbd_spec **rbd_spec)
4260 {
4261         size_t len;
4262         char *options;
4263         const char *mon_addrs;
4264         char *snap_name;
4265         size_t mon_addrs_size;
4266         struct rbd_spec *spec = NULL;
4267         struct rbd_options *rbd_opts = NULL;
4268         struct ceph_options *copts;
4269         int ret;
4270
4271         /* The first four tokens are required */
4272
4273         len = next_token(&buf);
4274         if (!len) {
4275                 rbd_warn(NULL, "no monitor address(es) provided");
4276                 return -EINVAL;
4277         }
4278         mon_addrs = buf;
4279         mon_addrs_size = len + 1;
4280         buf += len;
4281
4282         ret = -EINVAL;
4283         options = dup_token(&buf, NULL);
4284         if (!options)
4285                 return -ENOMEM;
4286         if (!*options) {
4287                 rbd_warn(NULL, "no options provided");
4288                 goto out_err;
4289         }
4290
4291         spec = rbd_spec_alloc();
4292         if (!spec)
4293                 goto out_mem;
4294
4295         spec->pool_name = dup_token(&buf, NULL);
4296         if (!spec->pool_name)
4297                 goto out_mem;
4298         if (!*spec->pool_name) {
4299                 rbd_warn(NULL, "no pool name provided");
4300                 goto out_err;
4301         }
4302
4303         spec->image_name = dup_token(&buf, NULL);
4304         if (!spec->image_name)
4305                 goto out_mem;
4306         if (!*spec->image_name) {
4307                 rbd_warn(NULL, "no image name provided");
4308                 goto out_err;
4309         }
4310
4311         /*
4312          * Snapshot name is optional; default is to use "-"
4313          * (indicating the head/no snapshot).
4314          */
4315         len = next_token(&buf);
4316         if (!len) {
4317                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4318                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4319         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4320                 ret = -ENAMETOOLONG;
4321                 goto out_err;
4322         }
4323         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4324         if (!snap_name)
4325                 goto out_mem;
4326         *(snap_name + len) = '\0';
4327         spec->snap_name = snap_name;
4328
4329         /* Initialize all rbd options to the defaults */
4330
4331         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4332         if (!rbd_opts)
4333                 goto out_mem;
4334
4335         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4336
4337         copts = ceph_parse_options(options, mon_addrs,
4338                                         mon_addrs + mon_addrs_size - 1,
4339                                         parse_rbd_opts_token, rbd_opts);
4340         if (IS_ERR(copts)) {
4341                 ret = PTR_ERR(copts);
4342                 goto out_err;
4343         }
4344         kfree(options);
4345
4346         *ceph_opts = copts;
4347         *opts = rbd_opts;
4348         *rbd_spec = spec;
4349
4350         return 0;
4351 out_mem:
4352         ret = -ENOMEM;
4353 out_err:
4354         kfree(rbd_opts);
4355         rbd_spec_put(spec);
4356         kfree(options);
4357
4358         return ret;
4359 }
4360
4361 /*
4362  * An rbd format 2 image has a unique identifier, distinct from the
4363  * name given to it by the user.  Internally, that identifier is
4364  * what's used to specify the names of objects related to the image.
4365  *
4366  * A special "rbd id" object is used to map an rbd image name to its
4367  * id.  If that object doesn't exist, then there is no v2 rbd image
4368  * with the supplied name.
4369  *
4370  * This function will record the given rbd_dev's image_id field if
4371  * it can be determined, and in that case will return 0.  If any
4372  * errors occur a negative errno will be returned and the rbd_dev's
4373  * image_id field will be unchanged (and should be NULL).
4374  */
4375 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4376 {
4377         int ret;
4378         size_t size;
4379         char *object_name;
4380         void *response;
4381         char *image_id;
4382
4383         /*
4384          * When probing a parent image, the image id is already
4385          * known (and the image name likely is not).  There's no
4386          * need to fetch the image id again in this case.  We
4387          * do still need to set the image format though.
4388          */
4389         if (rbd_dev->spec->image_id) {
4390                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4391
4392                 return 0;
4393         }
4394
4395         /*
4396          * First, see if the format 2 image id file exists, and if
4397          * so, get the image's persistent id from it.
4398          */
4399         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4400         object_name = kmalloc(size, GFP_NOIO);
4401         if (!object_name)
4402                 return -ENOMEM;
4403         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4404         dout("rbd id object name is %s\n", object_name);
4405
4406         /* Response will be an encoded string, which includes a length */
4407
4408         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4409         response = kzalloc(size, GFP_NOIO);
4410         if (!response) {
4411                 ret = -ENOMEM;
4412                 goto out;
4413         }
4414
4415         /* If it doesn't exist we'll assume it's a format 1 image */
4416
4417         ret = rbd_obj_method_sync(rbd_dev, object_name,
4418                                 "rbd", "get_id", NULL, 0,
4419                                 response, RBD_IMAGE_ID_LEN_MAX);
4420         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4421         if (ret == -ENOENT) {
4422                 image_id = kstrdup("", GFP_KERNEL);
4423                 ret = image_id ? 0 : -ENOMEM;
4424                 if (!ret)
4425                         rbd_dev->image_format = 1;
4426         } else if (ret > sizeof (__le32)) {
4427                 void *p = response;
4428
4429                 image_id = ceph_extract_encoded_string(&p, p + ret,
4430                                                 NULL, GFP_NOIO);
4431                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4432                 if (!ret)
4433                         rbd_dev->image_format = 2;
4434         } else {
4435                 ret = -EINVAL;
4436         }
4437
4438         if (!ret) {
4439                 rbd_dev->spec->image_id = image_id;
4440                 dout("image_id is %s\n", image_id);
4441         }
4442 out:
4443         kfree(response);
4444         kfree(object_name);
4445
4446         return ret;
4447 }
4448
4449 /* Undo whatever state changes are made by v1 or v2 image probe */
4450
4451 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4452 {
4453         struct rbd_image_header *header;
4454
4455         rbd_dev_remove_parent(rbd_dev);
4456         rbd_spec_put(rbd_dev->parent_spec);
4457         rbd_dev->parent_spec = NULL;
4458         rbd_dev->parent_overlap = 0;
4459
4460         /* Free dynamic fields from the header, then zero it out */
4461
4462         header = &rbd_dev->header;
4463         ceph_put_snap_context(header->snapc);
4464         kfree(header->snap_sizes);
4465         kfree(header->snap_names);
4466         kfree(header->object_prefix);
4467         memset(header, 0, sizeof (*header));
4468 }
4469
4470 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4471 {
4472         int ret;
4473
4474         ret = rbd_dev_v2_object_prefix(rbd_dev);
4475         if (ret)
4476                 goto out_err;
4477
4478         /*
4479          * Get the and check features for the image.  Currently the
4480          * features are assumed to never change.
4481          */
4482         ret = rbd_dev_v2_features(rbd_dev);
4483         if (ret)
4484                 goto out_err;
4485
4486         /* If the image supports layering, get the parent info */
4487
4488         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4489                 ret = rbd_dev_v2_parent_info(rbd_dev);
4490                 if (ret)
4491                         goto out_err;
4492                 /*
4493                  * Print a warning if this image has a parent.
4494                  * Don't print it if the image now being probed
4495                  * is itself a parent.  We can tell at this point
4496                  * because we won't know its pool name yet (just its
4497                  * pool id).
4498                  */
4499                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4500                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4501                                         "is EXPERIMENTAL!");
4502         }
4503
4504         /* If the image supports fancy striping, get its parameters */
4505
4506         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4507                 ret = rbd_dev_v2_striping_info(rbd_dev);
4508                 if (ret < 0)
4509                         goto out_err;
4510         }
4511         /* No support for crypto and compression type format 2 images */
4512
4513         return 0;
4514 out_err:
4515         rbd_dev->parent_overlap = 0;
4516         rbd_spec_put(rbd_dev->parent_spec);
4517         rbd_dev->parent_spec = NULL;
4518         kfree(rbd_dev->header_name);
4519         rbd_dev->header_name = NULL;
4520         kfree(rbd_dev->header.object_prefix);
4521         rbd_dev->header.object_prefix = NULL;
4522
4523         return ret;
4524 }
4525
4526 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4527 {
4528         struct rbd_device *parent = NULL;
4529         struct rbd_spec *parent_spec;
4530         struct rbd_client *rbdc;
4531         int ret;
4532
4533         if (!rbd_dev->parent_spec)
4534                 return 0;
4535         /*
4536          * We need to pass a reference to the client and the parent
4537          * spec when creating the parent rbd_dev.  Images related by
4538          * parent/child relationships always share both.
4539          */
4540         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4541         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4542
4543         ret = -ENOMEM;
4544         parent = rbd_dev_create(rbdc, parent_spec);
4545         if (!parent)
4546                 goto out_err;
4547
4548         ret = rbd_dev_image_probe(parent, true);
4549         if (ret < 0)
4550                 goto out_err;
4551         rbd_dev->parent = parent;
4552
4553         return 0;
4554 out_err:
4555         if (parent) {
4556                 rbd_spec_put(rbd_dev->parent_spec);
4557                 kfree(rbd_dev->header_name);
4558                 rbd_dev_destroy(parent);
4559         } else {
4560                 rbd_put_client(rbdc);
4561                 rbd_spec_put(parent_spec);
4562         }
4563
4564         return ret;
4565 }
4566
4567 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4568 {
4569         int ret;
4570
4571         /* generate unique id: find highest unique id, add one */
4572         rbd_dev_id_get(rbd_dev);
4573
4574         /* Fill in the device name, now that we have its id. */
4575         BUILD_BUG_ON(DEV_NAME_LEN
4576                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4577         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4578
4579         /* Get our block major device number. */
4580
4581         ret = register_blkdev(0, rbd_dev->name);
4582         if (ret < 0)
4583                 goto err_out_id;
4584         rbd_dev->major = ret;
4585
4586         /* Set up the blkdev mapping. */
4587
4588         ret = rbd_init_disk(rbd_dev);
4589         if (ret)
4590                 goto err_out_blkdev;
4591
4592         ret = rbd_dev_mapping_set(rbd_dev);
4593         if (ret)
4594                 goto err_out_disk;
4595         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4596
4597         ret = rbd_bus_add_dev(rbd_dev);
4598         if (ret)
4599                 goto err_out_mapping;
4600
4601         /* Everything's ready.  Announce the disk to the world. */
4602
4603         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4604         add_disk(rbd_dev->disk);
4605
4606         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4607                 (unsigned long long) rbd_dev->mapping.size);
4608
4609         return ret;
4610
4611 err_out_mapping:
4612         rbd_dev_mapping_clear(rbd_dev);
4613 err_out_disk:
4614         rbd_free_disk(rbd_dev);
4615 err_out_blkdev:
4616         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4617 err_out_id:
4618         rbd_dev_id_put(rbd_dev);
4619         rbd_dev_mapping_clear(rbd_dev);
4620
4621         return ret;
4622 }
4623
4624 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4625 {
4626         struct rbd_spec *spec = rbd_dev->spec;
4627         size_t size;
4628
4629         /* Record the header object name for this rbd image. */
4630
4631         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4632
4633         if (rbd_dev->image_format == 1)
4634                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4635         else
4636                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4637
4638         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4639         if (!rbd_dev->header_name)
4640                 return -ENOMEM;
4641
4642         if (rbd_dev->image_format == 1)
4643                 sprintf(rbd_dev->header_name, "%s%s",
4644                         spec->image_name, RBD_SUFFIX);
4645         else
4646                 sprintf(rbd_dev->header_name, "%s%s",
4647                         RBD_HEADER_PREFIX, spec->image_id);
4648         return 0;
4649 }
4650
4651 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4652 {
4653         int ret;
4654
4655         rbd_dev_unprobe(rbd_dev);
4656         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4657         if (ret)
4658                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4659         kfree(rbd_dev->header_name);
4660         rbd_dev->header_name = NULL;
4661         rbd_dev->image_format = 0;
4662         kfree(rbd_dev->spec->image_id);
4663         rbd_dev->spec->image_id = NULL;
4664
4665         rbd_dev_destroy(rbd_dev);
4666 }
4667
4668 /*
4669  * Probe for the existence of the header object for the given rbd
4670  * device.  For format 2 images this includes determining the image
4671  * id.
4672  */
4673 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4674 {
4675         int ret;
4676         int tmp;
4677
4678         /*
4679          * Get the id from the image id object.  If it's not a
4680          * format 2 image, we'll get ENOENT back, and we'll assume
4681          * it's a format 1 image.
4682          */
4683         ret = rbd_dev_image_id(rbd_dev);
4684         if (ret)
4685                 return ret;
4686         rbd_assert(rbd_dev->spec->image_id);
4687         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4688
4689         ret = rbd_dev_header_name(rbd_dev);
4690         if (ret)
4691                 goto err_out_format;
4692
4693         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4694         if (ret)
4695                 goto out_header_name;
4696
4697         if (rbd_dev->image_format == 1)
4698                 ret = rbd_dev_v1_header_info(rbd_dev);
4699         else
4700                 ret = rbd_dev_v2_header_info(rbd_dev);
4701         if (ret)
4702                 goto err_out_watch;
4703
4704         ret = rbd_dev_spec_update(rbd_dev);
4705         if (ret)
4706                 goto err_out_probe;
4707
4708         /* If we are mapping a snapshot it must be marked read-only */
4709
4710         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4711                 read_only = true;
4712         rbd_dev->mapping.read_only = read_only;
4713
4714         ret = rbd_dev_probe_parent(rbd_dev);
4715         if (ret)
4716                 goto err_out_probe;
4717
4718         dout("discovered format %u image, header name is %s\n",
4719                 rbd_dev->image_format, rbd_dev->header_name);
4720
4721         return 0;
4722 err_out_probe:
4723         rbd_dev_unprobe(rbd_dev);
4724 err_out_watch:
4725         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4726         if (tmp)
4727                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4728 out_header_name:
4729         kfree(rbd_dev->header_name);
4730         rbd_dev->header_name = NULL;
4731 err_out_format:
4732         rbd_dev->image_format = 0;
4733         kfree(rbd_dev->spec->image_id);
4734         rbd_dev->spec->image_id = NULL;
4735
4736         dout("probe failed, returning %d\n", ret);
4737
4738         return ret;
4739 }
4740
4741 static ssize_t rbd_add(struct bus_type *bus,
4742                        const char *buf,
4743                        size_t count)
4744 {
4745         struct rbd_device *rbd_dev = NULL;
4746         struct ceph_options *ceph_opts = NULL;
4747         struct rbd_options *rbd_opts = NULL;
4748         struct rbd_spec *spec = NULL;
4749         struct rbd_client *rbdc;
4750         struct ceph_osd_client *osdc;
4751         bool read_only;
4752         int rc = -ENOMEM;
4753
4754         if (!try_module_get(THIS_MODULE))
4755                 return -ENODEV;
4756
4757         /* parse add command */
4758         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4759         if (rc < 0)
4760                 goto err_out_module;
4761         read_only = rbd_opts->read_only;
4762         kfree(rbd_opts);
4763         rbd_opts = NULL;        /* done with this */
4764
4765         rbdc = rbd_get_client(ceph_opts);
4766         if (IS_ERR(rbdc)) {
4767                 rc = PTR_ERR(rbdc);
4768                 goto err_out_args;
4769         }
4770         ceph_opts = NULL;       /* rbd_dev client now owns this */
4771
4772         /* pick the pool */
4773         osdc = &rbdc->client->osdc;
4774         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4775         if (rc < 0)
4776                 goto err_out_client;
4777         spec->pool_id = (u64)rc;
4778
4779         /* The ceph file layout needs to fit pool id in 32 bits */
4780
4781         if (spec->pool_id > (u64)U32_MAX) {
4782                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4783                                 (unsigned long long)spec->pool_id, U32_MAX);
4784                 rc = -EIO;
4785                 goto err_out_client;
4786         }
4787
4788         rbd_dev = rbd_dev_create(rbdc, spec);
4789         if (!rbd_dev)
4790                 goto err_out_client;
4791         rbdc = NULL;            /* rbd_dev now owns this */
4792         spec = NULL;            /* rbd_dev now owns this */
4793
4794         rc = rbd_dev_image_probe(rbd_dev, read_only);
4795         if (rc < 0)
4796                 goto err_out_rbd_dev;
4797
4798         rc = rbd_dev_device_setup(rbd_dev);
4799         if (!rc)
4800                 return count;
4801
4802         rbd_dev_image_release(rbd_dev);
4803 err_out_rbd_dev:
4804         rbd_dev_destroy(rbd_dev);
4805 err_out_client:
4806         rbd_put_client(rbdc);
4807 err_out_args:
4808         if (ceph_opts)
4809                 ceph_destroy_options(ceph_opts);
4810         kfree(rbd_opts);
4811         rbd_spec_put(spec);
4812 err_out_module:
4813         module_put(THIS_MODULE);
4814
4815         dout("Error adding device %s\n", buf);
4816
4817         return (ssize_t)rc;
4818 }
4819
4820 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4821 {
4822         struct list_head *tmp;
4823         struct rbd_device *rbd_dev;
4824
4825         spin_lock(&rbd_dev_list_lock);
4826         list_for_each(tmp, &rbd_dev_list) {
4827                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4828                 if (rbd_dev->dev_id == dev_id) {
4829                         spin_unlock(&rbd_dev_list_lock);
4830                         return rbd_dev;
4831                 }
4832         }
4833         spin_unlock(&rbd_dev_list_lock);
4834         return NULL;
4835 }
4836
4837 static void rbd_dev_device_release(struct device *dev)
4838 {
4839         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4840
4841         rbd_free_disk(rbd_dev);
4842         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4843         rbd_dev_mapping_clear(rbd_dev);
4844         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4845         rbd_dev->major = 0;
4846         rbd_dev_id_put(rbd_dev);
4847         rbd_dev_mapping_clear(rbd_dev);
4848 }
4849
4850 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4851 {
4852         while (rbd_dev->parent) {
4853                 struct rbd_device *first = rbd_dev;
4854                 struct rbd_device *second = first->parent;
4855                 struct rbd_device *third;
4856
4857                 /*
4858                  * Follow to the parent with no grandparent and
4859                  * remove it.
4860                  */
4861                 while (second && (third = second->parent)) {
4862                         first = second;
4863                         second = third;
4864                 }
4865                 rbd_assert(second);
4866                 rbd_dev_image_release(second);
4867                 first->parent = NULL;
4868                 first->parent_overlap = 0;
4869
4870                 rbd_assert(first->parent_spec);
4871                 rbd_spec_put(first->parent_spec);
4872                 first->parent_spec = NULL;
4873         }
4874 }
4875
4876 static ssize_t rbd_remove(struct bus_type *bus,
4877                           const char *buf,
4878                           size_t count)
4879 {
4880         struct rbd_device *rbd_dev = NULL;
4881         int target_id;
4882         unsigned long ul;
4883         int ret;
4884
4885         ret = strict_strtoul(buf, 10, &ul);
4886         if (ret)
4887                 return ret;
4888
4889         /* convert to int; abort if we lost anything in the conversion */
4890         target_id = (int) ul;
4891         if (target_id != ul)
4892                 return -EINVAL;
4893
4894         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4895
4896         rbd_dev = __rbd_get_dev(target_id);
4897         if (!rbd_dev) {
4898                 ret = -ENOENT;
4899                 goto done;
4900         }
4901
4902         spin_lock_irq(&rbd_dev->lock);
4903         if (rbd_dev->open_count)
4904                 ret = -EBUSY;
4905         else
4906                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4907         spin_unlock_irq(&rbd_dev->lock);
4908         if (ret < 0)
4909                 goto done;
4910         ret = count;
4911         rbd_bus_del_dev(rbd_dev);
4912         rbd_dev_image_release(rbd_dev);
4913         module_put(THIS_MODULE);
4914 done:
4915         mutex_unlock(&ctl_mutex);
4916
4917         return ret;
4918 }
4919
4920 /*
4921  * create control files in sysfs
4922  * /sys/bus/rbd/...
4923  */
4924 static int rbd_sysfs_init(void)
4925 {
4926         int ret;
4927
4928         ret = device_register(&rbd_root_dev);
4929         if (ret < 0)
4930                 return ret;
4931
4932         ret = bus_register(&rbd_bus_type);
4933         if (ret < 0)
4934                 device_unregister(&rbd_root_dev);
4935
4936         return ret;
4937 }
4938
4939 static void rbd_sysfs_cleanup(void)
4940 {
4941         bus_unregister(&rbd_bus_type);
4942         device_unregister(&rbd_root_dev);
4943 }
4944
4945 static int rbd_slab_init(void)
4946 {
4947         rbd_assert(!rbd_img_request_cache);
4948         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4949                                         sizeof (struct rbd_img_request),
4950                                         __alignof__(struct rbd_img_request),
4951                                         0, NULL);
4952         if (!rbd_img_request_cache)
4953                 return -ENOMEM;
4954
4955         rbd_assert(!rbd_obj_request_cache);
4956         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4957                                         sizeof (struct rbd_obj_request),
4958                                         __alignof__(struct rbd_obj_request),
4959                                         0, NULL);
4960         if (!rbd_obj_request_cache)
4961                 goto out_err;
4962
4963         rbd_assert(!rbd_segment_name_cache);
4964         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4965                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4966         if (rbd_segment_name_cache)
4967                 return 0;
4968 out_err:
4969         if (rbd_obj_request_cache) {
4970                 kmem_cache_destroy(rbd_obj_request_cache);
4971                 rbd_obj_request_cache = NULL;
4972         }
4973
4974         kmem_cache_destroy(rbd_img_request_cache);
4975         rbd_img_request_cache = NULL;
4976
4977         return -ENOMEM;
4978 }
4979
4980 static void rbd_slab_exit(void)
4981 {
4982         rbd_assert(rbd_segment_name_cache);
4983         kmem_cache_destroy(rbd_segment_name_cache);
4984         rbd_segment_name_cache = NULL;
4985
4986         rbd_assert(rbd_obj_request_cache);
4987         kmem_cache_destroy(rbd_obj_request_cache);
4988         rbd_obj_request_cache = NULL;
4989
4990         rbd_assert(rbd_img_request_cache);
4991         kmem_cache_destroy(rbd_img_request_cache);
4992         rbd_img_request_cache = NULL;
4993 }
4994
4995 static int __init rbd_init(void)
4996 {
4997         int rc;
4998
4999         if (!libceph_compatible(NULL)) {
5000                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5001
5002                 return -EINVAL;
5003         }
5004         rc = rbd_slab_init();
5005         if (rc)
5006                 return rc;
5007         rc = rbd_sysfs_init();
5008         if (rc)
5009                 rbd_slab_exit();
5010         else
5011                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5012
5013         return rc;
5014 }
5015
5016 static void __exit rbd_exit(void)
5017 {
5018         rbd_sysfs_cleanup();
5019         rbd_slab_exit();
5020 }
5021
5022 module_init(rbd_init);
5023 module_exit(rbd_exit);
5024
5025 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5026 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5027 MODULE_DESCRIPTION("rados block device");
5028
5029 /* following authorship retained from original osdblk.c */
5030 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5031
5032 MODULE_LICENSE("GPL");