]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
1ffdfbfbf3c41e39c73411ae4c9a0a120ca261e3
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227         u32                     copyup_page_count;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261         spinlock_t              completion_lock;/* protects next_completion */
262         u32                     next_completion;
263         rbd_img_callback_t      callback;
264         u64                     xferred;/* aggregate bytes transferred */
265         int                     result; /* first nonzero obj_request result */
266
267         u32                     obj_request_count;
268         struct list_head        obj_requests;   /* rbd_obj_request structs */
269
270         struct kref             kref;
271 };
272
273 #define for_each_obj_request(ireq, oreq) \
274         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279
280 struct rbd_mapping {
281         u64                     size;
282         u64                     features;
283         bool                    read_only;
284 };
285
286 /*
287  * a single device
288  */
289 struct rbd_device {
290         int                     dev_id;         /* blkdev unique id */
291
292         int                     major;          /* blkdev assigned major */
293         struct gendisk          *disk;          /* blkdev's gendisk and rq */
294
295         u32                     image_format;   /* Either 1 or 2 */
296         struct rbd_client       *rbd_client;
297
298         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299
300         spinlock_t              lock;           /* queue, flags, open_count */
301
302         struct rbd_image_header header;
303         unsigned long           flags;          /* possibly lock protected */
304         struct rbd_spec         *spec;
305
306         char                    *header_name;
307
308         struct ceph_file_layout layout;
309
310         struct ceph_osd_event   *watch_event;
311         struct rbd_obj_request  *watch_request;
312
313         struct rbd_spec         *parent_spec;
314         u64                     parent_overlap;
315         struct rbd_device       *parent;
316
317         /* protects updating the header */
318         struct rw_semaphore     header_rwsem;
319
320         struct rbd_mapping      mapping;
321
322         struct list_head        node;
323
324         /* sysfs related */
325         struct device           dev;
326         unsigned long           open_count;     /* protected by lock */
327 };
328
329 /*
330  * Flag bits for rbd_dev->flags.  If atomicity is required,
331  * rbd_dev->lock is used to protect access.
332  *
333  * Currently, only the "removing" flag (which is coupled with the
334  * "open_count" field) requires atomic access.
335  */
336 enum rbd_dev_flags {
337         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
338         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
339 };
340
341 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
342
343 static LIST_HEAD(rbd_dev_list);    /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345
346 static LIST_HEAD(rbd_client_list);              /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
348
349 /* Slab caches for frequently-allocated structures */
350
351 static struct kmem_cache        *rbd_img_request_cache;
352 static struct kmem_cache        *rbd_obj_request_cache;
353 static struct kmem_cache        *rbd_segment_name_cache;
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static void rbd_dev_device_release(struct device *dev);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
433                                         u64 snap_id);
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435                                 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
437                 u64 *snap_features);
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
439
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
441 {
442         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443         bool removing = false;
444
445         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446                 return -EROFS;
447
448         spin_lock_irq(&rbd_dev->lock);
449         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450                 removing = true;
451         else
452                 rbd_dev->open_count++;
453         spin_unlock_irq(&rbd_dev->lock);
454         if (removing)
455                 return -ENOENT;
456
457         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458         (void) get_device(&rbd_dev->dev);
459         set_device_ro(bdev, rbd_dev->mapping.read_only);
460         mutex_unlock(&ctl_mutex);
461
462         return 0;
463 }
464
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
466 {
467         struct rbd_device *rbd_dev = disk->private_data;
468         unsigned long open_count_before;
469
470         spin_lock_irq(&rbd_dev->lock);
471         open_count_before = rbd_dev->open_count--;
472         spin_unlock_irq(&rbd_dev->lock);
473         rbd_assert(open_count_before > 0);
474
475         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476         put_device(&rbd_dev->dev);
477         mutex_unlock(&ctl_mutex);
478
479         return 0;
480 }
481
482 static const struct block_device_operations rbd_bd_ops = {
483         .owner                  = THIS_MODULE,
484         .open                   = rbd_open,
485         .release                = rbd_release,
486 };
487
488 /*
489  * Initialize an rbd client instance.
490  * We own *ceph_opts.
491  */
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
493 {
494         struct rbd_client *rbdc;
495         int ret = -ENOMEM;
496
497         dout("%s:\n", __func__);
498         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499         if (!rbdc)
500                 goto out_opt;
501
502         kref_init(&rbdc->kref);
503         INIT_LIST_HEAD(&rbdc->node);
504
505         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
506
507         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508         if (IS_ERR(rbdc->client))
509                 goto out_mutex;
510         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
511
512         ret = ceph_open_session(rbdc->client);
513         if (ret < 0)
514                 goto out_err;
515
516         spin_lock(&rbd_client_list_lock);
517         list_add_tail(&rbdc->node, &rbd_client_list);
518         spin_unlock(&rbd_client_list_lock);
519
520         mutex_unlock(&ctl_mutex);
521         dout("%s: rbdc %p\n", __func__, rbdc);
522
523         return rbdc;
524
525 out_err:
526         ceph_destroy_client(rbdc->client);
527 out_mutex:
528         mutex_unlock(&ctl_mutex);
529         kfree(rbdc);
530 out_opt:
531         if (ceph_opts)
532                 ceph_destroy_options(ceph_opts);
533         dout("%s: error %d\n", __func__, ret);
534
535         return ERR_PTR(ret);
536 }
537
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
539 {
540         kref_get(&rbdc->kref);
541
542         return rbdc;
543 }
544
545 /*
546  * Find a ceph client with specific addr and configuration.  If
547  * found, bump its reference count.
548  */
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
550 {
551         struct rbd_client *client_node;
552         bool found = false;
553
554         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555                 return NULL;
556
557         spin_lock(&rbd_client_list_lock);
558         list_for_each_entry(client_node, &rbd_client_list, node) {
559                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560                         __rbd_get_client(client_node);
561
562                         found = true;
563                         break;
564                 }
565         }
566         spin_unlock(&rbd_client_list_lock);
567
568         return found ? client_node : NULL;
569 }
570
571 /*
572  * mount options
573  */
574 enum {
575         Opt_last_int,
576         /* int args above */
577         Opt_last_string,
578         /* string args above */
579         Opt_read_only,
580         Opt_read_write,
581         /* Boolean args above */
582         Opt_last_bool,
583 };
584
585 static match_table_t rbd_opts_tokens = {
586         /* int args above */
587         /* string args above */
588         {Opt_read_only, "read_only"},
589         {Opt_read_only, "ro"},          /* Alternate spelling */
590         {Opt_read_write, "read_write"},
591         {Opt_read_write, "rw"},         /* Alternate spelling */
592         /* Boolean args above */
593         {-1, NULL}
594 };
595
596 struct rbd_options {
597         bool    read_only;
598 };
599
600 #define RBD_READ_ONLY_DEFAULT   false
601
602 static int parse_rbd_opts_token(char *c, void *private)
603 {
604         struct rbd_options *rbd_opts = private;
605         substring_t argstr[MAX_OPT_ARGS];
606         int token, intval, ret;
607
608         token = match_token(c, rbd_opts_tokens, argstr);
609         if (token < 0)
610                 return -EINVAL;
611
612         if (token < Opt_last_int) {
613                 ret = match_int(&argstr[0], &intval);
614                 if (ret < 0) {
615                         pr_err("bad mount option arg (not int) "
616                                "at '%s'\n", c);
617                         return ret;
618                 }
619                 dout("got int token %d val %d\n", token, intval);
620         } else if (token > Opt_last_int && token < Opt_last_string) {
621                 dout("got string token %d val %s\n", token,
622                      argstr[0].from);
623         } else if (token > Opt_last_string && token < Opt_last_bool) {
624                 dout("got Boolean token %d\n", token);
625         } else {
626                 dout("got token %d\n", token);
627         }
628
629         switch (token) {
630         case Opt_read_only:
631                 rbd_opts->read_only = true;
632                 break;
633         case Opt_read_write:
634                 rbd_opts->read_only = false;
635                 break;
636         default:
637                 rbd_assert(false);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Get a ceph client with specific addr and configuration, if one does
645  * not exist create it.
646  */
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650
651         rbdc = rbd_client_find(ceph_opts);
652         if (rbdc)       /* using an existing client */
653                 ceph_destroy_options(ceph_opts);
654         else
655                 rbdc = rbd_client_create(ceph_opts);
656
657         return rbdc;
658 }
659
660 /*
661  * Destroy ceph client
662  *
663  * Caller must hold rbd_client_list_lock.
664  */
665 static void rbd_client_release(struct kref *kref)
666 {
667         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
668
669         dout("%s: rbdc %p\n", __func__, rbdc);
670         spin_lock(&rbd_client_list_lock);
671         list_del(&rbdc->node);
672         spin_unlock(&rbd_client_list_lock);
673
674         ceph_destroy_client(rbdc->client);
675         kfree(rbdc);
676 }
677
678 /*
679  * Drop reference to ceph client node. If it's not referenced anymore, release
680  * it.
681  */
682 static void rbd_put_client(struct rbd_client *rbdc)
683 {
684         if (rbdc)
685                 kref_put(&rbdc->kref, rbd_client_release);
686 }
687
688 static bool rbd_image_format_valid(u32 image_format)
689 {
690         return image_format == 1 || image_format == 2;
691 }
692
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 {
695         size_t size;
696         u32 snap_count;
697
698         /* The header has to start with the magic rbd header text */
699         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700                 return false;
701
702         /* The bio layer requires at least sector-sized I/O */
703
704         if (ondisk->options.order < SECTOR_SHIFT)
705                 return false;
706
707         /* If we use u64 in a few spots we may be able to loosen this */
708
709         if (ondisk->options.order > 8 * sizeof (int) - 1)
710                 return false;
711
712         /*
713          * The size of a snapshot header has to fit in a size_t, and
714          * that limits the number of snapshots.
715          */
716         snap_count = le32_to_cpu(ondisk->snap_count);
717         size = SIZE_MAX - sizeof (struct ceph_snap_context);
718         if (snap_count > size / sizeof (__le64))
719                 return false;
720
721         /*
722          * Not only that, but the size of the entire the snapshot
723          * header must also be representable in a size_t.
724          */
725         size -= snap_count * sizeof (__le64);
726         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
727                 return false;
728
729         return true;
730 }
731
732 /*
733  * Fill an rbd image header with information from the given format 1
734  * on-disk header.
735  */
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737                                  struct rbd_image_header_ondisk *ondisk)
738 {
739         struct rbd_image_header *header = &rbd_dev->header;
740         bool first_time = header->object_prefix == NULL;
741         struct ceph_snap_context *snapc;
742         char *object_prefix = NULL;
743         char *snap_names = NULL;
744         u64 *snap_sizes = NULL;
745         u32 snap_count;
746         size_t size;
747         int ret = -ENOMEM;
748         u32 i;
749
750         /* Allocate this now to avoid having to handle failure below */
751
752         if (first_time) {
753                 size_t len;
754
755                 len = strnlen(ondisk->object_prefix,
756                                 sizeof (ondisk->object_prefix));
757                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758                 if (!object_prefix)
759                         return -ENOMEM;
760                 memcpy(object_prefix, ondisk->object_prefix, len);
761                 object_prefix[len] = '\0';
762         }
763
764         /* Allocate the snapshot context and fill it in */
765
766         snap_count = le32_to_cpu(ondisk->snap_count);
767         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768         if (!snapc)
769                 goto out_err;
770         snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         if (snap_count) {
772                 struct rbd_image_snap_ondisk *snaps;
773                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
774
775                 /* We'll keep a copy of the snapshot names... */
776
777                 if (snap_names_len > (u64)SIZE_MAX)
778                         goto out_2big;
779                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
780                 if (!snap_names)
781                         goto out_err;
782
783                 /* ...as well as the array of their sizes. */
784
785                 size = snap_count * sizeof (*header->snap_sizes);
786                 snap_sizes = kmalloc(size, GFP_KERNEL);
787                 if (!snap_sizes)
788                         goto out_err;
789
790                 /*
791                  * Copy the names, and fill in each snapshot's id
792                  * and size.
793                  *
794                  * Note that rbd_dev_v1_header_info() guarantees the
795                  * ondisk buffer we're working with has
796                  * snap_names_len bytes beyond the end of the
797                  * snapshot id array, this memcpy() is safe.
798                  */
799                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800                 snaps = ondisk->snaps;
801                 for (i = 0; i < snap_count; i++) {
802                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
804                 }
805         }
806
807         /* We won't fail any more, fill in the header */
808
809         down_write(&rbd_dev->header_rwsem);
810         if (first_time) {
811                 header->object_prefix = object_prefix;
812                 header->obj_order = ondisk->options.order;
813                 header->crypt_type = ondisk->options.crypt_type;
814                 header->comp_type = ondisk->options.comp_type;
815                 /* The rest aren't used for format 1 images */
816                 header->stripe_unit = 0;
817                 header->stripe_count = 0;
818                 header->features = 0;
819         } else {
820                 ceph_put_snap_context(header->snapc);
821                 kfree(header->snap_names);
822                 kfree(header->snap_sizes);
823         }
824
825         /* The remaining fields always get updated (when we refresh) */
826
827         header->image_size = le64_to_cpu(ondisk->image_size);
828         header->snapc = snapc;
829         header->snap_names = snap_names;
830         header->snap_sizes = snap_sizes;
831
832         /* Make sure mapping size is consistent with header info */
833
834         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835                 if (rbd_dev->mapping.size != header->image_size)
836                         rbd_dev->mapping.size = header->image_size;
837
838         up_write(&rbd_dev->header_rwsem);
839
840         return 0;
841 out_2big:
842         ret = -EIO;
843 out_err:
844         kfree(snap_sizes);
845         kfree(snap_names);
846         ceph_put_snap_context(snapc);
847         kfree(object_prefix);
848
849         return ret;
850 }
851
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
853 {
854         const char *snap_name;
855
856         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
857
858         /* Skip over names until we find the one we are looking for */
859
860         snap_name = rbd_dev->header.snap_names;
861         while (which--)
862                 snap_name += strlen(snap_name) + 1;
863
864         return kstrdup(snap_name, GFP_KERNEL);
865 }
866
867 /*
868  * Snapshot id comparison function for use with qsort()/bsearch().
869  * Note that result is for snapshots in *descending* order.
870  */
871 static int snapid_compare_reverse(const void *s1, const void *s2)
872 {
873         u64 snap_id1 = *(u64 *)s1;
874         u64 snap_id2 = *(u64 *)s2;
875
876         if (snap_id1 < snap_id2)
877                 return 1;
878         return snap_id1 == snap_id2 ? 0 : -1;
879 }
880
881 /*
882  * Search a snapshot context to see if the given snapshot id is
883  * present.
884  *
885  * Returns the position of the snapshot id in the array if it's found,
886  * or BAD_SNAP_INDEX otherwise.
887  *
888  * Note: The snapshot array is in kept sorted (by the osd) in
889  * reverse order, highest snapshot id first.
890  */
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
892 {
893         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894         u64 *found;
895
896         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897                                 sizeof (snap_id), snapid_compare_reverse);
898
899         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 }
901
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
903                                         u64 snap_id)
904 {
905         u32 which;
906
907         which = rbd_dev_snap_index(rbd_dev, snap_id);
908         if (which == BAD_SNAP_INDEX)
909                 return NULL;
910
911         return _rbd_dev_v1_snap_name(rbd_dev, which);
912 }
913
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
915 {
916         if (snap_id == CEPH_NOSNAP)
917                 return RBD_SNAP_HEAD_NAME;
918
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (rbd_dev->image_format == 1)
921                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
922
923         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 }
925
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927                                 u64 *snap_size)
928 {
929         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930         if (snap_id == CEPH_NOSNAP) {
931                 *snap_size = rbd_dev->header.image_size;
932         } else if (rbd_dev->image_format == 1) {
933                 u32 which;
934
935                 which = rbd_dev_snap_index(rbd_dev, snap_id);
936                 if (which == BAD_SNAP_INDEX)
937                         return -ENOENT;
938
939                 *snap_size = rbd_dev->header.snap_sizes[which];
940         } else {
941                 u64 size = 0;
942                 int ret;
943
944                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
945                 if (ret)
946                         return ret;
947
948                 *snap_size = size;
949         }
950         return 0;
951 }
952
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954                         u64 *snap_features)
955 {
956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957         if (snap_id == CEPH_NOSNAP) {
958                 *snap_features = rbd_dev->header.features;
959         } else if (rbd_dev->image_format == 1) {
960                 *snap_features = 0;     /* No features for format 1 */
961         } else {
962                 u64 features = 0;
963                 int ret;
964
965                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
966                 if (ret)
967                         return ret;
968
969                 *snap_features = features;
970         }
971         return 0;
972 }
973
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
975 {
976         u64 snap_id = rbd_dev->spec->snap_id;
977         u64 size = 0;
978         u64 features = 0;
979         int ret;
980
981         ret = rbd_snap_size(rbd_dev, snap_id, &size);
982         if (ret)
983                 return ret;
984         ret = rbd_snap_features(rbd_dev, snap_id, &features);
985         if (ret)
986                 return ret;
987
988         rbd_dev->mapping.size = size;
989         rbd_dev->mapping.features = features;
990
991         return 0;
992 }
993
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
995 {
996         rbd_dev->mapping.size = 0;
997         rbd_dev->mapping.features = 0;
998 }
999
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         char *name;
1003         u64 segment;
1004         int ret;
1005
1006         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007         if (!name)
1008                 return NULL;
1009         segment = offset >> rbd_dev->header.obj_order;
1010         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011                         rbd_dev->header.object_prefix, segment);
1012         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013                 pr_err("error formatting segment name for #%llu (%d)\n",
1014                         segment, ret);
1015                 kfree(name);
1016                 name = NULL;
1017         }
1018
1019         return name;
1020 }
1021
1022 static void rbd_segment_name_free(const char *name)
1023 {
1024         /* The explicit cast here is needed to drop the const qualifier */
1025
1026         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 }
1028
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1030 {
1031         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1032
1033         return offset & (segment_size - 1);
1034 }
1035
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037                                 u64 offset, u64 length)
1038 {
1039         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1040
1041         offset &= segment_size - 1;
1042
1043         rbd_assert(length <= U64_MAX - offset);
1044         if (offset + length > segment_size)
1045                 length = segment_size - offset;
1046
1047         return length;
1048 }
1049
1050 /*
1051  * returns the size of an object in the image
1052  */
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1054 {
1055         return 1 << header->obj_order;
1056 }
1057
1058 /*
1059  * bio helpers
1060  */
1061
1062 static void bio_chain_put(struct bio *chain)
1063 {
1064         struct bio *tmp;
1065
1066         while (chain) {
1067                 tmp = chain;
1068                 chain = chain->bi_next;
1069                 bio_put(tmp);
1070         }
1071 }
1072
1073 /*
1074  * zeros a bio chain, starting at specific offset
1075  */
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 {
1078         struct bio_vec *bv;
1079         unsigned long flags;
1080         void *buf;
1081         int i;
1082         int pos = 0;
1083
1084         while (chain) {
1085                 bio_for_each_segment(bv, chain, i) {
1086                         if (pos + bv->bv_len > start_ofs) {
1087                                 int remainder = max(start_ofs - pos, 0);
1088                                 buf = bvec_kmap_irq(bv, &flags);
1089                                 memset(buf + remainder, 0,
1090                                        bv->bv_len - remainder);
1091                                 bvec_kunmap_irq(buf, &flags);
1092                         }
1093                         pos += bv->bv_len;
1094                 }
1095
1096                 chain = chain->bi_next;
1097         }
1098 }
1099
1100 /*
1101  * similar to zero_bio_chain(), zeros data defined by a page array,
1102  * starting at the given byte offset from the start of the array and
1103  * continuing up to the given end offset.  The pages array is
1104  * assumed to be big enough to hold all bytes up to the end.
1105  */
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1107 {
1108         struct page **page = &pages[offset >> PAGE_SHIFT];
1109
1110         rbd_assert(end > offset);
1111         rbd_assert(end - offset <= (u64)SIZE_MAX);
1112         while (offset < end) {
1113                 size_t page_offset;
1114                 size_t length;
1115                 unsigned long flags;
1116                 void *kaddr;
1117
1118                 page_offset = (size_t)(offset & ~PAGE_MASK);
1119                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120                 local_irq_save(flags);
1121                 kaddr = kmap_atomic(*page);
1122                 memset(kaddr + page_offset, 0, length);
1123                 kunmap_atomic(kaddr);
1124                 local_irq_restore(flags);
1125
1126                 offset += length;
1127                 page++;
1128         }
1129 }
1130
1131 /*
1132  * Clone a portion of a bio, starting at the given byte offset
1133  * and continuing for the number of bytes indicated.
1134  */
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136                                         unsigned int offset,
1137                                         unsigned int len,
1138                                         gfp_t gfpmask)
1139 {
1140         struct bio_vec *bv;
1141         unsigned int resid;
1142         unsigned short idx;
1143         unsigned int voff;
1144         unsigned short end_idx;
1145         unsigned short vcnt;
1146         struct bio *bio;
1147
1148         /* Handle the easy case for the caller */
1149
1150         if (!offset && len == bio_src->bi_size)
1151                 return bio_clone(bio_src, gfpmask);
1152
1153         if (WARN_ON_ONCE(!len))
1154                 return NULL;
1155         if (WARN_ON_ONCE(len > bio_src->bi_size))
1156                 return NULL;
1157         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158                 return NULL;
1159
1160         /* Find first affected segment... */
1161
1162         resid = offset;
1163         __bio_for_each_segment(bv, bio_src, idx, 0) {
1164                 if (resid < bv->bv_len)
1165                         break;
1166                 resid -= bv->bv_len;
1167         }
1168         voff = resid;
1169
1170         /* ...and the last affected segment */
1171
1172         resid += len;
1173         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174                 if (resid <= bv->bv_len)
1175                         break;
1176                 resid -= bv->bv_len;
1177         }
1178         vcnt = end_idx - idx + 1;
1179
1180         /* Build the clone */
1181
1182         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1183         if (!bio)
1184                 return NULL;    /* ENOMEM */
1185
1186         bio->bi_bdev = bio_src->bi_bdev;
1187         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188         bio->bi_rw = bio_src->bi_rw;
1189         bio->bi_flags |= 1 << BIO_CLONED;
1190
1191         /*
1192          * Copy over our part of the bio_vec, then update the first
1193          * and last (or only) entries.
1194          */
1195         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196                         vcnt * sizeof (struct bio_vec));
1197         bio->bi_io_vec[0].bv_offset += voff;
1198         if (vcnt > 1) {
1199                 bio->bi_io_vec[0].bv_len -= voff;
1200                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1201         } else {
1202                 bio->bi_io_vec[0].bv_len = len;
1203         }
1204
1205         bio->bi_vcnt = vcnt;
1206         bio->bi_size = len;
1207         bio->bi_idx = 0;
1208
1209         return bio;
1210 }
1211
1212 /*
1213  * Clone a portion of a bio chain, starting at the given byte offset
1214  * into the first bio in the source chain and continuing for the
1215  * number of bytes indicated.  The result is another bio chain of
1216  * exactly the given length, or a null pointer on error.
1217  *
1218  * The bio_src and offset parameters are both in-out.  On entry they
1219  * refer to the first source bio and the offset into that bio where
1220  * the start of data to be cloned is located.
1221  *
1222  * On return, bio_src is updated to refer to the bio in the source
1223  * chain that contains first un-cloned byte, and *offset will
1224  * contain the offset of that byte within that bio.
1225  */
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227                                         unsigned int *offset,
1228                                         unsigned int len,
1229                                         gfp_t gfpmask)
1230 {
1231         struct bio *bi = *bio_src;
1232         unsigned int off = *offset;
1233         struct bio *chain = NULL;
1234         struct bio **end;
1235
1236         /* Build up a chain of clone bios up to the limit */
1237
1238         if (!bi || off >= bi->bi_size || !len)
1239                 return NULL;            /* Nothing to clone */
1240
1241         end = &chain;
1242         while (len) {
1243                 unsigned int bi_size;
1244                 struct bio *bio;
1245
1246                 if (!bi) {
1247                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248                         goto out_err;   /* EINVAL; ran out of bio's */
1249                 }
1250                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1252                 if (!bio)
1253                         goto out_err;   /* ENOMEM */
1254
1255                 *end = bio;
1256                 end = &bio->bi_next;
1257
1258                 off += bi_size;
1259                 if (off == bi->bi_size) {
1260                         bi = bi->bi_next;
1261                         off = 0;
1262                 }
1263                 len -= bi_size;
1264         }
1265         *bio_src = bi;
1266         *offset = off;
1267
1268         return chain;
1269 out_err:
1270         bio_chain_put(chain);
1271
1272         return NULL;
1273 }
1274
1275 /*
1276  * The default/initial value for all object request flags is 0.  For
1277  * each flag, once its value is set to 1 it is never reset to 0
1278  * again.
1279  */
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev;
1284
1285                 rbd_dev = obj_request->img_request->rbd_dev;
1286                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1287                         obj_request);
1288         }
1289 }
1290
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 {
1293         smp_mb();
1294         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 }
1296
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1298 {
1299         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300                 struct rbd_device *rbd_dev = NULL;
1301
1302                 if (obj_request_img_data_test(obj_request))
1303                         rbd_dev = obj_request->img_request->rbd_dev;
1304                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1305                         obj_request);
1306         }
1307 }
1308
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1313 }
1314
1315 /*
1316  * This sets the KNOWN flag after (possibly) setting the EXISTS
1317  * flag.  The latter is set based on the "exists" value provided.
1318  *
1319  * Note that for our purposes once an object exists it never goes
1320  * away again.  It's possible that the response from two existence
1321  * checks are separated by the creation of the target object, and
1322  * the first ("doesn't exist") response arrives *after* the second
1323  * ("does exist").  In that case we ignore the second one.
1324  */
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1326                                 bool exists)
1327 {
1328         if (exists)
1329                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 {
1336         smp_mb();
1337         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 }
1339
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 {
1342         smp_mb();
1343         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 }
1345
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1347 {
1348         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349                 atomic_read(&obj_request->kref.refcount));
1350         kref_get(&obj_request->kref);
1351 }
1352
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request != NULL);
1357         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358                 atomic_read(&obj_request->kref.refcount));
1359         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 }
1361
1362 static bool img_request_child_test(struct rbd_img_request *img_request);
1363 static void rbd_parent_request_destroy(struct kref *kref);
1364 static void rbd_img_request_destroy(struct kref *kref);
1365 static void rbd_img_request_put(struct rbd_img_request *img_request)
1366 {
1367         rbd_assert(img_request != NULL);
1368         dout("%s: img %p (was %d)\n", __func__, img_request,
1369                 atomic_read(&img_request->kref.refcount));
1370         if (img_request_child_test(img_request))
1371                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1372         else
1373                 kref_put(&img_request->kref, rbd_img_request_destroy);
1374 }
1375
1376 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1377                                         struct rbd_obj_request *obj_request)
1378 {
1379         rbd_assert(obj_request->img_request == NULL);
1380
1381         /* Image request now owns object's original reference */
1382         obj_request->img_request = img_request;
1383         obj_request->which = img_request->obj_request_count;
1384         rbd_assert(!obj_request_img_data_test(obj_request));
1385         obj_request_img_data_set(obj_request);
1386         rbd_assert(obj_request->which != BAD_WHICH);
1387         img_request->obj_request_count++;
1388         list_add_tail(&obj_request->links, &img_request->obj_requests);
1389         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1390                 obj_request->which);
1391 }
1392
1393 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1394                                         struct rbd_obj_request *obj_request)
1395 {
1396         rbd_assert(obj_request->which != BAD_WHICH);
1397
1398         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1399                 obj_request->which);
1400         list_del(&obj_request->links);
1401         rbd_assert(img_request->obj_request_count > 0);
1402         img_request->obj_request_count--;
1403         rbd_assert(obj_request->which == img_request->obj_request_count);
1404         obj_request->which = BAD_WHICH;
1405         rbd_assert(obj_request_img_data_test(obj_request));
1406         rbd_assert(obj_request->img_request == img_request);
1407         obj_request->img_request = NULL;
1408         obj_request->callback = NULL;
1409         rbd_obj_request_put(obj_request);
1410 }
1411
1412 static bool obj_request_type_valid(enum obj_request_type type)
1413 {
1414         switch (type) {
1415         case OBJ_REQUEST_NODATA:
1416         case OBJ_REQUEST_BIO:
1417         case OBJ_REQUEST_PAGES:
1418                 return true;
1419         default:
1420                 return false;
1421         }
1422 }
1423
1424 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1425                                 struct rbd_obj_request *obj_request)
1426 {
1427         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1428
1429         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1430 }
1431
1432 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1433 {
1434
1435         dout("%s: img %p\n", __func__, img_request);
1436
1437         /*
1438          * If no error occurred, compute the aggregate transfer
1439          * count for the image request.  We could instead use
1440          * atomic64_cmpxchg() to update it as each object request
1441          * completes; not clear which way is better off hand.
1442          */
1443         if (!img_request->result) {
1444                 struct rbd_obj_request *obj_request;
1445                 u64 xferred = 0;
1446
1447                 for_each_obj_request(img_request, obj_request)
1448                         xferred += obj_request->xferred;
1449                 img_request->xferred = xferred;
1450         }
1451
1452         if (img_request->callback)
1453                 img_request->callback(img_request);
1454         else
1455                 rbd_img_request_put(img_request);
1456 }
1457
1458 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1459
1460 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1461 {
1462         dout("%s: obj %p\n", __func__, obj_request);
1463
1464         return wait_for_completion_interruptible(&obj_request->completion);
1465 }
1466
1467 /*
1468  * The default/initial value for all image request flags is 0.  Each
1469  * is conditionally set to 1 at image request initialization time
1470  * and currently never change thereafter.
1471  */
1472 static void img_request_write_set(struct rbd_img_request *img_request)
1473 {
1474         set_bit(IMG_REQ_WRITE, &img_request->flags);
1475         smp_mb();
1476 }
1477
1478 static bool img_request_write_test(struct rbd_img_request *img_request)
1479 {
1480         smp_mb();
1481         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1482 }
1483
1484 static void img_request_child_set(struct rbd_img_request *img_request)
1485 {
1486         set_bit(IMG_REQ_CHILD, &img_request->flags);
1487         smp_mb();
1488 }
1489
1490 static void img_request_child_clear(struct rbd_img_request *img_request)
1491 {
1492         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1493         smp_mb();
1494 }
1495
1496 static bool img_request_child_test(struct rbd_img_request *img_request)
1497 {
1498         smp_mb();
1499         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1500 }
1501
1502 static void img_request_layered_set(struct rbd_img_request *img_request)
1503 {
1504         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1505         smp_mb();
1506 }
1507
1508 static bool img_request_layered_test(struct rbd_img_request *img_request)
1509 {
1510         smp_mb();
1511         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1512 }
1513
1514 static void
1515 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1516 {
1517         u64 xferred = obj_request->xferred;
1518         u64 length = obj_request->length;
1519
1520         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1521                 obj_request, obj_request->img_request, obj_request->result,
1522                 xferred, length);
1523         /*
1524          * ENOENT means a hole in the image.  We zero-fill the
1525          * entire length of the request.  A short read also implies
1526          * zero-fill to the end of the request.  Either way we
1527          * update the xferred count to indicate the whole request
1528          * was satisfied.
1529          */
1530         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1531         if (obj_request->result == -ENOENT) {
1532                 if (obj_request->type == OBJ_REQUEST_BIO)
1533                         zero_bio_chain(obj_request->bio_list, 0);
1534                 else
1535                         zero_pages(obj_request->pages, 0, length);
1536                 obj_request->result = 0;
1537                 obj_request->xferred = length;
1538         } else if (xferred < length && !obj_request->result) {
1539                 if (obj_request->type == OBJ_REQUEST_BIO)
1540                         zero_bio_chain(obj_request->bio_list, xferred);
1541                 else
1542                         zero_pages(obj_request->pages, xferred, length);
1543                 obj_request->xferred = length;
1544         }
1545         obj_request_done_set(obj_request);
1546 }
1547
1548 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1549 {
1550         dout("%s: obj %p cb %p\n", __func__, obj_request,
1551                 obj_request->callback);
1552         if (obj_request->callback)
1553                 obj_request->callback(obj_request);
1554         else
1555                 complete_all(&obj_request->completion);
1556 }
1557
1558 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1559 {
1560         dout("%s: obj %p\n", __func__, obj_request);
1561         obj_request_done_set(obj_request);
1562 }
1563
1564 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1565 {
1566         struct rbd_img_request *img_request = NULL;
1567         struct rbd_device *rbd_dev = NULL;
1568         bool layered = false;
1569
1570         if (obj_request_img_data_test(obj_request)) {
1571                 img_request = obj_request->img_request;
1572                 layered = img_request && img_request_layered_test(img_request);
1573                 rbd_dev = img_request->rbd_dev;
1574         }
1575
1576         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1577                 obj_request, img_request, obj_request->result,
1578                 obj_request->xferred, obj_request->length);
1579         if (layered && obj_request->result == -ENOENT &&
1580                         obj_request->img_offset < rbd_dev->parent_overlap)
1581                 rbd_img_parent_read(obj_request);
1582         else if (img_request)
1583                 rbd_img_obj_request_read_callback(obj_request);
1584         else
1585                 obj_request_done_set(obj_request);
1586 }
1587
1588 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1589 {
1590         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1591                 obj_request->result, obj_request->length);
1592         /*
1593          * There is no such thing as a successful short write.  Set
1594          * it to our originally-requested length.
1595          */
1596         obj_request->xferred = obj_request->length;
1597         obj_request_done_set(obj_request);
1598 }
1599
1600 /*
1601  * For a simple stat call there's nothing to do.  We'll do more if
1602  * this is part of a write sequence for a layered image.
1603  */
1604 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1605 {
1606         dout("%s: obj %p\n", __func__, obj_request);
1607         obj_request_done_set(obj_request);
1608 }
1609
1610 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1611                                 struct ceph_msg *msg)
1612 {
1613         struct rbd_obj_request *obj_request = osd_req->r_priv;
1614         u16 opcode;
1615
1616         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1617         rbd_assert(osd_req == obj_request->osd_req);
1618         if (obj_request_img_data_test(obj_request)) {
1619                 rbd_assert(obj_request->img_request);
1620                 rbd_assert(obj_request->which != BAD_WHICH);
1621         } else {
1622                 rbd_assert(obj_request->which == BAD_WHICH);
1623         }
1624
1625         if (osd_req->r_result < 0)
1626                 obj_request->result = osd_req->r_result;
1627
1628         BUG_ON(osd_req->r_num_ops > 2);
1629
1630         /*
1631          * We support a 64-bit length, but ultimately it has to be
1632          * passed to blk_end_request(), which takes an unsigned int.
1633          */
1634         obj_request->xferred = osd_req->r_reply_op_len[0];
1635         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1636         opcode = osd_req->r_ops[0].op;
1637         switch (opcode) {
1638         case CEPH_OSD_OP_READ:
1639                 rbd_osd_read_callback(obj_request);
1640                 break;
1641         case CEPH_OSD_OP_WRITE:
1642                 rbd_osd_write_callback(obj_request);
1643                 break;
1644         case CEPH_OSD_OP_STAT:
1645                 rbd_osd_stat_callback(obj_request);
1646                 break;
1647         case CEPH_OSD_OP_CALL:
1648         case CEPH_OSD_OP_NOTIFY_ACK:
1649         case CEPH_OSD_OP_WATCH:
1650                 rbd_osd_trivial_callback(obj_request);
1651                 break;
1652         default:
1653                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1654                         obj_request->object_name, (unsigned short) opcode);
1655                 break;
1656         }
1657
1658         if (obj_request_done_test(obj_request))
1659                 rbd_obj_request_complete(obj_request);
1660 }
1661
1662 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1663 {
1664         struct rbd_img_request *img_request = obj_request->img_request;
1665         struct ceph_osd_request *osd_req = obj_request->osd_req;
1666         u64 snap_id;
1667
1668         rbd_assert(osd_req != NULL);
1669
1670         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1671         ceph_osdc_build_request(osd_req, obj_request->offset,
1672                         NULL, snap_id, NULL);
1673 }
1674
1675 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1676 {
1677         struct rbd_img_request *img_request = obj_request->img_request;
1678         struct ceph_osd_request *osd_req = obj_request->osd_req;
1679         struct ceph_snap_context *snapc;
1680         struct timespec mtime = CURRENT_TIME;
1681
1682         rbd_assert(osd_req != NULL);
1683
1684         snapc = img_request ? img_request->snapc : NULL;
1685         ceph_osdc_build_request(osd_req, obj_request->offset,
1686                         snapc, CEPH_NOSNAP, &mtime);
1687 }
1688
1689 static struct ceph_osd_request *rbd_osd_req_create(
1690                                         struct rbd_device *rbd_dev,
1691                                         bool write_request,
1692                                         struct rbd_obj_request *obj_request)
1693 {
1694         struct ceph_snap_context *snapc = NULL;
1695         struct ceph_osd_client *osdc;
1696         struct ceph_osd_request *osd_req;
1697
1698         if (obj_request_img_data_test(obj_request)) {
1699                 struct rbd_img_request *img_request = obj_request->img_request;
1700
1701                 rbd_assert(write_request ==
1702                                 img_request_write_test(img_request));
1703                 if (write_request)
1704                         snapc = img_request->snapc;
1705         }
1706
1707         /* Allocate and initialize the request, for the single op */
1708
1709         osdc = &rbd_dev->rbd_client->client->osdc;
1710         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1711         if (!osd_req)
1712                 return NULL;    /* ENOMEM */
1713
1714         if (write_request)
1715                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1716         else
1717                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1718
1719         osd_req->r_callback = rbd_osd_req_callback;
1720         osd_req->r_priv = obj_request;
1721
1722         osd_req->r_oid_len = strlen(obj_request->object_name);
1723         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1724         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1725
1726         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1727
1728         return osd_req;
1729 }
1730
1731 /*
1732  * Create a copyup osd request based on the information in the
1733  * object request supplied.  A copyup request has two osd ops,
1734  * a copyup method call, and a "normal" write request.
1735  */
1736 static struct ceph_osd_request *
1737 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1738 {
1739         struct rbd_img_request *img_request;
1740         struct ceph_snap_context *snapc;
1741         struct rbd_device *rbd_dev;
1742         struct ceph_osd_client *osdc;
1743         struct ceph_osd_request *osd_req;
1744
1745         rbd_assert(obj_request_img_data_test(obj_request));
1746         img_request = obj_request->img_request;
1747         rbd_assert(img_request);
1748         rbd_assert(img_request_write_test(img_request));
1749
1750         /* Allocate and initialize the request, for the two ops */
1751
1752         snapc = img_request->snapc;
1753         rbd_dev = img_request->rbd_dev;
1754         osdc = &rbd_dev->rbd_client->client->osdc;
1755         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1756         if (!osd_req)
1757                 return NULL;    /* ENOMEM */
1758
1759         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1760         osd_req->r_callback = rbd_osd_req_callback;
1761         osd_req->r_priv = obj_request;
1762
1763         osd_req->r_oid_len = strlen(obj_request->object_name);
1764         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1765         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1766
1767         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1768
1769         return osd_req;
1770 }
1771
1772
1773 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1774 {
1775         ceph_osdc_put_request(osd_req);
1776 }
1777
1778 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1779
1780 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1781                                                 u64 offset, u64 length,
1782                                                 enum obj_request_type type)
1783 {
1784         struct rbd_obj_request *obj_request;
1785         size_t size;
1786         char *name;
1787
1788         rbd_assert(obj_request_type_valid(type));
1789
1790         size = strlen(object_name) + 1;
1791         name = kmalloc(size, GFP_KERNEL);
1792         if (!name)
1793                 return NULL;
1794
1795         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1796         if (!obj_request) {
1797                 kfree(name);
1798                 return NULL;
1799         }
1800
1801         obj_request->object_name = memcpy(name, object_name, size);
1802         obj_request->offset = offset;
1803         obj_request->length = length;
1804         obj_request->flags = 0;
1805         obj_request->which = BAD_WHICH;
1806         obj_request->type = type;
1807         INIT_LIST_HEAD(&obj_request->links);
1808         init_completion(&obj_request->completion);
1809         kref_init(&obj_request->kref);
1810
1811         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1812                 offset, length, (int)type, obj_request);
1813
1814         return obj_request;
1815 }
1816
1817 static void rbd_obj_request_destroy(struct kref *kref)
1818 {
1819         struct rbd_obj_request *obj_request;
1820
1821         obj_request = container_of(kref, struct rbd_obj_request, kref);
1822
1823         dout("%s: obj %p\n", __func__, obj_request);
1824
1825         rbd_assert(obj_request->img_request == NULL);
1826         rbd_assert(obj_request->which == BAD_WHICH);
1827
1828         if (obj_request->osd_req)
1829                 rbd_osd_req_destroy(obj_request->osd_req);
1830
1831         rbd_assert(obj_request_type_valid(obj_request->type));
1832         switch (obj_request->type) {
1833         case OBJ_REQUEST_NODATA:
1834                 break;          /* Nothing to do */
1835         case OBJ_REQUEST_BIO:
1836                 if (obj_request->bio_list)
1837                         bio_chain_put(obj_request->bio_list);
1838                 break;
1839         case OBJ_REQUEST_PAGES:
1840                 if (obj_request->pages)
1841                         ceph_release_page_vector(obj_request->pages,
1842                                                 obj_request->page_count);
1843                 break;
1844         }
1845
1846         kfree(obj_request->object_name);
1847         obj_request->object_name = NULL;
1848         kmem_cache_free(rbd_obj_request_cache, obj_request);
1849 }
1850
1851 /* It's OK to call this for a device with no parent */
1852
1853 static void rbd_spec_put(struct rbd_spec *spec);
1854 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1855 {
1856         rbd_dev_remove_parent(rbd_dev);
1857         rbd_spec_put(rbd_dev->parent_spec);
1858         rbd_dev->parent_spec = NULL;
1859         rbd_dev->parent_overlap = 0;
1860 }
1861
1862 /*
1863  * Caller is responsible for filling in the list of object requests
1864  * that comprises the image request, and the Linux request pointer
1865  * (if there is one).
1866  */
1867 static struct rbd_img_request *rbd_img_request_create(
1868                                         struct rbd_device *rbd_dev,
1869                                         u64 offset, u64 length,
1870                                         bool write_request)
1871 {
1872         struct rbd_img_request *img_request;
1873
1874         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1875         if (!img_request)
1876                 return NULL;
1877
1878         if (write_request) {
1879                 down_read(&rbd_dev->header_rwsem);
1880                 ceph_get_snap_context(rbd_dev->header.snapc);
1881                 up_read(&rbd_dev->header_rwsem);
1882         }
1883
1884         img_request->rq = NULL;
1885         img_request->rbd_dev = rbd_dev;
1886         img_request->offset = offset;
1887         img_request->length = length;
1888         img_request->flags = 0;
1889         if (write_request) {
1890                 img_request_write_set(img_request);
1891                 img_request->snapc = rbd_dev->header.snapc;
1892         } else {
1893                 img_request->snap_id = rbd_dev->spec->snap_id;
1894         }
1895         if (rbd_dev->parent_overlap)
1896                 img_request_layered_set(img_request);
1897         spin_lock_init(&img_request->completion_lock);
1898         img_request->next_completion = 0;
1899         img_request->callback = NULL;
1900         img_request->result = 0;
1901         img_request->obj_request_count = 0;
1902         INIT_LIST_HEAD(&img_request->obj_requests);
1903         kref_init(&img_request->kref);
1904
1905         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1906                 write_request ? "write" : "read", offset, length,
1907                 img_request);
1908
1909         return img_request;
1910 }
1911
1912 static void rbd_img_request_destroy(struct kref *kref)
1913 {
1914         struct rbd_img_request *img_request;
1915         struct rbd_obj_request *obj_request;
1916         struct rbd_obj_request *next_obj_request;
1917
1918         img_request = container_of(kref, struct rbd_img_request, kref);
1919
1920         dout("%s: img %p\n", __func__, img_request);
1921
1922         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1923                 rbd_img_obj_request_del(img_request, obj_request);
1924         rbd_assert(img_request->obj_request_count == 0);
1925
1926         if (img_request_write_test(img_request))
1927                 ceph_put_snap_context(img_request->snapc);
1928
1929         kmem_cache_free(rbd_img_request_cache, img_request);
1930 }
1931
1932 static struct rbd_img_request *rbd_parent_request_create(
1933                                         struct rbd_obj_request *obj_request,
1934                                         u64 img_offset, u64 length)
1935 {
1936         struct rbd_img_request *parent_request;
1937         struct rbd_device *rbd_dev;
1938
1939         rbd_assert(obj_request->img_request);
1940         rbd_dev = obj_request->img_request->rbd_dev;
1941
1942         parent_request = rbd_img_request_create(rbd_dev->parent,
1943                                                 img_offset, length, false);
1944         if (!parent_request)
1945                 return NULL;
1946
1947         img_request_child_set(parent_request);
1948         rbd_obj_request_get(obj_request);
1949         parent_request->obj_request = obj_request;
1950
1951         return parent_request;
1952 }
1953
1954 static void rbd_parent_request_destroy(struct kref *kref)
1955 {
1956         struct rbd_img_request *parent_request;
1957         struct rbd_obj_request *orig_request;
1958
1959         parent_request = container_of(kref, struct rbd_img_request, kref);
1960         orig_request = parent_request->obj_request;
1961
1962         parent_request->obj_request = NULL;
1963         rbd_obj_request_put(orig_request);
1964         img_request_child_clear(parent_request);
1965
1966         rbd_img_request_destroy(kref);
1967 }
1968
1969 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1970 {
1971         struct rbd_img_request *img_request;
1972         unsigned int xferred;
1973         int result;
1974         bool more;
1975
1976         rbd_assert(obj_request_img_data_test(obj_request));
1977         img_request = obj_request->img_request;
1978
1979         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1980         xferred = (unsigned int)obj_request->xferred;
1981         result = obj_request->result;
1982         if (result) {
1983                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1984
1985                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1986                         img_request_write_test(img_request) ? "write" : "read",
1987                         obj_request->length, obj_request->img_offset,
1988                         obj_request->offset);
1989                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1990                         result, xferred);
1991                 if (!img_request->result)
1992                         img_request->result = result;
1993         }
1994
1995         /* Image object requests don't own their page array */
1996
1997         if (obj_request->type == OBJ_REQUEST_PAGES) {
1998                 obj_request->pages = NULL;
1999                 obj_request->page_count = 0;
2000         }
2001
2002         if (img_request_child_test(img_request)) {
2003                 rbd_assert(img_request->obj_request != NULL);
2004                 more = obj_request->which < img_request->obj_request_count - 1;
2005         } else {
2006                 rbd_assert(img_request->rq != NULL);
2007                 more = blk_end_request(img_request->rq, result, xferred);
2008         }
2009
2010         return more;
2011 }
2012
2013 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2014 {
2015         struct rbd_img_request *img_request;
2016         u32 which = obj_request->which;
2017         bool more = true;
2018
2019         rbd_assert(obj_request_img_data_test(obj_request));
2020         img_request = obj_request->img_request;
2021
2022         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2023         rbd_assert(img_request != NULL);
2024         rbd_assert(img_request->obj_request_count > 0);
2025         rbd_assert(which != BAD_WHICH);
2026         rbd_assert(which < img_request->obj_request_count);
2027         rbd_assert(which >= img_request->next_completion);
2028
2029         spin_lock_irq(&img_request->completion_lock);
2030         if (which != img_request->next_completion)
2031                 goto out;
2032
2033         for_each_obj_request_from(img_request, obj_request) {
2034                 rbd_assert(more);
2035                 rbd_assert(which < img_request->obj_request_count);
2036
2037                 if (!obj_request_done_test(obj_request))
2038                         break;
2039                 more = rbd_img_obj_end_request(obj_request);
2040                 which++;
2041         }
2042
2043         rbd_assert(more ^ (which == img_request->obj_request_count));
2044         img_request->next_completion = which;
2045 out:
2046         spin_unlock_irq(&img_request->completion_lock);
2047
2048         if (!more)
2049                 rbd_img_request_complete(img_request);
2050 }
2051
2052 /*
2053  * Split up an image request into one or more object requests, each
2054  * to a different object.  The "type" parameter indicates whether
2055  * "data_desc" is the pointer to the head of a list of bio
2056  * structures, or the base of a page array.  In either case this
2057  * function assumes data_desc describes memory sufficient to hold
2058  * all data described by the image request.
2059  */
2060 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2061                                         enum obj_request_type type,
2062                                         void *data_desc)
2063 {
2064         struct rbd_device *rbd_dev = img_request->rbd_dev;
2065         struct rbd_obj_request *obj_request = NULL;
2066         struct rbd_obj_request *next_obj_request;
2067         bool write_request = img_request_write_test(img_request);
2068         struct bio *bio_list;
2069         unsigned int bio_offset = 0;
2070         struct page **pages;
2071         u64 img_offset;
2072         u64 resid;
2073         u16 opcode;
2074
2075         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2076                 (int)type, data_desc);
2077
2078         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2079         img_offset = img_request->offset;
2080         resid = img_request->length;
2081         rbd_assert(resid > 0);
2082
2083         if (type == OBJ_REQUEST_BIO) {
2084                 bio_list = data_desc;
2085                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2086         } else {
2087                 rbd_assert(type == OBJ_REQUEST_PAGES);
2088                 pages = data_desc;
2089         }
2090
2091         while (resid) {
2092                 struct ceph_osd_request *osd_req;
2093                 const char *object_name;
2094                 u64 offset;
2095                 u64 length;
2096
2097                 object_name = rbd_segment_name(rbd_dev, img_offset);
2098                 if (!object_name)
2099                         goto out_unwind;
2100                 offset = rbd_segment_offset(rbd_dev, img_offset);
2101                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2102                 obj_request = rbd_obj_request_create(object_name,
2103                                                 offset, length, type);
2104                 /* object request has its own copy of the object name */
2105                 rbd_segment_name_free(object_name);
2106                 if (!obj_request)
2107                         goto out_unwind;
2108
2109                 if (type == OBJ_REQUEST_BIO) {
2110                         unsigned int clone_size;
2111
2112                         rbd_assert(length <= (u64)UINT_MAX);
2113                         clone_size = (unsigned int)length;
2114                         obj_request->bio_list =
2115                                         bio_chain_clone_range(&bio_list,
2116                                                                 &bio_offset,
2117                                                                 clone_size,
2118                                                                 GFP_ATOMIC);
2119                         if (!obj_request->bio_list)
2120                                 goto out_partial;
2121                 } else {
2122                         unsigned int page_count;
2123
2124                         obj_request->pages = pages;
2125                         page_count = (u32)calc_pages_for(offset, length);
2126                         obj_request->page_count = page_count;
2127                         if ((offset + length) & ~PAGE_MASK)
2128                                 page_count--;   /* more on last page */
2129                         pages += page_count;
2130                 }
2131
2132                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2133                                                 obj_request);
2134                 if (!osd_req)
2135                         goto out_partial;
2136                 obj_request->osd_req = osd_req;
2137                 obj_request->callback = rbd_img_obj_callback;
2138
2139                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2140                                                 0, 0);
2141                 if (type == OBJ_REQUEST_BIO)
2142                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2143                                         obj_request->bio_list, length);
2144                 else
2145                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2146                                         obj_request->pages, length,
2147                                         offset & ~PAGE_MASK, false, false);
2148
2149                 if (write_request)
2150                         rbd_osd_req_format_write(obj_request);
2151                 else
2152                         rbd_osd_req_format_read(obj_request);
2153
2154                 obj_request->img_offset = img_offset;
2155                 rbd_img_obj_request_add(img_request, obj_request);
2156
2157                 img_offset += length;
2158                 resid -= length;
2159         }
2160
2161         return 0;
2162
2163 out_partial:
2164         rbd_obj_request_put(obj_request);
2165 out_unwind:
2166         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2167                 rbd_obj_request_put(obj_request);
2168
2169         return -ENOMEM;
2170 }
2171
2172 static void
2173 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2174 {
2175         struct rbd_img_request *img_request;
2176         struct rbd_device *rbd_dev;
2177         struct page **pages;
2178         u32 page_count;
2179
2180         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2181         rbd_assert(obj_request_img_data_test(obj_request));
2182         img_request = obj_request->img_request;
2183         rbd_assert(img_request);
2184
2185         rbd_dev = img_request->rbd_dev;
2186         rbd_assert(rbd_dev);
2187
2188         pages = obj_request->copyup_pages;
2189         rbd_assert(pages != NULL);
2190         obj_request->copyup_pages = NULL;
2191         page_count = obj_request->copyup_page_count;
2192         rbd_assert(page_count);
2193         obj_request->copyup_page_count = 0;
2194         ceph_release_page_vector(pages, page_count);
2195
2196         /*
2197          * We want the transfer count to reflect the size of the
2198          * original write request.  There is no such thing as a
2199          * successful short write, so if the request was successful
2200          * we can just set it to the originally-requested length.
2201          */
2202         if (!obj_request->result)
2203                 obj_request->xferred = obj_request->length;
2204
2205         /* Finish up with the normal image object callback */
2206
2207         rbd_img_obj_callback(obj_request);
2208 }
2209
2210 static void
2211 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2212 {
2213         struct rbd_obj_request *orig_request;
2214         struct ceph_osd_request *osd_req;
2215         struct ceph_osd_client *osdc;
2216         struct rbd_device *rbd_dev;
2217         struct page **pages;
2218         u32 page_count;
2219         int result;
2220         u64 parent_length;
2221         u64 offset;
2222         u64 length;
2223
2224         rbd_assert(img_request_child_test(img_request));
2225
2226         /* First get what we need from the image request */
2227
2228         pages = img_request->copyup_pages;
2229         rbd_assert(pages != NULL);
2230         img_request->copyup_pages = NULL;
2231         page_count = img_request->copyup_page_count;
2232         rbd_assert(page_count);
2233         img_request->copyup_page_count = 0;
2234
2235         orig_request = img_request->obj_request;
2236         rbd_assert(orig_request != NULL);
2237         rbd_assert(obj_request_type_valid(orig_request->type));
2238         result = img_request->result;
2239         parent_length = img_request->length;
2240         rbd_assert(parent_length == img_request->xferred);
2241         rbd_img_request_put(img_request);
2242
2243         rbd_assert(orig_request->img_request);
2244         rbd_dev = orig_request->img_request->rbd_dev;
2245         rbd_assert(rbd_dev);
2246
2247         if (result)
2248                 goto out_err;
2249
2250         /*
2251          * The original osd request is of no use to use any more.
2252          * We need a new one that can hold the two ops in a copyup
2253          * request.  Allocate the new copyup osd request for the
2254          * original request, and release the old one.
2255          */
2256         result = -ENOMEM;
2257         osd_req = rbd_osd_req_create_copyup(orig_request);
2258         if (!osd_req)
2259                 goto out_err;
2260         rbd_osd_req_destroy(orig_request->osd_req);
2261         orig_request->osd_req = osd_req;
2262         orig_request->copyup_pages = pages;
2263         orig_request->copyup_page_count = page_count;
2264
2265         /* Initialize the copyup op */
2266
2267         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2268         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2269                                                 false, false);
2270
2271         /* Then the original write request op */
2272
2273         offset = orig_request->offset;
2274         length = orig_request->length;
2275         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2276                                         offset, length, 0, 0);
2277         if (orig_request->type == OBJ_REQUEST_BIO)
2278                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2279                                         orig_request->bio_list, length);
2280         else
2281                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2282                                         orig_request->pages, length,
2283                                         offset & ~PAGE_MASK, false, false);
2284
2285         rbd_osd_req_format_write(orig_request);
2286
2287         /* All set, send it off. */
2288
2289         orig_request->callback = rbd_img_obj_copyup_callback;
2290         osdc = &rbd_dev->rbd_client->client->osdc;
2291         result = rbd_obj_request_submit(osdc, orig_request);
2292         if (!result)
2293                 return;
2294 out_err:
2295         /* Record the error code and complete the request */
2296
2297         orig_request->result = result;
2298         orig_request->xferred = 0;
2299         obj_request_done_set(orig_request);
2300         rbd_obj_request_complete(orig_request);
2301 }
2302
2303 /*
2304  * Read from the parent image the range of data that covers the
2305  * entire target of the given object request.  This is used for
2306  * satisfying a layered image write request when the target of an
2307  * object request from the image request does not exist.
2308  *
2309  * A page array big enough to hold the returned data is allocated
2310  * and supplied to rbd_img_request_fill() as the "data descriptor."
2311  * When the read completes, this page array will be transferred to
2312  * the original object request for the copyup operation.
2313  *
2314  * If an error occurs, record it as the result of the original
2315  * object request and mark it done so it gets completed.
2316  */
2317 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2318 {
2319         struct rbd_img_request *img_request = NULL;
2320         struct rbd_img_request *parent_request = NULL;
2321         struct rbd_device *rbd_dev;
2322         u64 img_offset;
2323         u64 length;
2324         struct page **pages = NULL;
2325         u32 page_count;
2326         int result;
2327
2328         rbd_assert(obj_request_img_data_test(obj_request));
2329         rbd_assert(obj_request_type_valid(obj_request->type));
2330
2331         img_request = obj_request->img_request;
2332         rbd_assert(img_request != NULL);
2333         rbd_dev = img_request->rbd_dev;
2334         rbd_assert(rbd_dev->parent != NULL);
2335
2336         /*
2337          * Determine the byte range covered by the object in the
2338          * child image to which the original request was to be sent.
2339          */
2340         img_offset = obj_request->img_offset - obj_request->offset;
2341         length = (u64)1 << rbd_dev->header.obj_order;
2342
2343         /*
2344          * There is no defined parent data beyond the parent
2345          * overlap, so limit what we read at that boundary if
2346          * necessary.
2347          */
2348         if (img_offset + length > rbd_dev->parent_overlap) {
2349                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2350                 length = rbd_dev->parent_overlap - img_offset;
2351         }
2352
2353         /*
2354          * Allocate a page array big enough to receive the data read
2355          * from the parent.
2356          */
2357         page_count = (u32)calc_pages_for(0, length);
2358         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2359         if (IS_ERR(pages)) {
2360                 result = PTR_ERR(pages);
2361                 pages = NULL;
2362                 goto out_err;
2363         }
2364
2365         result = -ENOMEM;
2366         parent_request = rbd_parent_request_create(obj_request,
2367                                                 img_offset, length);
2368         if (!parent_request)
2369                 goto out_err;
2370
2371         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2372         if (result)
2373                 goto out_err;
2374         parent_request->copyup_pages = pages;
2375         parent_request->copyup_page_count = page_count;
2376
2377         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2378         result = rbd_img_request_submit(parent_request);
2379         if (!result)
2380                 return 0;
2381
2382         parent_request->copyup_pages = NULL;
2383         parent_request->copyup_page_count = 0;
2384         parent_request->obj_request = NULL;
2385         rbd_obj_request_put(obj_request);
2386 out_err:
2387         if (pages)
2388                 ceph_release_page_vector(pages, page_count);
2389         if (parent_request)
2390                 rbd_img_request_put(parent_request);
2391         obj_request->result = result;
2392         obj_request->xferred = 0;
2393         obj_request_done_set(obj_request);
2394
2395         return result;
2396 }
2397
2398 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2399 {
2400         struct rbd_obj_request *orig_request;
2401         int result;
2402
2403         rbd_assert(!obj_request_img_data_test(obj_request));
2404
2405         /*
2406          * All we need from the object request is the original
2407          * request and the result of the STAT op.  Grab those, then
2408          * we're done with the request.
2409          */
2410         orig_request = obj_request->obj_request;
2411         obj_request->obj_request = NULL;
2412         rbd_assert(orig_request);
2413         rbd_assert(orig_request->img_request);
2414
2415         result = obj_request->result;
2416         obj_request->result = 0;
2417
2418         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2419                 obj_request, orig_request, result,
2420                 obj_request->xferred, obj_request->length);
2421         rbd_obj_request_put(obj_request);
2422
2423         rbd_assert(orig_request);
2424         rbd_assert(orig_request->img_request);
2425
2426         /*
2427          * Our only purpose here is to determine whether the object
2428          * exists, and we don't want to treat the non-existence as
2429          * an error.  If something else comes back, transfer the
2430          * error to the original request and complete it now.
2431          */
2432         if (!result) {
2433                 obj_request_existence_set(orig_request, true);
2434         } else if (result == -ENOENT) {
2435                 obj_request_existence_set(orig_request, false);
2436         } else if (result) {
2437                 orig_request->result = result;
2438                 goto out;
2439         }
2440
2441         /*
2442          * Resubmit the original request now that we have recorded
2443          * whether the target object exists.
2444          */
2445         orig_request->result = rbd_img_obj_request_submit(orig_request);
2446 out:
2447         if (orig_request->result)
2448                 rbd_obj_request_complete(orig_request);
2449         rbd_obj_request_put(orig_request);
2450 }
2451
2452 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2453 {
2454         struct rbd_obj_request *stat_request;
2455         struct rbd_device *rbd_dev;
2456         struct ceph_osd_client *osdc;
2457         struct page **pages = NULL;
2458         u32 page_count;
2459         size_t size;
2460         int ret;
2461
2462         /*
2463          * The response data for a STAT call consists of:
2464          *     le64 length;
2465          *     struct {
2466          *         le32 tv_sec;
2467          *         le32 tv_nsec;
2468          *     } mtime;
2469          */
2470         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2471         page_count = (u32)calc_pages_for(0, size);
2472         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2473         if (IS_ERR(pages))
2474                 return PTR_ERR(pages);
2475
2476         ret = -ENOMEM;
2477         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2478                                                         OBJ_REQUEST_PAGES);
2479         if (!stat_request)
2480                 goto out;
2481
2482         rbd_obj_request_get(obj_request);
2483         stat_request->obj_request = obj_request;
2484         stat_request->pages = pages;
2485         stat_request->page_count = page_count;
2486
2487         rbd_assert(obj_request->img_request);
2488         rbd_dev = obj_request->img_request->rbd_dev;
2489         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2490                                                 stat_request);
2491         if (!stat_request->osd_req)
2492                 goto out;
2493         stat_request->callback = rbd_img_obj_exists_callback;
2494
2495         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2496         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2497                                         false, false);
2498         rbd_osd_req_format_read(stat_request);
2499
2500         osdc = &rbd_dev->rbd_client->client->osdc;
2501         ret = rbd_obj_request_submit(osdc, stat_request);
2502 out:
2503         if (ret)
2504                 rbd_obj_request_put(obj_request);
2505
2506         return ret;
2507 }
2508
2509 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2510 {
2511         struct rbd_img_request *img_request;
2512         struct rbd_device *rbd_dev;
2513         bool known;
2514
2515         rbd_assert(obj_request_img_data_test(obj_request));
2516
2517         img_request = obj_request->img_request;
2518         rbd_assert(img_request);
2519         rbd_dev = img_request->rbd_dev;
2520
2521         /*
2522          * Only writes to layered images need special handling.
2523          * Reads and non-layered writes are simple object requests.
2524          * Layered writes that start beyond the end of the overlap
2525          * with the parent have no parent data, so they too are
2526          * simple object requests.  Finally, if the target object is
2527          * known to already exist, its parent data has already been
2528          * copied, so a write to the object can also be handled as a
2529          * simple object request.
2530          */
2531         if (!img_request_write_test(img_request) ||
2532                 !img_request_layered_test(img_request) ||
2533                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2534                 ((known = obj_request_known_test(obj_request)) &&
2535                         obj_request_exists_test(obj_request))) {
2536
2537                 struct rbd_device *rbd_dev;
2538                 struct ceph_osd_client *osdc;
2539
2540                 rbd_dev = obj_request->img_request->rbd_dev;
2541                 osdc = &rbd_dev->rbd_client->client->osdc;
2542
2543                 return rbd_obj_request_submit(osdc, obj_request);
2544         }
2545
2546         /*
2547          * It's a layered write.  The target object might exist but
2548          * we may not know that yet.  If we know it doesn't exist,
2549          * start by reading the data for the full target object from
2550          * the parent so we can use it for a copyup to the target.
2551          */
2552         if (known)
2553                 return rbd_img_obj_parent_read_full(obj_request);
2554
2555         /* We don't know whether the target exists.  Go find out. */
2556
2557         return rbd_img_obj_exists_submit(obj_request);
2558 }
2559
2560 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2561 {
2562         struct rbd_obj_request *obj_request;
2563         struct rbd_obj_request *next_obj_request;
2564
2565         dout("%s: img %p\n", __func__, img_request);
2566         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2567                 int ret;
2568
2569                 ret = rbd_img_obj_request_submit(obj_request);
2570                 if (ret)
2571                         return ret;
2572         }
2573
2574         return 0;
2575 }
2576
2577 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2578 {
2579         struct rbd_obj_request *obj_request;
2580         struct rbd_device *rbd_dev;
2581         u64 obj_end;
2582
2583         rbd_assert(img_request_child_test(img_request));
2584
2585         obj_request = img_request->obj_request;
2586         rbd_assert(obj_request);
2587         rbd_assert(obj_request->img_request);
2588
2589         obj_request->result = img_request->result;
2590         if (obj_request->result)
2591                 goto out;
2592
2593         /*
2594          * We need to zero anything beyond the parent overlap
2595          * boundary.  Since rbd_img_obj_request_read_callback()
2596          * will zero anything beyond the end of a short read, an
2597          * easy way to do this is to pretend the data from the
2598          * parent came up short--ending at the overlap boundary.
2599          */
2600         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2601         obj_end = obj_request->img_offset + obj_request->length;
2602         rbd_dev = obj_request->img_request->rbd_dev;
2603         if (obj_end > rbd_dev->parent_overlap) {
2604                 u64 xferred = 0;
2605
2606                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2607                         xferred = rbd_dev->parent_overlap -
2608                                         obj_request->img_offset;
2609
2610                 obj_request->xferred = min(img_request->xferred, xferred);
2611         } else {
2612                 obj_request->xferred = img_request->xferred;
2613         }
2614 out:
2615         rbd_img_request_put(img_request);
2616         rbd_img_obj_request_read_callback(obj_request);
2617         rbd_obj_request_complete(obj_request);
2618 }
2619
2620 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2621 {
2622         struct rbd_img_request *img_request;
2623         int result;
2624
2625         rbd_assert(obj_request_img_data_test(obj_request));
2626         rbd_assert(obj_request->img_request != NULL);
2627         rbd_assert(obj_request->result == (s32) -ENOENT);
2628         rbd_assert(obj_request_type_valid(obj_request->type));
2629
2630         /* rbd_read_finish(obj_request, obj_request->length); */
2631         img_request = rbd_parent_request_create(obj_request,
2632                                                 obj_request->img_offset,
2633                                                 obj_request->length);
2634         result = -ENOMEM;
2635         if (!img_request)
2636                 goto out_err;
2637
2638         if (obj_request->type == OBJ_REQUEST_BIO)
2639                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2640                                                 obj_request->bio_list);
2641         else
2642                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2643                                                 obj_request->pages);
2644         if (result)
2645                 goto out_err;
2646
2647         img_request->callback = rbd_img_parent_read_callback;
2648         result = rbd_img_request_submit(img_request);
2649         if (result)
2650                 goto out_err;
2651
2652         return;
2653 out_err:
2654         if (img_request)
2655                 rbd_img_request_put(img_request);
2656         obj_request->result = result;
2657         obj_request->xferred = 0;
2658         obj_request_done_set(obj_request);
2659 }
2660
2661 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2662 {
2663         struct rbd_obj_request *obj_request;
2664         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2665         int ret;
2666
2667         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2668                                                         OBJ_REQUEST_NODATA);
2669         if (!obj_request)
2670                 return -ENOMEM;
2671
2672         ret = -ENOMEM;
2673         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2674         if (!obj_request->osd_req)
2675                 goto out;
2676         obj_request->callback = rbd_obj_request_put;
2677
2678         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2679                                         notify_id, 0, 0);
2680         rbd_osd_req_format_read(obj_request);
2681
2682         ret = rbd_obj_request_submit(osdc, obj_request);
2683 out:
2684         if (ret)
2685                 rbd_obj_request_put(obj_request);
2686
2687         return ret;
2688 }
2689
2690 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2691 {
2692         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2693         int ret;
2694
2695         if (!rbd_dev)
2696                 return;
2697
2698         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2699                 rbd_dev->header_name, (unsigned long long)notify_id,
2700                 (unsigned int)opcode);
2701         ret = rbd_dev_refresh(rbd_dev);
2702         if (ret)
2703                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2704
2705         rbd_obj_notify_ack(rbd_dev, notify_id);
2706 }
2707
2708 /*
2709  * Request sync osd watch/unwatch.  The value of "start" determines
2710  * whether a watch request is being initiated or torn down.
2711  */
2712 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2713 {
2714         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2715         struct rbd_obj_request *obj_request;
2716         int ret;
2717
2718         rbd_assert(start ^ !!rbd_dev->watch_event);
2719         rbd_assert(start ^ !!rbd_dev->watch_request);
2720
2721         if (start) {
2722                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2723                                                 &rbd_dev->watch_event);
2724                 if (ret < 0)
2725                         return ret;
2726                 rbd_assert(rbd_dev->watch_event != NULL);
2727         }
2728
2729         ret = -ENOMEM;
2730         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2731                                                         OBJ_REQUEST_NODATA);
2732         if (!obj_request)
2733                 goto out_cancel;
2734
2735         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2736         if (!obj_request->osd_req)
2737                 goto out_cancel;
2738
2739         if (start)
2740                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2741         else
2742                 ceph_osdc_unregister_linger_request(osdc,
2743                                         rbd_dev->watch_request->osd_req);
2744
2745         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2746                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2747         rbd_osd_req_format_write(obj_request);
2748
2749         ret = rbd_obj_request_submit(osdc, obj_request);
2750         if (ret)
2751                 goto out_cancel;
2752         ret = rbd_obj_request_wait(obj_request);
2753         if (ret)
2754                 goto out_cancel;
2755         ret = obj_request->result;
2756         if (ret)
2757                 goto out_cancel;
2758
2759         /*
2760          * A watch request is set to linger, so the underlying osd
2761          * request won't go away until we unregister it.  We retain
2762          * a pointer to the object request during that time (in
2763          * rbd_dev->watch_request), so we'll keep a reference to
2764          * it.  We'll drop that reference (below) after we've
2765          * unregistered it.
2766          */
2767         if (start) {
2768                 rbd_dev->watch_request = obj_request;
2769
2770                 return 0;
2771         }
2772
2773         /* We have successfully torn down the watch request */
2774
2775         rbd_obj_request_put(rbd_dev->watch_request);
2776         rbd_dev->watch_request = NULL;
2777 out_cancel:
2778         /* Cancel the event if we're tearing down, or on error */
2779         ceph_osdc_cancel_event(rbd_dev->watch_event);
2780         rbd_dev->watch_event = NULL;
2781         if (obj_request)
2782                 rbd_obj_request_put(obj_request);
2783
2784         return ret;
2785 }
2786
2787 /*
2788  * Synchronous osd object method call.  Returns the number of bytes
2789  * returned in the outbound buffer, or a negative error code.
2790  */
2791 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2792                              const char *object_name,
2793                              const char *class_name,
2794                              const char *method_name,
2795                              const void *outbound,
2796                              size_t outbound_size,
2797                              void *inbound,
2798                              size_t inbound_size)
2799 {
2800         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2801         struct rbd_obj_request *obj_request;
2802         struct page **pages;
2803         u32 page_count;
2804         int ret;
2805
2806         /*
2807          * Method calls are ultimately read operations.  The result
2808          * should placed into the inbound buffer provided.  They
2809          * also supply outbound data--parameters for the object
2810          * method.  Currently if this is present it will be a
2811          * snapshot id.
2812          */
2813         page_count = (u32)calc_pages_for(0, inbound_size);
2814         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2815         if (IS_ERR(pages))
2816                 return PTR_ERR(pages);
2817
2818         ret = -ENOMEM;
2819         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2820                                                         OBJ_REQUEST_PAGES);
2821         if (!obj_request)
2822                 goto out;
2823
2824         obj_request->pages = pages;
2825         obj_request->page_count = page_count;
2826
2827         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2828         if (!obj_request->osd_req)
2829                 goto out;
2830
2831         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2832                                         class_name, method_name);
2833         if (outbound_size) {
2834                 struct ceph_pagelist *pagelist;
2835
2836                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2837                 if (!pagelist)
2838                         goto out;
2839
2840                 ceph_pagelist_init(pagelist);
2841                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2842                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2843                                                 pagelist);
2844         }
2845         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2846                                         obj_request->pages, inbound_size,
2847                                         0, false, false);
2848         rbd_osd_req_format_read(obj_request);
2849
2850         ret = rbd_obj_request_submit(osdc, obj_request);
2851         if (ret)
2852                 goto out;
2853         ret = rbd_obj_request_wait(obj_request);
2854         if (ret)
2855                 goto out;
2856
2857         ret = obj_request->result;
2858         if (ret < 0)
2859                 goto out;
2860
2861         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2862         ret = (int)obj_request->xferred;
2863         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2864 out:
2865         if (obj_request)
2866                 rbd_obj_request_put(obj_request);
2867         else
2868                 ceph_release_page_vector(pages, page_count);
2869
2870         return ret;
2871 }
2872
2873 static void rbd_request_fn(struct request_queue *q)
2874                 __releases(q->queue_lock) __acquires(q->queue_lock)
2875 {
2876         struct rbd_device *rbd_dev = q->queuedata;
2877         bool read_only = rbd_dev->mapping.read_only;
2878         struct request *rq;
2879         int result;
2880
2881         while ((rq = blk_fetch_request(q))) {
2882                 bool write_request = rq_data_dir(rq) == WRITE;
2883                 struct rbd_img_request *img_request;
2884                 u64 offset;
2885                 u64 length;
2886
2887                 /* Ignore any non-FS requests that filter through. */
2888
2889                 if (rq->cmd_type != REQ_TYPE_FS) {
2890                         dout("%s: non-fs request type %d\n", __func__,
2891                                 (int) rq->cmd_type);
2892                         __blk_end_request_all(rq, 0);
2893                         continue;
2894                 }
2895
2896                 /* Ignore/skip any zero-length requests */
2897
2898                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2899                 length = (u64) blk_rq_bytes(rq);
2900
2901                 if (!length) {
2902                         dout("%s: zero-length request\n", __func__);
2903                         __blk_end_request_all(rq, 0);
2904                         continue;
2905                 }
2906
2907                 spin_unlock_irq(q->queue_lock);
2908
2909                 /* Disallow writes to a read-only device */
2910
2911                 if (write_request) {
2912                         result = -EROFS;
2913                         if (read_only)
2914                                 goto end_request;
2915                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2916                 }
2917
2918                 /*
2919                  * Quit early if the mapped snapshot no longer
2920                  * exists.  It's still possible the snapshot will
2921                  * have disappeared by the time our request arrives
2922                  * at the osd, but there's no sense in sending it if
2923                  * we already know.
2924                  */
2925                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2926                         dout("request for non-existent snapshot");
2927                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2928                         result = -ENXIO;
2929                         goto end_request;
2930                 }
2931
2932                 result = -EINVAL;
2933                 if (offset && length > U64_MAX - offset + 1) {
2934                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2935                                 offset, length);
2936                         goto end_request;       /* Shouldn't happen */
2937                 }
2938
2939                 result = -EIO;
2940                 if (offset + length > rbd_dev->mapping.size) {
2941                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2942                                 offset, length, rbd_dev->mapping.size);
2943                         goto end_request;
2944                 }
2945
2946                 result = -ENOMEM;
2947                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2948                                                         write_request);
2949                 if (!img_request)
2950                         goto end_request;
2951
2952                 img_request->rq = rq;
2953
2954                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2955                                                 rq->bio);
2956                 if (!result)
2957                         result = rbd_img_request_submit(img_request);
2958                 if (result)
2959                         rbd_img_request_put(img_request);
2960 end_request:
2961                 spin_lock_irq(q->queue_lock);
2962                 if (result < 0) {
2963                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2964                                 write_request ? "write" : "read",
2965                                 length, offset, result);
2966
2967                         __blk_end_request_all(rq, result);
2968                 }
2969         }
2970 }
2971
2972 /*
2973  * a queue callback. Makes sure that we don't create a bio that spans across
2974  * multiple osd objects. One exception would be with a single page bios,
2975  * which we handle later at bio_chain_clone_range()
2976  */
2977 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2978                           struct bio_vec *bvec)
2979 {
2980         struct rbd_device *rbd_dev = q->queuedata;
2981         sector_t sector_offset;
2982         sector_t sectors_per_obj;
2983         sector_t obj_sector_offset;
2984         int ret;
2985
2986         /*
2987          * Find how far into its rbd object the partition-relative
2988          * bio start sector is to offset relative to the enclosing
2989          * device.
2990          */
2991         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2992         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2993         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2994
2995         /*
2996          * Compute the number of bytes from that offset to the end
2997          * of the object.  Account for what's already used by the bio.
2998          */
2999         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3000         if (ret > bmd->bi_size)
3001                 ret -= bmd->bi_size;
3002         else
3003                 ret = 0;
3004
3005         /*
3006          * Don't send back more than was asked for.  And if the bio
3007          * was empty, let the whole thing through because:  "Note
3008          * that a block device *must* allow a single page to be
3009          * added to an empty bio."
3010          */
3011         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3012         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3013                 ret = (int) bvec->bv_len;
3014
3015         return ret;
3016 }
3017
3018 static void rbd_free_disk(struct rbd_device *rbd_dev)
3019 {
3020         struct gendisk *disk = rbd_dev->disk;
3021
3022         if (!disk)
3023                 return;
3024
3025         rbd_dev->disk = NULL;
3026         if (disk->flags & GENHD_FL_UP) {
3027                 del_gendisk(disk);
3028                 if (disk->queue)
3029                         blk_cleanup_queue(disk->queue);
3030         }
3031         put_disk(disk);
3032 }
3033
3034 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3035                                 const char *object_name,
3036                                 u64 offset, u64 length, void *buf)
3037
3038 {
3039         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3040         struct rbd_obj_request *obj_request;
3041         struct page **pages = NULL;
3042         u32 page_count;
3043         size_t size;
3044         int ret;
3045
3046         page_count = (u32) calc_pages_for(offset, length);
3047         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3048         if (IS_ERR(pages))
3049                 ret = PTR_ERR(pages);
3050
3051         ret = -ENOMEM;
3052         obj_request = rbd_obj_request_create(object_name, offset, length,
3053                                                         OBJ_REQUEST_PAGES);
3054         if (!obj_request)
3055                 goto out;
3056
3057         obj_request->pages = pages;
3058         obj_request->page_count = page_count;
3059
3060         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3061         if (!obj_request->osd_req)
3062                 goto out;
3063
3064         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3065                                         offset, length, 0, 0);
3066         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3067                                         obj_request->pages,
3068                                         obj_request->length,
3069                                         obj_request->offset & ~PAGE_MASK,
3070                                         false, false);
3071         rbd_osd_req_format_read(obj_request);
3072
3073         ret = rbd_obj_request_submit(osdc, obj_request);
3074         if (ret)
3075                 goto out;
3076         ret = rbd_obj_request_wait(obj_request);
3077         if (ret)
3078                 goto out;
3079
3080         ret = obj_request->result;
3081         if (ret < 0)
3082                 goto out;
3083
3084         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3085         size = (size_t) obj_request->xferred;
3086         ceph_copy_from_page_vector(pages, buf, 0, size);
3087         rbd_assert(size <= (size_t)INT_MAX);
3088         ret = (int)size;
3089 out:
3090         if (obj_request)
3091                 rbd_obj_request_put(obj_request);
3092         else
3093                 ceph_release_page_vector(pages, page_count);
3094
3095         return ret;
3096 }
3097
3098 /*
3099  * Read the complete header for the given rbd device.  On successful
3100  * return, the rbd_dev->header field will contain up-to-date
3101  * information about the image.
3102  */
3103 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3104 {
3105         struct rbd_image_header_ondisk *ondisk = NULL;
3106         u32 snap_count = 0;
3107         u64 names_size = 0;
3108         u32 want_count;
3109         int ret;
3110
3111         /*
3112          * The complete header will include an array of its 64-bit
3113          * snapshot ids, followed by the names of those snapshots as
3114          * a contiguous block of NUL-terminated strings.  Note that
3115          * the number of snapshots could change by the time we read
3116          * it in, in which case we re-read it.
3117          */
3118         do {
3119                 size_t size;
3120
3121                 kfree(ondisk);
3122
3123                 size = sizeof (*ondisk);
3124                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3125                 size += names_size;
3126                 ondisk = kmalloc(size, GFP_KERNEL);
3127                 if (!ondisk)
3128                         return -ENOMEM;
3129
3130                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3131                                        0, size, ondisk);
3132                 if (ret < 0)
3133                         goto out;
3134                 if ((size_t)ret < size) {
3135                         ret = -ENXIO;
3136                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3137                                 size, ret);
3138                         goto out;
3139                 }
3140                 if (!rbd_dev_ondisk_valid(ondisk)) {
3141                         ret = -ENXIO;
3142                         rbd_warn(rbd_dev, "invalid header");
3143                         goto out;
3144                 }
3145
3146                 names_size = le64_to_cpu(ondisk->snap_names_len);
3147                 want_count = snap_count;
3148                 snap_count = le32_to_cpu(ondisk->snap_count);
3149         } while (snap_count != want_count);
3150
3151         ret = rbd_header_from_disk(rbd_dev, ondisk);
3152 out:
3153         kfree(ondisk);
3154
3155         return ret;
3156 }
3157
3158 /*
3159  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3160  * has disappeared from the (just updated) snapshot context.
3161  */
3162 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3163 {
3164         u64 snap_id;
3165
3166         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3167                 return;
3168
3169         snap_id = rbd_dev->spec->snap_id;
3170         if (snap_id == CEPH_NOSNAP)
3171                 return;
3172
3173         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3174                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3175 }
3176
3177 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3178 {
3179         u64 mapping_size;
3180         int ret;
3181
3182         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3183         mapping_size = rbd_dev->mapping.size;
3184         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3185         if (rbd_dev->image_format == 1)
3186                 ret = rbd_dev_v1_header_info(rbd_dev);
3187         else
3188                 ret = rbd_dev_v2_header_info(rbd_dev);
3189
3190         /* If it's a mapped snapshot, validate its EXISTS flag */
3191
3192         rbd_exists_validate(rbd_dev);
3193         mutex_unlock(&ctl_mutex);
3194         if (mapping_size != rbd_dev->mapping.size) {
3195                 sector_t size;
3196
3197                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3198                 dout("setting size to %llu sectors", (unsigned long long)size);
3199                 set_capacity(rbd_dev->disk, size);
3200                 revalidate_disk(rbd_dev->disk);
3201         }
3202
3203         return ret;
3204 }
3205
3206 static int rbd_init_disk(struct rbd_device *rbd_dev)
3207 {
3208         struct gendisk *disk;
3209         struct request_queue *q;
3210         u64 segment_size;
3211
3212         /* create gendisk info */
3213         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3214         if (!disk)
3215                 return -ENOMEM;
3216
3217         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3218                  rbd_dev->dev_id);
3219         disk->major = rbd_dev->major;
3220         disk->first_minor = 0;
3221         disk->fops = &rbd_bd_ops;
3222         disk->private_data = rbd_dev;
3223
3224         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3225         if (!q)
3226                 goto out_disk;
3227
3228         /* We use the default size, but let's be explicit about it. */
3229         blk_queue_physical_block_size(q, SECTOR_SIZE);
3230
3231         /* set io sizes to object size */
3232         segment_size = rbd_obj_bytes(&rbd_dev->header);
3233         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3234         blk_queue_max_segment_size(q, segment_size);
3235         blk_queue_io_min(q, segment_size);
3236         blk_queue_io_opt(q, segment_size);
3237
3238         blk_queue_merge_bvec(q, rbd_merge_bvec);
3239         disk->queue = q;
3240
3241         q->queuedata = rbd_dev;
3242
3243         rbd_dev->disk = disk;
3244
3245         return 0;
3246 out_disk:
3247         put_disk(disk);
3248
3249         return -ENOMEM;
3250 }
3251
3252 /*
3253   sysfs
3254 */
3255
3256 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3257 {
3258         return container_of(dev, struct rbd_device, dev);
3259 }
3260
3261 static ssize_t rbd_size_show(struct device *dev,
3262                              struct device_attribute *attr, char *buf)
3263 {
3264         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3265
3266         return sprintf(buf, "%llu\n",
3267                 (unsigned long long)rbd_dev->mapping.size);
3268 }
3269
3270 /*
3271  * Note this shows the features for whatever's mapped, which is not
3272  * necessarily the base image.
3273  */
3274 static ssize_t rbd_features_show(struct device *dev,
3275                              struct device_attribute *attr, char *buf)
3276 {
3277         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3278
3279         return sprintf(buf, "0x%016llx\n",
3280                         (unsigned long long)rbd_dev->mapping.features);
3281 }
3282
3283 static ssize_t rbd_major_show(struct device *dev,
3284                               struct device_attribute *attr, char *buf)
3285 {
3286         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287
3288         if (rbd_dev->major)
3289                 return sprintf(buf, "%d\n", rbd_dev->major);
3290
3291         return sprintf(buf, "(none)\n");
3292
3293 }
3294
3295 static ssize_t rbd_client_id_show(struct device *dev,
3296                                   struct device_attribute *attr, char *buf)
3297 {
3298         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3299
3300         return sprintf(buf, "client%lld\n",
3301                         ceph_client_id(rbd_dev->rbd_client->client));
3302 }
3303
3304 static ssize_t rbd_pool_show(struct device *dev,
3305                              struct device_attribute *attr, char *buf)
3306 {
3307         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3308
3309         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3310 }
3311
3312 static ssize_t rbd_pool_id_show(struct device *dev,
3313                              struct device_attribute *attr, char *buf)
3314 {
3315         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3316
3317         return sprintf(buf, "%llu\n",
3318                         (unsigned long long) rbd_dev->spec->pool_id);
3319 }
3320
3321 static ssize_t rbd_name_show(struct device *dev,
3322                              struct device_attribute *attr, char *buf)
3323 {
3324         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3325
3326         if (rbd_dev->spec->image_name)
3327                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3328
3329         return sprintf(buf, "(unknown)\n");
3330 }
3331
3332 static ssize_t rbd_image_id_show(struct device *dev,
3333                              struct device_attribute *attr, char *buf)
3334 {
3335         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3336
3337         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3338 }
3339
3340 /*
3341  * Shows the name of the currently-mapped snapshot (or
3342  * RBD_SNAP_HEAD_NAME for the base image).
3343  */
3344 static ssize_t rbd_snap_show(struct device *dev,
3345                              struct device_attribute *attr,
3346                              char *buf)
3347 {
3348         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3349
3350         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3351 }
3352
3353 /*
3354  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3355  * for the parent image.  If there is no parent, simply shows
3356  * "(no parent image)".
3357  */
3358 static ssize_t rbd_parent_show(struct device *dev,
3359                              struct device_attribute *attr,
3360                              char *buf)
3361 {
3362         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3363         struct rbd_spec *spec = rbd_dev->parent_spec;
3364         int count;
3365         char *bufp = buf;
3366
3367         if (!spec)
3368                 return sprintf(buf, "(no parent image)\n");
3369
3370         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3371                         (unsigned long long) spec->pool_id, spec->pool_name);
3372         if (count < 0)
3373                 return count;
3374         bufp += count;
3375
3376         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3377                         spec->image_name ? spec->image_name : "(unknown)");
3378         if (count < 0)
3379                 return count;
3380         bufp += count;
3381
3382         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3383                         (unsigned long long) spec->snap_id, spec->snap_name);
3384         if (count < 0)
3385                 return count;
3386         bufp += count;
3387
3388         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3389         if (count < 0)
3390                 return count;
3391         bufp += count;
3392
3393         return (ssize_t) (bufp - buf);
3394 }
3395
3396 static ssize_t rbd_image_refresh(struct device *dev,
3397                                  struct device_attribute *attr,
3398                                  const char *buf,
3399                                  size_t size)
3400 {
3401         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3402         int ret;
3403
3404         ret = rbd_dev_refresh(rbd_dev);
3405         if (ret)
3406                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3407
3408         return ret < 0 ? ret : size;
3409 }
3410
3411 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3412 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3413 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3414 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3415 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3416 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3417 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3418 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3419 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3420 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3421 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3422
3423 static struct attribute *rbd_attrs[] = {
3424         &dev_attr_size.attr,
3425         &dev_attr_features.attr,
3426         &dev_attr_major.attr,
3427         &dev_attr_client_id.attr,
3428         &dev_attr_pool.attr,
3429         &dev_attr_pool_id.attr,
3430         &dev_attr_name.attr,
3431         &dev_attr_image_id.attr,
3432         &dev_attr_current_snap.attr,
3433         &dev_attr_parent.attr,
3434         &dev_attr_refresh.attr,
3435         NULL
3436 };
3437
3438 static struct attribute_group rbd_attr_group = {
3439         .attrs = rbd_attrs,
3440 };
3441
3442 static const struct attribute_group *rbd_attr_groups[] = {
3443         &rbd_attr_group,
3444         NULL
3445 };
3446
3447 static void rbd_sysfs_dev_release(struct device *dev)
3448 {
3449 }
3450
3451 static struct device_type rbd_device_type = {
3452         .name           = "rbd",
3453         .groups         = rbd_attr_groups,
3454         .release        = rbd_sysfs_dev_release,
3455 };
3456
3457 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3458 {
3459         kref_get(&spec->kref);
3460
3461         return spec;
3462 }
3463
3464 static void rbd_spec_free(struct kref *kref);
3465 static void rbd_spec_put(struct rbd_spec *spec)
3466 {
3467         if (spec)
3468                 kref_put(&spec->kref, rbd_spec_free);
3469 }
3470
3471 static struct rbd_spec *rbd_spec_alloc(void)
3472 {
3473         struct rbd_spec *spec;
3474
3475         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3476         if (!spec)
3477                 return NULL;
3478         kref_init(&spec->kref);
3479
3480         return spec;
3481 }
3482
3483 static void rbd_spec_free(struct kref *kref)
3484 {
3485         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3486
3487         kfree(spec->pool_name);
3488         kfree(spec->image_id);
3489         kfree(spec->image_name);
3490         kfree(spec->snap_name);
3491         kfree(spec);
3492 }
3493
3494 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3495                                 struct rbd_spec *spec)
3496 {
3497         struct rbd_device *rbd_dev;
3498
3499         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3500         if (!rbd_dev)
3501                 return NULL;
3502
3503         spin_lock_init(&rbd_dev->lock);
3504         rbd_dev->flags = 0;
3505         INIT_LIST_HEAD(&rbd_dev->node);
3506         init_rwsem(&rbd_dev->header_rwsem);
3507
3508         rbd_dev->spec = spec;
3509         rbd_dev->rbd_client = rbdc;
3510
3511         /* Initialize the layout used for all rbd requests */
3512
3513         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3514         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3515         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3516         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3517
3518         return rbd_dev;
3519 }
3520
3521 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3522 {
3523         rbd_put_client(rbd_dev->rbd_client);
3524         rbd_spec_put(rbd_dev->spec);
3525         kfree(rbd_dev);
3526 }
3527
3528 /*
3529  * Get the size and object order for an image snapshot, or if
3530  * snap_id is CEPH_NOSNAP, gets this information for the base
3531  * image.
3532  */
3533 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3534                                 u8 *order, u64 *snap_size)
3535 {
3536         __le64 snapid = cpu_to_le64(snap_id);
3537         int ret;
3538         struct {
3539                 u8 order;
3540                 __le64 size;
3541         } __attribute__ ((packed)) size_buf = { 0 };
3542
3543         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3544                                 "rbd", "get_size",
3545                                 &snapid, sizeof (snapid),
3546                                 &size_buf, sizeof (size_buf));
3547         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3548         if (ret < 0)
3549                 return ret;
3550         if (ret < sizeof (size_buf))
3551                 return -ERANGE;
3552
3553         if (order)
3554                 *order = size_buf.order;
3555         *snap_size = le64_to_cpu(size_buf.size);
3556
3557         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3558                 (unsigned long long)snap_id, (unsigned int)*order,
3559                 (unsigned long long)*snap_size);
3560
3561         return 0;
3562 }
3563
3564 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3565 {
3566         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3567                                         &rbd_dev->header.obj_order,
3568                                         &rbd_dev->header.image_size);
3569 }
3570
3571 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3572 {
3573         void *reply_buf;
3574         int ret;
3575         void *p;
3576
3577         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3578         if (!reply_buf)
3579                 return -ENOMEM;
3580
3581         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3582                                 "rbd", "get_object_prefix", NULL, 0,
3583                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3584         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3585         if (ret < 0)
3586                 goto out;
3587
3588         p = reply_buf;
3589         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3590                                                 p + ret, NULL, GFP_NOIO);
3591         ret = 0;
3592
3593         if (IS_ERR(rbd_dev->header.object_prefix)) {
3594                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3595                 rbd_dev->header.object_prefix = NULL;
3596         } else {
3597                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3598         }
3599 out:
3600         kfree(reply_buf);
3601
3602         return ret;
3603 }
3604
3605 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3606                 u64 *snap_features)
3607 {
3608         __le64 snapid = cpu_to_le64(snap_id);
3609         struct {
3610                 __le64 features;
3611                 __le64 incompat;
3612         } __attribute__ ((packed)) features_buf = { 0 };
3613         u64 incompat;
3614         int ret;
3615
3616         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3617                                 "rbd", "get_features",
3618                                 &snapid, sizeof (snapid),
3619                                 &features_buf, sizeof (features_buf));
3620         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3621         if (ret < 0)
3622                 return ret;
3623         if (ret < sizeof (features_buf))
3624                 return -ERANGE;
3625
3626         incompat = le64_to_cpu(features_buf.incompat);
3627         if (incompat & ~RBD_FEATURES_SUPPORTED)
3628                 return -ENXIO;
3629
3630         *snap_features = le64_to_cpu(features_buf.features);
3631
3632         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3633                 (unsigned long long)snap_id,
3634                 (unsigned long long)*snap_features,
3635                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3636
3637         return 0;
3638 }
3639
3640 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3641 {
3642         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3643                                                 &rbd_dev->header.features);
3644 }
3645
3646 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3647 {
3648         struct rbd_spec *parent_spec;
3649         size_t size;
3650         void *reply_buf = NULL;
3651         __le64 snapid;
3652         void *p;
3653         void *end;
3654         u64 pool_id;
3655         char *image_id;
3656         u64 overlap;
3657         int ret;
3658
3659         parent_spec = rbd_spec_alloc();
3660         if (!parent_spec)
3661                 return -ENOMEM;
3662
3663         size = sizeof (__le64) +                                /* pool_id */
3664                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3665                 sizeof (__le64) +                               /* snap_id */
3666                 sizeof (__le64);                                /* overlap */
3667         reply_buf = kmalloc(size, GFP_KERNEL);
3668         if (!reply_buf) {
3669                 ret = -ENOMEM;
3670                 goto out_err;
3671         }
3672
3673         snapid = cpu_to_le64(CEPH_NOSNAP);
3674         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3675                                 "rbd", "get_parent",
3676                                 &snapid, sizeof (snapid),
3677                                 reply_buf, size);
3678         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3679         if (ret < 0)
3680                 goto out_err;
3681
3682         p = reply_buf;
3683         end = reply_buf + ret;
3684         ret = -ERANGE;
3685         ceph_decode_64_safe(&p, end, pool_id, out_err);
3686         if (pool_id == CEPH_NOPOOL)
3687                 goto out;       /* No parent?  No problem. */
3688
3689         /* The ceph file layout needs to fit pool id in 32 bits */
3690
3691         ret = -EIO;
3692         if (pool_id > (u64)U32_MAX) {
3693                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3694                         (unsigned long long)pool_id, U32_MAX);
3695                 goto out_err;
3696         }
3697         parent_spec->pool_id = pool_id;
3698
3699         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3700         if (IS_ERR(image_id)) {
3701                 ret = PTR_ERR(image_id);
3702                 goto out_err;
3703         }
3704         parent_spec->image_id = image_id;
3705         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3706         ceph_decode_64_safe(&p, end, overlap, out_err);
3707
3708         if (overlap) {
3709                 rbd_spec_put(rbd_dev->parent_spec);
3710                 rbd_dev->parent_spec = parent_spec;
3711                 parent_spec = NULL;     /* rbd_dev now owns this */
3712                 rbd_dev->parent_overlap = overlap;
3713         } else {
3714                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3715         }
3716 out:
3717         ret = 0;
3718 out_err:
3719         kfree(reply_buf);
3720         rbd_spec_put(parent_spec);
3721
3722         return ret;
3723 }
3724
3725 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3726 {
3727         struct {
3728                 __le64 stripe_unit;
3729                 __le64 stripe_count;
3730         } __attribute__ ((packed)) striping_info_buf = { 0 };
3731         size_t size = sizeof (striping_info_buf);
3732         void *p;
3733         u64 obj_size;
3734         u64 stripe_unit;
3735         u64 stripe_count;
3736         int ret;
3737
3738         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3739                                 "rbd", "get_stripe_unit_count", NULL, 0,
3740                                 (char *)&striping_info_buf, size);
3741         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3742         if (ret < 0)
3743                 return ret;
3744         if (ret < size)
3745                 return -ERANGE;
3746
3747         /*
3748          * We don't actually support the "fancy striping" feature
3749          * (STRIPINGV2) yet, but if the striping sizes are the
3750          * defaults the behavior is the same as before.  So find
3751          * out, and only fail if the image has non-default values.
3752          */
3753         ret = -EINVAL;
3754         obj_size = (u64)1 << rbd_dev->header.obj_order;
3755         p = &striping_info_buf;
3756         stripe_unit = ceph_decode_64(&p);
3757         if (stripe_unit != obj_size) {
3758                 rbd_warn(rbd_dev, "unsupported stripe unit "
3759                                 "(got %llu want %llu)",
3760                                 stripe_unit, obj_size);
3761                 return -EINVAL;
3762         }
3763         stripe_count = ceph_decode_64(&p);
3764         if (stripe_count != 1) {
3765                 rbd_warn(rbd_dev, "unsupported stripe count "
3766                                 "(got %llu want 1)", stripe_count);
3767                 return -EINVAL;
3768         }
3769         rbd_dev->header.stripe_unit = stripe_unit;
3770         rbd_dev->header.stripe_count = stripe_count;
3771
3772         return 0;
3773 }
3774
3775 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3776 {
3777         size_t image_id_size;
3778         char *image_id;
3779         void *p;
3780         void *end;
3781         size_t size;
3782         void *reply_buf = NULL;
3783         size_t len = 0;
3784         char *image_name = NULL;
3785         int ret;
3786
3787         rbd_assert(!rbd_dev->spec->image_name);
3788
3789         len = strlen(rbd_dev->spec->image_id);
3790         image_id_size = sizeof (__le32) + len;
3791         image_id = kmalloc(image_id_size, GFP_KERNEL);
3792         if (!image_id)
3793                 return NULL;
3794
3795         p = image_id;
3796         end = image_id + image_id_size;
3797         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3798
3799         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3800         reply_buf = kmalloc(size, GFP_KERNEL);
3801         if (!reply_buf)
3802                 goto out;
3803
3804         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3805                                 "rbd", "dir_get_name",
3806                                 image_id, image_id_size,
3807                                 reply_buf, size);
3808         if (ret < 0)
3809                 goto out;
3810         p = reply_buf;
3811         end = reply_buf + ret;
3812
3813         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3814         if (IS_ERR(image_name))
3815                 image_name = NULL;
3816         else
3817                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3818 out:
3819         kfree(reply_buf);
3820         kfree(image_id);
3821
3822         return image_name;
3823 }
3824
3825 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3826 {
3827         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3828         const char *snap_name;
3829         u32 which = 0;
3830
3831         /* Skip over names until we find the one we are looking for */
3832
3833         snap_name = rbd_dev->header.snap_names;
3834         while (which < snapc->num_snaps) {
3835                 if (!strcmp(name, snap_name))
3836                         return snapc->snaps[which];
3837                 snap_name += strlen(snap_name) + 1;
3838                 which++;
3839         }
3840         return CEPH_NOSNAP;
3841 }
3842
3843 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3844 {
3845         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3846         u32 which;
3847         bool found = false;
3848         u64 snap_id;
3849
3850         for (which = 0; !found && which < snapc->num_snaps; which++) {
3851                 const char *snap_name;
3852
3853                 snap_id = snapc->snaps[which];
3854                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3855                 if (IS_ERR(snap_name))
3856                         break;
3857                 found = !strcmp(name, snap_name);
3858                 kfree(snap_name);
3859         }
3860         return found ? snap_id : CEPH_NOSNAP;
3861 }
3862
3863 /*
3864  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3865  * no snapshot by that name is found, or if an error occurs.
3866  */
3867 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3868 {
3869         if (rbd_dev->image_format == 1)
3870                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3871
3872         return rbd_v2_snap_id_by_name(rbd_dev, name);
3873 }
3874
3875 /*
3876  * When an rbd image has a parent image, it is identified by the
3877  * pool, image, and snapshot ids (not names).  This function fills
3878  * in the names for those ids.  (It's OK if we can't figure out the
3879  * name for an image id, but the pool and snapshot ids should always
3880  * exist and have names.)  All names in an rbd spec are dynamically
3881  * allocated.
3882  *
3883  * When an image being mapped (not a parent) is probed, we have the
3884  * pool name and pool id, image name and image id, and the snapshot
3885  * name.  The only thing we're missing is the snapshot id.
3886  */
3887 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3888 {
3889         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3890         struct rbd_spec *spec = rbd_dev->spec;
3891         const char *pool_name;
3892         const char *image_name;
3893         const char *snap_name;
3894         int ret;
3895
3896         /*
3897          * An image being mapped will have the pool name (etc.), but
3898          * we need to look up the snapshot id.
3899          */
3900         if (spec->pool_name) {
3901                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3902                         u64 snap_id;
3903
3904                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3905                         if (snap_id == CEPH_NOSNAP)
3906                                 return -ENOENT;
3907                         spec->snap_id = snap_id;
3908                 } else {
3909                         spec->snap_id = CEPH_NOSNAP;
3910                 }
3911
3912                 return 0;
3913         }
3914
3915         /* Get the pool name; we have to make our own copy of this */
3916
3917         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3918         if (!pool_name) {
3919                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3920                 return -EIO;
3921         }
3922         pool_name = kstrdup(pool_name, GFP_KERNEL);
3923         if (!pool_name)
3924                 return -ENOMEM;
3925
3926         /* Fetch the image name; tolerate failure here */
3927
3928         image_name = rbd_dev_image_name(rbd_dev);
3929         if (!image_name)
3930                 rbd_warn(rbd_dev, "unable to get image name");
3931
3932         /* Look up the snapshot name, and make a copy */
3933
3934         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3935         if (!snap_name) {
3936                 ret = -ENOMEM;
3937                 goto out_err;
3938         }
3939
3940         spec->pool_name = pool_name;
3941         spec->image_name = image_name;
3942         spec->snap_name = snap_name;
3943
3944         return 0;
3945 out_err:
3946         kfree(image_name);
3947         kfree(pool_name);
3948
3949         return ret;
3950 }
3951
3952 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3953 {
3954         size_t size;
3955         int ret;
3956         void *reply_buf;
3957         void *p;
3958         void *end;
3959         u64 seq;
3960         u32 snap_count;
3961         struct ceph_snap_context *snapc;
3962         u32 i;
3963
3964         /*
3965          * We'll need room for the seq value (maximum snapshot id),
3966          * snapshot count, and array of that many snapshot ids.
3967          * For now we have a fixed upper limit on the number we're
3968          * prepared to receive.
3969          */
3970         size = sizeof (__le64) + sizeof (__le32) +
3971                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3972         reply_buf = kzalloc(size, GFP_KERNEL);
3973         if (!reply_buf)
3974                 return -ENOMEM;
3975
3976         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3977                                 "rbd", "get_snapcontext", NULL, 0,
3978                                 reply_buf, size);
3979         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3980         if (ret < 0)
3981                 goto out;
3982
3983         p = reply_buf;
3984         end = reply_buf + ret;
3985         ret = -ERANGE;
3986         ceph_decode_64_safe(&p, end, seq, out);
3987         ceph_decode_32_safe(&p, end, snap_count, out);
3988
3989         /*
3990          * Make sure the reported number of snapshot ids wouldn't go
3991          * beyond the end of our buffer.  But before checking that,
3992          * make sure the computed size of the snapshot context we
3993          * allocate is representable in a size_t.
3994          */
3995         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3996                                  / sizeof (u64)) {
3997                 ret = -EINVAL;
3998                 goto out;
3999         }
4000         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4001                 goto out;
4002         ret = 0;
4003
4004         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4005         if (!snapc) {
4006                 ret = -ENOMEM;
4007                 goto out;
4008         }
4009         snapc->seq = seq;
4010         for (i = 0; i < snap_count; i++)
4011                 snapc->snaps[i] = ceph_decode_64(&p);
4012
4013         ceph_put_snap_context(rbd_dev->header.snapc);
4014         rbd_dev->header.snapc = snapc;
4015
4016         dout("  snap context seq = %llu, snap_count = %u\n",
4017                 (unsigned long long)seq, (unsigned int)snap_count);
4018 out:
4019         kfree(reply_buf);
4020
4021         return ret;
4022 }
4023
4024 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4025                                         u64 snap_id)
4026 {
4027         size_t size;
4028         void *reply_buf;
4029         __le64 snapid;
4030         int ret;
4031         void *p;
4032         void *end;
4033         char *snap_name;
4034
4035         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4036         reply_buf = kmalloc(size, GFP_KERNEL);
4037         if (!reply_buf)
4038                 return ERR_PTR(-ENOMEM);
4039
4040         snapid = cpu_to_le64(snap_id);
4041         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4042                                 "rbd", "get_snapshot_name",
4043                                 &snapid, sizeof (snapid),
4044                                 reply_buf, size);
4045         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4046         if (ret < 0) {
4047                 snap_name = ERR_PTR(ret);
4048                 goto out;
4049         }
4050
4051         p = reply_buf;
4052         end = reply_buf + ret;
4053         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4054         if (IS_ERR(snap_name))
4055                 goto out;
4056
4057         dout("  snap_id 0x%016llx snap_name = %s\n",
4058                 (unsigned long long)snap_id, snap_name);
4059 out:
4060         kfree(reply_buf);
4061
4062         return snap_name;
4063 }
4064
4065 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4066 {
4067         bool first_time = rbd_dev->header.object_prefix == NULL;
4068         int ret;
4069
4070         down_write(&rbd_dev->header_rwsem);
4071
4072         if (first_time) {
4073                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4074                 if (ret)
4075                         goto out;
4076         }
4077
4078         /*
4079          * If the image supports layering, get the parent info.  We
4080          * need to probe the first time regardless.  Thereafter we
4081          * only need to if there's a parent, to see if it has
4082          * disappeared due to the mapped image getting flattened.
4083          */
4084         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4085                         (first_time || rbd_dev->parent_spec)) {
4086                 bool warn;
4087
4088                 ret = rbd_dev_v2_parent_info(rbd_dev);
4089                 if (ret)
4090                         goto out;
4091
4092                 /*
4093                  * Print a warning if this is the initial probe and
4094                  * the image has a parent.  Don't print it if the
4095                  * image now being probed is itself a parent.  We
4096                  * can tell at this point because we won't know its
4097                  * pool name yet (just its pool id).
4098                  */
4099                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4100                 if (first_time && warn)
4101                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4102                                         "is EXPERIMENTAL!");
4103         }
4104
4105         ret = rbd_dev_v2_image_size(rbd_dev);
4106         if (ret)
4107                 goto out;
4108
4109         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4110                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4111                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4112
4113         ret = rbd_dev_v2_snap_context(rbd_dev);
4114         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4115 out:
4116         up_write(&rbd_dev->header_rwsem);
4117
4118         return ret;
4119 }
4120
4121 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4122 {
4123         struct device *dev;
4124         int ret;
4125
4126         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4127
4128         dev = &rbd_dev->dev;
4129         dev->bus = &rbd_bus_type;
4130         dev->type = &rbd_device_type;
4131         dev->parent = &rbd_root_dev;
4132         dev->release = rbd_dev_device_release;
4133         dev_set_name(dev, "%d", rbd_dev->dev_id);
4134         ret = device_register(dev);
4135
4136         mutex_unlock(&ctl_mutex);
4137
4138         return ret;
4139 }
4140
4141 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4142 {
4143         device_unregister(&rbd_dev->dev);
4144 }
4145
4146 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4147
4148 /*
4149  * Get a unique rbd identifier for the given new rbd_dev, and add
4150  * the rbd_dev to the global list.  The minimum rbd id is 1.
4151  */
4152 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4153 {
4154         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4155
4156         spin_lock(&rbd_dev_list_lock);
4157         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4158         spin_unlock(&rbd_dev_list_lock);
4159         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4160                 (unsigned long long) rbd_dev->dev_id);
4161 }
4162
4163 /*
4164  * Remove an rbd_dev from the global list, and record that its
4165  * identifier is no longer in use.
4166  */
4167 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4168 {
4169         struct list_head *tmp;
4170         int rbd_id = rbd_dev->dev_id;
4171         int max_id;
4172
4173         rbd_assert(rbd_id > 0);
4174
4175         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4176                 (unsigned long long) rbd_dev->dev_id);
4177         spin_lock(&rbd_dev_list_lock);
4178         list_del_init(&rbd_dev->node);
4179
4180         /*
4181          * If the id being "put" is not the current maximum, there
4182          * is nothing special we need to do.
4183          */
4184         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4185                 spin_unlock(&rbd_dev_list_lock);
4186                 return;
4187         }
4188
4189         /*
4190          * We need to update the current maximum id.  Search the
4191          * list to find out what it is.  We're more likely to find
4192          * the maximum at the end, so search the list backward.
4193          */
4194         max_id = 0;
4195         list_for_each_prev(tmp, &rbd_dev_list) {
4196                 struct rbd_device *rbd_dev;
4197
4198                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4199                 if (rbd_dev->dev_id > max_id)
4200                         max_id = rbd_dev->dev_id;
4201         }
4202         spin_unlock(&rbd_dev_list_lock);
4203
4204         /*
4205          * The max id could have been updated by rbd_dev_id_get(), in
4206          * which case it now accurately reflects the new maximum.
4207          * Be careful not to overwrite the maximum value in that
4208          * case.
4209          */
4210         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4211         dout("  max dev id has been reset\n");
4212 }
4213
4214 /*
4215  * Skips over white space at *buf, and updates *buf to point to the
4216  * first found non-space character (if any). Returns the length of
4217  * the token (string of non-white space characters) found.  Note
4218  * that *buf must be terminated with '\0'.
4219  */
4220 static inline size_t next_token(const char **buf)
4221 {
4222         /*
4223         * These are the characters that produce nonzero for
4224         * isspace() in the "C" and "POSIX" locales.
4225         */
4226         const char *spaces = " \f\n\r\t\v";
4227
4228         *buf += strspn(*buf, spaces);   /* Find start of token */
4229
4230         return strcspn(*buf, spaces);   /* Return token length */
4231 }
4232
4233 /*
4234  * Finds the next token in *buf, and if the provided token buffer is
4235  * big enough, copies the found token into it.  The result, if
4236  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4237  * must be terminated with '\0' on entry.
4238  *
4239  * Returns the length of the token found (not including the '\0').
4240  * Return value will be 0 if no token is found, and it will be >=
4241  * token_size if the token would not fit.
4242  *
4243  * The *buf pointer will be updated to point beyond the end of the
4244  * found token.  Note that this occurs even if the token buffer is
4245  * too small to hold it.
4246  */
4247 static inline size_t copy_token(const char **buf,
4248                                 char *token,
4249                                 size_t token_size)
4250 {
4251         size_t len;
4252
4253         len = next_token(buf);
4254         if (len < token_size) {
4255                 memcpy(token, *buf, len);
4256                 *(token + len) = '\0';
4257         }
4258         *buf += len;
4259
4260         return len;
4261 }
4262
4263 /*
4264  * Finds the next token in *buf, dynamically allocates a buffer big
4265  * enough to hold a copy of it, and copies the token into the new
4266  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4267  * that a duplicate buffer is created even for a zero-length token.
4268  *
4269  * Returns a pointer to the newly-allocated duplicate, or a null
4270  * pointer if memory for the duplicate was not available.  If
4271  * the lenp argument is a non-null pointer, the length of the token
4272  * (not including the '\0') is returned in *lenp.
4273  *
4274  * If successful, the *buf pointer will be updated to point beyond
4275  * the end of the found token.
4276  *
4277  * Note: uses GFP_KERNEL for allocation.
4278  */
4279 static inline char *dup_token(const char **buf, size_t *lenp)
4280 {
4281         char *dup;
4282         size_t len;
4283
4284         len = next_token(buf);
4285         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4286         if (!dup)
4287                 return NULL;
4288         *(dup + len) = '\0';
4289         *buf += len;
4290
4291         if (lenp)
4292                 *lenp = len;
4293
4294         return dup;
4295 }
4296
4297 /*
4298  * Parse the options provided for an "rbd add" (i.e., rbd image
4299  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4300  * and the data written is passed here via a NUL-terminated buffer.
4301  * Returns 0 if successful or an error code otherwise.
4302  *
4303  * The information extracted from these options is recorded in
4304  * the other parameters which return dynamically-allocated
4305  * structures:
4306  *  ceph_opts
4307  *      The address of a pointer that will refer to a ceph options
4308  *      structure.  Caller must release the returned pointer using
4309  *      ceph_destroy_options() when it is no longer needed.
4310  *  rbd_opts
4311  *      Address of an rbd options pointer.  Fully initialized by
4312  *      this function; caller must release with kfree().
4313  *  spec
4314  *      Address of an rbd image specification pointer.  Fully
4315  *      initialized by this function based on parsed options.
4316  *      Caller must release with rbd_spec_put().
4317  *
4318  * The options passed take this form:
4319  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4320  * where:
4321  *  <mon_addrs>
4322  *      A comma-separated list of one or more monitor addresses.
4323  *      A monitor address is an ip address, optionally followed
4324  *      by a port number (separated by a colon).
4325  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4326  *  <options>
4327  *      A comma-separated list of ceph and/or rbd options.
4328  *  <pool_name>
4329  *      The name of the rados pool containing the rbd image.
4330  *  <image_name>
4331  *      The name of the image in that pool to map.
4332  *  <snap_id>
4333  *      An optional snapshot id.  If provided, the mapping will
4334  *      present data from the image at the time that snapshot was
4335  *      created.  The image head is used if no snapshot id is
4336  *      provided.  Snapshot mappings are always read-only.
4337  */
4338 static int rbd_add_parse_args(const char *buf,
4339                                 struct ceph_options **ceph_opts,
4340                                 struct rbd_options **opts,
4341                                 struct rbd_spec **rbd_spec)
4342 {
4343         size_t len;
4344         char *options;
4345         const char *mon_addrs;
4346         char *snap_name;
4347         size_t mon_addrs_size;
4348         struct rbd_spec *spec = NULL;
4349         struct rbd_options *rbd_opts = NULL;
4350         struct ceph_options *copts;
4351         int ret;
4352
4353         /* The first four tokens are required */
4354
4355         len = next_token(&buf);
4356         if (!len) {
4357                 rbd_warn(NULL, "no monitor address(es) provided");
4358                 return -EINVAL;
4359         }
4360         mon_addrs = buf;
4361         mon_addrs_size = len + 1;
4362         buf += len;
4363
4364         ret = -EINVAL;
4365         options = dup_token(&buf, NULL);
4366         if (!options)
4367                 return -ENOMEM;
4368         if (!*options) {
4369                 rbd_warn(NULL, "no options provided");
4370                 goto out_err;
4371         }
4372
4373         spec = rbd_spec_alloc();
4374         if (!spec)
4375                 goto out_mem;
4376
4377         spec->pool_name = dup_token(&buf, NULL);
4378         if (!spec->pool_name)
4379                 goto out_mem;
4380         if (!*spec->pool_name) {
4381                 rbd_warn(NULL, "no pool name provided");
4382                 goto out_err;
4383         }
4384
4385         spec->image_name = dup_token(&buf, NULL);
4386         if (!spec->image_name)
4387                 goto out_mem;
4388         if (!*spec->image_name) {
4389                 rbd_warn(NULL, "no image name provided");
4390                 goto out_err;
4391         }
4392
4393         /*
4394          * Snapshot name is optional; default is to use "-"
4395          * (indicating the head/no snapshot).
4396          */
4397         len = next_token(&buf);
4398         if (!len) {
4399                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4400                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4401         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4402                 ret = -ENAMETOOLONG;
4403                 goto out_err;
4404         }
4405         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4406         if (!snap_name)
4407                 goto out_mem;
4408         *(snap_name + len) = '\0';
4409         spec->snap_name = snap_name;
4410
4411         /* Initialize all rbd options to the defaults */
4412
4413         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4414         if (!rbd_opts)
4415                 goto out_mem;
4416
4417         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4418
4419         copts = ceph_parse_options(options, mon_addrs,
4420                                         mon_addrs + mon_addrs_size - 1,
4421                                         parse_rbd_opts_token, rbd_opts);
4422         if (IS_ERR(copts)) {
4423                 ret = PTR_ERR(copts);
4424                 goto out_err;
4425         }
4426         kfree(options);
4427
4428         *ceph_opts = copts;
4429         *opts = rbd_opts;
4430         *rbd_spec = spec;
4431
4432         return 0;
4433 out_mem:
4434         ret = -ENOMEM;
4435 out_err:
4436         kfree(rbd_opts);
4437         rbd_spec_put(spec);
4438         kfree(options);
4439
4440         return ret;
4441 }
4442
4443 /*
4444  * An rbd format 2 image has a unique identifier, distinct from the
4445  * name given to it by the user.  Internally, that identifier is
4446  * what's used to specify the names of objects related to the image.
4447  *
4448  * A special "rbd id" object is used to map an rbd image name to its
4449  * id.  If that object doesn't exist, then there is no v2 rbd image
4450  * with the supplied name.
4451  *
4452  * This function will record the given rbd_dev's image_id field if
4453  * it can be determined, and in that case will return 0.  If any
4454  * errors occur a negative errno will be returned and the rbd_dev's
4455  * image_id field will be unchanged (and should be NULL).
4456  */
4457 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4458 {
4459         int ret;
4460         size_t size;
4461         char *object_name;
4462         void *response;
4463         char *image_id;
4464
4465         /*
4466          * When probing a parent image, the image id is already
4467          * known (and the image name likely is not).  There's no
4468          * need to fetch the image id again in this case.  We
4469          * do still need to set the image format though.
4470          */
4471         if (rbd_dev->spec->image_id) {
4472                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4473
4474                 return 0;
4475         }
4476
4477         /*
4478          * First, see if the format 2 image id file exists, and if
4479          * so, get the image's persistent id from it.
4480          */
4481         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4482         object_name = kmalloc(size, GFP_NOIO);
4483         if (!object_name)
4484                 return -ENOMEM;
4485         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4486         dout("rbd id object name is %s\n", object_name);
4487
4488         /* Response will be an encoded string, which includes a length */
4489
4490         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4491         response = kzalloc(size, GFP_NOIO);
4492         if (!response) {
4493                 ret = -ENOMEM;
4494                 goto out;
4495         }
4496
4497         /* If it doesn't exist we'll assume it's a format 1 image */
4498
4499         ret = rbd_obj_method_sync(rbd_dev, object_name,
4500                                 "rbd", "get_id", NULL, 0,
4501                                 response, RBD_IMAGE_ID_LEN_MAX);
4502         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4503         if (ret == -ENOENT) {
4504                 image_id = kstrdup("", GFP_KERNEL);
4505                 ret = image_id ? 0 : -ENOMEM;
4506                 if (!ret)
4507                         rbd_dev->image_format = 1;
4508         } else if (ret > sizeof (__le32)) {
4509                 void *p = response;
4510
4511                 image_id = ceph_extract_encoded_string(&p, p + ret,
4512                                                 NULL, GFP_NOIO);
4513                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4514                 if (!ret)
4515                         rbd_dev->image_format = 2;
4516         } else {
4517                 ret = -EINVAL;
4518         }
4519
4520         if (!ret) {
4521                 rbd_dev->spec->image_id = image_id;
4522                 dout("image_id is %s\n", image_id);
4523         }
4524 out:
4525         kfree(response);
4526         kfree(object_name);
4527
4528         return ret;
4529 }
4530
4531 /* Undo whatever state changes are made by v1 or v2 image probe */
4532
4533 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4534 {
4535         struct rbd_image_header *header;
4536
4537         rbd_dev_unparent(rbd_dev);
4538
4539         /* Free dynamic fields from the header, then zero it out */
4540
4541         header = &rbd_dev->header;
4542         ceph_put_snap_context(header->snapc);
4543         kfree(header->snap_sizes);
4544         kfree(header->snap_names);
4545         kfree(header->object_prefix);
4546         memset(header, 0, sizeof (*header));
4547 }
4548
4549 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4550 {
4551         int ret;
4552
4553         ret = rbd_dev_v2_object_prefix(rbd_dev);
4554         if (ret)
4555                 goto out_err;
4556
4557         /*
4558          * Get the and check features for the image.  Currently the
4559          * features are assumed to never change.
4560          */
4561         ret = rbd_dev_v2_features(rbd_dev);
4562         if (ret)
4563                 goto out_err;
4564
4565         /* If the image supports fancy striping, get its parameters */
4566
4567         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4568                 ret = rbd_dev_v2_striping_info(rbd_dev);
4569                 if (ret < 0)
4570                         goto out_err;
4571         }
4572         /* No support for crypto and compression type format 2 images */
4573
4574         return 0;
4575 out_err:
4576         rbd_dev->header.features = 0;
4577         kfree(rbd_dev->header.object_prefix);
4578         rbd_dev->header.object_prefix = NULL;
4579
4580         return ret;
4581 }
4582
4583 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4584 {
4585         struct rbd_device *parent = NULL;
4586         struct rbd_spec *parent_spec;
4587         struct rbd_client *rbdc;
4588         int ret;
4589
4590         if (!rbd_dev->parent_spec)
4591                 return 0;
4592         /*
4593          * We need to pass a reference to the client and the parent
4594          * spec when creating the parent rbd_dev.  Images related by
4595          * parent/child relationships always share both.
4596          */
4597         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4598         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4599
4600         ret = -ENOMEM;
4601         parent = rbd_dev_create(rbdc, parent_spec);
4602         if (!parent)
4603                 goto out_err;
4604
4605         ret = rbd_dev_image_probe(parent, false);
4606         if (ret < 0)
4607                 goto out_err;
4608         rbd_dev->parent = parent;
4609
4610         return 0;
4611 out_err:
4612         if (parent) {
4613                 rbd_dev_unparent(rbd_dev);
4614                 kfree(rbd_dev->header_name);
4615                 rbd_dev_destroy(parent);
4616         } else {
4617                 rbd_put_client(rbdc);
4618                 rbd_spec_put(parent_spec);
4619         }
4620
4621         return ret;
4622 }
4623
4624 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4625 {
4626         int ret;
4627
4628         /* generate unique id: find highest unique id, add one */
4629         rbd_dev_id_get(rbd_dev);
4630
4631         /* Fill in the device name, now that we have its id. */
4632         BUILD_BUG_ON(DEV_NAME_LEN
4633                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4634         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4635
4636         /* Get our block major device number. */
4637
4638         ret = register_blkdev(0, rbd_dev->name);
4639         if (ret < 0)
4640                 goto err_out_id;
4641         rbd_dev->major = ret;
4642
4643         /* Set up the blkdev mapping. */
4644
4645         ret = rbd_init_disk(rbd_dev);
4646         if (ret)
4647                 goto err_out_blkdev;
4648
4649         ret = rbd_dev_mapping_set(rbd_dev);
4650         if (ret)
4651                 goto err_out_disk;
4652         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4653
4654         ret = rbd_bus_add_dev(rbd_dev);
4655         if (ret)
4656                 goto err_out_mapping;
4657
4658         /* Everything's ready.  Announce the disk to the world. */
4659
4660         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4661         add_disk(rbd_dev->disk);
4662
4663         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4664                 (unsigned long long) rbd_dev->mapping.size);
4665
4666         return ret;
4667
4668 err_out_mapping:
4669         rbd_dev_mapping_clear(rbd_dev);
4670 err_out_disk:
4671         rbd_free_disk(rbd_dev);
4672 err_out_blkdev:
4673         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4674 err_out_id:
4675         rbd_dev_id_put(rbd_dev);
4676         rbd_dev_mapping_clear(rbd_dev);
4677
4678         return ret;
4679 }
4680
4681 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4682 {
4683         struct rbd_spec *spec = rbd_dev->spec;
4684         size_t size;
4685
4686         /* Record the header object name for this rbd image. */
4687
4688         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4689
4690         if (rbd_dev->image_format == 1)
4691                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4692         else
4693                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4694
4695         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4696         if (!rbd_dev->header_name)
4697                 return -ENOMEM;
4698
4699         if (rbd_dev->image_format == 1)
4700                 sprintf(rbd_dev->header_name, "%s%s",
4701                         spec->image_name, RBD_SUFFIX);
4702         else
4703                 sprintf(rbd_dev->header_name, "%s%s",
4704                         RBD_HEADER_PREFIX, spec->image_id);
4705         return 0;
4706 }
4707
4708 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4709 {
4710         rbd_dev_unprobe(rbd_dev);
4711         kfree(rbd_dev->header_name);
4712         rbd_dev->header_name = NULL;
4713         rbd_dev->image_format = 0;
4714         kfree(rbd_dev->spec->image_id);
4715         rbd_dev->spec->image_id = NULL;
4716
4717         rbd_dev_destroy(rbd_dev);
4718 }
4719
4720 /*
4721  * Probe for the existence of the header object for the given rbd
4722  * device.  If this image is the one being mapped (i.e., not a
4723  * parent), initiate a watch on its header object before using that
4724  * object to get detailed information about the rbd image.
4725  */
4726 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4727 {
4728         int ret;
4729         int tmp;
4730
4731         /*
4732          * Get the id from the image id object.  If it's not a
4733          * format 2 image, we'll get ENOENT back, and we'll assume
4734          * it's a format 1 image.
4735          */
4736         ret = rbd_dev_image_id(rbd_dev);
4737         if (ret)
4738                 return ret;
4739         rbd_assert(rbd_dev->spec->image_id);
4740         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4741
4742         ret = rbd_dev_header_name(rbd_dev);
4743         if (ret)
4744                 goto err_out_format;
4745
4746         if (mapping) {
4747                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4748                 if (ret)
4749                         goto out_header_name;
4750         }
4751
4752         if (rbd_dev->image_format == 1)
4753                 ret = rbd_dev_v1_header_info(rbd_dev);
4754         else
4755                 ret = rbd_dev_v2_header_info(rbd_dev);
4756         if (ret)
4757                 goto err_out_watch;
4758
4759         ret = rbd_dev_spec_update(rbd_dev);
4760         if (ret)
4761                 goto err_out_probe;
4762
4763         ret = rbd_dev_probe_parent(rbd_dev);
4764         if (ret)
4765                 goto err_out_probe;
4766
4767         dout("discovered format %u image, header name is %s\n",
4768                 rbd_dev->image_format, rbd_dev->header_name);
4769
4770         return 0;
4771 err_out_probe:
4772         rbd_dev_unprobe(rbd_dev);
4773 err_out_watch:
4774         if (mapping) {
4775                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4776                 if (tmp)
4777                         rbd_warn(rbd_dev, "unable to tear down "
4778                                         "watch request (%d)\n", tmp);
4779         }
4780 out_header_name:
4781         kfree(rbd_dev->header_name);
4782         rbd_dev->header_name = NULL;
4783 err_out_format:
4784         rbd_dev->image_format = 0;
4785         kfree(rbd_dev->spec->image_id);
4786         rbd_dev->spec->image_id = NULL;
4787
4788         dout("probe failed, returning %d\n", ret);
4789
4790         return ret;
4791 }
4792
4793 static ssize_t rbd_add(struct bus_type *bus,
4794                        const char *buf,
4795                        size_t count)
4796 {
4797         struct rbd_device *rbd_dev = NULL;
4798         struct ceph_options *ceph_opts = NULL;
4799         struct rbd_options *rbd_opts = NULL;
4800         struct rbd_spec *spec = NULL;
4801         struct rbd_client *rbdc;
4802         struct ceph_osd_client *osdc;
4803         bool read_only;
4804         int rc = -ENOMEM;
4805
4806         if (!try_module_get(THIS_MODULE))
4807                 return -ENODEV;
4808
4809         /* parse add command */
4810         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4811         if (rc < 0)
4812                 goto err_out_module;
4813         read_only = rbd_opts->read_only;
4814         kfree(rbd_opts);
4815         rbd_opts = NULL;        /* done with this */
4816
4817         rbdc = rbd_get_client(ceph_opts);
4818         if (IS_ERR(rbdc)) {
4819                 rc = PTR_ERR(rbdc);
4820                 goto err_out_args;
4821         }
4822         ceph_opts = NULL;       /* rbd_dev client now owns this */
4823
4824         /* pick the pool */
4825         osdc = &rbdc->client->osdc;
4826         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4827         if (rc < 0)
4828                 goto err_out_client;
4829         spec->pool_id = (u64)rc;
4830
4831         /* The ceph file layout needs to fit pool id in 32 bits */
4832
4833         if (spec->pool_id > (u64)U32_MAX) {
4834                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4835                                 (unsigned long long)spec->pool_id, U32_MAX);
4836                 rc = -EIO;
4837                 goto err_out_client;
4838         }
4839
4840         rbd_dev = rbd_dev_create(rbdc, spec);
4841         if (!rbd_dev)
4842                 goto err_out_client;
4843         rbdc = NULL;            /* rbd_dev now owns this */
4844         spec = NULL;            /* rbd_dev now owns this */
4845
4846         rc = rbd_dev_image_probe(rbd_dev, true);
4847         if (rc < 0)
4848                 goto err_out_rbd_dev;
4849
4850         /* If we are mapping a snapshot it must be marked read-only */
4851
4852         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4853                 read_only = true;
4854         rbd_dev->mapping.read_only = read_only;
4855
4856         rc = rbd_dev_device_setup(rbd_dev);
4857         if (!rc)
4858                 return count;
4859
4860         rbd_dev_image_release(rbd_dev);
4861 err_out_rbd_dev:
4862         rbd_dev_destroy(rbd_dev);
4863 err_out_client:
4864         rbd_put_client(rbdc);
4865 err_out_args:
4866         if (ceph_opts)
4867                 ceph_destroy_options(ceph_opts);
4868         kfree(rbd_opts);
4869         rbd_spec_put(spec);
4870 err_out_module:
4871         module_put(THIS_MODULE);
4872
4873         dout("Error adding device %s\n", buf);
4874
4875         return (ssize_t)rc;
4876 }
4877
4878 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4879 {
4880         struct list_head *tmp;
4881         struct rbd_device *rbd_dev;
4882
4883         spin_lock(&rbd_dev_list_lock);
4884         list_for_each(tmp, &rbd_dev_list) {
4885                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4886                 if (rbd_dev->dev_id == dev_id) {
4887                         spin_unlock(&rbd_dev_list_lock);
4888                         return rbd_dev;
4889                 }
4890         }
4891         spin_unlock(&rbd_dev_list_lock);
4892         return NULL;
4893 }
4894
4895 static void rbd_dev_device_release(struct device *dev)
4896 {
4897         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4898
4899         rbd_free_disk(rbd_dev);
4900         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4901         rbd_dev_mapping_clear(rbd_dev);
4902         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4903         rbd_dev->major = 0;
4904         rbd_dev_id_put(rbd_dev);
4905         rbd_dev_mapping_clear(rbd_dev);
4906 }
4907
4908 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4909 {
4910         while (rbd_dev->parent) {
4911                 struct rbd_device *first = rbd_dev;
4912                 struct rbd_device *second = first->parent;
4913                 struct rbd_device *third;
4914
4915                 /*
4916                  * Follow to the parent with no grandparent and
4917                  * remove it.
4918                  */
4919                 while (second && (third = second->parent)) {
4920                         first = second;
4921                         second = third;
4922                 }
4923                 rbd_assert(second);
4924                 rbd_dev_image_release(second);
4925                 first->parent = NULL;
4926                 first->parent_overlap = 0;
4927
4928                 rbd_assert(first->parent_spec);
4929                 rbd_spec_put(first->parent_spec);
4930                 first->parent_spec = NULL;
4931         }
4932 }
4933
4934 static ssize_t rbd_remove(struct bus_type *bus,
4935                           const char *buf,
4936                           size_t count)
4937 {
4938         struct rbd_device *rbd_dev = NULL;
4939         int target_id;
4940         unsigned long ul;
4941         int ret;
4942
4943         ret = strict_strtoul(buf, 10, &ul);
4944         if (ret)
4945                 return ret;
4946
4947         /* convert to int; abort if we lost anything in the conversion */
4948         target_id = (int) ul;
4949         if (target_id != ul)
4950                 return -EINVAL;
4951
4952         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4953
4954         rbd_dev = __rbd_get_dev(target_id);
4955         if (!rbd_dev) {
4956                 ret = -ENOENT;
4957                 goto done;
4958         }
4959
4960         spin_lock_irq(&rbd_dev->lock);
4961         if (rbd_dev->open_count)
4962                 ret = -EBUSY;
4963         else
4964                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4965         spin_unlock_irq(&rbd_dev->lock);
4966         if (ret < 0)
4967                 goto done;
4968         rbd_bus_del_dev(rbd_dev);
4969         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4970         if (ret)
4971                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4972         rbd_dev_image_release(rbd_dev);
4973         module_put(THIS_MODULE);
4974         ret = count;
4975 done:
4976         mutex_unlock(&ctl_mutex);
4977
4978         return ret;
4979 }
4980
4981 /*
4982  * create control files in sysfs
4983  * /sys/bus/rbd/...
4984  */
4985 static int rbd_sysfs_init(void)
4986 {
4987         int ret;
4988
4989         ret = device_register(&rbd_root_dev);
4990         if (ret < 0)
4991                 return ret;
4992
4993         ret = bus_register(&rbd_bus_type);
4994         if (ret < 0)
4995                 device_unregister(&rbd_root_dev);
4996
4997         return ret;
4998 }
4999
5000 static void rbd_sysfs_cleanup(void)
5001 {
5002         bus_unregister(&rbd_bus_type);
5003         device_unregister(&rbd_root_dev);
5004 }
5005
5006 static int rbd_slab_init(void)
5007 {
5008         rbd_assert(!rbd_img_request_cache);
5009         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5010                                         sizeof (struct rbd_img_request),
5011                                         __alignof__(struct rbd_img_request),
5012                                         0, NULL);
5013         if (!rbd_img_request_cache)
5014                 return -ENOMEM;
5015
5016         rbd_assert(!rbd_obj_request_cache);
5017         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5018                                         sizeof (struct rbd_obj_request),
5019                                         __alignof__(struct rbd_obj_request),
5020                                         0, NULL);
5021         if (!rbd_obj_request_cache)
5022                 goto out_err;
5023
5024         rbd_assert(!rbd_segment_name_cache);
5025         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5026                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5027         if (rbd_segment_name_cache)
5028                 return 0;
5029 out_err:
5030         if (rbd_obj_request_cache) {
5031                 kmem_cache_destroy(rbd_obj_request_cache);
5032                 rbd_obj_request_cache = NULL;
5033         }
5034
5035         kmem_cache_destroy(rbd_img_request_cache);
5036         rbd_img_request_cache = NULL;
5037
5038         return -ENOMEM;
5039 }
5040
5041 static void rbd_slab_exit(void)
5042 {
5043         rbd_assert(rbd_segment_name_cache);
5044         kmem_cache_destroy(rbd_segment_name_cache);
5045         rbd_segment_name_cache = NULL;
5046
5047         rbd_assert(rbd_obj_request_cache);
5048         kmem_cache_destroy(rbd_obj_request_cache);
5049         rbd_obj_request_cache = NULL;
5050
5051         rbd_assert(rbd_img_request_cache);
5052         kmem_cache_destroy(rbd_img_request_cache);
5053         rbd_img_request_cache = NULL;
5054 }
5055
5056 static int __init rbd_init(void)
5057 {
5058         int rc;
5059
5060         if (!libceph_compatible(NULL)) {
5061                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5062
5063                 return -EINVAL;
5064         }
5065         rc = rbd_slab_init();
5066         if (rc)
5067                 return rc;
5068         rc = rbd_sysfs_init();
5069         if (rc)
5070                 rbd_slab_exit();
5071         else
5072                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5073
5074         return rc;
5075 }
5076
5077 static void __exit rbd_exit(void)
5078 {
5079         rbd_sysfs_cleanup();
5080         rbd_slab_exit();
5081 }
5082
5083 module_init(rbd_init);
5084 module_exit(rbd_exit);
5085
5086 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5087 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5088 MODULE_DESCRIPTION("rados block device");
5089
5090 /* following authorship retained from original osdblk.c */
5091 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5092
5093 MODULE_LICENSE("GPL");