]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
9c2b20a88be2422e476768b681828228cb6bb975
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227         u32                     copyup_page_count;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261         spinlock_t              completion_lock;/* protects next_completion */
262         u32                     next_completion;
263         rbd_img_callback_t      callback;
264         u64                     xferred;/* aggregate bytes transferred */
265         int                     result; /* first nonzero obj_request result */
266
267         u32                     obj_request_count;
268         struct list_head        obj_requests;   /* rbd_obj_request structs */
269
270         struct kref             kref;
271 };
272
273 #define for_each_obj_request(ireq, oreq) \
274         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279
280 struct rbd_mapping {
281         u64                     size;
282         u64                     features;
283         bool                    read_only;
284 };
285
286 /*
287  * a single device
288  */
289 struct rbd_device {
290         int                     dev_id;         /* blkdev unique id */
291
292         int                     major;          /* blkdev assigned major */
293         struct gendisk          *disk;          /* blkdev's gendisk and rq */
294
295         u32                     image_format;   /* Either 1 or 2 */
296         struct rbd_client       *rbd_client;
297
298         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299
300         spinlock_t              lock;           /* queue, flags, open_count */
301
302         struct rbd_image_header header;
303         unsigned long           flags;          /* possibly lock protected */
304         struct rbd_spec         *spec;
305
306         char                    *header_name;
307
308         struct ceph_file_layout layout;
309
310         struct ceph_osd_event   *watch_event;
311         struct rbd_obj_request  *watch_request;
312
313         struct rbd_spec         *parent_spec;
314         u64                     parent_overlap;
315         struct rbd_device       *parent;
316
317         /* protects updating the header */
318         struct rw_semaphore     header_rwsem;
319
320         struct rbd_mapping      mapping;
321
322         struct list_head        node;
323
324         /* sysfs related */
325         struct device           dev;
326         unsigned long           open_count;     /* protected by lock */
327 };
328
329 /*
330  * Flag bits for rbd_dev->flags.  If atomicity is required,
331  * rbd_dev->lock is used to protect access.
332  *
333  * Currently, only the "removing" flag (which is coupled with the
334  * "open_count" field) requires atomic access.
335  */
336 enum rbd_dev_flags {
337         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
338         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
339 };
340
341 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
342
343 static LIST_HEAD(rbd_dev_list);    /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345
346 static LIST_HEAD(rbd_client_list);              /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
348
349 /* Slab caches for frequently-allocated structures */
350
351 static struct kmem_cache        *rbd_img_request_cache;
352 static struct kmem_cache        *rbd_obj_request_cache;
353 static struct kmem_cache        *rbd_segment_name_cache;
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static void rbd_dev_device_release(struct device *dev);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
433                                         u64 snap_id);
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435                                 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
437                 u64 *snap_features);
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
439
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
441 {
442         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443         bool removing = false;
444
445         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446                 return -EROFS;
447
448         spin_lock_irq(&rbd_dev->lock);
449         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450                 removing = true;
451         else
452                 rbd_dev->open_count++;
453         spin_unlock_irq(&rbd_dev->lock);
454         if (removing)
455                 return -ENOENT;
456
457         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458         (void) get_device(&rbd_dev->dev);
459         set_device_ro(bdev, rbd_dev->mapping.read_only);
460         mutex_unlock(&ctl_mutex);
461
462         return 0;
463 }
464
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
466 {
467         struct rbd_device *rbd_dev = disk->private_data;
468         unsigned long open_count_before;
469
470         spin_lock_irq(&rbd_dev->lock);
471         open_count_before = rbd_dev->open_count--;
472         spin_unlock_irq(&rbd_dev->lock);
473         rbd_assert(open_count_before > 0);
474
475         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476         put_device(&rbd_dev->dev);
477         mutex_unlock(&ctl_mutex);
478
479         return 0;
480 }
481
482 static const struct block_device_operations rbd_bd_ops = {
483         .owner                  = THIS_MODULE,
484         .open                   = rbd_open,
485         .release                = rbd_release,
486 };
487
488 /*
489  * Initialize an rbd client instance.
490  * We own *ceph_opts.
491  */
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
493 {
494         struct rbd_client *rbdc;
495         int ret = -ENOMEM;
496
497         dout("%s:\n", __func__);
498         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499         if (!rbdc)
500                 goto out_opt;
501
502         kref_init(&rbdc->kref);
503         INIT_LIST_HEAD(&rbdc->node);
504
505         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
506
507         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508         if (IS_ERR(rbdc->client))
509                 goto out_mutex;
510         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
511
512         ret = ceph_open_session(rbdc->client);
513         if (ret < 0)
514                 goto out_err;
515
516         spin_lock(&rbd_client_list_lock);
517         list_add_tail(&rbdc->node, &rbd_client_list);
518         spin_unlock(&rbd_client_list_lock);
519
520         mutex_unlock(&ctl_mutex);
521         dout("%s: rbdc %p\n", __func__, rbdc);
522
523         return rbdc;
524
525 out_err:
526         ceph_destroy_client(rbdc->client);
527 out_mutex:
528         mutex_unlock(&ctl_mutex);
529         kfree(rbdc);
530 out_opt:
531         if (ceph_opts)
532                 ceph_destroy_options(ceph_opts);
533         dout("%s: error %d\n", __func__, ret);
534
535         return ERR_PTR(ret);
536 }
537
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
539 {
540         kref_get(&rbdc->kref);
541
542         return rbdc;
543 }
544
545 /*
546  * Find a ceph client with specific addr and configuration.  If
547  * found, bump its reference count.
548  */
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
550 {
551         struct rbd_client *client_node;
552         bool found = false;
553
554         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555                 return NULL;
556
557         spin_lock(&rbd_client_list_lock);
558         list_for_each_entry(client_node, &rbd_client_list, node) {
559                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560                         __rbd_get_client(client_node);
561
562                         found = true;
563                         break;
564                 }
565         }
566         spin_unlock(&rbd_client_list_lock);
567
568         return found ? client_node : NULL;
569 }
570
571 /*
572  * mount options
573  */
574 enum {
575         Opt_last_int,
576         /* int args above */
577         Opt_last_string,
578         /* string args above */
579         Opt_read_only,
580         Opt_read_write,
581         /* Boolean args above */
582         Opt_last_bool,
583 };
584
585 static match_table_t rbd_opts_tokens = {
586         /* int args above */
587         /* string args above */
588         {Opt_read_only, "read_only"},
589         {Opt_read_only, "ro"},          /* Alternate spelling */
590         {Opt_read_write, "read_write"},
591         {Opt_read_write, "rw"},         /* Alternate spelling */
592         /* Boolean args above */
593         {-1, NULL}
594 };
595
596 struct rbd_options {
597         bool    read_only;
598 };
599
600 #define RBD_READ_ONLY_DEFAULT   false
601
602 static int parse_rbd_opts_token(char *c, void *private)
603 {
604         struct rbd_options *rbd_opts = private;
605         substring_t argstr[MAX_OPT_ARGS];
606         int token, intval, ret;
607
608         token = match_token(c, rbd_opts_tokens, argstr);
609         if (token < 0)
610                 return -EINVAL;
611
612         if (token < Opt_last_int) {
613                 ret = match_int(&argstr[0], &intval);
614                 if (ret < 0) {
615                         pr_err("bad mount option arg (not int) "
616                                "at '%s'\n", c);
617                         return ret;
618                 }
619                 dout("got int token %d val %d\n", token, intval);
620         } else if (token > Opt_last_int && token < Opt_last_string) {
621                 dout("got string token %d val %s\n", token,
622                      argstr[0].from);
623         } else if (token > Opt_last_string && token < Opt_last_bool) {
624                 dout("got Boolean token %d\n", token);
625         } else {
626                 dout("got token %d\n", token);
627         }
628
629         switch (token) {
630         case Opt_read_only:
631                 rbd_opts->read_only = true;
632                 break;
633         case Opt_read_write:
634                 rbd_opts->read_only = false;
635                 break;
636         default:
637                 rbd_assert(false);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Get a ceph client with specific addr and configuration, if one does
645  * not exist create it.
646  */
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650
651         rbdc = rbd_client_find(ceph_opts);
652         if (rbdc)       /* using an existing client */
653                 ceph_destroy_options(ceph_opts);
654         else
655                 rbdc = rbd_client_create(ceph_opts);
656
657         return rbdc;
658 }
659
660 /*
661  * Destroy ceph client
662  *
663  * Caller must hold rbd_client_list_lock.
664  */
665 static void rbd_client_release(struct kref *kref)
666 {
667         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
668
669         dout("%s: rbdc %p\n", __func__, rbdc);
670         spin_lock(&rbd_client_list_lock);
671         list_del(&rbdc->node);
672         spin_unlock(&rbd_client_list_lock);
673
674         ceph_destroy_client(rbdc->client);
675         kfree(rbdc);
676 }
677
678 /*
679  * Drop reference to ceph client node. If it's not referenced anymore, release
680  * it.
681  */
682 static void rbd_put_client(struct rbd_client *rbdc)
683 {
684         if (rbdc)
685                 kref_put(&rbdc->kref, rbd_client_release);
686 }
687
688 static bool rbd_image_format_valid(u32 image_format)
689 {
690         return image_format == 1 || image_format == 2;
691 }
692
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 {
695         size_t size;
696         u32 snap_count;
697
698         /* The header has to start with the magic rbd header text */
699         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700                 return false;
701
702         /* The bio layer requires at least sector-sized I/O */
703
704         if (ondisk->options.order < SECTOR_SHIFT)
705                 return false;
706
707         /* If we use u64 in a few spots we may be able to loosen this */
708
709         if (ondisk->options.order > 8 * sizeof (int) - 1)
710                 return false;
711
712         /*
713          * The size of a snapshot header has to fit in a size_t, and
714          * that limits the number of snapshots.
715          */
716         snap_count = le32_to_cpu(ondisk->snap_count);
717         size = SIZE_MAX - sizeof (struct ceph_snap_context);
718         if (snap_count > size / sizeof (__le64))
719                 return false;
720
721         /*
722          * Not only that, but the size of the entire the snapshot
723          * header must also be representable in a size_t.
724          */
725         size -= snap_count * sizeof (__le64);
726         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
727                 return false;
728
729         return true;
730 }
731
732 /*
733  * Fill an rbd image header with information from the given format 1
734  * on-disk header.
735  */
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737                                  struct rbd_image_header_ondisk *ondisk)
738 {
739         struct rbd_image_header *header = &rbd_dev->header;
740         bool first_time = header->object_prefix == NULL;
741         struct ceph_snap_context *snapc;
742         char *object_prefix = NULL;
743         char *snap_names = NULL;
744         u64 *snap_sizes = NULL;
745         u32 snap_count;
746         size_t size;
747         int ret = -ENOMEM;
748         u32 i;
749
750         /* Allocate this now to avoid having to handle failure below */
751
752         if (first_time) {
753                 size_t len;
754
755                 len = strnlen(ondisk->object_prefix,
756                                 sizeof (ondisk->object_prefix));
757                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758                 if (!object_prefix)
759                         return -ENOMEM;
760                 memcpy(object_prefix, ondisk->object_prefix, len);
761                 object_prefix[len] = '\0';
762         }
763
764         /* Allocate the snapshot context and fill it in */
765
766         snap_count = le32_to_cpu(ondisk->snap_count);
767         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768         if (!snapc)
769                 goto out_err;
770         snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         if (snap_count) {
772                 struct rbd_image_snap_ondisk *snaps;
773                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
774
775                 /* We'll keep a copy of the snapshot names... */
776
777                 if (snap_names_len > (u64)SIZE_MAX)
778                         goto out_2big;
779                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
780                 if (!snap_names)
781                         goto out_err;
782
783                 /* ...as well as the array of their sizes. */
784
785                 size = snap_count * sizeof (*header->snap_sizes);
786                 snap_sizes = kmalloc(size, GFP_KERNEL);
787                 if (!snap_sizes)
788                         goto out_err;
789
790                 /*
791                  * Copy the names, and fill in each snapshot's id
792                  * and size.
793                  *
794                  * Note that rbd_dev_v1_header_info() guarantees the
795                  * ondisk buffer we're working with has
796                  * snap_names_len bytes beyond the end of the
797                  * snapshot id array, this memcpy() is safe.
798                  */
799                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800                 snaps = ondisk->snaps;
801                 for (i = 0; i < snap_count; i++) {
802                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
804                 }
805         }
806
807         /* We won't fail any more, fill in the header */
808
809         down_write(&rbd_dev->header_rwsem);
810         if (first_time) {
811                 header->object_prefix = object_prefix;
812                 header->obj_order = ondisk->options.order;
813                 header->crypt_type = ondisk->options.crypt_type;
814                 header->comp_type = ondisk->options.comp_type;
815                 /* The rest aren't used for format 1 images */
816                 header->stripe_unit = 0;
817                 header->stripe_count = 0;
818                 header->features = 0;
819         } else {
820                 ceph_put_snap_context(header->snapc);
821                 kfree(header->snap_names);
822                 kfree(header->snap_sizes);
823         }
824
825         /* The remaining fields always get updated (when we refresh) */
826
827         header->image_size = le64_to_cpu(ondisk->image_size);
828         header->snapc = snapc;
829         header->snap_names = snap_names;
830         header->snap_sizes = snap_sizes;
831
832         /* Make sure mapping size is consistent with header info */
833
834         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835                 if (rbd_dev->mapping.size != header->image_size)
836                         rbd_dev->mapping.size = header->image_size;
837
838         up_write(&rbd_dev->header_rwsem);
839
840         return 0;
841 out_2big:
842         ret = -EIO;
843 out_err:
844         kfree(snap_sizes);
845         kfree(snap_names);
846         ceph_put_snap_context(snapc);
847         kfree(object_prefix);
848
849         return ret;
850 }
851
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
853 {
854         const char *snap_name;
855
856         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
857
858         /* Skip over names until we find the one we are looking for */
859
860         snap_name = rbd_dev->header.snap_names;
861         while (which--)
862                 snap_name += strlen(snap_name) + 1;
863
864         return kstrdup(snap_name, GFP_KERNEL);
865 }
866
867 /*
868  * Snapshot id comparison function for use with qsort()/bsearch().
869  * Note that result is for snapshots in *descending* order.
870  */
871 static int snapid_compare_reverse(const void *s1, const void *s2)
872 {
873         u64 snap_id1 = *(u64 *)s1;
874         u64 snap_id2 = *(u64 *)s2;
875
876         if (snap_id1 < snap_id2)
877                 return 1;
878         return snap_id1 == snap_id2 ? 0 : -1;
879 }
880
881 /*
882  * Search a snapshot context to see if the given snapshot id is
883  * present.
884  *
885  * Returns the position of the snapshot id in the array if it's found,
886  * or BAD_SNAP_INDEX otherwise.
887  *
888  * Note: The snapshot array is in kept sorted (by the osd) in
889  * reverse order, highest snapshot id first.
890  */
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
892 {
893         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894         u64 *found;
895
896         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897                                 sizeof (snap_id), snapid_compare_reverse);
898
899         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 }
901
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
903                                         u64 snap_id)
904 {
905         u32 which;
906
907         which = rbd_dev_snap_index(rbd_dev, snap_id);
908         if (which == BAD_SNAP_INDEX)
909                 return NULL;
910
911         return _rbd_dev_v1_snap_name(rbd_dev, which);
912 }
913
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
915 {
916         if (snap_id == CEPH_NOSNAP)
917                 return RBD_SNAP_HEAD_NAME;
918
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (rbd_dev->image_format == 1)
921                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
922
923         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 }
925
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927                                 u64 *snap_size)
928 {
929         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930         if (snap_id == CEPH_NOSNAP) {
931                 *snap_size = rbd_dev->header.image_size;
932         } else if (rbd_dev->image_format == 1) {
933                 u32 which;
934
935                 which = rbd_dev_snap_index(rbd_dev, snap_id);
936                 if (which == BAD_SNAP_INDEX)
937                         return -ENOENT;
938
939                 *snap_size = rbd_dev->header.snap_sizes[which];
940         } else {
941                 u64 size = 0;
942                 int ret;
943
944                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
945                 if (ret)
946                         return ret;
947
948                 *snap_size = size;
949         }
950         return 0;
951 }
952
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954                         u64 *snap_features)
955 {
956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957         if (snap_id == CEPH_NOSNAP) {
958                 *snap_features = rbd_dev->header.features;
959         } else if (rbd_dev->image_format == 1) {
960                 *snap_features = 0;     /* No features for format 1 */
961         } else {
962                 u64 features = 0;
963                 int ret;
964
965                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
966                 if (ret)
967                         return ret;
968
969                 *snap_features = features;
970         }
971         return 0;
972 }
973
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
975 {
976         u64 snap_id = rbd_dev->spec->snap_id;
977         u64 size = 0;
978         u64 features = 0;
979         int ret;
980
981         ret = rbd_snap_size(rbd_dev, snap_id, &size);
982         if (ret)
983                 return ret;
984         ret = rbd_snap_features(rbd_dev, snap_id, &features);
985         if (ret)
986                 return ret;
987
988         rbd_dev->mapping.size = size;
989         rbd_dev->mapping.features = features;
990
991         return 0;
992 }
993
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
995 {
996         rbd_dev->mapping.size = 0;
997         rbd_dev->mapping.features = 0;
998 }
999
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         char *name;
1003         u64 segment;
1004         int ret;
1005
1006         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007         if (!name)
1008                 return NULL;
1009         segment = offset >> rbd_dev->header.obj_order;
1010         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011                         rbd_dev->header.object_prefix, segment);
1012         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013                 pr_err("error formatting segment name for #%llu (%d)\n",
1014                         segment, ret);
1015                 kfree(name);
1016                 name = NULL;
1017         }
1018
1019         return name;
1020 }
1021
1022 static void rbd_segment_name_free(const char *name)
1023 {
1024         /* The explicit cast here is needed to drop the const qualifier */
1025
1026         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 }
1028
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1030 {
1031         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1032
1033         return offset & (segment_size - 1);
1034 }
1035
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037                                 u64 offset, u64 length)
1038 {
1039         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1040
1041         offset &= segment_size - 1;
1042
1043         rbd_assert(length <= U64_MAX - offset);
1044         if (offset + length > segment_size)
1045                 length = segment_size - offset;
1046
1047         return length;
1048 }
1049
1050 /*
1051  * returns the size of an object in the image
1052  */
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1054 {
1055         return 1 << header->obj_order;
1056 }
1057
1058 /*
1059  * bio helpers
1060  */
1061
1062 static void bio_chain_put(struct bio *chain)
1063 {
1064         struct bio *tmp;
1065
1066         while (chain) {
1067                 tmp = chain;
1068                 chain = chain->bi_next;
1069                 bio_put(tmp);
1070         }
1071 }
1072
1073 /*
1074  * zeros a bio chain, starting at specific offset
1075  */
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 {
1078         struct bio_vec *bv;
1079         unsigned long flags;
1080         void *buf;
1081         int i;
1082         int pos = 0;
1083
1084         while (chain) {
1085                 bio_for_each_segment(bv, chain, i) {
1086                         if (pos + bv->bv_len > start_ofs) {
1087                                 int remainder = max(start_ofs - pos, 0);
1088                                 buf = bvec_kmap_irq(bv, &flags);
1089                                 memset(buf + remainder, 0,
1090                                        bv->bv_len - remainder);
1091                                 bvec_kunmap_irq(buf, &flags);
1092                         }
1093                         pos += bv->bv_len;
1094                 }
1095
1096                 chain = chain->bi_next;
1097         }
1098 }
1099
1100 /*
1101  * similar to zero_bio_chain(), zeros data defined by a page array,
1102  * starting at the given byte offset from the start of the array and
1103  * continuing up to the given end offset.  The pages array is
1104  * assumed to be big enough to hold all bytes up to the end.
1105  */
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1107 {
1108         struct page **page = &pages[offset >> PAGE_SHIFT];
1109
1110         rbd_assert(end > offset);
1111         rbd_assert(end - offset <= (u64)SIZE_MAX);
1112         while (offset < end) {
1113                 size_t page_offset;
1114                 size_t length;
1115                 unsigned long flags;
1116                 void *kaddr;
1117
1118                 page_offset = (size_t)(offset & ~PAGE_MASK);
1119                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120                 local_irq_save(flags);
1121                 kaddr = kmap_atomic(*page);
1122                 memset(kaddr + page_offset, 0, length);
1123                 kunmap_atomic(kaddr);
1124                 local_irq_restore(flags);
1125
1126                 offset += length;
1127                 page++;
1128         }
1129 }
1130
1131 /*
1132  * Clone a portion of a bio, starting at the given byte offset
1133  * and continuing for the number of bytes indicated.
1134  */
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136                                         unsigned int offset,
1137                                         unsigned int len,
1138                                         gfp_t gfpmask)
1139 {
1140         struct bio_vec *bv;
1141         unsigned int resid;
1142         unsigned short idx;
1143         unsigned int voff;
1144         unsigned short end_idx;
1145         unsigned short vcnt;
1146         struct bio *bio;
1147
1148         /* Handle the easy case for the caller */
1149
1150         if (!offset && len == bio_src->bi_size)
1151                 return bio_clone(bio_src, gfpmask);
1152
1153         if (WARN_ON_ONCE(!len))
1154                 return NULL;
1155         if (WARN_ON_ONCE(len > bio_src->bi_size))
1156                 return NULL;
1157         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158                 return NULL;
1159
1160         /* Find first affected segment... */
1161
1162         resid = offset;
1163         __bio_for_each_segment(bv, bio_src, idx, 0) {
1164                 if (resid < bv->bv_len)
1165                         break;
1166                 resid -= bv->bv_len;
1167         }
1168         voff = resid;
1169
1170         /* ...and the last affected segment */
1171
1172         resid += len;
1173         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174                 if (resid <= bv->bv_len)
1175                         break;
1176                 resid -= bv->bv_len;
1177         }
1178         vcnt = end_idx - idx + 1;
1179
1180         /* Build the clone */
1181
1182         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1183         if (!bio)
1184                 return NULL;    /* ENOMEM */
1185
1186         bio->bi_bdev = bio_src->bi_bdev;
1187         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188         bio->bi_rw = bio_src->bi_rw;
1189         bio->bi_flags |= 1 << BIO_CLONED;
1190
1191         /*
1192          * Copy over our part of the bio_vec, then update the first
1193          * and last (or only) entries.
1194          */
1195         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196                         vcnt * sizeof (struct bio_vec));
1197         bio->bi_io_vec[0].bv_offset += voff;
1198         if (vcnt > 1) {
1199                 bio->bi_io_vec[0].bv_len -= voff;
1200                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1201         } else {
1202                 bio->bi_io_vec[0].bv_len = len;
1203         }
1204
1205         bio->bi_vcnt = vcnt;
1206         bio->bi_size = len;
1207         bio->bi_idx = 0;
1208
1209         return bio;
1210 }
1211
1212 /*
1213  * Clone a portion of a bio chain, starting at the given byte offset
1214  * into the first bio in the source chain and continuing for the
1215  * number of bytes indicated.  The result is another bio chain of
1216  * exactly the given length, or a null pointer on error.
1217  *
1218  * The bio_src and offset parameters are both in-out.  On entry they
1219  * refer to the first source bio and the offset into that bio where
1220  * the start of data to be cloned is located.
1221  *
1222  * On return, bio_src is updated to refer to the bio in the source
1223  * chain that contains first un-cloned byte, and *offset will
1224  * contain the offset of that byte within that bio.
1225  */
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227                                         unsigned int *offset,
1228                                         unsigned int len,
1229                                         gfp_t gfpmask)
1230 {
1231         struct bio *bi = *bio_src;
1232         unsigned int off = *offset;
1233         struct bio *chain = NULL;
1234         struct bio **end;
1235
1236         /* Build up a chain of clone bios up to the limit */
1237
1238         if (!bi || off >= bi->bi_size || !len)
1239                 return NULL;            /* Nothing to clone */
1240
1241         end = &chain;
1242         while (len) {
1243                 unsigned int bi_size;
1244                 struct bio *bio;
1245
1246                 if (!bi) {
1247                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248                         goto out_err;   /* EINVAL; ran out of bio's */
1249                 }
1250                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1252                 if (!bio)
1253                         goto out_err;   /* ENOMEM */
1254
1255                 *end = bio;
1256                 end = &bio->bi_next;
1257
1258                 off += bi_size;
1259                 if (off == bi->bi_size) {
1260                         bi = bi->bi_next;
1261                         off = 0;
1262                 }
1263                 len -= bi_size;
1264         }
1265         *bio_src = bi;
1266         *offset = off;
1267
1268         return chain;
1269 out_err:
1270         bio_chain_put(chain);
1271
1272         return NULL;
1273 }
1274
1275 /*
1276  * The default/initial value for all object request flags is 0.  For
1277  * each flag, once its value is set to 1 it is never reset to 0
1278  * again.
1279  */
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev;
1284
1285                 rbd_dev = obj_request->img_request->rbd_dev;
1286                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1287                         obj_request);
1288         }
1289 }
1290
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 {
1293         smp_mb();
1294         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 }
1296
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1298 {
1299         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300                 struct rbd_device *rbd_dev = NULL;
1301
1302                 if (obj_request_img_data_test(obj_request))
1303                         rbd_dev = obj_request->img_request->rbd_dev;
1304                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1305                         obj_request);
1306         }
1307 }
1308
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1313 }
1314
1315 /*
1316  * This sets the KNOWN flag after (possibly) setting the EXISTS
1317  * flag.  The latter is set based on the "exists" value provided.
1318  *
1319  * Note that for our purposes once an object exists it never goes
1320  * away again.  It's possible that the response from two existence
1321  * checks are separated by the creation of the target object, and
1322  * the first ("doesn't exist") response arrives *after* the second
1323  * ("does exist").  In that case we ignore the second one.
1324  */
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1326                                 bool exists)
1327 {
1328         if (exists)
1329                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 {
1336         smp_mb();
1337         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 }
1339
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 {
1342         smp_mb();
1343         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 }
1345
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1347 {
1348         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349                 atomic_read(&obj_request->kref.refcount));
1350         kref_get(&obj_request->kref);
1351 }
1352
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request != NULL);
1357         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358                 atomic_read(&obj_request->kref.refcount));
1359         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 }
1361
1362 static void rbd_img_request_destroy(struct kref *kref);
1363 static void rbd_img_request_put(struct rbd_img_request *img_request)
1364 {
1365         rbd_assert(img_request != NULL);
1366         dout("%s: img %p (was %d)\n", __func__, img_request,
1367                 atomic_read(&img_request->kref.refcount));
1368         kref_put(&img_request->kref, rbd_img_request_destroy);
1369 }
1370
1371 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1372                                         struct rbd_obj_request *obj_request)
1373 {
1374         rbd_assert(obj_request->img_request == NULL);
1375
1376         /* Image request now owns object's original reference */
1377         obj_request->img_request = img_request;
1378         obj_request->which = img_request->obj_request_count;
1379         rbd_assert(!obj_request_img_data_test(obj_request));
1380         obj_request_img_data_set(obj_request);
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382         img_request->obj_request_count++;
1383         list_add_tail(&obj_request->links, &img_request->obj_requests);
1384         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1385                 obj_request->which);
1386 }
1387
1388 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1389                                         struct rbd_obj_request *obj_request)
1390 {
1391         rbd_assert(obj_request->which != BAD_WHICH);
1392
1393         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1394                 obj_request->which);
1395         list_del(&obj_request->links);
1396         rbd_assert(img_request->obj_request_count > 0);
1397         img_request->obj_request_count--;
1398         rbd_assert(obj_request->which == img_request->obj_request_count);
1399         obj_request->which = BAD_WHICH;
1400         rbd_assert(obj_request_img_data_test(obj_request));
1401         rbd_assert(obj_request->img_request == img_request);
1402         obj_request->img_request = NULL;
1403         obj_request->callback = NULL;
1404         rbd_obj_request_put(obj_request);
1405 }
1406
1407 static bool obj_request_type_valid(enum obj_request_type type)
1408 {
1409         switch (type) {
1410         case OBJ_REQUEST_NODATA:
1411         case OBJ_REQUEST_BIO:
1412         case OBJ_REQUEST_PAGES:
1413                 return true;
1414         default:
1415                 return false;
1416         }
1417 }
1418
1419 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1420                                 struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1423
1424         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1425 }
1426
1427 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1428 {
1429
1430         dout("%s: img %p\n", __func__, img_request);
1431
1432         /*
1433          * If no error occurred, compute the aggregate transfer
1434          * count for the image request.  We could instead use
1435          * atomic64_cmpxchg() to update it as each object request
1436          * completes; not clear which way is better off hand.
1437          */
1438         if (!img_request->result) {
1439                 struct rbd_obj_request *obj_request;
1440                 u64 xferred = 0;
1441
1442                 for_each_obj_request(img_request, obj_request)
1443                         xferred += obj_request->xferred;
1444                 img_request->xferred = xferred;
1445         }
1446
1447         if (img_request->callback)
1448                 img_request->callback(img_request);
1449         else
1450                 rbd_img_request_put(img_request);
1451 }
1452
1453 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1454
1455 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p\n", __func__, obj_request);
1458
1459         return wait_for_completion_interruptible(&obj_request->completion);
1460 }
1461
1462 /*
1463  * The default/initial value for all image request flags is 0.  Each
1464  * is conditionally set to 1 at image request initialization time
1465  * and currently never change thereafter.
1466  */
1467 static void img_request_write_set(struct rbd_img_request *img_request)
1468 {
1469         set_bit(IMG_REQ_WRITE, &img_request->flags);
1470         smp_mb();
1471 }
1472
1473 static bool img_request_write_test(struct rbd_img_request *img_request)
1474 {
1475         smp_mb();
1476         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1477 }
1478
1479 static void img_request_child_set(struct rbd_img_request *img_request)
1480 {
1481         set_bit(IMG_REQ_CHILD, &img_request->flags);
1482         smp_mb();
1483 }
1484
1485 static bool img_request_child_test(struct rbd_img_request *img_request)
1486 {
1487         smp_mb();
1488         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1489 }
1490
1491 static void img_request_layered_set(struct rbd_img_request *img_request)
1492 {
1493         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool img_request_layered_test(struct rbd_img_request *img_request)
1498 {
1499         smp_mb();
1500         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1501 }
1502
1503 static void
1504 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1505 {
1506         u64 xferred = obj_request->xferred;
1507         u64 length = obj_request->length;
1508
1509         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1510                 obj_request, obj_request->img_request, obj_request->result,
1511                 xferred, length);
1512         /*
1513          * ENOENT means a hole in the image.  We zero-fill the
1514          * entire length of the request.  A short read also implies
1515          * zero-fill to the end of the request.  Either way we
1516          * update the xferred count to indicate the whole request
1517          * was satisfied.
1518          */
1519         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1520         if (obj_request->result == -ENOENT) {
1521                 if (obj_request->type == OBJ_REQUEST_BIO)
1522                         zero_bio_chain(obj_request->bio_list, 0);
1523                 else
1524                         zero_pages(obj_request->pages, 0, length);
1525                 obj_request->result = 0;
1526                 obj_request->xferred = length;
1527         } else if (xferred < length && !obj_request->result) {
1528                 if (obj_request->type == OBJ_REQUEST_BIO)
1529                         zero_bio_chain(obj_request->bio_list, xferred);
1530                 else
1531                         zero_pages(obj_request->pages, xferred, length);
1532                 obj_request->xferred = length;
1533         }
1534         obj_request_done_set(obj_request);
1535 }
1536
1537 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p cb %p\n", __func__, obj_request,
1540                 obj_request->callback);
1541         if (obj_request->callback)
1542                 obj_request->callback(obj_request);
1543         else
1544                 complete_all(&obj_request->completion);
1545 }
1546
1547 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = NULL;
1556         struct rbd_device *rbd_dev = NULL;
1557         bool layered = false;
1558
1559         if (obj_request_img_data_test(obj_request)) {
1560                 img_request = obj_request->img_request;
1561                 layered = img_request && img_request_layered_test(img_request);
1562                 rbd_dev = img_request->rbd_dev;
1563         }
1564
1565         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1566                 obj_request, img_request, obj_request->result,
1567                 obj_request->xferred, obj_request->length);
1568         if (layered && obj_request->result == -ENOENT &&
1569                         obj_request->img_offset < rbd_dev->parent_overlap)
1570                 rbd_img_parent_read(obj_request);
1571         else if (img_request)
1572                 rbd_img_obj_request_read_callback(obj_request);
1573         else
1574                 obj_request_done_set(obj_request);
1575 }
1576
1577 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1578 {
1579         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1580                 obj_request->result, obj_request->length);
1581         /*
1582          * There is no such thing as a successful short write.  Set
1583          * it to our originally-requested length.
1584          */
1585         obj_request->xferred = obj_request->length;
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 /*
1590  * For a simple stat call there's nothing to do.  We'll do more if
1591  * this is part of a write sequence for a layered image.
1592  */
1593 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1594 {
1595         dout("%s: obj %p\n", __func__, obj_request);
1596         obj_request_done_set(obj_request);
1597 }
1598
1599 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1600                                 struct ceph_msg *msg)
1601 {
1602         struct rbd_obj_request *obj_request = osd_req->r_priv;
1603         u16 opcode;
1604
1605         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1606         rbd_assert(osd_req == obj_request->osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 rbd_assert(obj_request->img_request);
1609                 rbd_assert(obj_request->which != BAD_WHICH);
1610         } else {
1611                 rbd_assert(obj_request->which == BAD_WHICH);
1612         }
1613
1614         if (osd_req->r_result < 0)
1615                 obj_request->result = osd_req->r_result;
1616
1617         BUG_ON(osd_req->r_num_ops > 2);
1618
1619         /*
1620          * We support a 64-bit length, but ultimately it has to be
1621          * passed to blk_end_request(), which takes an unsigned int.
1622          */
1623         obj_request->xferred = osd_req->r_reply_op_len[0];
1624         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1625         opcode = osd_req->r_ops[0].op;
1626         switch (opcode) {
1627         case CEPH_OSD_OP_READ:
1628                 rbd_osd_read_callback(obj_request);
1629                 break;
1630         case CEPH_OSD_OP_WRITE:
1631                 rbd_osd_write_callback(obj_request);
1632                 break;
1633         case CEPH_OSD_OP_STAT:
1634                 rbd_osd_stat_callback(obj_request);
1635                 break;
1636         case CEPH_OSD_OP_CALL:
1637         case CEPH_OSD_OP_NOTIFY_ACK:
1638         case CEPH_OSD_OP_WATCH:
1639                 rbd_osd_trivial_callback(obj_request);
1640                 break;
1641         default:
1642                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1643                         obj_request->object_name, (unsigned short) opcode);
1644                 break;
1645         }
1646
1647         if (obj_request_done_test(obj_request))
1648                 rbd_obj_request_complete(obj_request);
1649 }
1650
1651 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1652 {
1653         struct rbd_img_request *img_request = obj_request->img_request;
1654         struct ceph_osd_request *osd_req = obj_request->osd_req;
1655         u64 snap_id;
1656
1657         rbd_assert(osd_req != NULL);
1658
1659         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1660         ceph_osdc_build_request(osd_req, obj_request->offset,
1661                         NULL, snap_id, NULL);
1662 }
1663
1664 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1665 {
1666         struct rbd_img_request *img_request = obj_request->img_request;
1667         struct ceph_osd_request *osd_req = obj_request->osd_req;
1668         struct ceph_snap_context *snapc;
1669         struct timespec mtime = CURRENT_TIME;
1670
1671         rbd_assert(osd_req != NULL);
1672
1673         snapc = img_request ? img_request->snapc : NULL;
1674         ceph_osdc_build_request(osd_req, obj_request->offset,
1675                         snapc, CEPH_NOSNAP, &mtime);
1676 }
1677
1678 static struct ceph_osd_request *rbd_osd_req_create(
1679                                         struct rbd_device *rbd_dev,
1680                                         bool write_request,
1681                                         struct rbd_obj_request *obj_request)
1682 {
1683         struct ceph_snap_context *snapc = NULL;
1684         struct ceph_osd_client *osdc;
1685         struct ceph_osd_request *osd_req;
1686
1687         if (obj_request_img_data_test(obj_request)) {
1688                 struct rbd_img_request *img_request = obj_request->img_request;
1689
1690                 rbd_assert(write_request ==
1691                                 img_request_write_test(img_request));
1692                 if (write_request)
1693                         snapc = img_request->snapc;
1694         }
1695
1696         /* Allocate and initialize the request, for the single op */
1697
1698         osdc = &rbd_dev->rbd_client->client->osdc;
1699         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1700         if (!osd_req)
1701                 return NULL;    /* ENOMEM */
1702
1703         if (write_request)
1704                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1705         else
1706                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1707
1708         osd_req->r_callback = rbd_osd_req_callback;
1709         osd_req->r_priv = obj_request;
1710
1711         osd_req->r_oid_len = strlen(obj_request->object_name);
1712         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1713         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1714
1715         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1716
1717         return osd_req;
1718 }
1719
1720 /*
1721  * Create a copyup osd request based on the information in the
1722  * object request supplied.  A copyup request has two osd ops,
1723  * a copyup method call, and a "normal" write request.
1724  */
1725 static struct ceph_osd_request *
1726 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1727 {
1728         struct rbd_img_request *img_request;
1729         struct ceph_snap_context *snapc;
1730         struct rbd_device *rbd_dev;
1731         struct ceph_osd_client *osdc;
1732         struct ceph_osd_request *osd_req;
1733
1734         rbd_assert(obj_request_img_data_test(obj_request));
1735         img_request = obj_request->img_request;
1736         rbd_assert(img_request);
1737         rbd_assert(img_request_write_test(img_request));
1738
1739         /* Allocate and initialize the request, for the two ops */
1740
1741         snapc = img_request->snapc;
1742         rbd_dev = img_request->rbd_dev;
1743         osdc = &rbd_dev->rbd_client->client->osdc;
1744         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1745         if (!osd_req)
1746                 return NULL;    /* ENOMEM */
1747
1748         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1749         osd_req->r_callback = rbd_osd_req_callback;
1750         osd_req->r_priv = obj_request;
1751
1752         osd_req->r_oid_len = strlen(obj_request->object_name);
1753         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1754         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1755
1756         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1757
1758         return osd_req;
1759 }
1760
1761
1762 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1763 {
1764         ceph_osdc_put_request(osd_req);
1765 }
1766
1767 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1768
1769 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1770                                                 u64 offset, u64 length,
1771                                                 enum obj_request_type type)
1772 {
1773         struct rbd_obj_request *obj_request;
1774         size_t size;
1775         char *name;
1776
1777         rbd_assert(obj_request_type_valid(type));
1778
1779         size = strlen(object_name) + 1;
1780         name = kmalloc(size, GFP_KERNEL);
1781         if (!name)
1782                 return NULL;
1783
1784         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1785         if (!obj_request) {
1786                 kfree(name);
1787                 return NULL;
1788         }
1789
1790         obj_request->object_name = memcpy(name, object_name, size);
1791         obj_request->offset = offset;
1792         obj_request->length = length;
1793         obj_request->flags = 0;
1794         obj_request->which = BAD_WHICH;
1795         obj_request->type = type;
1796         INIT_LIST_HEAD(&obj_request->links);
1797         init_completion(&obj_request->completion);
1798         kref_init(&obj_request->kref);
1799
1800         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1801                 offset, length, (int)type, obj_request);
1802
1803         return obj_request;
1804 }
1805
1806 static void rbd_obj_request_destroy(struct kref *kref)
1807 {
1808         struct rbd_obj_request *obj_request;
1809
1810         obj_request = container_of(kref, struct rbd_obj_request, kref);
1811
1812         dout("%s: obj %p\n", __func__, obj_request);
1813
1814         rbd_assert(obj_request->img_request == NULL);
1815         rbd_assert(obj_request->which == BAD_WHICH);
1816
1817         if (obj_request->osd_req)
1818                 rbd_osd_req_destroy(obj_request->osd_req);
1819
1820         rbd_assert(obj_request_type_valid(obj_request->type));
1821         switch (obj_request->type) {
1822         case OBJ_REQUEST_NODATA:
1823                 break;          /* Nothing to do */
1824         case OBJ_REQUEST_BIO:
1825                 if (obj_request->bio_list)
1826                         bio_chain_put(obj_request->bio_list);
1827                 break;
1828         case OBJ_REQUEST_PAGES:
1829                 if (obj_request->pages)
1830                         ceph_release_page_vector(obj_request->pages,
1831                                                 obj_request->page_count);
1832                 break;
1833         }
1834
1835         kfree(obj_request->object_name);
1836         obj_request->object_name = NULL;
1837         kmem_cache_free(rbd_obj_request_cache, obj_request);
1838 }
1839
1840 /* It's OK to call this for a device with no parent */
1841
1842 static void rbd_spec_put(struct rbd_spec *spec);
1843 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1844 {
1845         rbd_dev_remove_parent(rbd_dev);
1846         rbd_spec_put(rbd_dev->parent_spec);
1847         rbd_dev->parent_spec = NULL;
1848         rbd_dev->parent_overlap = 0;
1849 }
1850
1851 /*
1852  * Caller is responsible for filling in the list of object requests
1853  * that comprises the image request, and the Linux request pointer
1854  * (if there is one).
1855  */
1856 static struct rbd_img_request *rbd_img_request_create(
1857                                         struct rbd_device *rbd_dev,
1858                                         u64 offset, u64 length,
1859                                         bool write_request,
1860                                         bool child_request)
1861 {
1862         struct rbd_img_request *img_request;
1863
1864         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1865         if (!img_request)
1866                 return NULL;
1867
1868         if (write_request) {
1869                 down_read(&rbd_dev->header_rwsem);
1870                 ceph_get_snap_context(rbd_dev->header.snapc);
1871                 up_read(&rbd_dev->header_rwsem);
1872         }
1873
1874         img_request->rq = NULL;
1875         img_request->rbd_dev = rbd_dev;
1876         img_request->offset = offset;
1877         img_request->length = length;
1878         img_request->flags = 0;
1879         if (write_request) {
1880                 img_request_write_set(img_request);
1881                 img_request->snapc = rbd_dev->header.snapc;
1882         } else {
1883                 img_request->snap_id = rbd_dev->spec->snap_id;
1884         }
1885         if (child_request)
1886                 img_request_child_set(img_request);
1887         if (rbd_dev->parent_overlap)
1888                 img_request_layered_set(img_request);
1889         spin_lock_init(&img_request->completion_lock);
1890         img_request->next_completion = 0;
1891         img_request->callback = NULL;
1892         img_request->result = 0;
1893         img_request->obj_request_count = 0;
1894         INIT_LIST_HEAD(&img_request->obj_requests);
1895         kref_init(&img_request->kref);
1896
1897         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1898                 write_request ? "write" : "read", offset, length,
1899                 img_request);
1900
1901         return img_request;
1902 }
1903
1904 static void rbd_img_request_destroy(struct kref *kref)
1905 {
1906         struct rbd_img_request *img_request;
1907         struct rbd_obj_request *obj_request;
1908         struct rbd_obj_request *next_obj_request;
1909
1910         img_request = container_of(kref, struct rbd_img_request, kref);
1911
1912         dout("%s: img %p\n", __func__, img_request);
1913
1914         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1915                 rbd_img_obj_request_del(img_request, obj_request);
1916         rbd_assert(img_request->obj_request_count == 0);
1917
1918         if (img_request_write_test(img_request))
1919                 ceph_put_snap_context(img_request->snapc);
1920
1921         if (img_request_child_test(img_request))
1922                 rbd_obj_request_put(img_request->obj_request);
1923
1924         kmem_cache_free(rbd_img_request_cache, img_request);
1925 }
1926
1927 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1928 {
1929         struct rbd_img_request *img_request;
1930         unsigned int xferred;
1931         int result;
1932         bool more;
1933
1934         rbd_assert(obj_request_img_data_test(obj_request));
1935         img_request = obj_request->img_request;
1936
1937         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1938         xferred = (unsigned int)obj_request->xferred;
1939         result = obj_request->result;
1940         if (result) {
1941                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1942
1943                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1944                         img_request_write_test(img_request) ? "write" : "read",
1945                         obj_request->length, obj_request->img_offset,
1946                         obj_request->offset);
1947                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1948                         result, xferred);
1949                 if (!img_request->result)
1950                         img_request->result = result;
1951         }
1952
1953         /* Image object requests don't own their page array */
1954
1955         if (obj_request->type == OBJ_REQUEST_PAGES) {
1956                 obj_request->pages = NULL;
1957                 obj_request->page_count = 0;
1958         }
1959
1960         if (img_request_child_test(img_request)) {
1961                 rbd_assert(img_request->obj_request != NULL);
1962                 more = obj_request->which < img_request->obj_request_count - 1;
1963         } else {
1964                 rbd_assert(img_request->rq != NULL);
1965                 more = blk_end_request(img_request->rq, result, xferred);
1966         }
1967
1968         return more;
1969 }
1970
1971 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1972 {
1973         struct rbd_img_request *img_request;
1974         u32 which = obj_request->which;
1975         bool more = true;
1976
1977         rbd_assert(obj_request_img_data_test(obj_request));
1978         img_request = obj_request->img_request;
1979
1980         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1981         rbd_assert(img_request != NULL);
1982         rbd_assert(img_request->obj_request_count > 0);
1983         rbd_assert(which != BAD_WHICH);
1984         rbd_assert(which < img_request->obj_request_count);
1985         rbd_assert(which >= img_request->next_completion);
1986
1987         spin_lock_irq(&img_request->completion_lock);
1988         if (which != img_request->next_completion)
1989                 goto out;
1990
1991         for_each_obj_request_from(img_request, obj_request) {
1992                 rbd_assert(more);
1993                 rbd_assert(which < img_request->obj_request_count);
1994
1995                 if (!obj_request_done_test(obj_request))
1996                         break;
1997                 more = rbd_img_obj_end_request(obj_request);
1998                 which++;
1999         }
2000
2001         rbd_assert(more ^ (which == img_request->obj_request_count));
2002         img_request->next_completion = which;
2003 out:
2004         spin_unlock_irq(&img_request->completion_lock);
2005
2006         if (!more)
2007                 rbd_img_request_complete(img_request);
2008 }
2009
2010 /*
2011  * Split up an image request into one or more object requests, each
2012  * to a different object.  The "type" parameter indicates whether
2013  * "data_desc" is the pointer to the head of a list of bio
2014  * structures, or the base of a page array.  In either case this
2015  * function assumes data_desc describes memory sufficient to hold
2016  * all data described by the image request.
2017  */
2018 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2019                                         enum obj_request_type type,
2020                                         void *data_desc)
2021 {
2022         struct rbd_device *rbd_dev = img_request->rbd_dev;
2023         struct rbd_obj_request *obj_request = NULL;
2024         struct rbd_obj_request *next_obj_request;
2025         bool write_request = img_request_write_test(img_request);
2026         struct bio *bio_list;
2027         unsigned int bio_offset = 0;
2028         struct page **pages;
2029         u64 img_offset;
2030         u64 resid;
2031         u16 opcode;
2032
2033         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2034                 (int)type, data_desc);
2035
2036         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2037         img_offset = img_request->offset;
2038         resid = img_request->length;
2039         rbd_assert(resid > 0);
2040
2041         if (type == OBJ_REQUEST_BIO) {
2042                 bio_list = data_desc;
2043                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2044         } else {
2045                 rbd_assert(type == OBJ_REQUEST_PAGES);
2046                 pages = data_desc;
2047         }
2048
2049         while (resid) {
2050                 struct ceph_osd_request *osd_req;
2051                 const char *object_name;
2052                 u64 offset;
2053                 u64 length;
2054
2055                 object_name = rbd_segment_name(rbd_dev, img_offset);
2056                 if (!object_name)
2057                         goto out_unwind;
2058                 offset = rbd_segment_offset(rbd_dev, img_offset);
2059                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2060                 obj_request = rbd_obj_request_create(object_name,
2061                                                 offset, length, type);
2062                 /* object request has its own copy of the object name */
2063                 rbd_segment_name_free(object_name);
2064                 if (!obj_request)
2065                         goto out_unwind;
2066
2067                 if (type == OBJ_REQUEST_BIO) {
2068                         unsigned int clone_size;
2069
2070                         rbd_assert(length <= (u64)UINT_MAX);
2071                         clone_size = (unsigned int)length;
2072                         obj_request->bio_list =
2073                                         bio_chain_clone_range(&bio_list,
2074                                                                 &bio_offset,
2075                                                                 clone_size,
2076                                                                 GFP_ATOMIC);
2077                         if (!obj_request->bio_list)
2078                                 goto out_partial;
2079                 } else {
2080                         unsigned int page_count;
2081
2082                         obj_request->pages = pages;
2083                         page_count = (u32)calc_pages_for(offset, length);
2084                         obj_request->page_count = page_count;
2085                         if ((offset + length) & ~PAGE_MASK)
2086                                 page_count--;   /* more on last page */
2087                         pages += page_count;
2088                 }
2089
2090                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2091                                                 obj_request);
2092                 if (!osd_req)
2093                         goto out_partial;
2094                 obj_request->osd_req = osd_req;
2095                 obj_request->callback = rbd_img_obj_callback;
2096
2097                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2098                                                 0, 0);
2099                 if (type == OBJ_REQUEST_BIO)
2100                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2101                                         obj_request->bio_list, length);
2102                 else
2103                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2104                                         obj_request->pages, length,
2105                                         offset & ~PAGE_MASK, false, false);
2106
2107                 if (write_request)
2108                         rbd_osd_req_format_write(obj_request);
2109                 else
2110                         rbd_osd_req_format_read(obj_request);
2111
2112                 obj_request->img_offset = img_offset;
2113                 rbd_img_obj_request_add(img_request, obj_request);
2114
2115                 img_offset += length;
2116                 resid -= length;
2117         }
2118
2119         return 0;
2120
2121 out_partial:
2122         rbd_obj_request_put(obj_request);
2123 out_unwind:
2124         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2125                 rbd_obj_request_put(obj_request);
2126
2127         return -ENOMEM;
2128 }
2129
2130 static void
2131 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2132 {
2133         struct rbd_img_request *img_request;
2134         struct rbd_device *rbd_dev;
2135         struct page **pages;
2136         u32 page_count;
2137
2138         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2139         rbd_assert(obj_request_img_data_test(obj_request));
2140         img_request = obj_request->img_request;
2141         rbd_assert(img_request);
2142
2143         rbd_dev = img_request->rbd_dev;
2144         rbd_assert(rbd_dev);
2145
2146         pages = obj_request->copyup_pages;
2147         rbd_assert(pages != NULL);
2148         obj_request->copyup_pages = NULL;
2149         page_count = obj_request->copyup_page_count;
2150         rbd_assert(page_count);
2151         obj_request->copyup_page_count = 0;
2152         ceph_release_page_vector(pages, page_count);
2153
2154         /*
2155          * We want the transfer count to reflect the size of the
2156          * original write request.  There is no such thing as a
2157          * successful short write, so if the request was successful
2158          * we can just set it to the originally-requested length.
2159          */
2160         if (!obj_request->result)
2161                 obj_request->xferred = obj_request->length;
2162
2163         /* Finish up with the normal image object callback */
2164
2165         rbd_img_obj_callback(obj_request);
2166 }
2167
2168 static void
2169 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2170 {
2171         struct rbd_obj_request *orig_request;
2172         struct ceph_osd_request *osd_req;
2173         struct ceph_osd_client *osdc;
2174         struct rbd_device *rbd_dev;
2175         struct page **pages;
2176         u32 page_count;
2177         int result;
2178         u64 parent_length;
2179         u64 offset;
2180         u64 length;
2181
2182         rbd_assert(img_request_child_test(img_request));
2183
2184         /* First get what we need from the image request */
2185
2186         pages = img_request->copyup_pages;
2187         rbd_assert(pages != NULL);
2188         img_request->copyup_pages = NULL;
2189         page_count = img_request->copyup_page_count;
2190         rbd_assert(page_count);
2191         img_request->copyup_page_count = 0;
2192
2193         orig_request = img_request->obj_request;
2194         rbd_assert(orig_request != NULL);
2195         rbd_assert(obj_request_type_valid(orig_request->type));
2196         result = img_request->result;
2197         parent_length = img_request->length;
2198         rbd_assert(parent_length == img_request->xferred);
2199         rbd_img_request_put(img_request);
2200
2201         rbd_assert(orig_request->img_request);
2202         rbd_dev = orig_request->img_request->rbd_dev;
2203         rbd_assert(rbd_dev);
2204
2205         if (result)
2206                 goto out_err;
2207
2208         /*
2209          * The original osd request is of no use to use any more.
2210          * We need a new one that can hold the two ops in a copyup
2211          * request.  Allocate the new copyup osd request for the
2212          * original request, and release the old one.
2213          */
2214         result = -ENOMEM;
2215         osd_req = rbd_osd_req_create_copyup(orig_request);
2216         if (!osd_req)
2217                 goto out_err;
2218         rbd_osd_req_destroy(orig_request->osd_req);
2219         orig_request->osd_req = osd_req;
2220         orig_request->copyup_pages = pages;
2221         orig_request->copyup_page_count = page_count;
2222
2223         /* Initialize the copyup op */
2224
2225         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2226         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2227                                                 false, false);
2228
2229         /* Then the original write request op */
2230
2231         offset = orig_request->offset;
2232         length = orig_request->length;
2233         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2234                                         offset, length, 0, 0);
2235         if (orig_request->type == OBJ_REQUEST_BIO)
2236                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2237                                         orig_request->bio_list, length);
2238         else
2239                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2240                                         orig_request->pages, length,
2241                                         offset & ~PAGE_MASK, false, false);
2242
2243         rbd_osd_req_format_write(orig_request);
2244
2245         /* All set, send it off. */
2246
2247         orig_request->callback = rbd_img_obj_copyup_callback;
2248         osdc = &rbd_dev->rbd_client->client->osdc;
2249         result = rbd_obj_request_submit(osdc, orig_request);
2250         if (!result)
2251                 return;
2252 out_err:
2253         /* Record the error code and complete the request */
2254
2255         orig_request->result = result;
2256         orig_request->xferred = 0;
2257         obj_request_done_set(orig_request);
2258         rbd_obj_request_complete(orig_request);
2259 }
2260
2261 /*
2262  * Read from the parent image the range of data that covers the
2263  * entire target of the given object request.  This is used for
2264  * satisfying a layered image write request when the target of an
2265  * object request from the image request does not exist.
2266  *
2267  * A page array big enough to hold the returned data is allocated
2268  * and supplied to rbd_img_request_fill() as the "data descriptor."
2269  * When the read completes, this page array will be transferred to
2270  * the original object request for the copyup operation.
2271  *
2272  * If an error occurs, record it as the result of the original
2273  * object request and mark it done so it gets completed.
2274  */
2275 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2276 {
2277         struct rbd_img_request *img_request = NULL;
2278         struct rbd_img_request *parent_request = NULL;
2279         struct rbd_device *rbd_dev;
2280         u64 img_offset;
2281         u64 length;
2282         struct page **pages = NULL;
2283         u32 page_count;
2284         int result;
2285
2286         rbd_assert(obj_request_img_data_test(obj_request));
2287         rbd_assert(obj_request_type_valid(obj_request->type));
2288
2289         img_request = obj_request->img_request;
2290         rbd_assert(img_request != NULL);
2291         rbd_dev = img_request->rbd_dev;
2292         rbd_assert(rbd_dev->parent != NULL);
2293
2294         /*
2295          * Determine the byte range covered by the object in the
2296          * child image to which the original request was to be sent.
2297          */
2298         img_offset = obj_request->img_offset - obj_request->offset;
2299         length = (u64)1 << rbd_dev->header.obj_order;
2300
2301         /*
2302          * There is no defined parent data beyond the parent
2303          * overlap, so limit what we read at that boundary if
2304          * necessary.
2305          */
2306         if (img_offset + length > rbd_dev->parent_overlap) {
2307                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2308                 length = rbd_dev->parent_overlap - img_offset;
2309         }
2310
2311         /*
2312          * Allocate a page array big enough to receive the data read
2313          * from the parent.
2314          */
2315         page_count = (u32)calc_pages_for(0, length);
2316         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2317         if (IS_ERR(pages)) {
2318                 result = PTR_ERR(pages);
2319                 pages = NULL;
2320                 goto out_err;
2321         }
2322
2323         result = -ENOMEM;
2324         parent_request = rbd_img_request_create(rbd_dev->parent,
2325                                                 img_offset, length,
2326                                                 false, true);
2327         if (!parent_request)
2328                 goto out_err;
2329         rbd_obj_request_get(obj_request);
2330         parent_request->obj_request = obj_request;
2331
2332         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2333         if (result)
2334                 goto out_err;
2335         parent_request->copyup_pages = pages;
2336         parent_request->copyup_page_count = page_count;
2337
2338         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2339         result = rbd_img_request_submit(parent_request);
2340         if (!result)
2341                 return 0;
2342
2343         parent_request->copyup_pages = NULL;
2344         parent_request->copyup_page_count = 0;
2345         parent_request->obj_request = NULL;
2346         rbd_obj_request_put(obj_request);
2347 out_err:
2348         if (pages)
2349                 ceph_release_page_vector(pages, page_count);
2350         if (parent_request)
2351                 rbd_img_request_put(parent_request);
2352         obj_request->result = result;
2353         obj_request->xferred = 0;
2354         obj_request_done_set(obj_request);
2355
2356         return result;
2357 }
2358
2359 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2360 {
2361         struct rbd_obj_request *orig_request;
2362         int result;
2363
2364         rbd_assert(!obj_request_img_data_test(obj_request));
2365
2366         /*
2367          * All we need from the object request is the original
2368          * request and the result of the STAT op.  Grab those, then
2369          * we're done with the request.
2370          */
2371         orig_request = obj_request->obj_request;
2372         obj_request->obj_request = NULL;
2373         rbd_assert(orig_request);
2374         rbd_assert(orig_request->img_request);
2375
2376         result = obj_request->result;
2377         obj_request->result = 0;
2378
2379         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2380                 obj_request, orig_request, result,
2381                 obj_request->xferred, obj_request->length);
2382         rbd_obj_request_put(obj_request);
2383
2384         rbd_assert(orig_request);
2385         rbd_assert(orig_request->img_request);
2386
2387         /*
2388          * Our only purpose here is to determine whether the object
2389          * exists, and we don't want to treat the non-existence as
2390          * an error.  If something else comes back, transfer the
2391          * error to the original request and complete it now.
2392          */
2393         if (!result) {
2394                 obj_request_existence_set(orig_request, true);
2395         } else if (result == -ENOENT) {
2396                 obj_request_existence_set(orig_request, false);
2397         } else if (result) {
2398                 orig_request->result = result;
2399                 goto out;
2400         }
2401
2402         /*
2403          * Resubmit the original request now that we have recorded
2404          * whether the target object exists.
2405          */
2406         orig_request->result = rbd_img_obj_request_submit(orig_request);
2407 out:
2408         if (orig_request->result)
2409                 rbd_obj_request_complete(orig_request);
2410         rbd_obj_request_put(orig_request);
2411 }
2412
2413 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2414 {
2415         struct rbd_obj_request *stat_request;
2416         struct rbd_device *rbd_dev;
2417         struct ceph_osd_client *osdc;
2418         struct page **pages = NULL;
2419         u32 page_count;
2420         size_t size;
2421         int ret;
2422
2423         /*
2424          * The response data for a STAT call consists of:
2425          *     le64 length;
2426          *     struct {
2427          *         le32 tv_sec;
2428          *         le32 tv_nsec;
2429          *     } mtime;
2430          */
2431         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2432         page_count = (u32)calc_pages_for(0, size);
2433         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2434         if (IS_ERR(pages))
2435                 return PTR_ERR(pages);
2436
2437         ret = -ENOMEM;
2438         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2439                                                         OBJ_REQUEST_PAGES);
2440         if (!stat_request)
2441                 goto out;
2442
2443         rbd_obj_request_get(obj_request);
2444         stat_request->obj_request = obj_request;
2445         stat_request->pages = pages;
2446         stat_request->page_count = page_count;
2447
2448         rbd_assert(obj_request->img_request);
2449         rbd_dev = obj_request->img_request->rbd_dev;
2450         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2451                                                 stat_request);
2452         if (!stat_request->osd_req)
2453                 goto out;
2454         stat_request->callback = rbd_img_obj_exists_callback;
2455
2456         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2457         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2458                                         false, false);
2459         rbd_osd_req_format_read(stat_request);
2460
2461         osdc = &rbd_dev->rbd_client->client->osdc;
2462         ret = rbd_obj_request_submit(osdc, stat_request);
2463 out:
2464         if (ret)
2465                 rbd_obj_request_put(obj_request);
2466
2467         return ret;
2468 }
2469
2470 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2471 {
2472         struct rbd_img_request *img_request;
2473         struct rbd_device *rbd_dev;
2474         bool known;
2475
2476         rbd_assert(obj_request_img_data_test(obj_request));
2477
2478         img_request = obj_request->img_request;
2479         rbd_assert(img_request);
2480         rbd_dev = img_request->rbd_dev;
2481
2482         /*
2483          * Only writes to layered images need special handling.
2484          * Reads and non-layered writes are simple object requests.
2485          * Layered writes that start beyond the end of the overlap
2486          * with the parent have no parent data, so they too are
2487          * simple object requests.  Finally, if the target object is
2488          * known to already exist, its parent data has already been
2489          * copied, so a write to the object can also be handled as a
2490          * simple object request.
2491          */
2492         if (!img_request_write_test(img_request) ||
2493                 !img_request_layered_test(img_request) ||
2494                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2495                 ((known = obj_request_known_test(obj_request)) &&
2496                         obj_request_exists_test(obj_request))) {
2497
2498                 struct rbd_device *rbd_dev;
2499                 struct ceph_osd_client *osdc;
2500
2501                 rbd_dev = obj_request->img_request->rbd_dev;
2502                 osdc = &rbd_dev->rbd_client->client->osdc;
2503
2504                 return rbd_obj_request_submit(osdc, obj_request);
2505         }
2506
2507         /*
2508          * It's a layered write.  The target object might exist but
2509          * we may not know that yet.  If we know it doesn't exist,
2510          * start by reading the data for the full target object from
2511          * the parent so we can use it for a copyup to the target.
2512          */
2513         if (known)
2514                 return rbd_img_obj_parent_read_full(obj_request);
2515
2516         /* We don't know whether the target exists.  Go find out. */
2517
2518         return rbd_img_obj_exists_submit(obj_request);
2519 }
2520
2521 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2522 {
2523         struct rbd_obj_request *obj_request;
2524         struct rbd_obj_request *next_obj_request;
2525
2526         dout("%s: img %p\n", __func__, img_request);
2527         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2528                 int ret;
2529
2530                 ret = rbd_img_obj_request_submit(obj_request);
2531                 if (ret)
2532                         return ret;
2533         }
2534
2535         return 0;
2536 }
2537
2538 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2539 {
2540         struct rbd_obj_request *obj_request;
2541         struct rbd_device *rbd_dev;
2542         u64 obj_end;
2543
2544         rbd_assert(img_request_child_test(img_request));
2545
2546         obj_request = img_request->obj_request;
2547         rbd_assert(obj_request);
2548         rbd_assert(obj_request->img_request);
2549
2550         obj_request->result = img_request->result;
2551         if (obj_request->result)
2552                 goto out;
2553
2554         /*
2555          * We need to zero anything beyond the parent overlap
2556          * boundary.  Since rbd_img_obj_request_read_callback()
2557          * will zero anything beyond the end of a short read, an
2558          * easy way to do this is to pretend the data from the
2559          * parent came up short--ending at the overlap boundary.
2560          */
2561         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2562         obj_end = obj_request->img_offset + obj_request->length;
2563         rbd_dev = obj_request->img_request->rbd_dev;
2564         if (obj_end > rbd_dev->parent_overlap) {
2565                 u64 xferred = 0;
2566
2567                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2568                         xferred = rbd_dev->parent_overlap -
2569                                         obj_request->img_offset;
2570
2571                 obj_request->xferred = min(img_request->xferred, xferred);
2572         } else {
2573                 obj_request->xferred = img_request->xferred;
2574         }
2575 out:
2576         rbd_img_request_put(img_request);
2577         rbd_img_obj_request_read_callback(obj_request);
2578         rbd_obj_request_complete(obj_request);
2579 }
2580
2581 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2582 {
2583         struct rbd_device *rbd_dev;
2584         struct rbd_img_request *img_request;
2585         int result;
2586
2587         rbd_assert(obj_request_img_data_test(obj_request));
2588         rbd_assert(obj_request->img_request != NULL);
2589         rbd_assert(obj_request->result == (s32) -ENOENT);
2590         rbd_assert(obj_request_type_valid(obj_request->type));
2591
2592         rbd_dev = obj_request->img_request->rbd_dev;
2593         rbd_assert(rbd_dev->parent != NULL);
2594         /* rbd_read_finish(obj_request, obj_request->length); */
2595         img_request = rbd_img_request_create(rbd_dev->parent,
2596                                                 obj_request->img_offset,
2597                                                 obj_request->length,
2598                                                 false, true);
2599         result = -ENOMEM;
2600         if (!img_request)
2601                 goto out_err;
2602
2603         rbd_obj_request_get(obj_request);
2604         img_request->obj_request = obj_request;
2605
2606         if (obj_request->type == OBJ_REQUEST_BIO)
2607                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2608                                                 obj_request->bio_list);
2609         else
2610                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2611                                                 obj_request->pages);
2612         if (result)
2613                 goto out_err;
2614
2615         img_request->callback = rbd_img_parent_read_callback;
2616         result = rbd_img_request_submit(img_request);
2617         if (result)
2618                 goto out_err;
2619
2620         return;
2621 out_err:
2622         if (img_request)
2623                 rbd_img_request_put(img_request);
2624         obj_request->result = result;
2625         obj_request->xferred = 0;
2626         obj_request_done_set(obj_request);
2627 }
2628
2629 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2630 {
2631         struct rbd_obj_request *obj_request;
2632         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2633         int ret;
2634
2635         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2636                                                         OBJ_REQUEST_NODATA);
2637         if (!obj_request)
2638                 return -ENOMEM;
2639
2640         ret = -ENOMEM;
2641         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2642         if (!obj_request->osd_req)
2643                 goto out;
2644         obj_request->callback = rbd_obj_request_put;
2645
2646         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2647                                         notify_id, 0, 0);
2648         rbd_osd_req_format_read(obj_request);
2649
2650         ret = rbd_obj_request_submit(osdc, obj_request);
2651 out:
2652         if (ret)
2653                 rbd_obj_request_put(obj_request);
2654
2655         return ret;
2656 }
2657
2658 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2659 {
2660         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2661         int ret;
2662
2663         if (!rbd_dev)
2664                 return;
2665
2666         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2667                 rbd_dev->header_name, (unsigned long long)notify_id,
2668                 (unsigned int)opcode);
2669         ret = rbd_dev_refresh(rbd_dev);
2670         if (ret)
2671                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2672
2673         rbd_obj_notify_ack(rbd_dev, notify_id);
2674 }
2675
2676 /*
2677  * Request sync osd watch/unwatch.  The value of "start" determines
2678  * whether a watch request is being initiated or torn down.
2679  */
2680 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2681 {
2682         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2683         struct rbd_obj_request *obj_request;
2684         int ret;
2685
2686         rbd_assert(start ^ !!rbd_dev->watch_event);
2687         rbd_assert(start ^ !!rbd_dev->watch_request);
2688
2689         if (start) {
2690                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2691                                                 &rbd_dev->watch_event);
2692                 if (ret < 0)
2693                         return ret;
2694                 rbd_assert(rbd_dev->watch_event != NULL);
2695         }
2696
2697         ret = -ENOMEM;
2698         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2699                                                         OBJ_REQUEST_NODATA);
2700         if (!obj_request)
2701                 goto out_cancel;
2702
2703         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2704         if (!obj_request->osd_req)
2705                 goto out_cancel;
2706
2707         if (start)
2708                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2709         else
2710                 ceph_osdc_unregister_linger_request(osdc,
2711                                         rbd_dev->watch_request->osd_req);
2712
2713         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2714                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2715         rbd_osd_req_format_write(obj_request);
2716
2717         ret = rbd_obj_request_submit(osdc, obj_request);
2718         if (ret)
2719                 goto out_cancel;
2720         ret = rbd_obj_request_wait(obj_request);
2721         if (ret)
2722                 goto out_cancel;
2723         ret = obj_request->result;
2724         if (ret)
2725                 goto out_cancel;
2726
2727         /*
2728          * A watch request is set to linger, so the underlying osd
2729          * request won't go away until we unregister it.  We retain
2730          * a pointer to the object request during that time (in
2731          * rbd_dev->watch_request), so we'll keep a reference to
2732          * it.  We'll drop that reference (below) after we've
2733          * unregistered it.
2734          */
2735         if (start) {
2736                 rbd_dev->watch_request = obj_request;
2737
2738                 return 0;
2739         }
2740
2741         /* We have successfully torn down the watch request */
2742
2743         rbd_obj_request_put(rbd_dev->watch_request);
2744         rbd_dev->watch_request = NULL;
2745 out_cancel:
2746         /* Cancel the event if we're tearing down, or on error */
2747         ceph_osdc_cancel_event(rbd_dev->watch_event);
2748         rbd_dev->watch_event = NULL;
2749         if (obj_request)
2750                 rbd_obj_request_put(obj_request);
2751
2752         return ret;
2753 }
2754
2755 /*
2756  * Synchronous osd object method call.  Returns the number of bytes
2757  * returned in the outbound buffer, or a negative error code.
2758  */
2759 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2760                              const char *object_name,
2761                              const char *class_name,
2762                              const char *method_name,
2763                              const void *outbound,
2764                              size_t outbound_size,
2765                              void *inbound,
2766                              size_t inbound_size)
2767 {
2768         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2769         struct rbd_obj_request *obj_request;
2770         struct page **pages;
2771         u32 page_count;
2772         int ret;
2773
2774         /*
2775          * Method calls are ultimately read operations.  The result
2776          * should placed into the inbound buffer provided.  They
2777          * also supply outbound data--parameters for the object
2778          * method.  Currently if this is present it will be a
2779          * snapshot id.
2780          */
2781         page_count = (u32)calc_pages_for(0, inbound_size);
2782         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2783         if (IS_ERR(pages))
2784                 return PTR_ERR(pages);
2785
2786         ret = -ENOMEM;
2787         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2788                                                         OBJ_REQUEST_PAGES);
2789         if (!obj_request)
2790                 goto out;
2791
2792         obj_request->pages = pages;
2793         obj_request->page_count = page_count;
2794
2795         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2796         if (!obj_request->osd_req)
2797                 goto out;
2798
2799         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2800                                         class_name, method_name);
2801         if (outbound_size) {
2802                 struct ceph_pagelist *pagelist;
2803
2804                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2805                 if (!pagelist)
2806                         goto out;
2807
2808                 ceph_pagelist_init(pagelist);
2809                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2810                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2811                                                 pagelist);
2812         }
2813         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2814                                         obj_request->pages, inbound_size,
2815                                         0, false, false);
2816         rbd_osd_req_format_read(obj_request);
2817
2818         ret = rbd_obj_request_submit(osdc, obj_request);
2819         if (ret)
2820                 goto out;
2821         ret = rbd_obj_request_wait(obj_request);
2822         if (ret)
2823                 goto out;
2824
2825         ret = obj_request->result;
2826         if (ret < 0)
2827                 goto out;
2828
2829         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2830         ret = (int)obj_request->xferred;
2831         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2832 out:
2833         if (obj_request)
2834                 rbd_obj_request_put(obj_request);
2835         else
2836                 ceph_release_page_vector(pages, page_count);
2837
2838         return ret;
2839 }
2840
2841 static void rbd_request_fn(struct request_queue *q)
2842                 __releases(q->queue_lock) __acquires(q->queue_lock)
2843 {
2844         struct rbd_device *rbd_dev = q->queuedata;
2845         bool read_only = rbd_dev->mapping.read_only;
2846         struct request *rq;
2847         int result;
2848
2849         while ((rq = blk_fetch_request(q))) {
2850                 bool write_request = rq_data_dir(rq) == WRITE;
2851                 struct rbd_img_request *img_request;
2852                 u64 offset;
2853                 u64 length;
2854
2855                 /* Ignore any non-FS requests that filter through. */
2856
2857                 if (rq->cmd_type != REQ_TYPE_FS) {
2858                         dout("%s: non-fs request type %d\n", __func__,
2859                                 (int) rq->cmd_type);
2860                         __blk_end_request_all(rq, 0);
2861                         continue;
2862                 }
2863
2864                 /* Ignore/skip any zero-length requests */
2865
2866                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2867                 length = (u64) blk_rq_bytes(rq);
2868
2869                 if (!length) {
2870                         dout("%s: zero-length request\n", __func__);
2871                         __blk_end_request_all(rq, 0);
2872                         continue;
2873                 }
2874
2875                 spin_unlock_irq(q->queue_lock);
2876
2877                 /* Disallow writes to a read-only device */
2878
2879                 if (write_request) {
2880                         result = -EROFS;
2881                         if (read_only)
2882                                 goto end_request;
2883                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2884                 }
2885
2886                 /*
2887                  * Quit early if the mapped snapshot no longer
2888                  * exists.  It's still possible the snapshot will
2889                  * have disappeared by the time our request arrives
2890                  * at the osd, but there's no sense in sending it if
2891                  * we already know.
2892                  */
2893                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2894                         dout("request for non-existent snapshot");
2895                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2896                         result = -ENXIO;
2897                         goto end_request;
2898                 }
2899
2900                 result = -EINVAL;
2901                 if (offset && length > U64_MAX - offset + 1) {
2902                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2903                                 offset, length);
2904                         goto end_request;       /* Shouldn't happen */
2905                 }
2906
2907                 result = -EIO;
2908                 if (offset + length > rbd_dev->mapping.size) {
2909                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2910                                 offset, length, rbd_dev->mapping.size);
2911                         goto end_request;
2912                 }
2913
2914                 result = -ENOMEM;
2915                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2916                                                         write_request, false);
2917                 if (!img_request)
2918                         goto end_request;
2919
2920                 img_request->rq = rq;
2921
2922                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2923                                                 rq->bio);
2924                 if (!result)
2925                         result = rbd_img_request_submit(img_request);
2926                 if (result)
2927                         rbd_img_request_put(img_request);
2928 end_request:
2929                 spin_lock_irq(q->queue_lock);
2930                 if (result < 0) {
2931                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2932                                 write_request ? "write" : "read",
2933                                 length, offset, result);
2934
2935                         __blk_end_request_all(rq, result);
2936                 }
2937         }
2938 }
2939
2940 /*
2941  * a queue callback. Makes sure that we don't create a bio that spans across
2942  * multiple osd objects. One exception would be with a single page bios,
2943  * which we handle later at bio_chain_clone_range()
2944  */
2945 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2946                           struct bio_vec *bvec)
2947 {
2948         struct rbd_device *rbd_dev = q->queuedata;
2949         sector_t sector_offset;
2950         sector_t sectors_per_obj;
2951         sector_t obj_sector_offset;
2952         int ret;
2953
2954         /*
2955          * Find how far into its rbd object the partition-relative
2956          * bio start sector is to offset relative to the enclosing
2957          * device.
2958          */
2959         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2960         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2961         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2962
2963         /*
2964          * Compute the number of bytes from that offset to the end
2965          * of the object.  Account for what's already used by the bio.
2966          */
2967         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2968         if (ret > bmd->bi_size)
2969                 ret -= bmd->bi_size;
2970         else
2971                 ret = 0;
2972
2973         /*
2974          * Don't send back more than was asked for.  And if the bio
2975          * was empty, let the whole thing through because:  "Note
2976          * that a block device *must* allow a single page to be
2977          * added to an empty bio."
2978          */
2979         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2980         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2981                 ret = (int) bvec->bv_len;
2982
2983         return ret;
2984 }
2985
2986 static void rbd_free_disk(struct rbd_device *rbd_dev)
2987 {
2988         struct gendisk *disk = rbd_dev->disk;
2989
2990         if (!disk)
2991                 return;
2992
2993         rbd_dev->disk = NULL;
2994         if (disk->flags & GENHD_FL_UP) {
2995                 del_gendisk(disk);
2996                 if (disk->queue)
2997                         blk_cleanup_queue(disk->queue);
2998         }
2999         put_disk(disk);
3000 }
3001
3002 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3003                                 const char *object_name,
3004                                 u64 offset, u64 length, void *buf)
3005
3006 {
3007         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3008         struct rbd_obj_request *obj_request;
3009         struct page **pages = NULL;
3010         u32 page_count;
3011         size_t size;
3012         int ret;
3013
3014         page_count = (u32) calc_pages_for(offset, length);
3015         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3016         if (IS_ERR(pages))
3017                 ret = PTR_ERR(pages);
3018
3019         ret = -ENOMEM;
3020         obj_request = rbd_obj_request_create(object_name, offset, length,
3021                                                         OBJ_REQUEST_PAGES);
3022         if (!obj_request)
3023                 goto out;
3024
3025         obj_request->pages = pages;
3026         obj_request->page_count = page_count;
3027
3028         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3029         if (!obj_request->osd_req)
3030                 goto out;
3031
3032         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3033                                         offset, length, 0, 0);
3034         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3035                                         obj_request->pages,
3036                                         obj_request->length,
3037                                         obj_request->offset & ~PAGE_MASK,
3038                                         false, false);
3039         rbd_osd_req_format_read(obj_request);
3040
3041         ret = rbd_obj_request_submit(osdc, obj_request);
3042         if (ret)
3043                 goto out;
3044         ret = rbd_obj_request_wait(obj_request);
3045         if (ret)
3046                 goto out;
3047
3048         ret = obj_request->result;
3049         if (ret < 0)
3050                 goto out;
3051
3052         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3053         size = (size_t) obj_request->xferred;
3054         ceph_copy_from_page_vector(pages, buf, 0, size);
3055         rbd_assert(size <= (size_t)INT_MAX);
3056         ret = (int)size;
3057 out:
3058         if (obj_request)
3059                 rbd_obj_request_put(obj_request);
3060         else
3061                 ceph_release_page_vector(pages, page_count);
3062
3063         return ret;
3064 }
3065
3066 /*
3067  * Read the complete header for the given rbd device.  On successful
3068  * return, the rbd_dev->header field will contain up-to-date
3069  * information about the image.
3070  */
3071 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3072 {
3073         struct rbd_image_header_ondisk *ondisk = NULL;
3074         u32 snap_count = 0;
3075         u64 names_size = 0;
3076         u32 want_count;
3077         int ret;
3078
3079         /*
3080          * The complete header will include an array of its 64-bit
3081          * snapshot ids, followed by the names of those snapshots as
3082          * a contiguous block of NUL-terminated strings.  Note that
3083          * the number of snapshots could change by the time we read
3084          * it in, in which case we re-read it.
3085          */
3086         do {
3087                 size_t size;
3088
3089                 kfree(ondisk);
3090
3091                 size = sizeof (*ondisk);
3092                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3093                 size += names_size;
3094                 ondisk = kmalloc(size, GFP_KERNEL);
3095                 if (!ondisk)
3096                         return -ENOMEM;
3097
3098                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3099                                        0, size, ondisk);
3100                 if (ret < 0)
3101                         goto out;
3102                 if ((size_t)ret < size) {
3103                         ret = -ENXIO;
3104                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3105                                 size, ret);
3106                         goto out;
3107                 }
3108                 if (!rbd_dev_ondisk_valid(ondisk)) {
3109                         ret = -ENXIO;
3110                         rbd_warn(rbd_dev, "invalid header");
3111                         goto out;
3112                 }
3113
3114                 names_size = le64_to_cpu(ondisk->snap_names_len);
3115                 want_count = snap_count;
3116                 snap_count = le32_to_cpu(ondisk->snap_count);
3117         } while (snap_count != want_count);
3118
3119         ret = rbd_header_from_disk(rbd_dev, ondisk);
3120 out:
3121         kfree(ondisk);
3122
3123         return ret;
3124 }
3125
3126 /*
3127  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3128  * has disappeared from the (just updated) snapshot context.
3129  */
3130 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3131 {
3132         u64 snap_id;
3133
3134         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3135                 return;
3136
3137         snap_id = rbd_dev->spec->snap_id;
3138         if (snap_id == CEPH_NOSNAP)
3139                 return;
3140
3141         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3142                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3143 }
3144
3145 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3146 {
3147         u64 mapping_size;
3148         int ret;
3149
3150         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3151         mapping_size = rbd_dev->mapping.size;
3152         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3153         if (rbd_dev->image_format == 1)
3154                 ret = rbd_dev_v1_header_info(rbd_dev);
3155         else
3156                 ret = rbd_dev_v2_header_info(rbd_dev);
3157
3158         /* If it's a mapped snapshot, validate its EXISTS flag */
3159
3160         rbd_exists_validate(rbd_dev);
3161         mutex_unlock(&ctl_mutex);
3162         if (mapping_size != rbd_dev->mapping.size) {
3163                 sector_t size;
3164
3165                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3166                 dout("setting size to %llu sectors", (unsigned long long)size);
3167                 set_capacity(rbd_dev->disk, size);
3168                 revalidate_disk(rbd_dev->disk);
3169         }
3170
3171         return ret;
3172 }
3173
3174 static int rbd_init_disk(struct rbd_device *rbd_dev)
3175 {
3176         struct gendisk *disk;
3177         struct request_queue *q;
3178         u64 segment_size;
3179
3180         /* create gendisk info */
3181         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3182         if (!disk)
3183                 return -ENOMEM;
3184
3185         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3186                  rbd_dev->dev_id);
3187         disk->major = rbd_dev->major;
3188         disk->first_minor = 0;
3189         disk->fops = &rbd_bd_ops;
3190         disk->private_data = rbd_dev;
3191
3192         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3193         if (!q)
3194                 goto out_disk;
3195
3196         /* We use the default size, but let's be explicit about it. */
3197         blk_queue_physical_block_size(q, SECTOR_SIZE);
3198
3199         /* set io sizes to object size */
3200         segment_size = rbd_obj_bytes(&rbd_dev->header);
3201         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3202         blk_queue_max_segment_size(q, segment_size);
3203         blk_queue_io_min(q, segment_size);
3204         blk_queue_io_opt(q, segment_size);
3205
3206         blk_queue_merge_bvec(q, rbd_merge_bvec);
3207         disk->queue = q;
3208
3209         q->queuedata = rbd_dev;
3210
3211         rbd_dev->disk = disk;
3212
3213         return 0;
3214 out_disk:
3215         put_disk(disk);
3216
3217         return -ENOMEM;
3218 }
3219
3220 /*
3221   sysfs
3222 */
3223
3224 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3225 {
3226         return container_of(dev, struct rbd_device, dev);
3227 }
3228
3229 static ssize_t rbd_size_show(struct device *dev,
3230                              struct device_attribute *attr, char *buf)
3231 {
3232         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233
3234         return sprintf(buf, "%llu\n",
3235                 (unsigned long long)rbd_dev->mapping.size);
3236 }
3237
3238 /*
3239  * Note this shows the features for whatever's mapped, which is not
3240  * necessarily the base image.
3241  */
3242 static ssize_t rbd_features_show(struct device *dev,
3243                              struct device_attribute *attr, char *buf)
3244 {
3245         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3246
3247         return sprintf(buf, "0x%016llx\n",
3248                         (unsigned long long)rbd_dev->mapping.features);
3249 }
3250
3251 static ssize_t rbd_major_show(struct device *dev,
3252                               struct device_attribute *attr, char *buf)
3253 {
3254         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3255
3256         if (rbd_dev->major)
3257                 return sprintf(buf, "%d\n", rbd_dev->major);
3258
3259         return sprintf(buf, "(none)\n");
3260
3261 }
3262
3263 static ssize_t rbd_client_id_show(struct device *dev,
3264                                   struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         return sprintf(buf, "client%lld\n",
3269                         ceph_client_id(rbd_dev->rbd_client->client));
3270 }
3271
3272 static ssize_t rbd_pool_show(struct device *dev,
3273                              struct device_attribute *attr, char *buf)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276
3277         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3278 }
3279
3280 static ssize_t rbd_pool_id_show(struct device *dev,
3281                              struct device_attribute *attr, char *buf)
3282 {
3283         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3284
3285         return sprintf(buf, "%llu\n",
3286                         (unsigned long long) rbd_dev->spec->pool_id);
3287 }
3288
3289 static ssize_t rbd_name_show(struct device *dev,
3290                              struct device_attribute *attr, char *buf)
3291 {
3292         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3293
3294         if (rbd_dev->spec->image_name)
3295                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3296
3297         return sprintf(buf, "(unknown)\n");
3298 }
3299
3300 static ssize_t rbd_image_id_show(struct device *dev,
3301                              struct device_attribute *attr, char *buf)
3302 {
3303         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3304
3305         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3306 }
3307
3308 /*
3309  * Shows the name of the currently-mapped snapshot (or
3310  * RBD_SNAP_HEAD_NAME for the base image).
3311  */
3312 static ssize_t rbd_snap_show(struct device *dev,
3313                              struct device_attribute *attr,
3314                              char *buf)
3315 {
3316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3317
3318         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3319 }
3320
3321 /*
3322  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3323  * for the parent image.  If there is no parent, simply shows
3324  * "(no parent image)".
3325  */
3326 static ssize_t rbd_parent_show(struct device *dev,
3327                              struct device_attribute *attr,
3328                              char *buf)
3329 {
3330         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3331         struct rbd_spec *spec = rbd_dev->parent_spec;
3332         int count;
3333         char *bufp = buf;
3334
3335         if (!spec)
3336                 return sprintf(buf, "(no parent image)\n");
3337
3338         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3339                         (unsigned long long) spec->pool_id, spec->pool_name);
3340         if (count < 0)
3341                 return count;
3342         bufp += count;
3343
3344         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3345                         spec->image_name ? spec->image_name : "(unknown)");
3346         if (count < 0)
3347                 return count;
3348         bufp += count;
3349
3350         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3351                         (unsigned long long) spec->snap_id, spec->snap_name);
3352         if (count < 0)
3353                 return count;
3354         bufp += count;
3355
3356         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3357         if (count < 0)
3358                 return count;
3359         bufp += count;
3360
3361         return (ssize_t) (bufp - buf);
3362 }
3363
3364 static ssize_t rbd_image_refresh(struct device *dev,
3365                                  struct device_attribute *attr,
3366                                  const char *buf,
3367                                  size_t size)
3368 {
3369         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3370         int ret;
3371
3372         ret = rbd_dev_refresh(rbd_dev);
3373         if (ret)
3374                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3375
3376         return ret < 0 ? ret : size;
3377 }
3378
3379 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3380 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3381 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3382 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3383 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3384 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3385 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3386 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3387 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3388 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3389 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3390
3391 static struct attribute *rbd_attrs[] = {
3392         &dev_attr_size.attr,
3393         &dev_attr_features.attr,
3394         &dev_attr_major.attr,
3395         &dev_attr_client_id.attr,
3396         &dev_attr_pool.attr,
3397         &dev_attr_pool_id.attr,
3398         &dev_attr_name.attr,
3399         &dev_attr_image_id.attr,
3400         &dev_attr_current_snap.attr,
3401         &dev_attr_parent.attr,
3402         &dev_attr_refresh.attr,
3403         NULL
3404 };
3405
3406 static struct attribute_group rbd_attr_group = {
3407         .attrs = rbd_attrs,
3408 };
3409
3410 static const struct attribute_group *rbd_attr_groups[] = {
3411         &rbd_attr_group,
3412         NULL
3413 };
3414
3415 static void rbd_sysfs_dev_release(struct device *dev)
3416 {
3417 }
3418
3419 static struct device_type rbd_device_type = {
3420         .name           = "rbd",
3421         .groups         = rbd_attr_groups,
3422         .release        = rbd_sysfs_dev_release,
3423 };
3424
3425 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3426 {
3427         kref_get(&spec->kref);
3428
3429         return spec;
3430 }
3431
3432 static void rbd_spec_free(struct kref *kref);
3433 static void rbd_spec_put(struct rbd_spec *spec)
3434 {
3435         if (spec)
3436                 kref_put(&spec->kref, rbd_spec_free);
3437 }
3438
3439 static struct rbd_spec *rbd_spec_alloc(void)
3440 {
3441         struct rbd_spec *spec;
3442
3443         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3444         if (!spec)
3445                 return NULL;
3446         kref_init(&spec->kref);
3447
3448         return spec;
3449 }
3450
3451 static void rbd_spec_free(struct kref *kref)
3452 {
3453         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3454
3455         kfree(spec->pool_name);
3456         kfree(spec->image_id);
3457         kfree(spec->image_name);
3458         kfree(spec->snap_name);
3459         kfree(spec);
3460 }
3461
3462 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3463                                 struct rbd_spec *spec)
3464 {
3465         struct rbd_device *rbd_dev;
3466
3467         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3468         if (!rbd_dev)
3469                 return NULL;
3470
3471         spin_lock_init(&rbd_dev->lock);
3472         rbd_dev->flags = 0;
3473         INIT_LIST_HEAD(&rbd_dev->node);
3474         init_rwsem(&rbd_dev->header_rwsem);
3475
3476         rbd_dev->spec = spec;
3477         rbd_dev->rbd_client = rbdc;
3478
3479         /* Initialize the layout used for all rbd requests */
3480
3481         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3482         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3483         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3484         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3485
3486         return rbd_dev;
3487 }
3488
3489 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3490 {
3491         rbd_put_client(rbd_dev->rbd_client);
3492         rbd_spec_put(rbd_dev->spec);
3493         kfree(rbd_dev);
3494 }
3495
3496 /*
3497  * Get the size and object order for an image snapshot, or if
3498  * snap_id is CEPH_NOSNAP, gets this information for the base
3499  * image.
3500  */
3501 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3502                                 u8 *order, u64 *snap_size)
3503 {
3504         __le64 snapid = cpu_to_le64(snap_id);
3505         int ret;
3506         struct {
3507                 u8 order;
3508                 __le64 size;
3509         } __attribute__ ((packed)) size_buf = { 0 };
3510
3511         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3512                                 "rbd", "get_size",
3513                                 &snapid, sizeof (snapid),
3514                                 &size_buf, sizeof (size_buf));
3515         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3516         if (ret < 0)
3517                 return ret;
3518         if (ret < sizeof (size_buf))
3519                 return -ERANGE;
3520
3521         if (order)
3522                 *order = size_buf.order;
3523         *snap_size = le64_to_cpu(size_buf.size);
3524
3525         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3526                 (unsigned long long)snap_id, (unsigned int)*order,
3527                 (unsigned long long)*snap_size);
3528
3529         return 0;
3530 }
3531
3532 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3533 {
3534         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3535                                         &rbd_dev->header.obj_order,
3536                                         &rbd_dev->header.image_size);
3537 }
3538
3539 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3540 {
3541         void *reply_buf;
3542         int ret;
3543         void *p;
3544
3545         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3546         if (!reply_buf)
3547                 return -ENOMEM;
3548
3549         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3550                                 "rbd", "get_object_prefix", NULL, 0,
3551                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3552         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3553         if (ret < 0)
3554                 goto out;
3555
3556         p = reply_buf;
3557         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3558                                                 p + ret, NULL, GFP_NOIO);
3559         ret = 0;
3560
3561         if (IS_ERR(rbd_dev->header.object_prefix)) {
3562                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3563                 rbd_dev->header.object_prefix = NULL;
3564         } else {
3565                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3566         }
3567 out:
3568         kfree(reply_buf);
3569
3570         return ret;
3571 }
3572
3573 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3574                 u64 *snap_features)
3575 {
3576         __le64 snapid = cpu_to_le64(snap_id);
3577         struct {
3578                 __le64 features;
3579                 __le64 incompat;
3580         } __attribute__ ((packed)) features_buf = { 0 };
3581         u64 incompat;
3582         int ret;
3583
3584         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3585                                 "rbd", "get_features",
3586                                 &snapid, sizeof (snapid),
3587                                 &features_buf, sizeof (features_buf));
3588         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3589         if (ret < 0)
3590                 return ret;
3591         if (ret < sizeof (features_buf))
3592                 return -ERANGE;
3593
3594         incompat = le64_to_cpu(features_buf.incompat);
3595         if (incompat & ~RBD_FEATURES_SUPPORTED)
3596                 return -ENXIO;
3597
3598         *snap_features = le64_to_cpu(features_buf.features);
3599
3600         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3601                 (unsigned long long)snap_id,
3602                 (unsigned long long)*snap_features,
3603                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3604
3605         return 0;
3606 }
3607
3608 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3609 {
3610         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3611                                                 &rbd_dev->header.features);
3612 }
3613
3614 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3615 {
3616         struct rbd_spec *parent_spec;
3617         size_t size;
3618         void *reply_buf = NULL;
3619         __le64 snapid;
3620         void *p;
3621         void *end;
3622         u64 pool_id;
3623         char *image_id;
3624         u64 overlap;
3625         int ret;
3626
3627         parent_spec = rbd_spec_alloc();
3628         if (!parent_spec)
3629                 return -ENOMEM;
3630
3631         size = sizeof (__le64) +                                /* pool_id */
3632                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3633                 sizeof (__le64) +                               /* snap_id */
3634                 sizeof (__le64);                                /* overlap */
3635         reply_buf = kmalloc(size, GFP_KERNEL);
3636         if (!reply_buf) {
3637                 ret = -ENOMEM;
3638                 goto out_err;
3639         }
3640
3641         snapid = cpu_to_le64(CEPH_NOSNAP);
3642         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3643                                 "rbd", "get_parent",
3644                                 &snapid, sizeof (snapid),
3645                                 reply_buf, size);
3646         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3647         if (ret < 0)
3648                 goto out_err;
3649
3650         p = reply_buf;
3651         end = reply_buf + ret;
3652         ret = -ERANGE;
3653         ceph_decode_64_safe(&p, end, pool_id, out_err);
3654         if (pool_id == CEPH_NOPOOL)
3655                 goto out;       /* No parent?  No problem. */
3656
3657         /* The ceph file layout needs to fit pool id in 32 bits */
3658
3659         ret = -EIO;
3660         if (pool_id > (u64)U32_MAX) {
3661                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3662                         (unsigned long long)pool_id, U32_MAX);
3663                 goto out_err;
3664         }
3665         parent_spec->pool_id = pool_id;
3666
3667         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3668         if (IS_ERR(image_id)) {
3669                 ret = PTR_ERR(image_id);
3670                 goto out_err;
3671         }
3672         parent_spec->image_id = image_id;
3673         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3674         ceph_decode_64_safe(&p, end, overlap, out_err);
3675
3676         if (overlap) {
3677                 rbd_spec_put(rbd_dev->parent_spec);
3678                 rbd_dev->parent_spec = parent_spec;
3679                 parent_spec = NULL;     /* rbd_dev now owns this */
3680                 rbd_dev->parent_overlap = overlap;
3681         } else {
3682                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3683         }
3684 out:
3685         ret = 0;
3686 out_err:
3687         kfree(reply_buf);
3688         rbd_spec_put(parent_spec);
3689
3690         return ret;
3691 }
3692
3693 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3694 {
3695         struct {
3696                 __le64 stripe_unit;
3697                 __le64 stripe_count;
3698         } __attribute__ ((packed)) striping_info_buf = { 0 };
3699         size_t size = sizeof (striping_info_buf);
3700         void *p;
3701         u64 obj_size;
3702         u64 stripe_unit;
3703         u64 stripe_count;
3704         int ret;
3705
3706         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3707                                 "rbd", "get_stripe_unit_count", NULL, 0,
3708                                 (char *)&striping_info_buf, size);
3709         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3710         if (ret < 0)
3711                 return ret;
3712         if (ret < size)
3713                 return -ERANGE;
3714
3715         /*
3716          * We don't actually support the "fancy striping" feature
3717          * (STRIPINGV2) yet, but if the striping sizes are the
3718          * defaults the behavior is the same as before.  So find
3719          * out, and only fail if the image has non-default values.
3720          */
3721         ret = -EINVAL;
3722         obj_size = (u64)1 << rbd_dev->header.obj_order;
3723         p = &striping_info_buf;
3724         stripe_unit = ceph_decode_64(&p);
3725         if (stripe_unit != obj_size) {
3726                 rbd_warn(rbd_dev, "unsupported stripe unit "
3727                                 "(got %llu want %llu)",
3728                                 stripe_unit, obj_size);
3729                 return -EINVAL;
3730         }
3731         stripe_count = ceph_decode_64(&p);
3732         if (stripe_count != 1) {
3733                 rbd_warn(rbd_dev, "unsupported stripe count "
3734                                 "(got %llu want 1)", stripe_count);
3735                 return -EINVAL;
3736         }
3737         rbd_dev->header.stripe_unit = stripe_unit;
3738         rbd_dev->header.stripe_count = stripe_count;
3739
3740         return 0;
3741 }
3742
3743 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3744 {
3745         size_t image_id_size;
3746         char *image_id;
3747         void *p;
3748         void *end;
3749         size_t size;
3750         void *reply_buf = NULL;
3751         size_t len = 0;
3752         char *image_name = NULL;
3753         int ret;
3754
3755         rbd_assert(!rbd_dev->spec->image_name);
3756
3757         len = strlen(rbd_dev->spec->image_id);
3758         image_id_size = sizeof (__le32) + len;
3759         image_id = kmalloc(image_id_size, GFP_KERNEL);
3760         if (!image_id)
3761                 return NULL;
3762
3763         p = image_id;
3764         end = image_id + image_id_size;
3765         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3766
3767         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3768         reply_buf = kmalloc(size, GFP_KERNEL);
3769         if (!reply_buf)
3770                 goto out;
3771
3772         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3773                                 "rbd", "dir_get_name",
3774                                 image_id, image_id_size,
3775                                 reply_buf, size);
3776         if (ret < 0)
3777                 goto out;
3778         p = reply_buf;
3779         end = reply_buf + ret;
3780
3781         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3782         if (IS_ERR(image_name))
3783                 image_name = NULL;
3784         else
3785                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3786 out:
3787         kfree(reply_buf);
3788         kfree(image_id);
3789
3790         return image_name;
3791 }
3792
3793 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3794 {
3795         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3796         const char *snap_name;
3797         u32 which = 0;
3798
3799         /* Skip over names until we find the one we are looking for */
3800
3801         snap_name = rbd_dev->header.snap_names;
3802         while (which < snapc->num_snaps) {
3803                 if (!strcmp(name, snap_name))
3804                         return snapc->snaps[which];
3805                 snap_name += strlen(snap_name) + 1;
3806                 which++;
3807         }
3808         return CEPH_NOSNAP;
3809 }
3810
3811 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3812 {
3813         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3814         u32 which;
3815         bool found = false;
3816         u64 snap_id;
3817
3818         for (which = 0; !found && which < snapc->num_snaps; which++) {
3819                 const char *snap_name;
3820
3821                 snap_id = snapc->snaps[which];
3822                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3823                 if (IS_ERR(snap_name))
3824                         break;
3825                 found = !strcmp(name, snap_name);
3826                 kfree(snap_name);
3827         }
3828         return found ? snap_id : CEPH_NOSNAP;
3829 }
3830
3831 /*
3832  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3833  * no snapshot by that name is found, or if an error occurs.
3834  */
3835 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3836 {
3837         if (rbd_dev->image_format == 1)
3838                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3839
3840         return rbd_v2_snap_id_by_name(rbd_dev, name);
3841 }
3842
3843 /*
3844  * When an rbd image has a parent image, it is identified by the
3845  * pool, image, and snapshot ids (not names).  This function fills
3846  * in the names for those ids.  (It's OK if we can't figure out the
3847  * name for an image id, but the pool and snapshot ids should always
3848  * exist and have names.)  All names in an rbd spec are dynamically
3849  * allocated.
3850  *
3851  * When an image being mapped (not a parent) is probed, we have the
3852  * pool name and pool id, image name and image id, and the snapshot
3853  * name.  The only thing we're missing is the snapshot id.
3854  */
3855 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3856 {
3857         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3858         struct rbd_spec *spec = rbd_dev->spec;
3859         const char *pool_name;
3860         const char *image_name;
3861         const char *snap_name;
3862         int ret;
3863
3864         /*
3865          * An image being mapped will have the pool name (etc.), but
3866          * we need to look up the snapshot id.
3867          */
3868         if (spec->pool_name) {
3869                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3870                         u64 snap_id;
3871
3872                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3873                         if (snap_id == CEPH_NOSNAP)
3874                                 return -ENOENT;
3875                         spec->snap_id = snap_id;
3876                 } else {
3877                         spec->snap_id = CEPH_NOSNAP;
3878                 }
3879
3880                 return 0;
3881         }
3882
3883         /* Get the pool name; we have to make our own copy of this */
3884
3885         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3886         if (!pool_name) {
3887                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3888                 return -EIO;
3889         }
3890         pool_name = kstrdup(pool_name, GFP_KERNEL);
3891         if (!pool_name)
3892                 return -ENOMEM;
3893
3894         /* Fetch the image name; tolerate failure here */
3895
3896         image_name = rbd_dev_image_name(rbd_dev);
3897         if (!image_name)
3898                 rbd_warn(rbd_dev, "unable to get image name");
3899
3900         /* Look up the snapshot name, and make a copy */
3901
3902         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3903         if (!snap_name) {
3904                 ret = -ENOMEM;
3905                 goto out_err;
3906         }
3907
3908         spec->pool_name = pool_name;
3909         spec->image_name = image_name;
3910         spec->snap_name = snap_name;
3911
3912         return 0;
3913 out_err:
3914         kfree(image_name);
3915         kfree(pool_name);
3916
3917         return ret;
3918 }
3919
3920 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3921 {
3922         size_t size;
3923         int ret;
3924         void *reply_buf;
3925         void *p;
3926         void *end;
3927         u64 seq;
3928         u32 snap_count;
3929         struct ceph_snap_context *snapc;
3930         u32 i;
3931
3932         /*
3933          * We'll need room for the seq value (maximum snapshot id),
3934          * snapshot count, and array of that many snapshot ids.
3935          * For now we have a fixed upper limit on the number we're
3936          * prepared to receive.
3937          */
3938         size = sizeof (__le64) + sizeof (__le32) +
3939                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3940         reply_buf = kzalloc(size, GFP_KERNEL);
3941         if (!reply_buf)
3942                 return -ENOMEM;
3943
3944         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3945                                 "rbd", "get_snapcontext", NULL, 0,
3946                                 reply_buf, size);
3947         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3948         if (ret < 0)
3949                 goto out;
3950
3951         p = reply_buf;
3952         end = reply_buf + ret;
3953         ret = -ERANGE;
3954         ceph_decode_64_safe(&p, end, seq, out);
3955         ceph_decode_32_safe(&p, end, snap_count, out);
3956
3957         /*
3958          * Make sure the reported number of snapshot ids wouldn't go
3959          * beyond the end of our buffer.  But before checking that,
3960          * make sure the computed size of the snapshot context we
3961          * allocate is representable in a size_t.
3962          */
3963         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3964                                  / sizeof (u64)) {
3965                 ret = -EINVAL;
3966                 goto out;
3967         }
3968         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3969                 goto out;
3970         ret = 0;
3971
3972         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3973         if (!snapc) {
3974                 ret = -ENOMEM;
3975                 goto out;
3976         }
3977         snapc->seq = seq;
3978         for (i = 0; i < snap_count; i++)
3979                 snapc->snaps[i] = ceph_decode_64(&p);
3980
3981         ceph_put_snap_context(rbd_dev->header.snapc);
3982         rbd_dev->header.snapc = snapc;
3983
3984         dout("  snap context seq = %llu, snap_count = %u\n",
3985                 (unsigned long long)seq, (unsigned int)snap_count);
3986 out:
3987         kfree(reply_buf);
3988
3989         return ret;
3990 }
3991
3992 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3993                                         u64 snap_id)
3994 {
3995         size_t size;
3996         void *reply_buf;
3997         __le64 snapid;
3998         int ret;
3999         void *p;
4000         void *end;
4001         char *snap_name;
4002
4003         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4004         reply_buf = kmalloc(size, GFP_KERNEL);
4005         if (!reply_buf)
4006                 return ERR_PTR(-ENOMEM);
4007
4008         snapid = cpu_to_le64(snap_id);
4009         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4010                                 "rbd", "get_snapshot_name",
4011                                 &snapid, sizeof (snapid),
4012                                 reply_buf, size);
4013         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4014         if (ret < 0) {
4015                 snap_name = ERR_PTR(ret);
4016                 goto out;
4017         }
4018
4019         p = reply_buf;
4020         end = reply_buf + ret;
4021         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4022         if (IS_ERR(snap_name))
4023                 goto out;
4024
4025         dout("  snap_id 0x%016llx snap_name = %s\n",
4026                 (unsigned long long)snap_id, snap_name);
4027 out:
4028         kfree(reply_buf);
4029
4030         return snap_name;
4031 }
4032
4033 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4034 {
4035         bool first_time = rbd_dev->header.object_prefix == NULL;
4036         int ret;
4037
4038         down_write(&rbd_dev->header_rwsem);
4039
4040         if (first_time) {
4041                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4042                 if (ret)
4043                         goto out;
4044         }
4045
4046         /*
4047          * If the image supports layering, get the parent info.  We
4048          * need to probe the first time regardless.  Thereafter we
4049          * only need to if there's a parent, to see if it has
4050          * disappeared due to the mapped image getting flattened.
4051          */
4052         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4053                         (first_time || rbd_dev->parent_spec)) {
4054                 bool warn;
4055
4056                 ret = rbd_dev_v2_parent_info(rbd_dev);
4057                 if (ret)
4058                         goto out;
4059
4060                 /*
4061                  * Print a warning if this is the initial probe and
4062                  * the image has a parent.  Don't print it if the
4063                  * image now being probed is itself a parent.  We
4064                  * can tell at this point because we won't know its
4065                  * pool name yet (just its pool id).
4066                  */
4067                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4068                 if (first_time && warn)
4069                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4070                                         "is EXPERIMENTAL!");
4071         }
4072
4073         ret = rbd_dev_v2_image_size(rbd_dev);
4074         if (ret)
4075                 goto out;
4076
4077         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4078                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4079                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4080
4081         ret = rbd_dev_v2_snap_context(rbd_dev);
4082         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4083 out:
4084         up_write(&rbd_dev->header_rwsem);
4085
4086         return ret;
4087 }
4088
4089 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4090 {
4091         struct device *dev;
4092         int ret;
4093
4094         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4095
4096         dev = &rbd_dev->dev;
4097         dev->bus = &rbd_bus_type;
4098         dev->type = &rbd_device_type;
4099         dev->parent = &rbd_root_dev;
4100         dev->release = rbd_dev_device_release;
4101         dev_set_name(dev, "%d", rbd_dev->dev_id);
4102         ret = device_register(dev);
4103
4104         mutex_unlock(&ctl_mutex);
4105
4106         return ret;
4107 }
4108
4109 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4110 {
4111         device_unregister(&rbd_dev->dev);
4112 }
4113
4114 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4115
4116 /*
4117  * Get a unique rbd identifier for the given new rbd_dev, and add
4118  * the rbd_dev to the global list.  The minimum rbd id is 1.
4119  */
4120 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4121 {
4122         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4123
4124         spin_lock(&rbd_dev_list_lock);
4125         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4126         spin_unlock(&rbd_dev_list_lock);
4127         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4128                 (unsigned long long) rbd_dev->dev_id);
4129 }
4130
4131 /*
4132  * Remove an rbd_dev from the global list, and record that its
4133  * identifier is no longer in use.
4134  */
4135 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4136 {
4137         struct list_head *tmp;
4138         int rbd_id = rbd_dev->dev_id;
4139         int max_id;
4140
4141         rbd_assert(rbd_id > 0);
4142
4143         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4144                 (unsigned long long) rbd_dev->dev_id);
4145         spin_lock(&rbd_dev_list_lock);
4146         list_del_init(&rbd_dev->node);
4147
4148         /*
4149          * If the id being "put" is not the current maximum, there
4150          * is nothing special we need to do.
4151          */
4152         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4153                 spin_unlock(&rbd_dev_list_lock);
4154                 return;
4155         }
4156
4157         /*
4158          * We need to update the current maximum id.  Search the
4159          * list to find out what it is.  We're more likely to find
4160          * the maximum at the end, so search the list backward.
4161          */
4162         max_id = 0;
4163         list_for_each_prev(tmp, &rbd_dev_list) {
4164                 struct rbd_device *rbd_dev;
4165
4166                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4167                 if (rbd_dev->dev_id > max_id)
4168                         max_id = rbd_dev->dev_id;
4169         }
4170         spin_unlock(&rbd_dev_list_lock);
4171
4172         /*
4173          * The max id could have been updated by rbd_dev_id_get(), in
4174          * which case it now accurately reflects the new maximum.
4175          * Be careful not to overwrite the maximum value in that
4176          * case.
4177          */
4178         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4179         dout("  max dev id has been reset\n");
4180 }
4181
4182 /*
4183  * Skips over white space at *buf, and updates *buf to point to the
4184  * first found non-space character (if any). Returns the length of
4185  * the token (string of non-white space characters) found.  Note
4186  * that *buf must be terminated with '\0'.
4187  */
4188 static inline size_t next_token(const char **buf)
4189 {
4190         /*
4191         * These are the characters that produce nonzero for
4192         * isspace() in the "C" and "POSIX" locales.
4193         */
4194         const char *spaces = " \f\n\r\t\v";
4195
4196         *buf += strspn(*buf, spaces);   /* Find start of token */
4197
4198         return strcspn(*buf, spaces);   /* Return token length */
4199 }
4200
4201 /*
4202  * Finds the next token in *buf, and if the provided token buffer is
4203  * big enough, copies the found token into it.  The result, if
4204  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4205  * must be terminated with '\0' on entry.
4206  *
4207  * Returns the length of the token found (not including the '\0').
4208  * Return value will be 0 if no token is found, and it will be >=
4209  * token_size if the token would not fit.
4210  *
4211  * The *buf pointer will be updated to point beyond the end of the
4212  * found token.  Note that this occurs even if the token buffer is
4213  * too small to hold it.
4214  */
4215 static inline size_t copy_token(const char **buf,
4216                                 char *token,
4217                                 size_t token_size)
4218 {
4219         size_t len;
4220
4221         len = next_token(buf);
4222         if (len < token_size) {
4223                 memcpy(token, *buf, len);
4224                 *(token + len) = '\0';
4225         }
4226         *buf += len;
4227
4228         return len;
4229 }
4230
4231 /*
4232  * Finds the next token in *buf, dynamically allocates a buffer big
4233  * enough to hold a copy of it, and copies the token into the new
4234  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4235  * that a duplicate buffer is created even for a zero-length token.
4236  *
4237  * Returns a pointer to the newly-allocated duplicate, or a null
4238  * pointer if memory for the duplicate was not available.  If
4239  * the lenp argument is a non-null pointer, the length of the token
4240  * (not including the '\0') is returned in *lenp.
4241  *
4242  * If successful, the *buf pointer will be updated to point beyond
4243  * the end of the found token.
4244  *
4245  * Note: uses GFP_KERNEL for allocation.
4246  */
4247 static inline char *dup_token(const char **buf, size_t *lenp)
4248 {
4249         char *dup;
4250         size_t len;
4251
4252         len = next_token(buf);
4253         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4254         if (!dup)
4255                 return NULL;
4256         *(dup + len) = '\0';
4257         *buf += len;
4258
4259         if (lenp)
4260                 *lenp = len;
4261
4262         return dup;
4263 }
4264
4265 /*
4266  * Parse the options provided for an "rbd add" (i.e., rbd image
4267  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4268  * and the data written is passed here via a NUL-terminated buffer.
4269  * Returns 0 if successful or an error code otherwise.
4270  *
4271  * The information extracted from these options is recorded in
4272  * the other parameters which return dynamically-allocated
4273  * structures:
4274  *  ceph_opts
4275  *      The address of a pointer that will refer to a ceph options
4276  *      structure.  Caller must release the returned pointer using
4277  *      ceph_destroy_options() when it is no longer needed.
4278  *  rbd_opts
4279  *      Address of an rbd options pointer.  Fully initialized by
4280  *      this function; caller must release with kfree().
4281  *  spec
4282  *      Address of an rbd image specification pointer.  Fully
4283  *      initialized by this function based on parsed options.
4284  *      Caller must release with rbd_spec_put().
4285  *
4286  * The options passed take this form:
4287  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4288  * where:
4289  *  <mon_addrs>
4290  *      A comma-separated list of one or more monitor addresses.
4291  *      A monitor address is an ip address, optionally followed
4292  *      by a port number (separated by a colon).
4293  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4294  *  <options>
4295  *      A comma-separated list of ceph and/or rbd options.
4296  *  <pool_name>
4297  *      The name of the rados pool containing the rbd image.
4298  *  <image_name>
4299  *      The name of the image in that pool to map.
4300  *  <snap_id>
4301  *      An optional snapshot id.  If provided, the mapping will
4302  *      present data from the image at the time that snapshot was
4303  *      created.  The image head is used if no snapshot id is
4304  *      provided.  Snapshot mappings are always read-only.
4305  */
4306 static int rbd_add_parse_args(const char *buf,
4307                                 struct ceph_options **ceph_opts,
4308                                 struct rbd_options **opts,
4309                                 struct rbd_spec **rbd_spec)
4310 {
4311         size_t len;
4312         char *options;
4313         const char *mon_addrs;
4314         char *snap_name;
4315         size_t mon_addrs_size;
4316         struct rbd_spec *spec = NULL;
4317         struct rbd_options *rbd_opts = NULL;
4318         struct ceph_options *copts;
4319         int ret;
4320
4321         /* The first four tokens are required */
4322
4323         len = next_token(&buf);
4324         if (!len) {
4325                 rbd_warn(NULL, "no monitor address(es) provided");
4326                 return -EINVAL;
4327         }
4328         mon_addrs = buf;
4329         mon_addrs_size = len + 1;
4330         buf += len;
4331
4332         ret = -EINVAL;
4333         options = dup_token(&buf, NULL);
4334         if (!options)
4335                 return -ENOMEM;
4336         if (!*options) {
4337                 rbd_warn(NULL, "no options provided");
4338                 goto out_err;
4339         }
4340
4341         spec = rbd_spec_alloc();
4342         if (!spec)
4343                 goto out_mem;
4344
4345         spec->pool_name = dup_token(&buf, NULL);
4346         if (!spec->pool_name)
4347                 goto out_mem;
4348         if (!*spec->pool_name) {
4349                 rbd_warn(NULL, "no pool name provided");
4350                 goto out_err;
4351         }
4352
4353         spec->image_name = dup_token(&buf, NULL);
4354         if (!spec->image_name)
4355                 goto out_mem;
4356         if (!*spec->image_name) {
4357                 rbd_warn(NULL, "no image name provided");
4358                 goto out_err;
4359         }
4360
4361         /*
4362          * Snapshot name is optional; default is to use "-"
4363          * (indicating the head/no snapshot).
4364          */
4365         len = next_token(&buf);
4366         if (!len) {
4367                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4368                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4369         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4370                 ret = -ENAMETOOLONG;
4371                 goto out_err;
4372         }
4373         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4374         if (!snap_name)
4375                 goto out_mem;
4376         *(snap_name + len) = '\0';
4377         spec->snap_name = snap_name;
4378
4379         /* Initialize all rbd options to the defaults */
4380
4381         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4382         if (!rbd_opts)
4383                 goto out_mem;
4384
4385         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4386
4387         copts = ceph_parse_options(options, mon_addrs,
4388                                         mon_addrs + mon_addrs_size - 1,
4389                                         parse_rbd_opts_token, rbd_opts);
4390         if (IS_ERR(copts)) {
4391                 ret = PTR_ERR(copts);
4392                 goto out_err;
4393         }
4394         kfree(options);
4395
4396         *ceph_opts = copts;
4397         *opts = rbd_opts;
4398         *rbd_spec = spec;
4399
4400         return 0;
4401 out_mem:
4402         ret = -ENOMEM;
4403 out_err:
4404         kfree(rbd_opts);
4405         rbd_spec_put(spec);
4406         kfree(options);
4407
4408         return ret;
4409 }
4410
4411 /*
4412  * An rbd format 2 image has a unique identifier, distinct from the
4413  * name given to it by the user.  Internally, that identifier is
4414  * what's used to specify the names of objects related to the image.
4415  *
4416  * A special "rbd id" object is used to map an rbd image name to its
4417  * id.  If that object doesn't exist, then there is no v2 rbd image
4418  * with the supplied name.
4419  *
4420  * This function will record the given rbd_dev's image_id field if
4421  * it can be determined, and in that case will return 0.  If any
4422  * errors occur a negative errno will be returned and the rbd_dev's
4423  * image_id field will be unchanged (and should be NULL).
4424  */
4425 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4426 {
4427         int ret;
4428         size_t size;
4429         char *object_name;
4430         void *response;
4431         char *image_id;
4432
4433         /*
4434          * When probing a parent image, the image id is already
4435          * known (and the image name likely is not).  There's no
4436          * need to fetch the image id again in this case.  We
4437          * do still need to set the image format though.
4438          */
4439         if (rbd_dev->spec->image_id) {
4440                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4441
4442                 return 0;
4443         }
4444
4445         /*
4446          * First, see if the format 2 image id file exists, and if
4447          * so, get the image's persistent id from it.
4448          */
4449         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4450         object_name = kmalloc(size, GFP_NOIO);
4451         if (!object_name)
4452                 return -ENOMEM;
4453         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4454         dout("rbd id object name is %s\n", object_name);
4455
4456         /* Response will be an encoded string, which includes a length */
4457
4458         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4459         response = kzalloc(size, GFP_NOIO);
4460         if (!response) {
4461                 ret = -ENOMEM;
4462                 goto out;
4463         }
4464
4465         /* If it doesn't exist we'll assume it's a format 1 image */
4466
4467         ret = rbd_obj_method_sync(rbd_dev, object_name,
4468                                 "rbd", "get_id", NULL, 0,
4469                                 response, RBD_IMAGE_ID_LEN_MAX);
4470         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4471         if (ret == -ENOENT) {
4472                 image_id = kstrdup("", GFP_KERNEL);
4473                 ret = image_id ? 0 : -ENOMEM;
4474                 if (!ret)
4475                         rbd_dev->image_format = 1;
4476         } else if (ret > sizeof (__le32)) {
4477                 void *p = response;
4478
4479                 image_id = ceph_extract_encoded_string(&p, p + ret,
4480                                                 NULL, GFP_NOIO);
4481                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4482                 if (!ret)
4483                         rbd_dev->image_format = 2;
4484         } else {
4485                 ret = -EINVAL;
4486         }
4487
4488         if (!ret) {
4489                 rbd_dev->spec->image_id = image_id;
4490                 dout("image_id is %s\n", image_id);
4491         }
4492 out:
4493         kfree(response);
4494         kfree(object_name);
4495
4496         return ret;
4497 }
4498
4499 /* Undo whatever state changes are made by v1 or v2 image probe */
4500
4501 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4502 {
4503         struct rbd_image_header *header;
4504
4505         rbd_dev_unparent(rbd_dev);
4506
4507         /* Free dynamic fields from the header, then zero it out */
4508
4509         header = &rbd_dev->header;
4510         ceph_put_snap_context(header->snapc);
4511         kfree(header->snap_sizes);
4512         kfree(header->snap_names);
4513         kfree(header->object_prefix);
4514         memset(header, 0, sizeof (*header));
4515 }
4516
4517 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4518 {
4519         int ret;
4520
4521         ret = rbd_dev_v2_object_prefix(rbd_dev);
4522         if (ret)
4523                 goto out_err;
4524
4525         /*
4526          * Get the and check features for the image.  Currently the
4527          * features are assumed to never change.
4528          */
4529         ret = rbd_dev_v2_features(rbd_dev);
4530         if (ret)
4531                 goto out_err;
4532
4533         /* If the image supports fancy striping, get its parameters */
4534
4535         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4536                 ret = rbd_dev_v2_striping_info(rbd_dev);
4537                 if (ret < 0)
4538                         goto out_err;
4539         }
4540         /* No support for crypto and compression type format 2 images */
4541
4542         return 0;
4543 out_err:
4544         rbd_dev->header.features = 0;
4545         kfree(rbd_dev->header.object_prefix);
4546         rbd_dev->header.object_prefix = NULL;
4547
4548         return ret;
4549 }
4550
4551 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4552 {
4553         struct rbd_device *parent = NULL;
4554         struct rbd_spec *parent_spec;
4555         struct rbd_client *rbdc;
4556         int ret;
4557
4558         if (!rbd_dev->parent_spec)
4559                 return 0;
4560         /*
4561          * We need to pass a reference to the client and the parent
4562          * spec when creating the parent rbd_dev.  Images related by
4563          * parent/child relationships always share both.
4564          */
4565         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4566         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4567
4568         ret = -ENOMEM;
4569         parent = rbd_dev_create(rbdc, parent_spec);
4570         if (!parent)
4571                 goto out_err;
4572
4573         ret = rbd_dev_image_probe(parent, false);
4574         if (ret < 0)
4575                 goto out_err;
4576         rbd_dev->parent = parent;
4577
4578         return 0;
4579 out_err:
4580         if (parent) {
4581                 rbd_dev_unparent(rbd_dev);
4582                 kfree(rbd_dev->header_name);
4583                 rbd_dev_destroy(parent);
4584         } else {
4585                 rbd_put_client(rbdc);
4586                 rbd_spec_put(parent_spec);
4587         }
4588
4589         return ret;
4590 }
4591
4592 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4593 {
4594         int ret;
4595
4596         /* generate unique id: find highest unique id, add one */
4597         rbd_dev_id_get(rbd_dev);
4598
4599         /* Fill in the device name, now that we have its id. */
4600         BUILD_BUG_ON(DEV_NAME_LEN
4601                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4602         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4603
4604         /* Get our block major device number. */
4605
4606         ret = register_blkdev(0, rbd_dev->name);
4607         if (ret < 0)
4608                 goto err_out_id;
4609         rbd_dev->major = ret;
4610
4611         /* Set up the blkdev mapping. */
4612
4613         ret = rbd_init_disk(rbd_dev);
4614         if (ret)
4615                 goto err_out_blkdev;
4616
4617         ret = rbd_dev_mapping_set(rbd_dev);
4618         if (ret)
4619                 goto err_out_disk;
4620         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4621
4622         ret = rbd_bus_add_dev(rbd_dev);
4623         if (ret)
4624                 goto err_out_mapping;
4625
4626         /* Everything's ready.  Announce the disk to the world. */
4627
4628         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4629         add_disk(rbd_dev->disk);
4630
4631         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4632                 (unsigned long long) rbd_dev->mapping.size);
4633
4634         return ret;
4635
4636 err_out_mapping:
4637         rbd_dev_mapping_clear(rbd_dev);
4638 err_out_disk:
4639         rbd_free_disk(rbd_dev);
4640 err_out_blkdev:
4641         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4642 err_out_id:
4643         rbd_dev_id_put(rbd_dev);
4644         rbd_dev_mapping_clear(rbd_dev);
4645
4646         return ret;
4647 }
4648
4649 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4650 {
4651         struct rbd_spec *spec = rbd_dev->spec;
4652         size_t size;
4653
4654         /* Record the header object name for this rbd image. */
4655
4656         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4657
4658         if (rbd_dev->image_format == 1)
4659                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4660         else
4661                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4662
4663         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4664         if (!rbd_dev->header_name)
4665                 return -ENOMEM;
4666
4667         if (rbd_dev->image_format == 1)
4668                 sprintf(rbd_dev->header_name, "%s%s",
4669                         spec->image_name, RBD_SUFFIX);
4670         else
4671                 sprintf(rbd_dev->header_name, "%s%s",
4672                         RBD_HEADER_PREFIX, spec->image_id);
4673         return 0;
4674 }
4675
4676 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4677 {
4678         rbd_dev_unprobe(rbd_dev);
4679         kfree(rbd_dev->header_name);
4680         rbd_dev->header_name = NULL;
4681         rbd_dev->image_format = 0;
4682         kfree(rbd_dev->spec->image_id);
4683         rbd_dev->spec->image_id = NULL;
4684
4685         rbd_dev_destroy(rbd_dev);
4686 }
4687
4688 /*
4689  * Probe for the existence of the header object for the given rbd
4690  * device.  If this image is the one being mapped (i.e., not a
4691  * parent), initiate a watch on its header object before using that
4692  * object to get detailed information about the rbd image.
4693  */
4694 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4695 {
4696         int ret;
4697         int tmp;
4698
4699         /*
4700          * Get the id from the image id object.  If it's not a
4701          * format 2 image, we'll get ENOENT back, and we'll assume
4702          * it's a format 1 image.
4703          */
4704         ret = rbd_dev_image_id(rbd_dev);
4705         if (ret)
4706                 return ret;
4707         rbd_assert(rbd_dev->spec->image_id);
4708         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4709
4710         ret = rbd_dev_header_name(rbd_dev);
4711         if (ret)
4712                 goto err_out_format;
4713
4714         if (mapping) {
4715                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4716                 if (ret)
4717                         goto out_header_name;
4718         }
4719
4720         if (rbd_dev->image_format == 1)
4721                 ret = rbd_dev_v1_header_info(rbd_dev);
4722         else
4723                 ret = rbd_dev_v2_header_info(rbd_dev);
4724         if (ret)
4725                 goto err_out_watch;
4726
4727         ret = rbd_dev_spec_update(rbd_dev);
4728         if (ret)
4729                 goto err_out_probe;
4730
4731         ret = rbd_dev_probe_parent(rbd_dev);
4732         if (ret)
4733                 goto err_out_probe;
4734
4735         dout("discovered format %u image, header name is %s\n",
4736                 rbd_dev->image_format, rbd_dev->header_name);
4737
4738         return 0;
4739 err_out_probe:
4740         rbd_dev_unprobe(rbd_dev);
4741 err_out_watch:
4742         if (mapping) {
4743                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4744                 if (tmp)
4745                         rbd_warn(rbd_dev, "unable to tear down "
4746                                         "watch request (%d)\n", tmp);
4747         }
4748 out_header_name:
4749         kfree(rbd_dev->header_name);
4750         rbd_dev->header_name = NULL;
4751 err_out_format:
4752         rbd_dev->image_format = 0;
4753         kfree(rbd_dev->spec->image_id);
4754         rbd_dev->spec->image_id = NULL;
4755
4756         dout("probe failed, returning %d\n", ret);
4757
4758         return ret;
4759 }
4760
4761 static ssize_t rbd_add(struct bus_type *bus,
4762                        const char *buf,
4763                        size_t count)
4764 {
4765         struct rbd_device *rbd_dev = NULL;
4766         struct ceph_options *ceph_opts = NULL;
4767         struct rbd_options *rbd_opts = NULL;
4768         struct rbd_spec *spec = NULL;
4769         struct rbd_client *rbdc;
4770         struct ceph_osd_client *osdc;
4771         bool read_only;
4772         int rc = -ENOMEM;
4773
4774         if (!try_module_get(THIS_MODULE))
4775                 return -ENODEV;
4776
4777         /* parse add command */
4778         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4779         if (rc < 0)
4780                 goto err_out_module;
4781         read_only = rbd_opts->read_only;
4782         kfree(rbd_opts);
4783         rbd_opts = NULL;        /* done with this */
4784
4785         rbdc = rbd_get_client(ceph_opts);
4786         if (IS_ERR(rbdc)) {
4787                 rc = PTR_ERR(rbdc);
4788                 goto err_out_args;
4789         }
4790         ceph_opts = NULL;       /* rbd_dev client now owns this */
4791
4792         /* pick the pool */
4793         osdc = &rbdc->client->osdc;
4794         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4795         if (rc < 0)
4796                 goto err_out_client;
4797         spec->pool_id = (u64)rc;
4798
4799         /* The ceph file layout needs to fit pool id in 32 bits */
4800
4801         if (spec->pool_id > (u64)U32_MAX) {
4802                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4803                                 (unsigned long long)spec->pool_id, U32_MAX);
4804                 rc = -EIO;
4805                 goto err_out_client;
4806         }
4807
4808         rbd_dev = rbd_dev_create(rbdc, spec);
4809         if (!rbd_dev)
4810                 goto err_out_client;
4811         rbdc = NULL;            /* rbd_dev now owns this */
4812         spec = NULL;            /* rbd_dev now owns this */
4813
4814         rc = rbd_dev_image_probe(rbd_dev, true);
4815         if (rc < 0)
4816                 goto err_out_rbd_dev;
4817
4818         /* If we are mapping a snapshot it must be marked read-only */
4819
4820         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4821                 read_only = true;
4822         rbd_dev->mapping.read_only = read_only;
4823
4824         rc = rbd_dev_device_setup(rbd_dev);
4825         if (!rc)
4826                 return count;
4827
4828         rbd_dev_image_release(rbd_dev);
4829 err_out_rbd_dev:
4830         rbd_dev_destroy(rbd_dev);
4831 err_out_client:
4832         rbd_put_client(rbdc);
4833 err_out_args:
4834         if (ceph_opts)
4835                 ceph_destroy_options(ceph_opts);
4836         kfree(rbd_opts);
4837         rbd_spec_put(spec);
4838 err_out_module:
4839         module_put(THIS_MODULE);
4840
4841         dout("Error adding device %s\n", buf);
4842
4843         return (ssize_t)rc;
4844 }
4845
4846 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4847 {
4848         struct list_head *tmp;
4849         struct rbd_device *rbd_dev;
4850
4851         spin_lock(&rbd_dev_list_lock);
4852         list_for_each(tmp, &rbd_dev_list) {
4853                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4854                 if (rbd_dev->dev_id == dev_id) {
4855                         spin_unlock(&rbd_dev_list_lock);
4856                         return rbd_dev;
4857                 }
4858         }
4859         spin_unlock(&rbd_dev_list_lock);
4860         return NULL;
4861 }
4862
4863 static void rbd_dev_device_release(struct device *dev)
4864 {
4865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4866
4867         rbd_free_disk(rbd_dev);
4868         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4869         rbd_dev_mapping_clear(rbd_dev);
4870         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4871         rbd_dev->major = 0;
4872         rbd_dev_id_put(rbd_dev);
4873         rbd_dev_mapping_clear(rbd_dev);
4874 }
4875
4876 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4877 {
4878         while (rbd_dev->parent) {
4879                 struct rbd_device *first = rbd_dev;
4880                 struct rbd_device *second = first->parent;
4881                 struct rbd_device *third;
4882
4883                 /*
4884                  * Follow to the parent with no grandparent and
4885                  * remove it.
4886                  */
4887                 while (second && (third = second->parent)) {
4888                         first = second;
4889                         second = third;
4890                 }
4891                 rbd_assert(second);
4892                 rbd_dev_image_release(second);
4893                 first->parent = NULL;
4894                 first->parent_overlap = 0;
4895
4896                 rbd_assert(first->parent_spec);
4897                 rbd_spec_put(first->parent_spec);
4898                 first->parent_spec = NULL;
4899         }
4900 }
4901
4902 static ssize_t rbd_remove(struct bus_type *bus,
4903                           const char *buf,
4904                           size_t count)
4905 {
4906         struct rbd_device *rbd_dev = NULL;
4907         int target_id;
4908         unsigned long ul;
4909         int ret;
4910
4911         ret = strict_strtoul(buf, 10, &ul);
4912         if (ret)
4913                 return ret;
4914
4915         /* convert to int; abort if we lost anything in the conversion */
4916         target_id = (int) ul;
4917         if (target_id != ul)
4918                 return -EINVAL;
4919
4920         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4921
4922         rbd_dev = __rbd_get_dev(target_id);
4923         if (!rbd_dev) {
4924                 ret = -ENOENT;
4925                 goto done;
4926         }
4927
4928         spin_lock_irq(&rbd_dev->lock);
4929         if (rbd_dev->open_count)
4930                 ret = -EBUSY;
4931         else
4932                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4933         spin_unlock_irq(&rbd_dev->lock);
4934         if (ret < 0)
4935                 goto done;
4936         rbd_bus_del_dev(rbd_dev);
4937         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4938         if (ret)
4939                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4940         rbd_dev_image_release(rbd_dev);
4941         module_put(THIS_MODULE);
4942         ret = count;
4943 done:
4944         mutex_unlock(&ctl_mutex);
4945
4946         return ret;
4947 }
4948
4949 /*
4950  * create control files in sysfs
4951  * /sys/bus/rbd/...
4952  */
4953 static int rbd_sysfs_init(void)
4954 {
4955         int ret;
4956
4957         ret = device_register(&rbd_root_dev);
4958         if (ret < 0)
4959                 return ret;
4960
4961         ret = bus_register(&rbd_bus_type);
4962         if (ret < 0)
4963                 device_unregister(&rbd_root_dev);
4964
4965         return ret;
4966 }
4967
4968 static void rbd_sysfs_cleanup(void)
4969 {
4970         bus_unregister(&rbd_bus_type);
4971         device_unregister(&rbd_root_dev);
4972 }
4973
4974 static int rbd_slab_init(void)
4975 {
4976         rbd_assert(!rbd_img_request_cache);
4977         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4978                                         sizeof (struct rbd_img_request),
4979                                         __alignof__(struct rbd_img_request),
4980                                         0, NULL);
4981         if (!rbd_img_request_cache)
4982                 return -ENOMEM;
4983
4984         rbd_assert(!rbd_obj_request_cache);
4985         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4986                                         sizeof (struct rbd_obj_request),
4987                                         __alignof__(struct rbd_obj_request),
4988                                         0, NULL);
4989         if (!rbd_obj_request_cache)
4990                 goto out_err;
4991
4992         rbd_assert(!rbd_segment_name_cache);
4993         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4994                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4995         if (rbd_segment_name_cache)
4996                 return 0;
4997 out_err:
4998         if (rbd_obj_request_cache) {
4999                 kmem_cache_destroy(rbd_obj_request_cache);
5000                 rbd_obj_request_cache = NULL;
5001         }
5002
5003         kmem_cache_destroy(rbd_img_request_cache);
5004         rbd_img_request_cache = NULL;
5005
5006         return -ENOMEM;
5007 }
5008
5009 static void rbd_slab_exit(void)
5010 {
5011         rbd_assert(rbd_segment_name_cache);
5012         kmem_cache_destroy(rbd_segment_name_cache);
5013         rbd_segment_name_cache = NULL;
5014
5015         rbd_assert(rbd_obj_request_cache);
5016         kmem_cache_destroy(rbd_obj_request_cache);
5017         rbd_obj_request_cache = NULL;
5018
5019         rbd_assert(rbd_img_request_cache);
5020         kmem_cache_destroy(rbd_img_request_cache);
5021         rbd_img_request_cache = NULL;
5022 }
5023
5024 static int __init rbd_init(void)
5025 {
5026         int rc;
5027
5028         if (!libceph_compatible(NULL)) {
5029                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5030
5031                 return -EINVAL;
5032         }
5033         rc = rbd_slab_init();
5034         if (rc)
5035                 return rc;
5036         rc = rbd_sysfs_init();
5037         if (rc)
5038                 rbd_slab_exit();
5039         else
5040                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5041
5042         return rc;
5043 }
5044
5045 static void __exit rbd_exit(void)
5046 {
5047         rbd_sysfs_cleanup();
5048         rbd_slab_exit();
5049 }
5050
5051 module_init(rbd_init);
5052 module_exit(rbd_exit);
5053
5054 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5055 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5056 MODULE_DESCRIPTION("rados block device");
5057
5058 /* following authorship retained from original osdblk.c */
5059 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5060
5061 MODULE_LICENSE("GPL");