]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 /*
59  * Increment the given counter and return its updated value.
60  * If the counter is already 0 it will not be incremented.
61  * If the counter is already at its maximum value returns
62  * -EINVAL without updating it.
63  */
64 static int atomic_inc_return_safe(atomic_t *v)
65 {
66         unsigned int counter;
67
68         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69         if (counter <= (unsigned int)INT_MAX)
70                 return (int)counter;
71
72         atomic_dec(v);
73
74         return -EINVAL;
75 }
76
77 /* Decrement the counter.  Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
79 {
80         int counter;
81
82         counter = atomic_dec_return(v);
83         if (counter >= 0)
84                 return counter;
85
86         atomic_inc(v);
87
88         return -EINVAL;
89 }
90
91 #define RBD_DRV_NAME "rbd"
92 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
93
94 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
95
96 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
97 #define RBD_MAX_SNAP_NAME_LEN   \
98                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
100 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
101
102 #define RBD_SNAP_HEAD_NAME      "-"
103
104 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
105
106 /* This allows a single page to hold an image name sent by OSD */
107 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
108 #define RBD_IMAGE_ID_LEN_MAX    64
109
110 #define RBD_OBJ_PREFIX_LEN_MAX  64
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING    (1<<0)
115 #define RBD_FEATURE_STRIPINGV2  (1<<1)
116 #define RBD_FEATURES_ALL \
117             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118
119 /* Features supported by this (client software) implementation. */
120
121 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
122
123 /*
124  * An RBD device name will be "rbd#", where the "rbd" comes from
125  * RBD_DRV_NAME above, and # is a unique integer identifier.
126  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127  * enough to hold all possible device names.
128  */
129 #define DEV_NAME_LEN            32
130 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
131
132 /*
133  * block device image metadata (in-memory version)
134  */
135 struct rbd_image_header {
136         /* These six fields never change for a given rbd image */
137         char *object_prefix;
138         __u8 obj_order;
139         __u8 crypt_type;
140         __u8 comp_type;
141         u64 stripe_unit;
142         u64 stripe_count;
143         u64 features;           /* Might be changeable someday? */
144
145         /* The remaining fields need to be updated occasionally */
146         u64 image_size;
147         struct ceph_snap_context *snapc;
148         char *snap_names;       /* format 1 only */
149         u64 *snap_sizes;        /* format 1 only */
150 };
151
152 /*
153  * An rbd image specification.
154  *
155  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
156  * identify an image.  Each rbd_dev structure includes a pointer to
157  * an rbd_spec structure that encapsulates this identity.
158  *
159  * Each of the id's in an rbd_spec has an associated name.  For a
160  * user-mapped image, the names are supplied and the id's associated
161  * with them are looked up.  For a layered image, a parent image is
162  * defined by the tuple, and the names are looked up.
163  *
164  * An rbd_dev structure contains a parent_spec pointer which is
165  * non-null if the image it represents is a child in a layered
166  * image.  This pointer will refer to the rbd_spec structure used
167  * by the parent rbd_dev for its own identity (i.e., the structure
168  * is shared between the parent and child).
169  *
170  * Since these structures are populated once, during the discovery
171  * phase of image construction, they are effectively immutable so
172  * we make no effort to synchronize access to them.
173  *
174  * Note that code herein does not assume the image name is known (it
175  * could be a null pointer).
176  */
177 struct rbd_spec {
178         u64             pool_id;
179         const char      *pool_name;
180
181         const char      *image_id;
182         const char      *image_name;
183
184         u64             snap_id;
185         const char      *snap_name;
186
187         struct kref     kref;
188 };
189
190 /*
191  * an instance of the client.  multiple devices may share an rbd client.
192  */
193 struct rbd_client {
194         struct ceph_client      *client;
195         struct kref             kref;
196         struct list_head        node;
197 };
198
199 struct rbd_img_request;
200 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
203
204 struct rbd_obj_request;
205 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
207 enum obj_request_type {
208         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209 };
210
211 enum obj_req_flags {
212         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
213         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
214         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
215         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
216 };
217
218 struct rbd_obj_request {
219         const char              *object_name;
220         u64                     offset;         /* object start byte */
221         u64                     length;         /* bytes from offset */
222         unsigned long           flags;
223
224         /*
225          * An object request associated with an image will have its
226          * img_data flag set; a standalone object request will not.
227          *
228          * A standalone object request will have which == BAD_WHICH
229          * and a null obj_request pointer.
230          *
231          * An object request initiated in support of a layered image
232          * object (to check for its existence before a write) will
233          * have which == BAD_WHICH and a non-null obj_request pointer.
234          *
235          * Finally, an object request for rbd image data will have
236          * which != BAD_WHICH, and will have a non-null img_request
237          * pointer.  The value of which will be in the range
238          * 0..(img_request->obj_request_count-1).
239          */
240         union {
241                 struct rbd_obj_request  *obj_request;   /* STAT op */
242                 struct {
243                         struct rbd_img_request  *img_request;
244                         u64                     img_offset;
245                         /* links for img_request->obj_requests list */
246                         struct list_head        links;
247                 };
248         };
249         u32                     which;          /* posn image request list */
250
251         enum obj_request_type   type;
252         union {
253                 struct bio      *bio_list;
254                 struct {
255                         struct page     **pages;
256                         u32             page_count;
257                 };
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261
262         struct ceph_osd_request *osd_req;
263
264         u64                     xferred;        /* bytes transferred */
265         int                     result;
266
267         rbd_obj_callback_t      callback;
268         struct completion       completion;
269
270         struct kref             kref;
271 };
272
273 enum img_req_flags {
274         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
275         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
276         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
277 };
278
279 struct rbd_img_request {
280         struct rbd_device       *rbd_dev;
281         u64                     offset; /* starting image byte offset */
282         u64                     length; /* byte count from offset */
283         unsigned long           flags;
284         union {
285                 u64                     snap_id;        /* for reads */
286                 struct ceph_snap_context *snapc;        /* for writes */
287         };
288         union {
289                 struct request          *rq;            /* block request */
290                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
291         };
292         struct page             **copyup_pages;
293         u32                     copyup_page_count;
294         spinlock_t              completion_lock;/* protects next_completion */
295         u32                     next_completion;
296         rbd_img_callback_t      callback;
297         u64                     xferred;/* aggregate bytes transferred */
298         int                     result; /* first nonzero obj_request result */
299
300         u32                     obj_request_count;
301         struct list_head        obj_requests;   /* rbd_obj_request structs */
302
303         struct kref             kref;
304 };
305
306 #define for_each_obj_request(ireq, oreq) \
307         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
308 #define for_each_obj_request_from(ireq, oreq) \
309         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
310 #define for_each_obj_request_safe(ireq, oreq, n) \
311         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
312
313 struct rbd_mapping {
314         u64                     size;
315         u64                     features;
316         bool                    read_only;
317 };
318
319 /*
320  * a single device
321  */
322 struct rbd_device {
323         int                     dev_id;         /* blkdev unique id */
324
325         int                     major;          /* blkdev assigned major */
326         struct gendisk          *disk;          /* blkdev's gendisk and rq */
327
328         u32                     image_format;   /* Either 1 or 2 */
329         struct rbd_client       *rbd_client;
330
331         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
333         spinlock_t              lock;           /* queue, flags, open_count */
334
335         struct rbd_image_header header;
336         unsigned long           flags;          /* possibly lock protected */
337         struct rbd_spec         *spec;
338
339         char                    *header_name;
340
341         struct ceph_file_layout layout;
342
343         struct ceph_osd_event   *watch_event;
344         struct rbd_obj_request  *watch_request;
345
346         struct rbd_spec         *parent_spec;
347         u64                     parent_overlap;
348         atomic_t                parent_ref;
349         struct rbd_device       *parent;
350
351         /* protects updating the header */
352         struct rw_semaphore     header_rwsem;
353
354         struct rbd_mapping      mapping;
355
356         struct list_head        node;
357
358         /* sysfs related */
359         struct device           dev;
360         unsigned long           open_count;     /* protected by lock */
361 };
362
363 /*
364  * Flag bits for rbd_dev->flags.  If atomicity is required,
365  * rbd_dev->lock is used to protect access.
366  *
367  * Currently, only the "removing" flag (which is coupled with the
368  * "open_count" field) requires atomic access.
369  */
370 enum rbd_dev_flags {
371         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
372         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
373 };
374
375 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
376
377 static LIST_HEAD(rbd_dev_list);    /* devices */
378 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
380 static LIST_HEAD(rbd_client_list);              /* clients */
381 static DEFINE_SPINLOCK(rbd_client_list_lock);
382
383 /* Slab caches for frequently-allocated structures */
384
385 static struct kmem_cache        *rbd_img_request_cache;
386 static struct kmem_cache        *rbd_obj_request_cache;
387 static struct kmem_cache        *rbd_segment_name_cache;
388
389 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
391 static void rbd_dev_device_release(struct device *dev);
392
393 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394                        size_t count);
395 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396                           size_t count);
397 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398 static void rbd_spec_put(struct rbd_spec *spec);
399
400 static struct bus_attribute rbd_bus_attrs[] = {
401         __ATTR(add, S_IWUSR, NULL, rbd_add),
402         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403         __ATTR_NULL
404 };
405
406 static struct bus_type rbd_bus_type = {
407         .name           = "rbd",
408         .bus_attrs      = rbd_bus_attrs,
409 };
410
411 static void rbd_root_dev_release(struct device *dev)
412 {
413 }
414
415 static struct device rbd_root_dev = {
416         .init_name =    "rbd",
417         .release =      rbd_root_dev_release,
418 };
419
420 static __printf(2, 3)
421 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422 {
423         struct va_format vaf;
424         va_list args;
425
426         va_start(args, fmt);
427         vaf.fmt = fmt;
428         vaf.va = &args;
429
430         if (!rbd_dev)
431                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432         else if (rbd_dev->disk)
433                 printk(KERN_WARNING "%s: %s: %pV\n",
434                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435         else if (rbd_dev->spec && rbd_dev->spec->image_name)
436                 printk(KERN_WARNING "%s: image %s: %pV\n",
437                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438         else if (rbd_dev->spec && rbd_dev->spec->image_id)
439                 printk(KERN_WARNING "%s: id %s: %pV\n",
440                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441         else    /* punt */
442                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443                         RBD_DRV_NAME, rbd_dev, &vaf);
444         va_end(args);
445 }
446
447 #ifdef RBD_DEBUG
448 #define rbd_assert(expr)                                                \
449                 if (unlikely(!(expr))) {                                \
450                         printk(KERN_ERR "\nAssertion failure in %s() "  \
451                                                 "at line %d:\n\n"       \
452                                         "\trbd_assert(%s);\n\n",        \
453                                         __func__, __LINE__, #expr);     \
454                         BUG();                                          \
455                 }
456 #else /* !RBD_DEBUG */
457 #  define rbd_assert(expr)      ((void) 0)
458 #endif /* !RBD_DEBUG */
459
460 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
461 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
463
464 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
465 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
467 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468                                         u64 snap_id);
469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470                                 u8 *order, u64 *snap_size);
471 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472                 u64 *snap_features);
473 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
474
475 static int rbd_open(struct block_device *bdev, fmode_t mode)
476 {
477         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
478         bool removing = false;
479
480         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
481                 return -EROFS;
482
483         spin_lock_irq(&rbd_dev->lock);
484         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485                 removing = true;
486         else
487                 rbd_dev->open_count++;
488         spin_unlock_irq(&rbd_dev->lock);
489         if (removing)
490                 return -ENOENT;
491
492         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493         (void) get_device(&rbd_dev->dev);
494         set_device_ro(bdev, rbd_dev->mapping.read_only);
495         mutex_unlock(&ctl_mutex);
496
497         return 0;
498 }
499
500 static void rbd_release(struct gendisk *disk, fmode_t mode)
501 {
502         struct rbd_device *rbd_dev = disk->private_data;
503         unsigned long open_count_before;
504
505         spin_lock_irq(&rbd_dev->lock);
506         open_count_before = rbd_dev->open_count--;
507         spin_unlock_irq(&rbd_dev->lock);
508         rbd_assert(open_count_before > 0);
509
510         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511         put_device(&rbd_dev->dev);
512         mutex_unlock(&ctl_mutex);
513 }
514
515 static const struct block_device_operations rbd_bd_ops = {
516         .owner                  = THIS_MODULE,
517         .open                   = rbd_open,
518         .release                = rbd_release,
519 };
520
521 /*
522  * Initialize an rbd client instance.  Success or not, this function
523  * consumes ceph_opts.
524  */
525 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
526 {
527         struct rbd_client *rbdc;
528         int ret = -ENOMEM;
529
530         dout("%s:\n", __func__);
531         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532         if (!rbdc)
533                 goto out_opt;
534
535         kref_init(&rbdc->kref);
536         INIT_LIST_HEAD(&rbdc->node);
537
538         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
539
540         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
541         if (IS_ERR(rbdc->client))
542                 goto out_mutex;
543         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
544
545         ret = ceph_open_session(rbdc->client);
546         if (ret < 0)
547                 goto out_err;
548
549         spin_lock(&rbd_client_list_lock);
550         list_add_tail(&rbdc->node, &rbd_client_list);
551         spin_unlock(&rbd_client_list_lock);
552
553         mutex_unlock(&ctl_mutex);
554         dout("%s: rbdc %p\n", __func__, rbdc);
555
556         return rbdc;
557
558 out_err:
559         ceph_destroy_client(rbdc->client);
560 out_mutex:
561         mutex_unlock(&ctl_mutex);
562         kfree(rbdc);
563 out_opt:
564         if (ceph_opts)
565                 ceph_destroy_options(ceph_opts);
566         dout("%s: error %d\n", __func__, ret);
567
568         return ERR_PTR(ret);
569 }
570
571 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
572 {
573         kref_get(&rbdc->kref);
574
575         return rbdc;
576 }
577
578 /*
579  * Find a ceph client with specific addr and configuration.  If
580  * found, bump its reference count.
581  */
582 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
583 {
584         struct rbd_client *client_node;
585         bool found = false;
586
587         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
588                 return NULL;
589
590         spin_lock(&rbd_client_list_lock);
591         list_for_each_entry(client_node, &rbd_client_list, node) {
592                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
593                         __rbd_get_client(client_node);
594
595                         found = true;
596                         break;
597                 }
598         }
599         spin_unlock(&rbd_client_list_lock);
600
601         return found ? client_node : NULL;
602 }
603
604 /*
605  * mount options
606  */
607 enum {
608         Opt_last_int,
609         /* int args above */
610         Opt_last_string,
611         /* string args above */
612         Opt_read_only,
613         Opt_read_write,
614         /* Boolean args above */
615         Opt_last_bool,
616 };
617
618 static match_table_t rbd_opts_tokens = {
619         /* int args above */
620         /* string args above */
621         {Opt_read_only, "read_only"},
622         {Opt_read_only, "ro"},          /* Alternate spelling */
623         {Opt_read_write, "read_write"},
624         {Opt_read_write, "rw"},         /* Alternate spelling */
625         /* Boolean args above */
626         {-1, NULL}
627 };
628
629 struct rbd_options {
630         bool    read_only;
631 };
632
633 #define RBD_READ_ONLY_DEFAULT   false
634
635 static int parse_rbd_opts_token(char *c, void *private)
636 {
637         struct rbd_options *rbd_opts = private;
638         substring_t argstr[MAX_OPT_ARGS];
639         int token, intval, ret;
640
641         token = match_token(c, rbd_opts_tokens, argstr);
642         if (token < 0)
643                 return -EINVAL;
644
645         if (token < Opt_last_int) {
646                 ret = match_int(&argstr[0], &intval);
647                 if (ret < 0) {
648                         pr_err("bad mount option arg (not int) "
649                                "at '%s'\n", c);
650                         return ret;
651                 }
652                 dout("got int token %d val %d\n", token, intval);
653         } else if (token > Opt_last_int && token < Opt_last_string) {
654                 dout("got string token %d val %s\n", token,
655                      argstr[0].from);
656         } else if (token > Opt_last_string && token < Opt_last_bool) {
657                 dout("got Boolean token %d\n", token);
658         } else {
659                 dout("got token %d\n", token);
660         }
661
662         switch (token) {
663         case Opt_read_only:
664                 rbd_opts->read_only = true;
665                 break;
666         case Opt_read_write:
667                 rbd_opts->read_only = false;
668                 break;
669         default:
670                 rbd_assert(false);
671                 break;
672         }
673         return 0;
674 }
675
676 /*
677  * Get a ceph client with specific addr and configuration, if one does
678  * not exist create it.  Either way, ceph_opts is consumed by this
679  * function.
680  */
681 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
682 {
683         struct rbd_client *rbdc;
684
685         rbdc = rbd_client_find(ceph_opts);
686         if (rbdc)       /* using an existing client */
687                 ceph_destroy_options(ceph_opts);
688         else
689                 rbdc = rbd_client_create(ceph_opts);
690
691         return rbdc;
692 }
693
694 /*
695  * Destroy ceph client
696  *
697  * Caller must hold rbd_client_list_lock.
698  */
699 static void rbd_client_release(struct kref *kref)
700 {
701         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
702
703         dout("%s: rbdc %p\n", __func__, rbdc);
704         spin_lock(&rbd_client_list_lock);
705         list_del(&rbdc->node);
706         spin_unlock(&rbd_client_list_lock);
707
708         ceph_destroy_client(rbdc->client);
709         kfree(rbdc);
710 }
711
712 /*
713  * Drop reference to ceph client node. If it's not referenced anymore, release
714  * it.
715  */
716 static void rbd_put_client(struct rbd_client *rbdc)
717 {
718         if (rbdc)
719                 kref_put(&rbdc->kref, rbd_client_release);
720 }
721
722 static bool rbd_image_format_valid(u32 image_format)
723 {
724         return image_format == 1 || image_format == 2;
725 }
726
727 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
728 {
729         size_t size;
730         u32 snap_count;
731
732         /* The header has to start with the magic rbd header text */
733         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
734                 return false;
735
736         /* The bio layer requires at least sector-sized I/O */
737
738         if (ondisk->options.order < SECTOR_SHIFT)
739                 return false;
740
741         /* If we use u64 in a few spots we may be able to loosen this */
742
743         if (ondisk->options.order > 8 * sizeof (int) - 1)
744                 return false;
745
746         /*
747          * The size of a snapshot header has to fit in a size_t, and
748          * that limits the number of snapshots.
749          */
750         snap_count = le32_to_cpu(ondisk->snap_count);
751         size = SIZE_MAX - sizeof (struct ceph_snap_context);
752         if (snap_count > size / sizeof (__le64))
753                 return false;
754
755         /*
756          * Not only that, but the size of the entire the snapshot
757          * header must also be representable in a size_t.
758          */
759         size -= snap_count * sizeof (__le64);
760         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
761                 return false;
762
763         return true;
764 }
765
766 /*
767  * Fill an rbd image header with information from the given format 1
768  * on-disk header.
769  */
770 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
771                                  struct rbd_image_header_ondisk *ondisk)
772 {
773         struct rbd_image_header *header = &rbd_dev->header;
774         bool first_time = header->object_prefix == NULL;
775         struct ceph_snap_context *snapc;
776         char *object_prefix = NULL;
777         char *snap_names = NULL;
778         u64 *snap_sizes = NULL;
779         u32 snap_count;
780         size_t size;
781         int ret = -ENOMEM;
782         u32 i;
783
784         /* Allocate this now to avoid having to handle failure below */
785
786         if (first_time) {
787                 size_t len;
788
789                 len = strnlen(ondisk->object_prefix,
790                                 sizeof (ondisk->object_prefix));
791                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
792                 if (!object_prefix)
793                         return -ENOMEM;
794                 memcpy(object_prefix, ondisk->object_prefix, len);
795                 object_prefix[len] = '\0';
796         }
797
798         /* Allocate the snapshot context and fill it in */
799
800         snap_count = le32_to_cpu(ondisk->snap_count);
801         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
802         if (!snapc)
803                 goto out_err;
804         snapc->seq = le64_to_cpu(ondisk->snap_seq);
805         if (snap_count) {
806                 struct rbd_image_snap_ondisk *snaps;
807                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
808
809                 /* We'll keep a copy of the snapshot names... */
810
811                 if (snap_names_len > (u64)SIZE_MAX)
812                         goto out_2big;
813                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
814                 if (!snap_names)
815                         goto out_err;
816
817                 /* ...as well as the array of their sizes. */
818
819                 size = snap_count * sizeof (*header->snap_sizes);
820                 snap_sizes = kmalloc(size, GFP_KERNEL);
821                 if (!snap_sizes)
822                         goto out_err;
823
824                 /*
825                  * Copy the names, and fill in each snapshot's id
826                  * and size.
827                  *
828                  * Note that rbd_dev_v1_header_info() guarantees the
829                  * ondisk buffer we're working with has
830                  * snap_names_len bytes beyond the end of the
831                  * snapshot id array, this memcpy() is safe.
832                  */
833                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
834                 snaps = ondisk->snaps;
835                 for (i = 0; i < snap_count; i++) {
836                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
837                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
838                 }
839         }
840
841         /* We won't fail any more, fill in the header */
842
843         down_write(&rbd_dev->header_rwsem);
844         if (first_time) {
845                 header->object_prefix = object_prefix;
846                 header->obj_order = ondisk->options.order;
847                 header->crypt_type = ondisk->options.crypt_type;
848                 header->comp_type = ondisk->options.comp_type;
849                 /* The rest aren't used for format 1 images */
850                 header->stripe_unit = 0;
851                 header->stripe_count = 0;
852                 header->features = 0;
853         } else {
854                 ceph_put_snap_context(header->snapc);
855                 kfree(header->snap_names);
856                 kfree(header->snap_sizes);
857         }
858
859         /* The remaining fields always get updated (when we refresh) */
860
861         header->image_size = le64_to_cpu(ondisk->image_size);
862         header->snapc = snapc;
863         header->snap_names = snap_names;
864         header->snap_sizes = snap_sizes;
865
866         /* Make sure mapping size is consistent with header info */
867
868         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
869                 if (rbd_dev->mapping.size != header->image_size)
870                         rbd_dev->mapping.size = header->image_size;
871
872         up_write(&rbd_dev->header_rwsem);
873
874         return 0;
875 out_2big:
876         ret = -EIO;
877 out_err:
878         kfree(snap_sizes);
879         kfree(snap_names);
880         ceph_put_snap_context(snapc);
881         kfree(object_prefix);
882
883         return ret;
884 }
885
886 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
887 {
888         const char *snap_name;
889
890         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
891
892         /* Skip over names until we find the one we are looking for */
893
894         snap_name = rbd_dev->header.snap_names;
895         while (which--)
896                 snap_name += strlen(snap_name) + 1;
897
898         return kstrdup(snap_name, GFP_KERNEL);
899 }
900
901 /*
902  * Snapshot id comparison function for use with qsort()/bsearch().
903  * Note that result is for snapshots in *descending* order.
904  */
905 static int snapid_compare_reverse(const void *s1, const void *s2)
906 {
907         u64 snap_id1 = *(u64 *)s1;
908         u64 snap_id2 = *(u64 *)s2;
909
910         if (snap_id1 < snap_id2)
911                 return 1;
912         return snap_id1 == snap_id2 ? 0 : -1;
913 }
914
915 /*
916  * Search a snapshot context to see if the given snapshot id is
917  * present.
918  *
919  * Returns the position of the snapshot id in the array if it's found,
920  * or BAD_SNAP_INDEX otherwise.
921  *
922  * Note: The snapshot array is in kept sorted (by the osd) in
923  * reverse order, highest snapshot id first.
924  */
925 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
926 {
927         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
928         u64 *found;
929
930         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
931                                 sizeof (snap_id), snapid_compare_reverse);
932
933         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
934 }
935
936 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
937                                         u64 snap_id)
938 {
939         u32 which;
940
941         which = rbd_dev_snap_index(rbd_dev, snap_id);
942         if (which == BAD_SNAP_INDEX)
943                 return NULL;
944
945         return _rbd_dev_v1_snap_name(rbd_dev, which);
946 }
947
948 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
949 {
950         if (snap_id == CEPH_NOSNAP)
951                 return RBD_SNAP_HEAD_NAME;
952
953         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
954         if (rbd_dev->image_format == 1)
955                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
956
957         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
958 }
959
960 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
961                                 u64 *snap_size)
962 {
963         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
964         if (snap_id == CEPH_NOSNAP) {
965                 *snap_size = rbd_dev->header.image_size;
966         } else if (rbd_dev->image_format == 1) {
967                 u32 which;
968
969                 which = rbd_dev_snap_index(rbd_dev, snap_id);
970                 if (which == BAD_SNAP_INDEX)
971                         return -ENOENT;
972
973                 *snap_size = rbd_dev->header.snap_sizes[which];
974         } else {
975                 u64 size = 0;
976                 int ret;
977
978                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
979                 if (ret)
980                         return ret;
981
982                 *snap_size = size;
983         }
984         return 0;
985 }
986
987 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
988                         u64 *snap_features)
989 {
990         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
991         if (snap_id == CEPH_NOSNAP) {
992                 *snap_features = rbd_dev->header.features;
993         } else if (rbd_dev->image_format == 1) {
994                 *snap_features = 0;     /* No features for format 1 */
995         } else {
996                 u64 features = 0;
997                 int ret;
998
999                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1000                 if (ret)
1001                         return ret;
1002
1003                 *snap_features = features;
1004         }
1005         return 0;
1006 }
1007
1008 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1009 {
1010         u64 snap_id = rbd_dev->spec->snap_id;
1011         u64 size = 0;
1012         u64 features = 0;
1013         int ret;
1014
1015         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1016         if (ret)
1017                 return ret;
1018         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1019         if (ret)
1020                 return ret;
1021
1022         rbd_dev->mapping.size = size;
1023         rbd_dev->mapping.features = features;
1024
1025         return 0;
1026 }
1027
1028 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1029 {
1030         rbd_dev->mapping.size = 0;
1031         rbd_dev->mapping.features = 0;
1032 }
1033
1034 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1035 {
1036         char *name;
1037         u64 segment;
1038         int ret;
1039
1040         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1041         if (!name)
1042                 return NULL;
1043         segment = offset >> rbd_dev->header.obj_order;
1044         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1045                         rbd_dev->header.object_prefix, segment);
1046         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1047                 pr_err("error formatting segment name for #%llu (%d)\n",
1048                         segment, ret);
1049                 kfree(name);
1050                 name = NULL;
1051         }
1052
1053         return name;
1054 }
1055
1056 static void rbd_segment_name_free(const char *name)
1057 {
1058         /* The explicit cast here is needed to drop the const qualifier */
1059
1060         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1061 }
1062
1063 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1064 {
1065         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1066
1067         return offset & (segment_size - 1);
1068 }
1069
1070 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1071                                 u64 offset, u64 length)
1072 {
1073         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1074
1075         offset &= segment_size - 1;
1076
1077         rbd_assert(length <= U64_MAX - offset);
1078         if (offset + length > segment_size)
1079                 length = segment_size - offset;
1080
1081         return length;
1082 }
1083
1084 /*
1085  * returns the size of an object in the image
1086  */
1087 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1088 {
1089         return 1 << header->obj_order;
1090 }
1091
1092 /*
1093  * bio helpers
1094  */
1095
1096 static void bio_chain_put(struct bio *chain)
1097 {
1098         struct bio *tmp;
1099
1100         while (chain) {
1101                 tmp = chain;
1102                 chain = chain->bi_next;
1103                 bio_put(tmp);
1104         }
1105 }
1106
1107 /*
1108  * zeros a bio chain, starting at specific offset
1109  */
1110 static void zero_bio_chain(struct bio *chain, int start_ofs)
1111 {
1112         struct bio_vec *bv;
1113         unsigned long flags;
1114         void *buf;
1115         int i;
1116         int pos = 0;
1117
1118         while (chain) {
1119                 bio_for_each_segment(bv, chain, i) {
1120                         if (pos + bv->bv_len > start_ofs) {
1121                                 int remainder = max(start_ofs - pos, 0);
1122                                 buf = bvec_kmap_irq(bv, &flags);
1123                                 memset(buf + remainder, 0,
1124                                        bv->bv_len - remainder);
1125                                 bvec_kunmap_irq(buf, &flags);
1126                         }
1127                         pos += bv->bv_len;
1128                 }
1129
1130                 chain = chain->bi_next;
1131         }
1132 }
1133
1134 /*
1135  * similar to zero_bio_chain(), zeros data defined by a page array,
1136  * starting at the given byte offset from the start of the array and
1137  * continuing up to the given end offset.  The pages array is
1138  * assumed to be big enough to hold all bytes up to the end.
1139  */
1140 static void zero_pages(struct page **pages, u64 offset, u64 end)
1141 {
1142         struct page **page = &pages[offset >> PAGE_SHIFT];
1143
1144         rbd_assert(end > offset);
1145         rbd_assert(end - offset <= (u64)SIZE_MAX);
1146         while (offset < end) {
1147                 size_t page_offset;
1148                 size_t length;
1149                 unsigned long flags;
1150                 void *kaddr;
1151
1152                 page_offset = (size_t)(offset & ~PAGE_MASK);
1153                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1154                 local_irq_save(flags);
1155                 kaddr = kmap_atomic(*page);
1156                 memset(kaddr + page_offset, 0, length);
1157                 kunmap_atomic(kaddr);
1158                 local_irq_restore(flags);
1159
1160                 offset += length;
1161                 page++;
1162         }
1163 }
1164
1165 /*
1166  * Clone a portion of a bio, starting at the given byte offset
1167  * and continuing for the number of bytes indicated.
1168  */
1169 static struct bio *bio_clone_range(struct bio *bio_src,
1170                                         unsigned int offset,
1171                                         unsigned int len,
1172                                         gfp_t gfpmask)
1173 {
1174         struct bio_vec *bv;
1175         unsigned int resid;
1176         unsigned short idx;
1177         unsigned int voff;
1178         unsigned short end_idx;
1179         unsigned short vcnt;
1180         struct bio *bio;
1181
1182         /* Handle the easy case for the caller */
1183
1184         if (!offset && len == bio_src->bi_size)
1185                 return bio_clone(bio_src, gfpmask);
1186
1187         if (WARN_ON_ONCE(!len))
1188                 return NULL;
1189         if (WARN_ON_ONCE(len > bio_src->bi_size))
1190                 return NULL;
1191         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1192                 return NULL;
1193
1194         /* Find first affected segment... */
1195
1196         resid = offset;
1197         bio_for_each_segment(bv, bio_src, idx) {
1198                 if (resid < bv->bv_len)
1199                         break;
1200                 resid -= bv->bv_len;
1201         }
1202         voff = resid;
1203
1204         /* ...and the last affected segment */
1205
1206         resid += len;
1207         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1208                 if (resid <= bv->bv_len)
1209                         break;
1210                 resid -= bv->bv_len;
1211         }
1212         vcnt = end_idx - idx + 1;
1213
1214         /* Build the clone */
1215
1216         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1217         if (!bio)
1218                 return NULL;    /* ENOMEM */
1219
1220         bio->bi_bdev = bio_src->bi_bdev;
1221         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1222         bio->bi_rw = bio_src->bi_rw;
1223         bio->bi_flags |= 1 << BIO_CLONED;
1224
1225         /*
1226          * Copy over our part of the bio_vec, then update the first
1227          * and last (or only) entries.
1228          */
1229         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1230                         vcnt * sizeof (struct bio_vec));
1231         bio->bi_io_vec[0].bv_offset += voff;
1232         if (vcnt > 1) {
1233                 bio->bi_io_vec[0].bv_len -= voff;
1234                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1235         } else {
1236                 bio->bi_io_vec[0].bv_len = len;
1237         }
1238
1239         bio->bi_vcnt = vcnt;
1240         bio->bi_size = len;
1241         bio->bi_idx = 0;
1242
1243         return bio;
1244 }
1245
1246 /*
1247  * Clone a portion of a bio chain, starting at the given byte offset
1248  * into the first bio in the source chain and continuing for the
1249  * number of bytes indicated.  The result is another bio chain of
1250  * exactly the given length, or a null pointer on error.
1251  *
1252  * The bio_src and offset parameters are both in-out.  On entry they
1253  * refer to the first source bio and the offset into that bio where
1254  * the start of data to be cloned is located.
1255  *
1256  * On return, bio_src is updated to refer to the bio in the source
1257  * chain that contains first un-cloned byte, and *offset will
1258  * contain the offset of that byte within that bio.
1259  */
1260 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1261                                         unsigned int *offset,
1262                                         unsigned int len,
1263                                         gfp_t gfpmask)
1264 {
1265         struct bio *bi = *bio_src;
1266         unsigned int off = *offset;
1267         struct bio *chain = NULL;
1268         struct bio **end;
1269
1270         /* Build up a chain of clone bios up to the limit */
1271
1272         if (!bi || off >= bi->bi_size || !len)
1273                 return NULL;            /* Nothing to clone */
1274
1275         end = &chain;
1276         while (len) {
1277                 unsigned int bi_size;
1278                 struct bio *bio;
1279
1280                 if (!bi) {
1281                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1282                         goto out_err;   /* EINVAL; ran out of bio's */
1283                 }
1284                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1285                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1286                 if (!bio)
1287                         goto out_err;   /* ENOMEM */
1288
1289                 *end = bio;
1290                 end = &bio->bi_next;
1291
1292                 off += bi_size;
1293                 if (off == bi->bi_size) {
1294                         bi = bi->bi_next;
1295                         off = 0;
1296                 }
1297                 len -= bi_size;
1298         }
1299         *bio_src = bi;
1300         *offset = off;
1301
1302         return chain;
1303 out_err:
1304         bio_chain_put(chain);
1305
1306         return NULL;
1307 }
1308
1309 /*
1310  * The default/initial value for all object request flags is 0.  For
1311  * each flag, once its value is set to 1 it is never reset to 0
1312  * again.
1313  */
1314 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1315 {
1316         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1317                 struct rbd_device *rbd_dev;
1318
1319                 rbd_dev = obj_request->img_request->rbd_dev;
1320                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1321                         obj_request);
1322         }
1323 }
1324
1325 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1326 {
1327         smp_mb();
1328         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1329 }
1330
1331 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1332 {
1333         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1334                 struct rbd_device *rbd_dev = NULL;
1335
1336                 if (obj_request_img_data_test(obj_request))
1337                         rbd_dev = obj_request->img_request->rbd_dev;
1338                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1339                         obj_request);
1340         }
1341 }
1342
1343 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1344 {
1345         smp_mb();
1346         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1347 }
1348
1349 /*
1350  * This sets the KNOWN flag after (possibly) setting the EXISTS
1351  * flag.  The latter is set based on the "exists" value provided.
1352  *
1353  * Note that for our purposes once an object exists it never goes
1354  * away again.  It's possible that the response from two existence
1355  * checks are separated by the creation of the target object, and
1356  * the first ("doesn't exist") response arrives *after* the second
1357  * ("does exist").  In that case we ignore the second one.
1358  */
1359 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1360                                 bool exists)
1361 {
1362         if (exists)
1363                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1364         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1365         smp_mb();
1366 }
1367
1368 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1369 {
1370         smp_mb();
1371         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1372 }
1373
1374 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1375 {
1376         smp_mb();
1377         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1378 }
1379
1380 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1381 {
1382         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1383                 atomic_read(&obj_request->kref.refcount));
1384         kref_get(&obj_request->kref);
1385 }
1386
1387 static void rbd_obj_request_destroy(struct kref *kref);
1388 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1389 {
1390         rbd_assert(obj_request != NULL);
1391         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1392                 atomic_read(&obj_request->kref.refcount));
1393         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1394 }
1395
1396 static bool img_request_child_test(struct rbd_img_request *img_request);
1397 static void rbd_parent_request_destroy(struct kref *kref);
1398 static void rbd_img_request_destroy(struct kref *kref);
1399 static void rbd_img_request_put(struct rbd_img_request *img_request)
1400 {
1401         rbd_assert(img_request != NULL);
1402         dout("%s: img %p (was %d)\n", __func__, img_request,
1403                 atomic_read(&img_request->kref.refcount));
1404         if (img_request_child_test(img_request))
1405                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1406         else
1407                 kref_put(&img_request->kref, rbd_img_request_destroy);
1408 }
1409
1410 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1411                                         struct rbd_obj_request *obj_request)
1412 {
1413         rbd_assert(obj_request->img_request == NULL);
1414
1415         /* Image request now owns object's original reference */
1416         obj_request->img_request = img_request;
1417         obj_request->which = img_request->obj_request_count;
1418         rbd_assert(!obj_request_img_data_test(obj_request));
1419         obj_request_img_data_set(obj_request);
1420         rbd_assert(obj_request->which != BAD_WHICH);
1421         img_request->obj_request_count++;
1422         list_add_tail(&obj_request->links, &img_request->obj_requests);
1423         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1424                 obj_request->which);
1425 }
1426
1427 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1428                                         struct rbd_obj_request *obj_request)
1429 {
1430         rbd_assert(obj_request->which != BAD_WHICH);
1431
1432         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1433                 obj_request->which);
1434         list_del(&obj_request->links);
1435         rbd_assert(img_request->obj_request_count > 0);
1436         img_request->obj_request_count--;
1437         rbd_assert(obj_request->which == img_request->obj_request_count);
1438         obj_request->which = BAD_WHICH;
1439         rbd_assert(obj_request_img_data_test(obj_request));
1440         rbd_assert(obj_request->img_request == img_request);
1441         obj_request->img_request = NULL;
1442         obj_request->callback = NULL;
1443         rbd_obj_request_put(obj_request);
1444 }
1445
1446 static bool obj_request_type_valid(enum obj_request_type type)
1447 {
1448         switch (type) {
1449         case OBJ_REQUEST_NODATA:
1450         case OBJ_REQUEST_BIO:
1451         case OBJ_REQUEST_PAGES:
1452                 return true;
1453         default:
1454                 return false;
1455         }
1456 }
1457
1458 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1459                                 struct rbd_obj_request *obj_request)
1460 {
1461         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1462
1463         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1464 }
1465
1466 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1467 {
1468
1469         dout("%s: img %p\n", __func__, img_request);
1470
1471         /*
1472          * If no error occurred, compute the aggregate transfer
1473          * count for the image request.  We could instead use
1474          * atomic64_cmpxchg() to update it as each object request
1475          * completes; not clear which way is better off hand.
1476          */
1477         if (!img_request->result) {
1478                 struct rbd_obj_request *obj_request;
1479                 u64 xferred = 0;
1480
1481                 for_each_obj_request(img_request, obj_request)
1482                         xferred += obj_request->xferred;
1483                 img_request->xferred = xferred;
1484         }
1485
1486         if (img_request->callback)
1487                 img_request->callback(img_request);
1488         else
1489                 rbd_img_request_put(img_request);
1490 }
1491
1492 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1493
1494 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1495 {
1496         dout("%s: obj %p\n", __func__, obj_request);
1497
1498         return wait_for_completion_interruptible(&obj_request->completion);
1499 }
1500
1501 /*
1502  * The default/initial value for all image request flags is 0.  Each
1503  * is conditionally set to 1 at image request initialization time
1504  * and currently never change thereafter.
1505  */
1506 static void img_request_write_set(struct rbd_img_request *img_request)
1507 {
1508         set_bit(IMG_REQ_WRITE, &img_request->flags);
1509         smp_mb();
1510 }
1511
1512 static bool img_request_write_test(struct rbd_img_request *img_request)
1513 {
1514         smp_mb();
1515         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1516 }
1517
1518 static void img_request_child_set(struct rbd_img_request *img_request)
1519 {
1520         set_bit(IMG_REQ_CHILD, &img_request->flags);
1521         smp_mb();
1522 }
1523
1524 static void img_request_child_clear(struct rbd_img_request *img_request)
1525 {
1526         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1527         smp_mb();
1528 }
1529
1530 static bool img_request_child_test(struct rbd_img_request *img_request)
1531 {
1532         smp_mb();
1533         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1534 }
1535
1536 static void img_request_layered_set(struct rbd_img_request *img_request)
1537 {
1538         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1539         smp_mb();
1540 }
1541
1542 static void img_request_layered_clear(struct rbd_img_request *img_request)
1543 {
1544         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1545         smp_mb();
1546 }
1547
1548 static bool img_request_layered_test(struct rbd_img_request *img_request)
1549 {
1550         smp_mb();
1551         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1552 }
1553
1554 static void
1555 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1556 {
1557         u64 xferred = obj_request->xferred;
1558         u64 length = obj_request->length;
1559
1560         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1561                 obj_request, obj_request->img_request, obj_request->result,
1562                 xferred, length);
1563         /*
1564          * ENOENT means a hole in the image.  We zero-fill the
1565          * entire length of the request.  A short read also implies
1566          * zero-fill to the end of the request.  Either way we
1567          * update the xferred count to indicate the whole request
1568          * was satisfied.
1569          */
1570         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1571         if (obj_request->result == -ENOENT) {
1572                 if (obj_request->type == OBJ_REQUEST_BIO)
1573                         zero_bio_chain(obj_request->bio_list, 0);
1574                 else
1575                         zero_pages(obj_request->pages, 0, length);
1576                 obj_request->result = 0;
1577                 obj_request->xferred = length;
1578         } else if (xferred < length && !obj_request->result) {
1579                 if (obj_request->type == OBJ_REQUEST_BIO)
1580                         zero_bio_chain(obj_request->bio_list, xferred);
1581                 else
1582                         zero_pages(obj_request->pages, xferred, length);
1583                 obj_request->xferred = length;
1584         }
1585         obj_request_done_set(obj_request);
1586 }
1587
1588 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1589 {
1590         dout("%s: obj %p cb %p\n", __func__, obj_request,
1591                 obj_request->callback);
1592         if (obj_request->callback)
1593                 obj_request->callback(obj_request);
1594         else
1595                 complete_all(&obj_request->completion);
1596 }
1597
1598 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1599 {
1600         dout("%s: obj %p\n", __func__, obj_request);
1601         obj_request_done_set(obj_request);
1602 }
1603
1604 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1605 {
1606         struct rbd_img_request *img_request = NULL;
1607         struct rbd_device *rbd_dev = NULL;
1608         bool layered = false;
1609
1610         if (obj_request_img_data_test(obj_request)) {
1611                 img_request = obj_request->img_request;
1612                 layered = img_request && img_request_layered_test(img_request);
1613                 rbd_dev = img_request->rbd_dev;
1614         }
1615
1616         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1617                 obj_request, img_request, obj_request->result,
1618                 obj_request->xferred, obj_request->length);
1619         if (layered && obj_request->result == -ENOENT &&
1620                         obj_request->img_offset < rbd_dev->parent_overlap)
1621                 rbd_img_parent_read(obj_request);
1622         else if (img_request)
1623                 rbd_img_obj_request_read_callback(obj_request);
1624         else
1625                 obj_request_done_set(obj_request);
1626 }
1627
1628 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1629 {
1630         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1631                 obj_request->result, obj_request->length);
1632         /*
1633          * There is no such thing as a successful short write.  Set
1634          * it to our originally-requested length.
1635          */
1636         obj_request->xferred = obj_request->length;
1637         obj_request_done_set(obj_request);
1638 }
1639
1640 /*
1641  * For a simple stat call there's nothing to do.  We'll do more if
1642  * this is part of a write sequence for a layered image.
1643  */
1644 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1645 {
1646         dout("%s: obj %p\n", __func__, obj_request);
1647         obj_request_done_set(obj_request);
1648 }
1649
1650 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1651                                 struct ceph_msg *msg)
1652 {
1653         struct rbd_obj_request *obj_request = osd_req->r_priv;
1654         u16 opcode;
1655
1656         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1657         rbd_assert(osd_req == obj_request->osd_req);
1658         if (obj_request_img_data_test(obj_request)) {
1659                 rbd_assert(obj_request->img_request);
1660                 rbd_assert(obj_request->which != BAD_WHICH);
1661         } else {
1662                 rbd_assert(obj_request->which == BAD_WHICH);
1663         }
1664
1665         if (osd_req->r_result < 0)
1666                 obj_request->result = osd_req->r_result;
1667
1668         BUG_ON(osd_req->r_num_ops > 2);
1669
1670         /*
1671          * We support a 64-bit length, but ultimately it has to be
1672          * passed to blk_end_request(), which takes an unsigned int.
1673          */
1674         obj_request->xferred = osd_req->r_reply_op_len[0];
1675         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1676         opcode = osd_req->r_ops[0].op;
1677         switch (opcode) {
1678         case CEPH_OSD_OP_READ:
1679                 rbd_osd_read_callback(obj_request);
1680                 break;
1681         case CEPH_OSD_OP_WRITE:
1682                 rbd_osd_write_callback(obj_request);
1683                 break;
1684         case CEPH_OSD_OP_STAT:
1685                 rbd_osd_stat_callback(obj_request);
1686                 break;
1687         case CEPH_OSD_OP_CALL:
1688         case CEPH_OSD_OP_NOTIFY_ACK:
1689         case CEPH_OSD_OP_WATCH:
1690                 rbd_osd_trivial_callback(obj_request);
1691                 break;
1692         default:
1693                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1694                         obj_request->object_name, (unsigned short) opcode);
1695                 break;
1696         }
1697
1698         if (obj_request_done_test(obj_request))
1699                 rbd_obj_request_complete(obj_request);
1700 }
1701
1702 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1703 {
1704         struct rbd_img_request *img_request = obj_request->img_request;
1705         struct ceph_osd_request *osd_req = obj_request->osd_req;
1706         u64 snap_id;
1707
1708         rbd_assert(osd_req != NULL);
1709
1710         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1711         ceph_osdc_build_request(osd_req, obj_request->offset,
1712                         NULL, snap_id, NULL);
1713 }
1714
1715 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1716 {
1717         struct rbd_img_request *img_request = obj_request->img_request;
1718         struct ceph_osd_request *osd_req = obj_request->osd_req;
1719         struct ceph_snap_context *snapc;
1720         struct timespec mtime = CURRENT_TIME;
1721
1722         rbd_assert(osd_req != NULL);
1723
1724         snapc = img_request ? img_request->snapc : NULL;
1725         ceph_osdc_build_request(osd_req, obj_request->offset,
1726                         snapc, CEPH_NOSNAP, &mtime);
1727 }
1728
1729 static struct ceph_osd_request *rbd_osd_req_create(
1730                                         struct rbd_device *rbd_dev,
1731                                         bool write_request,
1732                                         struct rbd_obj_request *obj_request)
1733 {
1734         struct ceph_snap_context *snapc = NULL;
1735         struct ceph_osd_client *osdc;
1736         struct ceph_osd_request *osd_req;
1737
1738         if (obj_request_img_data_test(obj_request)) {
1739                 struct rbd_img_request *img_request = obj_request->img_request;
1740
1741                 rbd_assert(write_request ==
1742                                 img_request_write_test(img_request));
1743                 if (write_request)
1744                         snapc = img_request->snapc;
1745         }
1746
1747         /* Allocate and initialize the request, for the single op */
1748
1749         osdc = &rbd_dev->rbd_client->client->osdc;
1750         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1751         if (!osd_req)
1752                 return NULL;    /* ENOMEM */
1753
1754         if (write_request)
1755                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1756         else
1757                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1758
1759         osd_req->r_callback = rbd_osd_req_callback;
1760         osd_req->r_priv = obj_request;
1761
1762         osd_req->r_oid_len = strlen(obj_request->object_name);
1763         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1764         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1765
1766         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1767
1768         return osd_req;
1769 }
1770
1771 /*
1772  * Create a copyup osd request based on the information in the
1773  * object request supplied.  A copyup request has two osd ops,
1774  * a copyup method call, and a "normal" write request.
1775  */
1776 static struct ceph_osd_request *
1777 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1778 {
1779         struct rbd_img_request *img_request;
1780         struct ceph_snap_context *snapc;
1781         struct rbd_device *rbd_dev;
1782         struct ceph_osd_client *osdc;
1783         struct ceph_osd_request *osd_req;
1784
1785         rbd_assert(obj_request_img_data_test(obj_request));
1786         img_request = obj_request->img_request;
1787         rbd_assert(img_request);
1788         rbd_assert(img_request_write_test(img_request));
1789
1790         /* Allocate and initialize the request, for the two ops */
1791
1792         snapc = img_request->snapc;
1793         rbd_dev = img_request->rbd_dev;
1794         osdc = &rbd_dev->rbd_client->client->osdc;
1795         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1796         if (!osd_req)
1797                 return NULL;    /* ENOMEM */
1798
1799         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1800         osd_req->r_callback = rbd_osd_req_callback;
1801         osd_req->r_priv = obj_request;
1802
1803         osd_req->r_oid_len = strlen(obj_request->object_name);
1804         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1805         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1806
1807         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1808
1809         return osd_req;
1810 }
1811
1812
1813 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1814 {
1815         ceph_osdc_put_request(osd_req);
1816 }
1817
1818 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1819
1820 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1821                                                 u64 offset, u64 length,
1822                                                 enum obj_request_type type)
1823 {
1824         struct rbd_obj_request *obj_request;
1825         size_t size;
1826         char *name;
1827
1828         rbd_assert(obj_request_type_valid(type));
1829
1830         size = strlen(object_name) + 1;
1831         name = kmalloc(size, GFP_KERNEL);
1832         if (!name)
1833                 return NULL;
1834
1835         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1836         if (!obj_request) {
1837                 kfree(name);
1838                 return NULL;
1839         }
1840
1841         obj_request->object_name = memcpy(name, object_name, size);
1842         obj_request->offset = offset;
1843         obj_request->length = length;
1844         obj_request->flags = 0;
1845         obj_request->which = BAD_WHICH;
1846         obj_request->type = type;
1847         INIT_LIST_HEAD(&obj_request->links);
1848         init_completion(&obj_request->completion);
1849         kref_init(&obj_request->kref);
1850
1851         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1852                 offset, length, (int)type, obj_request);
1853
1854         return obj_request;
1855 }
1856
1857 static void rbd_obj_request_destroy(struct kref *kref)
1858 {
1859         struct rbd_obj_request *obj_request;
1860
1861         obj_request = container_of(kref, struct rbd_obj_request, kref);
1862
1863         dout("%s: obj %p\n", __func__, obj_request);
1864
1865         rbd_assert(obj_request->img_request == NULL);
1866         rbd_assert(obj_request->which == BAD_WHICH);
1867
1868         if (obj_request->osd_req)
1869                 rbd_osd_req_destroy(obj_request->osd_req);
1870
1871         rbd_assert(obj_request_type_valid(obj_request->type));
1872         switch (obj_request->type) {
1873         case OBJ_REQUEST_NODATA:
1874                 break;          /* Nothing to do */
1875         case OBJ_REQUEST_BIO:
1876                 if (obj_request->bio_list)
1877                         bio_chain_put(obj_request->bio_list);
1878                 break;
1879         case OBJ_REQUEST_PAGES:
1880                 if (obj_request->pages)
1881                         ceph_release_page_vector(obj_request->pages,
1882                                                 obj_request->page_count);
1883                 break;
1884         }
1885
1886         kfree(obj_request->object_name);
1887         obj_request->object_name = NULL;
1888         kmem_cache_free(rbd_obj_request_cache, obj_request);
1889 }
1890
1891 /* It's OK to call this for a device with no parent */
1892
1893 static void rbd_spec_put(struct rbd_spec *spec);
1894 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1895 {
1896         rbd_dev_remove_parent(rbd_dev);
1897         rbd_spec_put(rbd_dev->parent_spec);
1898         rbd_dev->parent_spec = NULL;
1899         rbd_dev->parent_overlap = 0;
1900 }
1901
1902 /*
1903  * Parent image reference counting is used to determine when an
1904  * image's parent fields can be safely torn down--after there are no
1905  * more in-flight requests to the parent image.  When the last
1906  * reference is dropped, cleaning them up is safe.
1907  */
1908 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1909 {
1910         int counter;
1911
1912         if (!rbd_dev->parent_spec)
1913                 return;
1914
1915         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1916         if (counter > 0)
1917                 return;
1918
1919         /* Last reference; clean up parent data structures */
1920
1921         if (!counter)
1922                 rbd_dev_unparent(rbd_dev);
1923         else
1924                 rbd_warn(rbd_dev, "parent reference underflow\n");
1925 }
1926
1927 /*
1928  * If an image has a non-zero parent overlap, get a reference to its
1929  * parent.
1930  *
1931  * We must get the reference before checking for the overlap to
1932  * coordinate properly with zeroing the parent overlap in
1933  * rbd_dev_v2_parent_info() when an image gets flattened.  We
1934  * drop it again if there is no overlap.
1935  *
1936  * Returns true if the rbd device has a parent with a non-zero
1937  * overlap and a reference for it was successfully taken, or
1938  * false otherwise.
1939  */
1940 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1941 {
1942         int counter;
1943
1944         if (!rbd_dev->parent_spec)
1945                 return false;
1946
1947         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1948         if (counter > 0 && rbd_dev->parent_overlap)
1949                 return true;
1950
1951         /* Image was flattened, but parent is not yet torn down */
1952
1953         if (counter < 0)
1954                 rbd_warn(rbd_dev, "parent reference overflow\n");
1955
1956         return false;
1957 }
1958
1959 /*
1960  * Caller is responsible for filling in the list of object requests
1961  * that comprises the image request, and the Linux request pointer
1962  * (if there is one).
1963  */
1964 static struct rbd_img_request *rbd_img_request_create(
1965                                         struct rbd_device *rbd_dev,
1966                                         u64 offset, u64 length,
1967                                         bool write_request)
1968 {
1969         struct rbd_img_request *img_request;
1970
1971         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1972         if (!img_request)
1973                 return NULL;
1974
1975         if (write_request) {
1976                 down_read(&rbd_dev->header_rwsem);
1977                 ceph_get_snap_context(rbd_dev->header.snapc);
1978                 up_read(&rbd_dev->header_rwsem);
1979         }
1980
1981         img_request->rq = NULL;
1982         img_request->rbd_dev = rbd_dev;
1983         img_request->offset = offset;
1984         img_request->length = length;
1985         img_request->flags = 0;
1986         if (write_request) {
1987                 img_request_write_set(img_request);
1988                 img_request->snapc = rbd_dev->header.snapc;
1989         } else {
1990                 img_request->snap_id = rbd_dev->spec->snap_id;
1991         }
1992         if (rbd_dev_parent_get(rbd_dev))
1993                 img_request_layered_set(img_request);
1994         spin_lock_init(&img_request->completion_lock);
1995         img_request->next_completion = 0;
1996         img_request->callback = NULL;
1997         img_request->result = 0;
1998         img_request->obj_request_count = 0;
1999         INIT_LIST_HEAD(&img_request->obj_requests);
2000         kref_init(&img_request->kref);
2001
2002         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2003                 write_request ? "write" : "read", offset, length,
2004                 img_request);
2005
2006         return img_request;
2007 }
2008
2009 static void rbd_img_request_destroy(struct kref *kref)
2010 {
2011         struct rbd_img_request *img_request;
2012         struct rbd_obj_request *obj_request;
2013         struct rbd_obj_request *next_obj_request;
2014
2015         img_request = container_of(kref, struct rbd_img_request, kref);
2016
2017         dout("%s: img %p\n", __func__, img_request);
2018
2019         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2020                 rbd_img_obj_request_del(img_request, obj_request);
2021         rbd_assert(img_request->obj_request_count == 0);
2022
2023         if (img_request_layered_test(img_request)) {
2024                 img_request_layered_clear(img_request);
2025                 rbd_dev_parent_put(img_request->rbd_dev);
2026         }
2027
2028         if (img_request_write_test(img_request))
2029                 ceph_put_snap_context(img_request->snapc);
2030
2031         kmem_cache_free(rbd_img_request_cache, img_request);
2032 }
2033
2034 static struct rbd_img_request *rbd_parent_request_create(
2035                                         struct rbd_obj_request *obj_request,
2036                                         u64 img_offset, u64 length)
2037 {
2038         struct rbd_img_request *parent_request;
2039         struct rbd_device *rbd_dev;
2040
2041         rbd_assert(obj_request->img_request);
2042         rbd_dev = obj_request->img_request->rbd_dev;
2043
2044         parent_request = rbd_img_request_create(rbd_dev->parent,
2045                                                 img_offset, length, false);
2046         if (!parent_request)
2047                 return NULL;
2048
2049         img_request_child_set(parent_request);
2050         rbd_obj_request_get(obj_request);
2051         parent_request->obj_request = obj_request;
2052
2053         return parent_request;
2054 }
2055
2056 static void rbd_parent_request_destroy(struct kref *kref)
2057 {
2058         struct rbd_img_request *parent_request;
2059         struct rbd_obj_request *orig_request;
2060
2061         parent_request = container_of(kref, struct rbd_img_request, kref);
2062         orig_request = parent_request->obj_request;
2063
2064         parent_request->obj_request = NULL;
2065         rbd_obj_request_put(orig_request);
2066         img_request_child_clear(parent_request);
2067
2068         rbd_img_request_destroy(kref);
2069 }
2070
2071 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2072 {
2073         struct rbd_img_request *img_request;
2074         unsigned int xferred;
2075         int result;
2076         bool more;
2077
2078         rbd_assert(obj_request_img_data_test(obj_request));
2079         img_request = obj_request->img_request;
2080
2081         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2082         xferred = (unsigned int)obj_request->xferred;
2083         result = obj_request->result;
2084         if (result) {
2085                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2086
2087                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2088                         img_request_write_test(img_request) ? "write" : "read",
2089                         obj_request->length, obj_request->img_offset,
2090                         obj_request->offset);
2091                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
2092                         result, xferred);
2093                 if (!img_request->result)
2094                         img_request->result = result;
2095         }
2096
2097         /* Image object requests don't own their page array */
2098
2099         if (obj_request->type == OBJ_REQUEST_PAGES) {
2100                 obj_request->pages = NULL;
2101                 obj_request->page_count = 0;
2102         }
2103
2104         if (img_request_child_test(img_request)) {
2105                 rbd_assert(img_request->obj_request != NULL);
2106                 more = obj_request->which < img_request->obj_request_count - 1;
2107         } else {
2108                 rbd_assert(img_request->rq != NULL);
2109                 more = blk_end_request(img_request->rq, result, xferred);
2110         }
2111
2112         return more;
2113 }
2114
2115 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2116 {
2117         struct rbd_img_request *img_request;
2118         u32 which = obj_request->which;
2119         bool more = true;
2120
2121         rbd_assert(obj_request_img_data_test(obj_request));
2122         img_request = obj_request->img_request;
2123
2124         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2125         rbd_assert(img_request != NULL);
2126         rbd_assert(img_request->obj_request_count > 0);
2127         rbd_assert(which != BAD_WHICH);
2128         rbd_assert(which < img_request->obj_request_count);
2129         rbd_assert(which >= img_request->next_completion);
2130
2131         spin_lock_irq(&img_request->completion_lock);
2132         if (which != img_request->next_completion)
2133                 goto out;
2134
2135         for_each_obj_request_from(img_request, obj_request) {
2136                 rbd_assert(more);
2137                 rbd_assert(which < img_request->obj_request_count);
2138
2139                 if (!obj_request_done_test(obj_request))
2140                         break;
2141                 more = rbd_img_obj_end_request(obj_request);
2142                 which++;
2143         }
2144
2145         rbd_assert(more ^ (which == img_request->obj_request_count));
2146         img_request->next_completion = which;
2147 out:
2148         spin_unlock_irq(&img_request->completion_lock);
2149
2150         if (!more)
2151                 rbd_img_request_complete(img_request);
2152 }
2153
2154 /*
2155  * Split up an image request into one or more object requests, each
2156  * to a different object.  The "type" parameter indicates whether
2157  * "data_desc" is the pointer to the head of a list of bio
2158  * structures, or the base of a page array.  In either case this
2159  * function assumes data_desc describes memory sufficient to hold
2160  * all data described by the image request.
2161  */
2162 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2163                                         enum obj_request_type type,
2164                                         void *data_desc)
2165 {
2166         struct rbd_device *rbd_dev = img_request->rbd_dev;
2167         struct rbd_obj_request *obj_request = NULL;
2168         struct rbd_obj_request *next_obj_request;
2169         bool write_request = img_request_write_test(img_request);
2170         struct bio *bio_list;
2171         unsigned int bio_offset = 0;
2172         struct page **pages;
2173         u64 img_offset;
2174         u64 resid;
2175         u16 opcode;
2176
2177         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2178                 (int)type, data_desc);
2179
2180         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2181         img_offset = img_request->offset;
2182         resid = img_request->length;
2183         rbd_assert(resid > 0);
2184
2185         if (type == OBJ_REQUEST_BIO) {
2186                 bio_list = data_desc;
2187                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2188         } else {
2189                 rbd_assert(type == OBJ_REQUEST_PAGES);
2190                 pages = data_desc;
2191         }
2192
2193         while (resid) {
2194                 struct ceph_osd_request *osd_req;
2195                 const char *object_name;
2196                 u64 offset;
2197                 u64 length;
2198
2199                 object_name = rbd_segment_name(rbd_dev, img_offset);
2200                 if (!object_name)
2201                         goto out_unwind;
2202                 offset = rbd_segment_offset(rbd_dev, img_offset);
2203                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2204                 obj_request = rbd_obj_request_create(object_name,
2205                                                 offset, length, type);
2206                 /* object request has its own copy of the object name */
2207                 rbd_segment_name_free(object_name);
2208                 if (!obj_request)
2209                         goto out_unwind;
2210
2211                 if (type == OBJ_REQUEST_BIO) {
2212                         unsigned int clone_size;
2213
2214                         rbd_assert(length <= (u64)UINT_MAX);
2215                         clone_size = (unsigned int)length;
2216                         obj_request->bio_list =
2217                                         bio_chain_clone_range(&bio_list,
2218                                                                 &bio_offset,
2219                                                                 clone_size,
2220                                                                 GFP_ATOMIC);
2221                         if (!obj_request->bio_list)
2222                                 goto out_partial;
2223                 } else {
2224                         unsigned int page_count;
2225
2226                         obj_request->pages = pages;
2227                         page_count = (u32)calc_pages_for(offset, length);
2228                         obj_request->page_count = page_count;
2229                         if ((offset + length) & ~PAGE_MASK)
2230                                 page_count--;   /* more on last page */
2231                         pages += page_count;
2232                 }
2233
2234                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2235                                                 obj_request);
2236                 if (!osd_req)
2237                         goto out_partial;
2238                 obj_request->osd_req = osd_req;
2239                 obj_request->callback = rbd_img_obj_callback;
2240
2241                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2242                                                 0, 0);
2243                 if (type == OBJ_REQUEST_BIO)
2244                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2245                                         obj_request->bio_list, length);
2246                 else
2247                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2248                                         obj_request->pages, length,
2249                                         offset & ~PAGE_MASK, false, false);
2250
2251                 if (write_request)
2252                         rbd_osd_req_format_write(obj_request);
2253                 else
2254                         rbd_osd_req_format_read(obj_request);
2255
2256                 obj_request->img_offset = img_offset;
2257                 rbd_img_obj_request_add(img_request, obj_request);
2258
2259                 img_offset += length;
2260                 resid -= length;
2261         }
2262
2263         return 0;
2264
2265 out_partial:
2266         rbd_obj_request_put(obj_request);
2267 out_unwind:
2268         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2269                 rbd_obj_request_put(obj_request);
2270
2271         return -ENOMEM;
2272 }
2273
2274 static void
2275 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2276 {
2277         struct rbd_img_request *img_request;
2278         struct rbd_device *rbd_dev;
2279         struct page **pages;
2280         u32 page_count;
2281
2282         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2283         rbd_assert(obj_request_img_data_test(obj_request));
2284         img_request = obj_request->img_request;
2285         rbd_assert(img_request);
2286
2287         rbd_dev = img_request->rbd_dev;
2288         rbd_assert(rbd_dev);
2289
2290         pages = obj_request->copyup_pages;
2291         rbd_assert(pages != NULL);
2292         obj_request->copyup_pages = NULL;
2293         page_count = obj_request->copyup_page_count;
2294         rbd_assert(page_count);
2295         obj_request->copyup_page_count = 0;
2296         ceph_release_page_vector(pages, page_count);
2297
2298         /*
2299          * We want the transfer count to reflect the size of the
2300          * original write request.  There is no such thing as a
2301          * successful short write, so if the request was successful
2302          * we can just set it to the originally-requested length.
2303          */
2304         if (!obj_request->result)
2305                 obj_request->xferred = obj_request->length;
2306
2307         /* Finish up with the normal image object callback */
2308
2309         rbd_img_obj_callback(obj_request);
2310 }
2311
2312 static void
2313 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2314 {
2315         struct rbd_obj_request *orig_request;
2316         struct ceph_osd_request *osd_req;
2317         struct ceph_osd_client *osdc;
2318         struct rbd_device *rbd_dev;
2319         struct page **pages;
2320         u32 page_count;
2321         int img_result;
2322         u64 parent_length;
2323         u64 offset;
2324         u64 length;
2325
2326         rbd_assert(img_request_child_test(img_request));
2327
2328         /* First get what we need from the image request */
2329
2330         pages = img_request->copyup_pages;
2331         rbd_assert(pages != NULL);
2332         img_request->copyup_pages = NULL;
2333         page_count = img_request->copyup_page_count;
2334         rbd_assert(page_count);
2335         img_request->copyup_page_count = 0;
2336
2337         orig_request = img_request->obj_request;
2338         rbd_assert(orig_request != NULL);
2339         rbd_assert(obj_request_type_valid(orig_request->type));
2340         img_result = img_request->result;
2341         parent_length = img_request->length;
2342         rbd_assert(parent_length == img_request->xferred);
2343         rbd_img_request_put(img_request);
2344
2345         rbd_assert(orig_request->img_request);
2346         rbd_dev = orig_request->img_request->rbd_dev;
2347         rbd_assert(rbd_dev);
2348
2349         /*
2350          * If the overlap has become 0 (most likely because the
2351          * image has been flattened) we need to free the pages
2352          * and re-submit the original write request.
2353          */
2354         if (!rbd_dev->parent_overlap) {
2355                 struct ceph_osd_client *osdc;
2356
2357                 ceph_release_page_vector(pages, page_count);
2358                 osdc = &rbd_dev->rbd_client->client->osdc;
2359                 img_result = rbd_obj_request_submit(osdc, orig_request);
2360                 if (!img_result)
2361                         return;
2362         }
2363
2364         if (img_result)
2365                 goto out_err;
2366
2367         /*
2368          * The original osd request is of no use to use any more.
2369          * We need a new one that can hold the two ops in a copyup
2370          * request.  Allocate the new copyup osd request for the
2371          * original request, and release the old one.
2372          */
2373         img_result = -ENOMEM;
2374         osd_req = rbd_osd_req_create_copyup(orig_request);
2375         if (!osd_req)
2376                 goto out_err;
2377         rbd_osd_req_destroy(orig_request->osd_req);
2378         orig_request->osd_req = osd_req;
2379         orig_request->copyup_pages = pages;
2380         orig_request->copyup_page_count = page_count;
2381
2382         /* Initialize the copyup op */
2383
2384         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2385         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2386                                                 false, false);
2387
2388         /* Then the original write request op */
2389
2390         offset = orig_request->offset;
2391         length = orig_request->length;
2392         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2393                                         offset, length, 0, 0);
2394         if (orig_request->type == OBJ_REQUEST_BIO)
2395                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2396                                         orig_request->bio_list, length);
2397         else
2398                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2399                                         orig_request->pages, length,
2400                                         offset & ~PAGE_MASK, false, false);
2401
2402         rbd_osd_req_format_write(orig_request);
2403
2404         /* All set, send it off. */
2405
2406         orig_request->callback = rbd_img_obj_copyup_callback;
2407         osdc = &rbd_dev->rbd_client->client->osdc;
2408         img_result = rbd_obj_request_submit(osdc, orig_request);
2409         if (!img_result)
2410                 return;
2411 out_err:
2412         /* Record the error code and complete the request */
2413
2414         orig_request->result = img_result;
2415         orig_request->xferred = 0;
2416         obj_request_done_set(orig_request);
2417         rbd_obj_request_complete(orig_request);
2418 }
2419
2420 /*
2421  * Read from the parent image the range of data that covers the
2422  * entire target of the given object request.  This is used for
2423  * satisfying a layered image write request when the target of an
2424  * object request from the image request does not exist.
2425  *
2426  * A page array big enough to hold the returned data is allocated
2427  * and supplied to rbd_img_request_fill() as the "data descriptor."
2428  * When the read completes, this page array will be transferred to
2429  * the original object request for the copyup operation.
2430  *
2431  * If an error occurs, record it as the result of the original
2432  * object request and mark it done so it gets completed.
2433  */
2434 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2435 {
2436         struct rbd_img_request *img_request = NULL;
2437         struct rbd_img_request *parent_request = NULL;
2438         struct rbd_device *rbd_dev;
2439         u64 img_offset;
2440         u64 length;
2441         struct page **pages = NULL;
2442         u32 page_count;
2443         int result;
2444
2445         rbd_assert(obj_request_img_data_test(obj_request));
2446         rbd_assert(obj_request_type_valid(obj_request->type));
2447
2448         img_request = obj_request->img_request;
2449         rbd_assert(img_request != NULL);
2450         rbd_dev = img_request->rbd_dev;
2451         rbd_assert(rbd_dev->parent != NULL);
2452
2453         /*
2454          * Determine the byte range covered by the object in the
2455          * child image to which the original request was to be sent.
2456          */
2457         img_offset = obj_request->img_offset - obj_request->offset;
2458         length = (u64)1 << rbd_dev->header.obj_order;
2459
2460         /*
2461          * There is no defined parent data beyond the parent
2462          * overlap, so limit what we read at that boundary if
2463          * necessary.
2464          */
2465         if (img_offset + length > rbd_dev->parent_overlap) {
2466                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2467                 length = rbd_dev->parent_overlap - img_offset;
2468         }
2469
2470         /*
2471          * Allocate a page array big enough to receive the data read
2472          * from the parent.
2473          */
2474         page_count = (u32)calc_pages_for(0, length);
2475         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2476         if (IS_ERR(pages)) {
2477                 result = PTR_ERR(pages);
2478                 pages = NULL;
2479                 goto out_err;
2480         }
2481
2482         result = -ENOMEM;
2483         parent_request = rbd_parent_request_create(obj_request,
2484                                                 img_offset, length);
2485         if (!parent_request)
2486                 goto out_err;
2487
2488         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2489         if (result)
2490                 goto out_err;
2491         parent_request->copyup_pages = pages;
2492         parent_request->copyup_page_count = page_count;
2493
2494         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2495         result = rbd_img_request_submit(parent_request);
2496         if (!result)
2497                 return 0;
2498
2499         parent_request->copyup_pages = NULL;
2500         parent_request->copyup_page_count = 0;
2501         parent_request->obj_request = NULL;
2502         rbd_obj_request_put(obj_request);
2503 out_err:
2504         if (pages)
2505                 ceph_release_page_vector(pages, page_count);
2506         if (parent_request)
2507                 rbd_img_request_put(parent_request);
2508         obj_request->result = result;
2509         obj_request->xferred = 0;
2510         obj_request_done_set(obj_request);
2511
2512         return result;
2513 }
2514
2515 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2516 {
2517         struct rbd_obj_request *orig_request;
2518         struct rbd_device *rbd_dev;
2519         int result;
2520
2521         rbd_assert(!obj_request_img_data_test(obj_request));
2522
2523         /*
2524          * All we need from the object request is the original
2525          * request and the result of the STAT op.  Grab those, then
2526          * we're done with the request.
2527          */
2528         orig_request = obj_request->obj_request;
2529         obj_request->obj_request = NULL;
2530         rbd_assert(orig_request);
2531         rbd_assert(orig_request->img_request);
2532
2533         result = obj_request->result;
2534         obj_request->result = 0;
2535
2536         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2537                 obj_request, orig_request, result,
2538                 obj_request->xferred, obj_request->length);
2539         rbd_obj_request_put(obj_request);
2540
2541         /*
2542          * If the overlap has become 0 (most likely because the
2543          * image has been flattened) we need to free the pages
2544          * and re-submit the original write request.
2545          */
2546         rbd_dev = orig_request->img_request->rbd_dev;
2547         if (!rbd_dev->parent_overlap) {
2548                 struct ceph_osd_client *osdc;
2549
2550                 rbd_obj_request_put(orig_request);
2551                 osdc = &rbd_dev->rbd_client->client->osdc;
2552                 result = rbd_obj_request_submit(osdc, orig_request);
2553                 if (!result)
2554                         return;
2555         }
2556
2557         /*
2558          * Our only purpose here is to determine whether the object
2559          * exists, and we don't want to treat the non-existence as
2560          * an error.  If something else comes back, transfer the
2561          * error to the original request and complete it now.
2562          */
2563         if (!result) {
2564                 obj_request_existence_set(orig_request, true);
2565         } else if (result == -ENOENT) {
2566                 obj_request_existence_set(orig_request, false);
2567         } else if (result) {
2568                 orig_request->result = result;
2569                 goto out;
2570         }
2571
2572         /*
2573          * Resubmit the original request now that we have recorded
2574          * whether the target object exists.
2575          */
2576         orig_request->result = rbd_img_obj_request_submit(orig_request);
2577 out:
2578         if (orig_request->result)
2579                 rbd_obj_request_complete(orig_request);
2580         rbd_obj_request_put(orig_request);
2581 }
2582
2583 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2584 {
2585         struct rbd_obj_request *stat_request;
2586         struct rbd_device *rbd_dev;
2587         struct ceph_osd_client *osdc;
2588         struct page **pages = NULL;
2589         u32 page_count;
2590         size_t size;
2591         int ret;
2592
2593         /*
2594          * The response data for a STAT call consists of:
2595          *     le64 length;
2596          *     struct {
2597          *         le32 tv_sec;
2598          *         le32 tv_nsec;
2599          *     } mtime;
2600          */
2601         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2602         page_count = (u32)calc_pages_for(0, size);
2603         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2604         if (IS_ERR(pages))
2605                 return PTR_ERR(pages);
2606
2607         ret = -ENOMEM;
2608         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2609                                                         OBJ_REQUEST_PAGES);
2610         if (!stat_request)
2611                 goto out;
2612
2613         rbd_obj_request_get(obj_request);
2614         stat_request->obj_request = obj_request;
2615         stat_request->pages = pages;
2616         stat_request->page_count = page_count;
2617
2618         rbd_assert(obj_request->img_request);
2619         rbd_dev = obj_request->img_request->rbd_dev;
2620         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2621                                                 stat_request);
2622         if (!stat_request->osd_req)
2623                 goto out;
2624         stat_request->callback = rbd_img_obj_exists_callback;
2625
2626         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2627         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2628                                         false, false);
2629         rbd_osd_req_format_read(stat_request);
2630
2631         osdc = &rbd_dev->rbd_client->client->osdc;
2632         ret = rbd_obj_request_submit(osdc, stat_request);
2633 out:
2634         if (ret)
2635                 rbd_obj_request_put(obj_request);
2636
2637         return ret;
2638 }
2639
2640 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2641 {
2642         struct rbd_img_request *img_request;
2643         struct rbd_device *rbd_dev;
2644         bool known;
2645
2646         rbd_assert(obj_request_img_data_test(obj_request));
2647
2648         img_request = obj_request->img_request;
2649         rbd_assert(img_request);
2650         rbd_dev = img_request->rbd_dev;
2651
2652         /*
2653          * Only writes to layered images need special handling.
2654          * Reads and non-layered writes are simple object requests.
2655          * Layered writes that start beyond the end of the overlap
2656          * with the parent have no parent data, so they too are
2657          * simple object requests.  Finally, if the target object is
2658          * known to already exist, its parent data has already been
2659          * copied, so a write to the object can also be handled as a
2660          * simple object request.
2661          */
2662         if (!img_request_write_test(img_request) ||
2663                 !img_request_layered_test(img_request) ||
2664                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2665                 ((known = obj_request_known_test(obj_request)) &&
2666                         obj_request_exists_test(obj_request))) {
2667
2668                 struct rbd_device *rbd_dev;
2669                 struct ceph_osd_client *osdc;
2670
2671                 rbd_dev = obj_request->img_request->rbd_dev;
2672                 osdc = &rbd_dev->rbd_client->client->osdc;
2673
2674                 return rbd_obj_request_submit(osdc, obj_request);
2675         }
2676
2677         /*
2678          * It's a layered write.  The target object might exist but
2679          * we may not know that yet.  If we know it doesn't exist,
2680          * start by reading the data for the full target object from
2681          * the parent so we can use it for a copyup to the target.
2682          */
2683         if (known)
2684                 return rbd_img_obj_parent_read_full(obj_request);
2685
2686         /* We don't know whether the target exists.  Go find out. */
2687
2688         return rbd_img_obj_exists_submit(obj_request);
2689 }
2690
2691 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2692 {
2693         struct rbd_obj_request *obj_request;
2694         struct rbd_obj_request *next_obj_request;
2695
2696         dout("%s: img %p\n", __func__, img_request);
2697         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2698                 int ret;
2699
2700                 ret = rbd_img_obj_request_submit(obj_request);
2701                 if (ret)
2702                         return ret;
2703         }
2704
2705         return 0;
2706 }
2707
2708 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2709 {
2710         struct rbd_obj_request *obj_request;
2711         struct rbd_device *rbd_dev;
2712         u64 obj_end;
2713         u64 img_xferred;
2714         int img_result;
2715
2716         rbd_assert(img_request_child_test(img_request));
2717
2718         /* First get what we need from the image request and release it */
2719
2720         obj_request = img_request->obj_request;
2721         img_xferred = img_request->xferred;
2722         img_result = img_request->result;
2723         rbd_img_request_put(img_request);
2724
2725         /*
2726          * If the overlap has become 0 (most likely because the
2727          * image has been flattened) we need to re-submit the
2728          * original request.
2729          */
2730         rbd_assert(obj_request);
2731         rbd_assert(obj_request->img_request);
2732         rbd_dev = obj_request->img_request->rbd_dev;
2733         if (!rbd_dev->parent_overlap) {
2734                 struct ceph_osd_client *osdc;
2735
2736                 osdc = &rbd_dev->rbd_client->client->osdc;
2737                 img_result = rbd_obj_request_submit(osdc, obj_request);
2738                 if (!img_result)
2739                         return;
2740         }
2741
2742         obj_request->result = img_result;
2743         if (obj_request->result)
2744                 goto out;
2745
2746         /*
2747          * We need to zero anything beyond the parent overlap
2748          * boundary.  Since rbd_img_obj_request_read_callback()
2749          * will zero anything beyond the end of a short read, an
2750          * easy way to do this is to pretend the data from the
2751          * parent came up short--ending at the overlap boundary.
2752          */
2753         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2754         obj_end = obj_request->img_offset + obj_request->length;
2755         if (obj_end > rbd_dev->parent_overlap) {
2756                 u64 xferred = 0;
2757
2758                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2759                         xferred = rbd_dev->parent_overlap -
2760                                         obj_request->img_offset;
2761
2762                 obj_request->xferred = min(img_xferred, xferred);
2763         } else {
2764                 obj_request->xferred = img_xferred;
2765         }
2766 out:
2767         rbd_img_obj_request_read_callback(obj_request);
2768         rbd_obj_request_complete(obj_request);
2769 }
2770
2771 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2772 {
2773         struct rbd_img_request *img_request;
2774         int result;
2775
2776         rbd_assert(obj_request_img_data_test(obj_request));
2777         rbd_assert(obj_request->img_request != NULL);
2778         rbd_assert(obj_request->result == (s32) -ENOENT);
2779         rbd_assert(obj_request_type_valid(obj_request->type));
2780
2781         /* rbd_read_finish(obj_request, obj_request->length); */
2782         img_request = rbd_parent_request_create(obj_request,
2783                                                 obj_request->img_offset,
2784                                                 obj_request->length);
2785         result = -ENOMEM;
2786         if (!img_request)
2787                 goto out_err;
2788
2789         if (obj_request->type == OBJ_REQUEST_BIO)
2790                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2791                                                 obj_request->bio_list);
2792         else
2793                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2794                                                 obj_request->pages);
2795         if (result)
2796                 goto out_err;
2797
2798         img_request->callback = rbd_img_parent_read_callback;
2799         result = rbd_img_request_submit(img_request);
2800         if (result)
2801                 goto out_err;
2802
2803         return;
2804 out_err:
2805         if (img_request)
2806                 rbd_img_request_put(img_request);
2807         obj_request->result = result;
2808         obj_request->xferred = 0;
2809         obj_request_done_set(obj_request);
2810 }
2811
2812 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2813 {
2814         struct rbd_obj_request *obj_request;
2815         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2816         int ret;
2817
2818         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2819                                                         OBJ_REQUEST_NODATA);
2820         if (!obj_request)
2821                 return -ENOMEM;
2822
2823         ret = -ENOMEM;
2824         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2825         if (!obj_request->osd_req)
2826                 goto out;
2827         obj_request->callback = rbd_obj_request_put;
2828
2829         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2830                                         notify_id, 0, 0);
2831         rbd_osd_req_format_read(obj_request);
2832
2833         ret = rbd_obj_request_submit(osdc, obj_request);
2834 out:
2835         if (ret)
2836                 rbd_obj_request_put(obj_request);
2837
2838         return ret;
2839 }
2840
2841 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2842 {
2843         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2844         int ret;
2845
2846         if (!rbd_dev)
2847                 return;
2848
2849         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2850                 rbd_dev->header_name, (unsigned long long)notify_id,
2851                 (unsigned int)opcode);
2852         ret = rbd_dev_refresh(rbd_dev);
2853         if (ret)
2854                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2855
2856         rbd_obj_notify_ack(rbd_dev, notify_id);
2857 }
2858
2859 /*
2860  * Request sync osd watch/unwatch.  The value of "start" determines
2861  * whether a watch request is being initiated or torn down.
2862  */
2863 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2864 {
2865         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2866         struct rbd_obj_request *obj_request;
2867         int ret;
2868
2869         rbd_assert(start ^ !!rbd_dev->watch_event);
2870         rbd_assert(start ^ !!rbd_dev->watch_request);
2871
2872         if (start) {
2873                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2874                                                 &rbd_dev->watch_event);
2875                 if (ret < 0)
2876                         return ret;
2877                 rbd_assert(rbd_dev->watch_event != NULL);
2878         }
2879
2880         ret = -ENOMEM;
2881         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2882                                                         OBJ_REQUEST_NODATA);
2883         if (!obj_request)
2884                 goto out_cancel;
2885
2886         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2887         if (!obj_request->osd_req)
2888                 goto out_cancel;
2889
2890         if (start)
2891                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2892         else
2893                 ceph_osdc_unregister_linger_request(osdc,
2894                                         rbd_dev->watch_request->osd_req);
2895
2896         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2897                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2898         rbd_osd_req_format_write(obj_request);
2899
2900         ret = rbd_obj_request_submit(osdc, obj_request);
2901         if (ret)
2902                 goto out_cancel;
2903         ret = rbd_obj_request_wait(obj_request);
2904         if (ret)
2905                 goto out_cancel;
2906         ret = obj_request->result;
2907         if (ret)
2908                 goto out_cancel;
2909
2910         /*
2911          * A watch request is set to linger, so the underlying osd
2912          * request won't go away until we unregister it.  We retain
2913          * a pointer to the object request during that time (in
2914          * rbd_dev->watch_request), so we'll keep a reference to
2915          * it.  We'll drop that reference (below) after we've
2916          * unregistered it.
2917          */
2918         if (start) {
2919                 rbd_dev->watch_request = obj_request;
2920
2921                 return 0;
2922         }
2923
2924         /* We have successfully torn down the watch request */
2925
2926         rbd_obj_request_put(rbd_dev->watch_request);
2927         rbd_dev->watch_request = NULL;
2928 out_cancel:
2929         /* Cancel the event if we're tearing down, or on error */
2930         ceph_osdc_cancel_event(rbd_dev->watch_event);
2931         rbd_dev->watch_event = NULL;
2932         if (obj_request)
2933                 rbd_obj_request_put(obj_request);
2934
2935         return ret;
2936 }
2937
2938 /*
2939  * Synchronous osd object method call.  Returns the number of bytes
2940  * returned in the outbound buffer, or a negative error code.
2941  */
2942 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2943                              const char *object_name,
2944                              const char *class_name,
2945                              const char *method_name,
2946                              const void *outbound,
2947                              size_t outbound_size,
2948                              void *inbound,
2949                              size_t inbound_size)
2950 {
2951         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2952         struct rbd_obj_request *obj_request;
2953         struct page **pages;
2954         u32 page_count;
2955         int ret;
2956
2957         /*
2958          * Method calls are ultimately read operations.  The result
2959          * should placed into the inbound buffer provided.  They
2960          * also supply outbound data--parameters for the object
2961          * method.  Currently if this is present it will be a
2962          * snapshot id.
2963          */
2964         page_count = (u32)calc_pages_for(0, inbound_size);
2965         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2966         if (IS_ERR(pages))
2967                 return PTR_ERR(pages);
2968
2969         ret = -ENOMEM;
2970         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2971                                                         OBJ_REQUEST_PAGES);
2972         if (!obj_request)
2973                 goto out;
2974
2975         obj_request->pages = pages;
2976         obj_request->page_count = page_count;
2977
2978         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2979         if (!obj_request->osd_req)
2980                 goto out;
2981
2982         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2983                                         class_name, method_name);
2984         if (outbound_size) {
2985                 struct ceph_pagelist *pagelist;
2986
2987                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2988                 if (!pagelist)
2989                         goto out;
2990
2991                 ceph_pagelist_init(pagelist);
2992                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2993                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2994                                                 pagelist);
2995         }
2996         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2997                                         obj_request->pages, inbound_size,
2998                                         0, false, false);
2999         rbd_osd_req_format_read(obj_request);
3000
3001         ret = rbd_obj_request_submit(osdc, obj_request);
3002         if (ret)
3003                 goto out;
3004         ret = rbd_obj_request_wait(obj_request);
3005         if (ret)
3006                 goto out;
3007
3008         ret = obj_request->result;
3009         if (ret < 0)
3010                 goto out;
3011
3012         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3013         ret = (int)obj_request->xferred;
3014         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3015 out:
3016         if (obj_request)
3017                 rbd_obj_request_put(obj_request);
3018         else
3019                 ceph_release_page_vector(pages, page_count);
3020
3021         return ret;
3022 }
3023
3024 static void rbd_request_fn(struct request_queue *q)
3025                 __releases(q->queue_lock) __acquires(q->queue_lock)
3026 {
3027         struct rbd_device *rbd_dev = q->queuedata;
3028         bool read_only = rbd_dev->mapping.read_only;
3029         struct request *rq;
3030         int result;
3031
3032         while ((rq = blk_fetch_request(q))) {
3033                 bool write_request = rq_data_dir(rq) == WRITE;
3034                 struct rbd_img_request *img_request;
3035                 u64 offset;
3036                 u64 length;
3037
3038                 /* Ignore any non-FS requests that filter through. */
3039
3040                 if (rq->cmd_type != REQ_TYPE_FS) {
3041                         dout("%s: non-fs request type %d\n", __func__,
3042                                 (int) rq->cmd_type);
3043                         __blk_end_request_all(rq, 0);
3044                         continue;
3045                 }
3046
3047                 /* Ignore/skip any zero-length requests */
3048
3049                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3050                 length = (u64) blk_rq_bytes(rq);
3051
3052                 if (!length) {
3053                         dout("%s: zero-length request\n", __func__);
3054                         __blk_end_request_all(rq, 0);
3055                         continue;
3056                 }
3057
3058                 spin_unlock_irq(q->queue_lock);
3059
3060                 /* Disallow writes to a read-only device */
3061
3062                 if (write_request) {
3063                         result = -EROFS;
3064                         if (read_only)
3065                                 goto end_request;
3066                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3067                 }
3068
3069                 /*
3070                  * Quit early if the mapped snapshot no longer
3071                  * exists.  It's still possible the snapshot will
3072                  * have disappeared by the time our request arrives
3073                  * at the osd, but there's no sense in sending it if
3074                  * we already know.
3075                  */
3076                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3077                         dout("request for non-existent snapshot");
3078                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3079                         result = -ENXIO;
3080                         goto end_request;
3081                 }
3082
3083                 result = -EINVAL;
3084                 if (offset && length > U64_MAX - offset + 1) {
3085                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3086                                 offset, length);
3087                         goto end_request;       /* Shouldn't happen */
3088                 }
3089
3090                 result = -EIO;
3091                 if (offset + length > rbd_dev->mapping.size) {
3092                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3093                                 offset, length, rbd_dev->mapping.size);
3094                         goto end_request;
3095                 }
3096
3097                 result = -ENOMEM;
3098                 img_request = rbd_img_request_create(rbd_dev, offset, length,
3099                                                         write_request);
3100                 if (!img_request)
3101                         goto end_request;
3102
3103                 img_request->rq = rq;
3104
3105                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3106                                                 rq->bio);
3107                 if (!result)
3108                         result = rbd_img_request_submit(img_request);
3109                 if (result)
3110                         rbd_img_request_put(img_request);
3111 end_request:
3112                 spin_lock_irq(q->queue_lock);
3113                 if (result < 0) {
3114                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3115                                 write_request ? "write" : "read",
3116                                 length, offset, result);
3117
3118                         __blk_end_request_all(rq, result);
3119                 }
3120         }
3121 }
3122
3123 /*
3124  * a queue callback. Makes sure that we don't create a bio that spans across
3125  * multiple osd objects. One exception would be with a single page bios,
3126  * which we handle later at bio_chain_clone_range()
3127  */
3128 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3129                           struct bio_vec *bvec)
3130 {
3131         struct rbd_device *rbd_dev = q->queuedata;
3132         sector_t sector_offset;
3133         sector_t sectors_per_obj;
3134         sector_t obj_sector_offset;
3135         int ret;
3136
3137         /*
3138          * Find how far into its rbd object the partition-relative
3139          * bio start sector is to offset relative to the enclosing
3140          * device.
3141          */
3142         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3143         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3144         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3145
3146         /*
3147          * Compute the number of bytes from that offset to the end
3148          * of the object.  Account for what's already used by the bio.
3149          */
3150         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3151         if (ret > bmd->bi_size)
3152                 ret -= bmd->bi_size;
3153         else
3154                 ret = 0;
3155
3156         /*
3157          * Don't send back more than was asked for.  And if the bio
3158          * was empty, let the whole thing through because:  "Note
3159          * that a block device *must* allow a single page to be
3160          * added to an empty bio."
3161          */
3162         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3163         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3164                 ret = (int) bvec->bv_len;
3165
3166         return ret;
3167 }
3168
3169 static void rbd_free_disk(struct rbd_device *rbd_dev)
3170 {
3171         struct gendisk *disk = rbd_dev->disk;
3172
3173         if (!disk)
3174                 return;
3175
3176         rbd_dev->disk = NULL;
3177         if (disk->flags & GENHD_FL_UP) {
3178                 del_gendisk(disk);
3179                 if (disk->queue)
3180                         blk_cleanup_queue(disk->queue);
3181         }
3182         put_disk(disk);
3183 }
3184
3185 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3186                                 const char *object_name,
3187                                 u64 offset, u64 length, void *buf)
3188
3189 {
3190         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3191         struct rbd_obj_request *obj_request;
3192         struct page **pages = NULL;
3193         u32 page_count;
3194         size_t size;
3195         int ret;
3196
3197         page_count = (u32) calc_pages_for(offset, length);
3198         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3199         if (IS_ERR(pages))
3200                 ret = PTR_ERR(pages);
3201
3202         ret = -ENOMEM;
3203         obj_request = rbd_obj_request_create(object_name, offset, length,
3204                                                         OBJ_REQUEST_PAGES);
3205         if (!obj_request)
3206                 goto out;
3207
3208         obj_request->pages = pages;
3209         obj_request->page_count = page_count;
3210
3211         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3212         if (!obj_request->osd_req)
3213                 goto out;
3214
3215         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3216                                         offset, length, 0, 0);
3217         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3218                                         obj_request->pages,
3219                                         obj_request->length,
3220                                         obj_request->offset & ~PAGE_MASK,
3221                                         false, false);
3222         rbd_osd_req_format_read(obj_request);
3223
3224         ret = rbd_obj_request_submit(osdc, obj_request);
3225         if (ret)
3226                 goto out;
3227         ret = rbd_obj_request_wait(obj_request);
3228         if (ret)
3229                 goto out;
3230
3231         ret = obj_request->result;
3232         if (ret < 0)
3233                 goto out;
3234
3235         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3236         size = (size_t) obj_request->xferred;
3237         ceph_copy_from_page_vector(pages, buf, 0, size);
3238         rbd_assert(size <= (size_t)INT_MAX);
3239         ret = (int)size;
3240 out:
3241         if (obj_request)
3242                 rbd_obj_request_put(obj_request);
3243         else
3244                 ceph_release_page_vector(pages, page_count);
3245
3246         return ret;
3247 }
3248
3249 /*
3250  * Read the complete header for the given rbd device.  On successful
3251  * return, the rbd_dev->header field will contain up-to-date
3252  * information about the image.
3253  */
3254 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3255 {
3256         struct rbd_image_header_ondisk *ondisk = NULL;
3257         u32 snap_count = 0;
3258         u64 names_size = 0;
3259         u32 want_count;
3260         int ret;
3261
3262         /*
3263          * The complete header will include an array of its 64-bit
3264          * snapshot ids, followed by the names of those snapshots as
3265          * a contiguous block of NUL-terminated strings.  Note that
3266          * the number of snapshots could change by the time we read
3267          * it in, in which case we re-read it.
3268          */
3269         do {
3270                 size_t size;
3271
3272                 kfree(ondisk);
3273
3274                 size = sizeof (*ondisk);
3275                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3276                 size += names_size;
3277                 ondisk = kmalloc(size, GFP_KERNEL);
3278                 if (!ondisk)
3279                         return -ENOMEM;
3280
3281                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3282                                        0, size, ondisk);
3283                 if (ret < 0)
3284                         goto out;
3285                 if ((size_t)ret < size) {
3286                         ret = -ENXIO;
3287                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3288                                 size, ret);
3289                         goto out;
3290                 }
3291                 if (!rbd_dev_ondisk_valid(ondisk)) {
3292                         ret = -ENXIO;
3293                         rbd_warn(rbd_dev, "invalid header");
3294                         goto out;
3295                 }
3296
3297                 names_size = le64_to_cpu(ondisk->snap_names_len);
3298                 want_count = snap_count;
3299                 snap_count = le32_to_cpu(ondisk->snap_count);
3300         } while (snap_count != want_count);
3301
3302         ret = rbd_header_from_disk(rbd_dev, ondisk);
3303 out:
3304         kfree(ondisk);
3305
3306         return ret;
3307 }
3308
3309 /*
3310  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3311  * has disappeared from the (just updated) snapshot context.
3312  */
3313 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3314 {
3315         u64 snap_id;
3316
3317         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3318                 return;
3319
3320         snap_id = rbd_dev->spec->snap_id;
3321         if (snap_id == CEPH_NOSNAP)
3322                 return;
3323
3324         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3325                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3326 }
3327
3328 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3329 {
3330         u64 mapping_size;
3331         int ret;
3332
3333         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3334         mapping_size = rbd_dev->mapping.size;
3335         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3336         if (rbd_dev->image_format == 1)
3337                 ret = rbd_dev_v1_header_info(rbd_dev);
3338         else
3339                 ret = rbd_dev_v2_header_info(rbd_dev);
3340
3341         /* If it's a mapped snapshot, validate its EXISTS flag */
3342
3343         rbd_exists_validate(rbd_dev);
3344         mutex_unlock(&ctl_mutex);
3345         if (mapping_size != rbd_dev->mapping.size) {
3346                 sector_t size;
3347
3348                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3349                 dout("setting size to %llu sectors", (unsigned long long)size);
3350                 set_capacity(rbd_dev->disk, size);
3351                 revalidate_disk(rbd_dev->disk);
3352         }
3353
3354         return ret;
3355 }
3356
3357 static int rbd_init_disk(struct rbd_device *rbd_dev)
3358 {
3359         struct gendisk *disk;
3360         struct request_queue *q;
3361         u64 segment_size;
3362
3363         /* create gendisk info */
3364         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3365         if (!disk)
3366                 return -ENOMEM;
3367
3368         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3369                  rbd_dev->dev_id);
3370         disk->major = rbd_dev->major;
3371         disk->first_minor = 0;
3372         disk->fops = &rbd_bd_ops;
3373         disk->private_data = rbd_dev;
3374
3375         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3376         if (!q)
3377                 goto out_disk;
3378
3379         /* We use the default size, but let's be explicit about it. */
3380         blk_queue_physical_block_size(q, SECTOR_SIZE);
3381
3382         /* set io sizes to object size */
3383         segment_size = rbd_obj_bytes(&rbd_dev->header);
3384         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3385         blk_queue_max_segment_size(q, segment_size);
3386         blk_queue_io_min(q, segment_size);
3387         blk_queue_io_opt(q, segment_size);
3388
3389         blk_queue_merge_bvec(q, rbd_merge_bvec);
3390         disk->queue = q;
3391
3392         q->queuedata = rbd_dev;
3393
3394         rbd_dev->disk = disk;
3395
3396         return 0;
3397 out_disk:
3398         put_disk(disk);
3399
3400         return -ENOMEM;
3401 }
3402
3403 /*
3404   sysfs
3405 */
3406
3407 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3408 {
3409         return container_of(dev, struct rbd_device, dev);
3410 }
3411
3412 static ssize_t rbd_size_show(struct device *dev,
3413                              struct device_attribute *attr, char *buf)
3414 {
3415         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3416
3417         return sprintf(buf, "%llu\n",
3418                 (unsigned long long)rbd_dev->mapping.size);
3419 }
3420
3421 /*
3422  * Note this shows the features for whatever's mapped, which is not
3423  * necessarily the base image.
3424  */
3425 static ssize_t rbd_features_show(struct device *dev,
3426                              struct device_attribute *attr, char *buf)
3427 {
3428         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3429
3430         return sprintf(buf, "0x%016llx\n",
3431                         (unsigned long long)rbd_dev->mapping.features);
3432 }
3433
3434 static ssize_t rbd_major_show(struct device *dev,
3435                               struct device_attribute *attr, char *buf)
3436 {
3437         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3438
3439         if (rbd_dev->major)
3440                 return sprintf(buf, "%d\n", rbd_dev->major);
3441
3442         return sprintf(buf, "(none)\n");
3443
3444 }
3445
3446 static ssize_t rbd_client_id_show(struct device *dev,
3447                                   struct device_attribute *attr, char *buf)
3448 {
3449         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3450
3451         return sprintf(buf, "client%lld\n",
3452                         ceph_client_id(rbd_dev->rbd_client->client));
3453 }
3454
3455 static ssize_t rbd_pool_show(struct device *dev,
3456                              struct device_attribute *attr, char *buf)
3457 {
3458         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3459
3460         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3461 }
3462
3463 static ssize_t rbd_pool_id_show(struct device *dev,
3464                              struct device_attribute *attr, char *buf)
3465 {
3466         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3467
3468         return sprintf(buf, "%llu\n",
3469                         (unsigned long long) rbd_dev->spec->pool_id);
3470 }
3471
3472 static ssize_t rbd_name_show(struct device *dev,
3473                              struct device_attribute *attr, char *buf)
3474 {
3475         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3476
3477         if (rbd_dev->spec->image_name)
3478                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3479
3480         return sprintf(buf, "(unknown)\n");
3481 }
3482
3483 static ssize_t rbd_image_id_show(struct device *dev,
3484                              struct device_attribute *attr, char *buf)
3485 {
3486         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3487
3488         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3489 }
3490
3491 /*
3492  * Shows the name of the currently-mapped snapshot (or
3493  * RBD_SNAP_HEAD_NAME for the base image).
3494  */
3495 static ssize_t rbd_snap_show(struct device *dev,
3496                              struct device_attribute *attr,
3497                              char *buf)
3498 {
3499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3500
3501         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3502 }
3503
3504 /*
3505  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3506  * for the parent image.  If there is no parent, simply shows
3507  * "(no parent image)".
3508  */
3509 static ssize_t rbd_parent_show(struct device *dev,
3510                              struct device_attribute *attr,
3511                              char *buf)
3512 {
3513         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3514         struct rbd_spec *spec = rbd_dev->parent_spec;
3515         int count;
3516         char *bufp = buf;
3517
3518         if (!spec)
3519                 return sprintf(buf, "(no parent image)\n");
3520
3521         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3522                         (unsigned long long) spec->pool_id, spec->pool_name);
3523         if (count < 0)
3524                 return count;
3525         bufp += count;
3526
3527         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3528                         spec->image_name ? spec->image_name : "(unknown)");
3529         if (count < 0)
3530                 return count;
3531         bufp += count;
3532
3533         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3534                         (unsigned long long) spec->snap_id, spec->snap_name);
3535         if (count < 0)
3536                 return count;
3537         bufp += count;
3538
3539         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3540         if (count < 0)
3541                 return count;
3542         bufp += count;
3543
3544         return (ssize_t) (bufp - buf);
3545 }
3546
3547 static ssize_t rbd_image_refresh(struct device *dev,
3548                                  struct device_attribute *attr,
3549                                  const char *buf,
3550                                  size_t size)
3551 {
3552         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3553         int ret;
3554
3555         ret = rbd_dev_refresh(rbd_dev);
3556         if (ret)
3557                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3558
3559         return ret < 0 ? ret : size;
3560 }
3561
3562 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3563 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3564 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3565 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3566 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3567 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3568 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3569 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3570 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3571 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3572 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3573
3574 static struct attribute *rbd_attrs[] = {
3575         &dev_attr_size.attr,
3576         &dev_attr_features.attr,
3577         &dev_attr_major.attr,
3578         &dev_attr_client_id.attr,
3579         &dev_attr_pool.attr,
3580         &dev_attr_pool_id.attr,
3581         &dev_attr_name.attr,
3582         &dev_attr_image_id.attr,
3583         &dev_attr_current_snap.attr,
3584         &dev_attr_parent.attr,
3585         &dev_attr_refresh.attr,
3586         NULL
3587 };
3588
3589 static struct attribute_group rbd_attr_group = {
3590         .attrs = rbd_attrs,
3591 };
3592
3593 static const struct attribute_group *rbd_attr_groups[] = {
3594         &rbd_attr_group,
3595         NULL
3596 };
3597
3598 static void rbd_sysfs_dev_release(struct device *dev)
3599 {
3600 }
3601
3602 static struct device_type rbd_device_type = {
3603         .name           = "rbd",
3604         .groups         = rbd_attr_groups,
3605         .release        = rbd_sysfs_dev_release,
3606 };
3607
3608 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3609 {
3610         kref_get(&spec->kref);
3611
3612         return spec;
3613 }
3614
3615 static void rbd_spec_free(struct kref *kref);
3616 static void rbd_spec_put(struct rbd_spec *spec)
3617 {
3618         if (spec)
3619                 kref_put(&spec->kref, rbd_spec_free);
3620 }
3621
3622 static struct rbd_spec *rbd_spec_alloc(void)
3623 {
3624         struct rbd_spec *spec;
3625
3626         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3627         if (!spec)
3628                 return NULL;
3629         kref_init(&spec->kref);
3630
3631         return spec;
3632 }
3633
3634 static void rbd_spec_free(struct kref *kref)
3635 {
3636         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3637
3638         kfree(spec->pool_name);
3639         kfree(spec->image_id);
3640         kfree(spec->image_name);
3641         kfree(spec->snap_name);
3642         kfree(spec);
3643 }
3644
3645 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3646                                 struct rbd_spec *spec)
3647 {
3648         struct rbd_device *rbd_dev;
3649
3650         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3651         if (!rbd_dev)
3652                 return NULL;
3653
3654         spin_lock_init(&rbd_dev->lock);
3655         rbd_dev->flags = 0;
3656         atomic_set(&rbd_dev->parent_ref, 0);
3657         INIT_LIST_HEAD(&rbd_dev->node);
3658         init_rwsem(&rbd_dev->header_rwsem);
3659
3660         rbd_dev->spec = spec;
3661         rbd_dev->rbd_client = rbdc;
3662
3663         /* Initialize the layout used for all rbd requests */
3664
3665         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3666         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3667         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3668         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3669
3670         return rbd_dev;
3671 }
3672
3673 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3674 {
3675         rbd_put_client(rbd_dev->rbd_client);
3676         rbd_spec_put(rbd_dev->spec);
3677         kfree(rbd_dev);
3678 }
3679
3680 /*
3681  * Get the size and object order for an image snapshot, or if
3682  * snap_id is CEPH_NOSNAP, gets this information for the base
3683  * image.
3684  */
3685 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3686                                 u8 *order, u64 *snap_size)
3687 {
3688         __le64 snapid = cpu_to_le64(snap_id);
3689         int ret;
3690         struct {
3691                 u8 order;
3692                 __le64 size;
3693         } __attribute__ ((packed)) size_buf = { 0 };
3694
3695         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3696                                 "rbd", "get_size",
3697                                 &snapid, sizeof (snapid),
3698                                 &size_buf, sizeof (size_buf));
3699         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3700         if (ret < 0)
3701                 return ret;
3702         if (ret < sizeof (size_buf))
3703                 return -ERANGE;
3704
3705         if (order)
3706                 *order = size_buf.order;
3707         *snap_size = le64_to_cpu(size_buf.size);
3708
3709         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3710                 (unsigned long long)snap_id, (unsigned int)*order,
3711                 (unsigned long long)*snap_size);
3712
3713         return 0;
3714 }
3715
3716 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3717 {
3718         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3719                                         &rbd_dev->header.obj_order,
3720                                         &rbd_dev->header.image_size);
3721 }
3722
3723 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3724 {
3725         void *reply_buf;
3726         int ret;
3727         void *p;
3728
3729         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3730         if (!reply_buf)
3731                 return -ENOMEM;
3732
3733         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3734                                 "rbd", "get_object_prefix", NULL, 0,
3735                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3736         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3737         if (ret < 0)
3738                 goto out;
3739
3740         p = reply_buf;
3741         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3742                                                 p + ret, NULL, GFP_NOIO);
3743         ret = 0;
3744
3745         if (IS_ERR(rbd_dev->header.object_prefix)) {
3746                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3747                 rbd_dev->header.object_prefix = NULL;
3748         } else {
3749                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3750         }
3751 out:
3752         kfree(reply_buf);
3753
3754         return ret;
3755 }
3756
3757 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3758                 u64 *snap_features)
3759 {
3760         __le64 snapid = cpu_to_le64(snap_id);
3761         struct {
3762                 __le64 features;
3763                 __le64 incompat;
3764         } __attribute__ ((packed)) features_buf = { 0 };
3765         u64 incompat;
3766         int ret;
3767
3768         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3769                                 "rbd", "get_features",
3770                                 &snapid, sizeof (snapid),
3771                                 &features_buf, sizeof (features_buf));
3772         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3773         if (ret < 0)
3774                 return ret;
3775         if (ret < sizeof (features_buf))
3776                 return -ERANGE;
3777
3778         incompat = le64_to_cpu(features_buf.incompat);
3779         if (incompat & ~RBD_FEATURES_SUPPORTED)
3780                 return -ENXIO;
3781
3782         *snap_features = le64_to_cpu(features_buf.features);
3783
3784         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3785                 (unsigned long long)snap_id,
3786                 (unsigned long long)*snap_features,
3787                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3788
3789         return 0;
3790 }
3791
3792 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3793 {
3794         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3795                                                 &rbd_dev->header.features);
3796 }
3797
3798 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3799 {
3800         struct rbd_spec *parent_spec;
3801         size_t size;
3802         void *reply_buf = NULL;
3803         __le64 snapid;
3804         void *p;
3805         void *end;
3806         u64 pool_id;
3807         char *image_id;
3808         u64 overlap;
3809         int ret;
3810
3811         parent_spec = rbd_spec_alloc();
3812         if (!parent_spec)
3813                 return -ENOMEM;
3814
3815         size = sizeof (__le64) +                                /* pool_id */
3816                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3817                 sizeof (__le64) +                               /* snap_id */
3818                 sizeof (__le64);                                /* overlap */
3819         reply_buf = kmalloc(size, GFP_KERNEL);
3820         if (!reply_buf) {
3821                 ret = -ENOMEM;
3822                 goto out_err;
3823         }
3824
3825         snapid = cpu_to_le64(CEPH_NOSNAP);
3826         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3827                                 "rbd", "get_parent",
3828                                 &snapid, sizeof (snapid),
3829                                 reply_buf, size);
3830         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3831         if (ret < 0)
3832                 goto out_err;
3833
3834         p = reply_buf;
3835         end = reply_buf + ret;
3836         ret = -ERANGE;
3837         ceph_decode_64_safe(&p, end, pool_id, out_err);
3838         if (pool_id == CEPH_NOPOOL) {
3839                 /*
3840                  * Either the parent never existed, or we have
3841                  * record of it but the image got flattened so it no
3842                  * longer has a parent.  When the parent of a
3843                  * layered image disappears we immediately set the
3844                  * overlap to 0.  The effect of this is that all new
3845                  * requests will be treated as if the image had no
3846                  * parent.
3847                  */
3848                 if (rbd_dev->parent_overlap) {
3849                         rbd_dev->parent_overlap = 0;
3850                         smp_mb();
3851                         rbd_dev_parent_put(rbd_dev);
3852                         pr_info("%s: clone image has been flattened\n",
3853                                 rbd_dev->disk->disk_name);
3854                 }
3855
3856                 goto out;       /* No parent?  No problem. */
3857         }
3858
3859         /* The ceph file layout needs to fit pool id in 32 bits */
3860
3861         ret = -EIO;
3862         if (pool_id > (u64)U32_MAX) {
3863                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3864                         (unsigned long long)pool_id, U32_MAX);
3865                 goto out_err;
3866         }
3867         parent_spec->pool_id = pool_id;
3868
3869         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3870         if (IS_ERR(image_id)) {
3871                 ret = PTR_ERR(image_id);
3872                 goto out_err;
3873         }
3874         parent_spec->image_id = image_id;
3875         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3876         ceph_decode_64_safe(&p, end, overlap, out_err);
3877
3878         if (overlap) {
3879                 rbd_spec_put(rbd_dev->parent_spec);
3880                 rbd_dev->parent_spec = parent_spec;
3881                 parent_spec = NULL;     /* rbd_dev now owns this */
3882                 rbd_dev->parent_overlap = overlap;
3883         } else {
3884                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3885         }
3886 out:
3887         ret = 0;
3888 out_err:
3889         kfree(reply_buf);
3890         rbd_spec_put(parent_spec);
3891
3892         return ret;
3893 }
3894
3895 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3896 {
3897         struct {
3898                 __le64 stripe_unit;
3899                 __le64 stripe_count;
3900         } __attribute__ ((packed)) striping_info_buf = { 0 };
3901         size_t size = sizeof (striping_info_buf);
3902         void *p;
3903         u64 obj_size;
3904         u64 stripe_unit;
3905         u64 stripe_count;
3906         int ret;
3907
3908         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3909                                 "rbd", "get_stripe_unit_count", NULL, 0,
3910                                 (char *)&striping_info_buf, size);
3911         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3912         if (ret < 0)
3913                 return ret;
3914         if (ret < size)
3915                 return -ERANGE;
3916
3917         /*
3918          * We don't actually support the "fancy striping" feature
3919          * (STRIPINGV2) yet, but if the striping sizes are the
3920          * defaults the behavior is the same as before.  So find
3921          * out, and only fail if the image has non-default values.
3922          */
3923         ret = -EINVAL;
3924         obj_size = (u64)1 << rbd_dev->header.obj_order;
3925         p = &striping_info_buf;
3926         stripe_unit = ceph_decode_64(&p);
3927         if (stripe_unit != obj_size) {
3928                 rbd_warn(rbd_dev, "unsupported stripe unit "
3929                                 "(got %llu want %llu)",
3930                                 stripe_unit, obj_size);
3931                 return -EINVAL;
3932         }
3933         stripe_count = ceph_decode_64(&p);
3934         if (stripe_count != 1) {
3935                 rbd_warn(rbd_dev, "unsupported stripe count "
3936                                 "(got %llu want 1)", stripe_count);
3937                 return -EINVAL;
3938         }
3939         rbd_dev->header.stripe_unit = stripe_unit;
3940         rbd_dev->header.stripe_count = stripe_count;
3941
3942         return 0;
3943 }
3944
3945 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3946 {
3947         size_t image_id_size;
3948         char *image_id;
3949         void *p;
3950         void *end;
3951         size_t size;
3952         void *reply_buf = NULL;
3953         size_t len = 0;
3954         char *image_name = NULL;
3955         int ret;
3956
3957         rbd_assert(!rbd_dev->spec->image_name);
3958
3959         len = strlen(rbd_dev->spec->image_id);
3960         image_id_size = sizeof (__le32) + len;
3961         image_id = kmalloc(image_id_size, GFP_KERNEL);
3962         if (!image_id)
3963                 return NULL;
3964
3965         p = image_id;
3966         end = image_id + image_id_size;
3967         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3968
3969         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3970         reply_buf = kmalloc(size, GFP_KERNEL);
3971         if (!reply_buf)
3972                 goto out;
3973
3974         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3975                                 "rbd", "dir_get_name",
3976                                 image_id, image_id_size,
3977                                 reply_buf, size);
3978         if (ret < 0)
3979                 goto out;
3980         p = reply_buf;
3981         end = reply_buf + ret;
3982
3983         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3984         if (IS_ERR(image_name))
3985                 image_name = NULL;
3986         else
3987                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3988 out:
3989         kfree(reply_buf);
3990         kfree(image_id);
3991
3992         return image_name;
3993 }
3994
3995 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3996 {
3997         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3998         const char *snap_name;
3999         u32 which = 0;
4000
4001         /* Skip over names until we find the one we are looking for */
4002
4003         snap_name = rbd_dev->header.snap_names;
4004         while (which < snapc->num_snaps) {
4005                 if (!strcmp(name, snap_name))
4006                         return snapc->snaps[which];
4007                 snap_name += strlen(snap_name) + 1;
4008                 which++;
4009         }
4010         return CEPH_NOSNAP;
4011 }
4012
4013 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4014 {
4015         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4016         u32 which;
4017         bool found = false;
4018         u64 snap_id;
4019
4020         for (which = 0; !found && which < snapc->num_snaps; which++) {
4021                 const char *snap_name;
4022
4023                 snap_id = snapc->snaps[which];
4024                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4025                 if (IS_ERR(snap_name))
4026                         break;
4027                 found = !strcmp(name, snap_name);
4028                 kfree(snap_name);
4029         }
4030         return found ? snap_id : CEPH_NOSNAP;
4031 }
4032
4033 /*
4034  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4035  * no snapshot by that name is found, or if an error occurs.
4036  */
4037 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4038 {
4039         if (rbd_dev->image_format == 1)
4040                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4041
4042         return rbd_v2_snap_id_by_name(rbd_dev, name);
4043 }
4044
4045 /*
4046  * When an rbd image has a parent image, it is identified by the
4047  * pool, image, and snapshot ids (not names).  This function fills
4048  * in the names for those ids.  (It's OK if we can't figure out the
4049  * name for an image id, but the pool and snapshot ids should always
4050  * exist and have names.)  All names in an rbd spec are dynamically
4051  * allocated.
4052  *
4053  * When an image being mapped (not a parent) is probed, we have the
4054  * pool name and pool id, image name and image id, and the snapshot
4055  * name.  The only thing we're missing is the snapshot id.
4056  */
4057 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4058 {
4059         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4060         struct rbd_spec *spec = rbd_dev->spec;
4061         const char *pool_name;
4062         const char *image_name;
4063         const char *snap_name;
4064         int ret;
4065
4066         /*
4067          * An image being mapped will have the pool name (etc.), but
4068          * we need to look up the snapshot id.
4069          */
4070         if (spec->pool_name) {
4071                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4072                         u64 snap_id;
4073
4074                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4075                         if (snap_id == CEPH_NOSNAP)
4076                                 return -ENOENT;
4077                         spec->snap_id = snap_id;
4078                 } else {
4079                         spec->snap_id = CEPH_NOSNAP;
4080                 }
4081
4082                 return 0;
4083         }
4084
4085         /* Get the pool name; we have to make our own copy of this */
4086
4087         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4088         if (!pool_name) {
4089                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4090                 return -EIO;
4091         }
4092         pool_name = kstrdup(pool_name, GFP_KERNEL);
4093         if (!pool_name)
4094                 return -ENOMEM;
4095
4096         /* Fetch the image name; tolerate failure here */
4097
4098         image_name = rbd_dev_image_name(rbd_dev);
4099         if (!image_name)
4100                 rbd_warn(rbd_dev, "unable to get image name");
4101
4102         /* Look up the snapshot name, and make a copy */
4103
4104         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4105         if (!snap_name) {
4106                 ret = -ENOMEM;
4107                 goto out_err;
4108         }
4109
4110         spec->pool_name = pool_name;
4111         spec->image_name = image_name;
4112         spec->snap_name = snap_name;
4113
4114         return 0;
4115 out_err:
4116         kfree(image_name);
4117         kfree(pool_name);
4118
4119         return ret;
4120 }
4121
4122 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4123 {
4124         size_t size;
4125         int ret;
4126         void *reply_buf;
4127         void *p;
4128         void *end;
4129         u64 seq;
4130         u32 snap_count;
4131         struct ceph_snap_context *snapc;
4132         u32 i;
4133
4134         /*
4135          * We'll need room for the seq value (maximum snapshot id),
4136          * snapshot count, and array of that many snapshot ids.
4137          * For now we have a fixed upper limit on the number we're
4138          * prepared to receive.
4139          */
4140         size = sizeof (__le64) + sizeof (__le32) +
4141                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4142         reply_buf = kzalloc(size, GFP_KERNEL);
4143         if (!reply_buf)
4144                 return -ENOMEM;
4145
4146         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4147                                 "rbd", "get_snapcontext", NULL, 0,
4148                                 reply_buf, size);
4149         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4150         if (ret < 0)
4151                 goto out;
4152
4153         p = reply_buf;
4154         end = reply_buf + ret;
4155         ret = -ERANGE;
4156         ceph_decode_64_safe(&p, end, seq, out);
4157         ceph_decode_32_safe(&p, end, snap_count, out);
4158
4159         /*
4160          * Make sure the reported number of snapshot ids wouldn't go
4161          * beyond the end of our buffer.  But before checking that,
4162          * make sure the computed size of the snapshot context we
4163          * allocate is representable in a size_t.
4164          */
4165         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4166                                  / sizeof (u64)) {
4167                 ret = -EINVAL;
4168                 goto out;
4169         }
4170         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4171                 goto out;
4172         ret = 0;
4173
4174         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4175         if (!snapc) {
4176                 ret = -ENOMEM;
4177                 goto out;
4178         }
4179         snapc->seq = seq;
4180         for (i = 0; i < snap_count; i++)
4181                 snapc->snaps[i] = ceph_decode_64(&p);
4182
4183         ceph_put_snap_context(rbd_dev->header.snapc);
4184         rbd_dev->header.snapc = snapc;
4185
4186         dout("  snap context seq = %llu, snap_count = %u\n",
4187                 (unsigned long long)seq, (unsigned int)snap_count);
4188 out:
4189         kfree(reply_buf);
4190
4191         return ret;
4192 }
4193
4194 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4195                                         u64 snap_id)
4196 {
4197         size_t size;
4198         void *reply_buf;
4199         __le64 snapid;
4200         int ret;
4201         void *p;
4202         void *end;
4203         char *snap_name;
4204
4205         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4206         reply_buf = kmalloc(size, GFP_KERNEL);
4207         if (!reply_buf)
4208                 return ERR_PTR(-ENOMEM);
4209
4210         snapid = cpu_to_le64(snap_id);
4211         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4212                                 "rbd", "get_snapshot_name",
4213                                 &snapid, sizeof (snapid),
4214                                 reply_buf, size);
4215         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4216         if (ret < 0) {
4217                 snap_name = ERR_PTR(ret);
4218                 goto out;
4219         }
4220
4221         p = reply_buf;
4222         end = reply_buf + ret;
4223         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4224         if (IS_ERR(snap_name))
4225                 goto out;
4226
4227         dout("  snap_id 0x%016llx snap_name = %s\n",
4228                 (unsigned long long)snap_id, snap_name);
4229 out:
4230         kfree(reply_buf);
4231
4232         return snap_name;
4233 }
4234
4235 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4236 {
4237         bool first_time = rbd_dev->header.object_prefix == NULL;
4238         int ret;
4239
4240         down_write(&rbd_dev->header_rwsem);
4241
4242         if (first_time) {
4243                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4244                 if (ret)
4245                         goto out;
4246         }
4247
4248         /*
4249          * If the image supports layering, get the parent info.  We
4250          * need to probe the first time regardless.  Thereafter we
4251          * only need to if there's a parent, to see if it has
4252          * disappeared due to the mapped image getting flattened.
4253          */
4254         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4255                         (first_time || rbd_dev->parent_spec)) {
4256                 bool warn;
4257
4258                 ret = rbd_dev_v2_parent_info(rbd_dev);
4259                 if (ret)
4260                         goto out;
4261
4262                 /*
4263                  * Print a warning if this is the initial probe and
4264                  * the image has a parent.  Don't print it if the
4265                  * image now being probed is itself a parent.  We
4266                  * can tell at this point because we won't know its
4267                  * pool name yet (just its pool id).
4268                  */
4269                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4270                 if (first_time && warn)
4271                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4272                                         "is EXPERIMENTAL!");
4273         }
4274
4275         ret = rbd_dev_v2_image_size(rbd_dev);
4276         if (ret)
4277                 goto out;
4278
4279         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4280                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4281                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4282
4283         ret = rbd_dev_v2_snap_context(rbd_dev);
4284         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4285 out:
4286         up_write(&rbd_dev->header_rwsem);
4287
4288         return ret;
4289 }
4290
4291 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4292 {
4293         struct device *dev;
4294         int ret;
4295
4296         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4297
4298         dev = &rbd_dev->dev;
4299         dev->bus = &rbd_bus_type;
4300         dev->type = &rbd_device_type;
4301         dev->parent = &rbd_root_dev;
4302         dev->release = rbd_dev_device_release;
4303         dev_set_name(dev, "%d", rbd_dev->dev_id);
4304         ret = device_register(dev);
4305
4306         mutex_unlock(&ctl_mutex);
4307
4308         return ret;
4309 }
4310
4311 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4312 {
4313         device_unregister(&rbd_dev->dev);
4314 }
4315
4316 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4317
4318 /*
4319  * Get a unique rbd identifier for the given new rbd_dev, and add
4320  * the rbd_dev to the global list.  The minimum rbd id is 1.
4321  */
4322 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4323 {
4324         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4325
4326         spin_lock(&rbd_dev_list_lock);
4327         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4328         spin_unlock(&rbd_dev_list_lock);
4329         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4330                 (unsigned long long) rbd_dev->dev_id);
4331 }
4332
4333 /*
4334  * Remove an rbd_dev from the global list, and record that its
4335  * identifier is no longer in use.
4336  */
4337 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4338 {
4339         struct list_head *tmp;
4340         int rbd_id = rbd_dev->dev_id;
4341         int max_id;
4342
4343         rbd_assert(rbd_id > 0);
4344
4345         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4346                 (unsigned long long) rbd_dev->dev_id);
4347         spin_lock(&rbd_dev_list_lock);
4348         list_del_init(&rbd_dev->node);
4349
4350         /*
4351          * If the id being "put" is not the current maximum, there
4352          * is nothing special we need to do.
4353          */
4354         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4355                 spin_unlock(&rbd_dev_list_lock);
4356                 return;
4357         }
4358
4359         /*
4360          * We need to update the current maximum id.  Search the
4361          * list to find out what it is.  We're more likely to find
4362          * the maximum at the end, so search the list backward.
4363          */
4364         max_id = 0;
4365         list_for_each_prev(tmp, &rbd_dev_list) {
4366                 struct rbd_device *rbd_dev;
4367
4368                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4369                 if (rbd_dev->dev_id > max_id)
4370                         max_id = rbd_dev->dev_id;
4371         }
4372         spin_unlock(&rbd_dev_list_lock);
4373
4374         /*
4375          * The max id could have been updated by rbd_dev_id_get(), in
4376          * which case it now accurately reflects the new maximum.
4377          * Be careful not to overwrite the maximum value in that
4378          * case.
4379          */
4380         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4381         dout("  max dev id has been reset\n");
4382 }
4383
4384 /*
4385  * Skips over white space at *buf, and updates *buf to point to the
4386  * first found non-space character (if any). Returns the length of
4387  * the token (string of non-white space characters) found.  Note
4388  * that *buf must be terminated with '\0'.
4389  */
4390 static inline size_t next_token(const char **buf)
4391 {
4392         /*
4393         * These are the characters that produce nonzero for
4394         * isspace() in the "C" and "POSIX" locales.
4395         */
4396         const char *spaces = " \f\n\r\t\v";
4397
4398         *buf += strspn(*buf, spaces);   /* Find start of token */
4399
4400         return strcspn(*buf, spaces);   /* Return token length */
4401 }
4402
4403 /*
4404  * Finds the next token in *buf, and if the provided token buffer is
4405  * big enough, copies the found token into it.  The result, if
4406  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4407  * must be terminated with '\0' on entry.
4408  *
4409  * Returns the length of the token found (not including the '\0').
4410  * Return value will be 0 if no token is found, and it will be >=
4411  * token_size if the token would not fit.
4412  *
4413  * The *buf pointer will be updated to point beyond the end of the
4414  * found token.  Note that this occurs even if the token buffer is
4415  * too small to hold it.
4416  */
4417 static inline size_t copy_token(const char **buf,
4418                                 char *token,
4419                                 size_t token_size)
4420 {
4421         size_t len;
4422
4423         len = next_token(buf);
4424         if (len < token_size) {
4425                 memcpy(token, *buf, len);
4426                 *(token + len) = '\0';
4427         }
4428         *buf += len;
4429
4430         return len;
4431 }
4432
4433 /*
4434  * Finds the next token in *buf, dynamically allocates a buffer big
4435  * enough to hold a copy of it, and copies the token into the new
4436  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4437  * that a duplicate buffer is created even for a zero-length token.
4438  *
4439  * Returns a pointer to the newly-allocated duplicate, or a null
4440  * pointer if memory for the duplicate was not available.  If
4441  * the lenp argument is a non-null pointer, the length of the token
4442  * (not including the '\0') is returned in *lenp.
4443  *
4444  * If successful, the *buf pointer will be updated to point beyond
4445  * the end of the found token.
4446  *
4447  * Note: uses GFP_KERNEL for allocation.
4448  */
4449 static inline char *dup_token(const char **buf, size_t *lenp)
4450 {
4451         char *dup;
4452         size_t len;
4453
4454         len = next_token(buf);
4455         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4456         if (!dup)
4457                 return NULL;
4458         *(dup + len) = '\0';
4459         *buf += len;
4460
4461         if (lenp)
4462                 *lenp = len;
4463
4464         return dup;
4465 }
4466
4467 /*
4468  * Parse the options provided for an "rbd add" (i.e., rbd image
4469  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4470  * and the data written is passed here via a NUL-terminated buffer.
4471  * Returns 0 if successful or an error code otherwise.
4472  *
4473  * The information extracted from these options is recorded in
4474  * the other parameters which return dynamically-allocated
4475  * structures:
4476  *  ceph_opts
4477  *      The address of a pointer that will refer to a ceph options
4478  *      structure.  Caller must release the returned pointer using
4479  *      ceph_destroy_options() when it is no longer needed.
4480  *  rbd_opts
4481  *      Address of an rbd options pointer.  Fully initialized by
4482  *      this function; caller must release with kfree().
4483  *  spec
4484  *      Address of an rbd image specification pointer.  Fully
4485  *      initialized by this function based on parsed options.
4486  *      Caller must release with rbd_spec_put().
4487  *
4488  * The options passed take this form:
4489  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4490  * where:
4491  *  <mon_addrs>
4492  *      A comma-separated list of one or more monitor addresses.
4493  *      A monitor address is an ip address, optionally followed
4494  *      by a port number (separated by a colon).
4495  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4496  *  <options>
4497  *      A comma-separated list of ceph and/or rbd options.
4498  *  <pool_name>
4499  *      The name of the rados pool containing the rbd image.
4500  *  <image_name>
4501  *      The name of the image in that pool to map.
4502  *  <snap_id>
4503  *      An optional snapshot id.  If provided, the mapping will
4504  *      present data from the image at the time that snapshot was
4505  *      created.  The image head is used if no snapshot id is
4506  *      provided.  Snapshot mappings are always read-only.
4507  */
4508 static int rbd_add_parse_args(const char *buf,
4509                                 struct ceph_options **ceph_opts,
4510                                 struct rbd_options **opts,
4511                                 struct rbd_spec **rbd_spec)
4512 {
4513         size_t len;
4514         char *options;
4515         const char *mon_addrs;
4516         char *snap_name;
4517         size_t mon_addrs_size;
4518         struct rbd_spec *spec = NULL;
4519         struct rbd_options *rbd_opts = NULL;
4520         struct ceph_options *copts;
4521         int ret;
4522
4523         /* The first four tokens are required */
4524
4525         len = next_token(&buf);
4526         if (!len) {
4527                 rbd_warn(NULL, "no monitor address(es) provided");
4528                 return -EINVAL;
4529         }
4530         mon_addrs = buf;
4531         mon_addrs_size = len + 1;
4532         buf += len;
4533
4534         ret = -EINVAL;
4535         options = dup_token(&buf, NULL);
4536         if (!options)
4537                 return -ENOMEM;
4538         if (!*options) {
4539                 rbd_warn(NULL, "no options provided");
4540                 goto out_err;
4541         }
4542
4543         spec = rbd_spec_alloc();
4544         if (!spec)
4545                 goto out_mem;
4546
4547         spec->pool_name = dup_token(&buf, NULL);
4548         if (!spec->pool_name)
4549                 goto out_mem;
4550         if (!*spec->pool_name) {
4551                 rbd_warn(NULL, "no pool name provided");
4552                 goto out_err;
4553         }
4554
4555         spec->image_name = dup_token(&buf, NULL);
4556         if (!spec->image_name)
4557                 goto out_mem;
4558         if (!*spec->image_name) {
4559                 rbd_warn(NULL, "no image name provided");
4560                 goto out_err;
4561         }
4562
4563         /*
4564          * Snapshot name is optional; default is to use "-"
4565          * (indicating the head/no snapshot).
4566          */
4567         len = next_token(&buf);
4568         if (!len) {
4569                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4570                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4571         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4572                 ret = -ENAMETOOLONG;
4573                 goto out_err;
4574         }
4575         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4576         if (!snap_name)
4577                 goto out_mem;
4578         *(snap_name + len) = '\0';
4579         spec->snap_name = snap_name;
4580
4581         /* Initialize all rbd options to the defaults */
4582
4583         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4584         if (!rbd_opts)
4585                 goto out_mem;
4586
4587         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4588
4589         copts = ceph_parse_options(options, mon_addrs,
4590                                         mon_addrs + mon_addrs_size - 1,
4591                                         parse_rbd_opts_token, rbd_opts);
4592         if (IS_ERR(copts)) {
4593                 ret = PTR_ERR(copts);
4594                 goto out_err;
4595         }
4596         kfree(options);
4597
4598         *ceph_opts = copts;
4599         *opts = rbd_opts;
4600         *rbd_spec = spec;
4601
4602         return 0;
4603 out_mem:
4604         ret = -ENOMEM;
4605 out_err:
4606         kfree(rbd_opts);
4607         rbd_spec_put(spec);
4608         kfree(options);
4609
4610         return ret;
4611 }
4612
4613 /*
4614  * An rbd format 2 image has a unique identifier, distinct from the
4615  * name given to it by the user.  Internally, that identifier is
4616  * what's used to specify the names of objects related to the image.
4617  *
4618  * A special "rbd id" object is used to map an rbd image name to its
4619  * id.  If that object doesn't exist, then there is no v2 rbd image
4620  * with the supplied name.
4621  *
4622  * This function will record the given rbd_dev's image_id field if
4623  * it can be determined, and in that case will return 0.  If any
4624  * errors occur a negative errno will be returned and the rbd_dev's
4625  * image_id field will be unchanged (and should be NULL).
4626  */
4627 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4628 {
4629         int ret;
4630         size_t size;
4631         char *object_name;
4632         void *response;
4633         char *image_id;
4634
4635         /*
4636          * When probing a parent image, the image id is already
4637          * known (and the image name likely is not).  There's no
4638          * need to fetch the image id again in this case.  We
4639          * do still need to set the image format though.
4640          */
4641         if (rbd_dev->spec->image_id) {
4642                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4643
4644                 return 0;
4645         }
4646
4647         /*
4648          * First, see if the format 2 image id file exists, and if
4649          * so, get the image's persistent id from it.
4650          */
4651         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4652         object_name = kmalloc(size, GFP_NOIO);
4653         if (!object_name)
4654                 return -ENOMEM;
4655         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4656         dout("rbd id object name is %s\n", object_name);
4657
4658         /* Response will be an encoded string, which includes a length */
4659
4660         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4661         response = kzalloc(size, GFP_NOIO);
4662         if (!response) {
4663                 ret = -ENOMEM;
4664                 goto out;
4665         }
4666
4667         /* If it doesn't exist we'll assume it's a format 1 image */
4668
4669         ret = rbd_obj_method_sync(rbd_dev, object_name,
4670                                 "rbd", "get_id", NULL, 0,
4671                                 response, RBD_IMAGE_ID_LEN_MAX);
4672         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4673         if (ret == -ENOENT) {
4674                 image_id = kstrdup("", GFP_KERNEL);
4675                 ret = image_id ? 0 : -ENOMEM;
4676                 if (!ret)
4677                         rbd_dev->image_format = 1;
4678         } else if (ret > sizeof (__le32)) {
4679                 void *p = response;
4680
4681                 image_id = ceph_extract_encoded_string(&p, p + ret,
4682                                                 NULL, GFP_NOIO);
4683                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4684                 if (!ret)
4685                         rbd_dev->image_format = 2;
4686         } else {
4687                 ret = -EINVAL;
4688         }
4689
4690         if (!ret) {
4691                 rbd_dev->spec->image_id = image_id;
4692                 dout("image_id is %s\n", image_id);
4693         }
4694 out:
4695         kfree(response);
4696         kfree(object_name);
4697
4698         return ret;
4699 }
4700
4701 /*
4702  * Undo whatever state changes are made by v1 or v2 header info
4703  * call.
4704  */
4705 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4706 {
4707         struct rbd_image_header *header;
4708
4709         /* Drop parent reference unless it's already been done (or none) */
4710
4711         if (rbd_dev->parent_overlap)
4712                 rbd_dev_parent_put(rbd_dev);
4713
4714         /* Free dynamic fields from the header, then zero it out */
4715
4716         header = &rbd_dev->header;
4717         ceph_put_snap_context(header->snapc);
4718         kfree(header->snap_sizes);
4719         kfree(header->snap_names);
4720         kfree(header->object_prefix);
4721         memset(header, 0, sizeof (*header));
4722 }
4723
4724 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4725 {
4726         int ret;
4727
4728         ret = rbd_dev_v2_object_prefix(rbd_dev);
4729         if (ret)
4730                 goto out_err;
4731
4732         /*
4733          * Get the and check features for the image.  Currently the
4734          * features are assumed to never change.
4735          */
4736         ret = rbd_dev_v2_features(rbd_dev);
4737         if (ret)
4738                 goto out_err;
4739
4740         /* If the image supports fancy striping, get its parameters */
4741
4742         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4743                 ret = rbd_dev_v2_striping_info(rbd_dev);
4744                 if (ret < 0)
4745                         goto out_err;
4746         }
4747         /* No support for crypto and compression type format 2 images */
4748
4749         return 0;
4750 out_err:
4751         rbd_dev->header.features = 0;
4752         kfree(rbd_dev->header.object_prefix);
4753         rbd_dev->header.object_prefix = NULL;
4754
4755         return ret;
4756 }
4757
4758 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4759 {
4760         struct rbd_device *parent = NULL;
4761         struct rbd_spec *parent_spec;
4762         struct rbd_client *rbdc;
4763         int ret;
4764
4765         if (!rbd_dev->parent_spec)
4766                 return 0;
4767         /*
4768          * We need to pass a reference to the client and the parent
4769          * spec when creating the parent rbd_dev.  Images related by
4770          * parent/child relationships always share both.
4771          */
4772         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4773         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4774
4775         ret = -ENOMEM;
4776         parent = rbd_dev_create(rbdc, parent_spec);
4777         if (!parent)
4778                 goto out_err;
4779
4780         ret = rbd_dev_image_probe(parent, false);
4781         if (ret < 0)
4782                 goto out_err;
4783         rbd_dev->parent = parent;
4784         atomic_set(&rbd_dev->parent_ref, 1);
4785
4786         return 0;
4787 out_err:
4788         if (parent) {
4789                 rbd_dev_unparent(rbd_dev);
4790                 kfree(rbd_dev->header_name);
4791                 rbd_dev_destroy(parent);
4792         } else {
4793                 rbd_put_client(rbdc);
4794                 rbd_spec_put(parent_spec);
4795         }
4796
4797         return ret;
4798 }
4799
4800 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4801 {
4802         int ret;
4803
4804         /* generate unique id: find highest unique id, add one */
4805         rbd_dev_id_get(rbd_dev);
4806
4807         /* Fill in the device name, now that we have its id. */
4808         BUILD_BUG_ON(DEV_NAME_LEN
4809                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4810         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4811
4812         /* Get our block major device number. */
4813
4814         ret = register_blkdev(0, rbd_dev->name);
4815         if (ret < 0)
4816                 goto err_out_id;
4817         rbd_dev->major = ret;
4818
4819         /* Set up the blkdev mapping. */
4820
4821         ret = rbd_init_disk(rbd_dev);
4822         if (ret)
4823                 goto err_out_blkdev;
4824
4825         ret = rbd_dev_mapping_set(rbd_dev);
4826         if (ret)
4827                 goto err_out_disk;
4828         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4829
4830         ret = rbd_bus_add_dev(rbd_dev);
4831         if (ret)
4832                 goto err_out_mapping;
4833
4834         /* Everything's ready.  Announce the disk to the world. */
4835
4836         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4837         add_disk(rbd_dev->disk);
4838
4839         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4840                 (unsigned long long) rbd_dev->mapping.size);
4841
4842         return ret;
4843
4844 err_out_mapping:
4845         rbd_dev_mapping_clear(rbd_dev);
4846 err_out_disk:
4847         rbd_free_disk(rbd_dev);
4848 err_out_blkdev:
4849         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4850 err_out_id:
4851         rbd_dev_id_put(rbd_dev);
4852         rbd_dev_mapping_clear(rbd_dev);
4853
4854         return ret;
4855 }
4856
4857 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4858 {
4859         struct rbd_spec *spec = rbd_dev->spec;
4860         size_t size;
4861
4862         /* Record the header object name for this rbd image. */
4863
4864         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4865
4866         if (rbd_dev->image_format == 1)
4867                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4868         else
4869                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4870
4871         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4872         if (!rbd_dev->header_name)
4873                 return -ENOMEM;
4874
4875         if (rbd_dev->image_format == 1)
4876                 sprintf(rbd_dev->header_name, "%s%s",
4877                         spec->image_name, RBD_SUFFIX);
4878         else
4879                 sprintf(rbd_dev->header_name, "%s%s",
4880                         RBD_HEADER_PREFIX, spec->image_id);
4881         return 0;
4882 }
4883
4884 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4885 {
4886         rbd_dev_unprobe(rbd_dev);
4887         kfree(rbd_dev->header_name);
4888         rbd_dev->header_name = NULL;
4889         rbd_dev->image_format = 0;
4890         kfree(rbd_dev->spec->image_id);
4891         rbd_dev->spec->image_id = NULL;
4892
4893         rbd_dev_destroy(rbd_dev);
4894 }
4895
4896 /*
4897  * Probe for the existence of the header object for the given rbd
4898  * device.  If this image is the one being mapped (i.e., not a
4899  * parent), initiate a watch on its header object before using that
4900  * object to get detailed information about the rbd image.
4901  */
4902 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4903 {
4904         int ret;
4905         int tmp;
4906
4907         /*
4908          * Get the id from the image id object.  Unless there's an
4909          * error, rbd_dev->spec->image_id will be filled in with
4910          * a dynamically-allocated string, and rbd_dev->image_format
4911          * will be set to either 1 or 2.
4912          */
4913         ret = rbd_dev_image_id(rbd_dev);
4914         if (ret)
4915                 return ret;
4916         rbd_assert(rbd_dev->spec->image_id);
4917         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4918
4919         ret = rbd_dev_header_name(rbd_dev);
4920         if (ret)
4921                 goto err_out_format;
4922
4923         if (mapping) {
4924                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4925                 if (ret)
4926                         goto out_header_name;
4927         }
4928
4929         if (rbd_dev->image_format == 1)
4930                 ret = rbd_dev_v1_header_info(rbd_dev);
4931         else
4932                 ret = rbd_dev_v2_header_info(rbd_dev);
4933         if (ret)
4934                 goto err_out_watch;
4935
4936         ret = rbd_dev_spec_update(rbd_dev);
4937         if (ret)
4938                 goto err_out_probe;
4939
4940         ret = rbd_dev_probe_parent(rbd_dev);
4941         if (ret)
4942                 goto err_out_probe;
4943
4944         dout("discovered format %u image, header name is %s\n",
4945                 rbd_dev->image_format, rbd_dev->header_name);
4946
4947         return 0;
4948 err_out_probe:
4949         rbd_dev_unprobe(rbd_dev);
4950 err_out_watch:
4951         if (mapping) {
4952                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4953                 if (tmp)
4954                         rbd_warn(rbd_dev, "unable to tear down "
4955                                         "watch request (%d)\n", tmp);
4956         }
4957 out_header_name:
4958         kfree(rbd_dev->header_name);
4959         rbd_dev->header_name = NULL;
4960 err_out_format:
4961         rbd_dev->image_format = 0;
4962         kfree(rbd_dev->spec->image_id);
4963         rbd_dev->spec->image_id = NULL;
4964
4965         dout("probe failed, returning %d\n", ret);
4966
4967         return ret;
4968 }
4969
4970 static ssize_t rbd_add(struct bus_type *bus,
4971                        const char *buf,
4972                        size_t count)
4973 {
4974         struct rbd_device *rbd_dev = NULL;
4975         struct ceph_options *ceph_opts = NULL;
4976         struct rbd_options *rbd_opts = NULL;
4977         struct rbd_spec *spec = NULL;
4978         struct rbd_client *rbdc;
4979         struct ceph_osd_client *osdc;
4980         bool read_only;
4981         int rc = -ENOMEM;
4982
4983         if (!try_module_get(THIS_MODULE))
4984                 return -ENODEV;
4985
4986         /* parse add command */
4987         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4988         if (rc < 0)
4989                 goto err_out_module;
4990         read_only = rbd_opts->read_only;
4991         kfree(rbd_opts);
4992         rbd_opts = NULL;        /* done with this */
4993
4994         rbdc = rbd_get_client(ceph_opts);
4995         if (IS_ERR(rbdc)) {
4996                 rc = PTR_ERR(rbdc);
4997                 goto err_out_args;
4998         }
4999
5000         /* pick the pool */
5001         osdc = &rbdc->client->osdc;
5002         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5003         if (rc < 0)
5004                 goto err_out_client;
5005         spec->pool_id = (u64)rc;
5006
5007         /* The ceph file layout needs to fit pool id in 32 bits */
5008
5009         if (spec->pool_id > (u64)U32_MAX) {
5010                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5011                                 (unsigned long long)spec->pool_id, U32_MAX);
5012                 rc = -EIO;
5013                 goto err_out_client;
5014         }
5015
5016         rbd_dev = rbd_dev_create(rbdc, spec);
5017         if (!rbd_dev)
5018                 goto err_out_client;
5019         rbdc = NULL;            /* rbd_dev now owns this */
5020         spec = NULL;            /* rbd_dev now owns this */
5021
5022         rc = rbd_dev_image_probe(rbd_dev, true);
5023         if (rc < 0)
5024                 goto err_out_rbd_dev;
5025
5026         /* If we are mapping a snapshot it must be marked read-only */
5027
5028         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5029                 read_only = true;
5030         rbd_dev->mapping.read_only = read_only;
5031
5032         rc = rbd_dev_device_setup(rbd_dev);
5033         if (rc) {
5034                 rbd_dev_image_release(rbd_dev);
5035                 goto err_out_module;
5036         }
5037
5038         return count;
5039
5040 err_out_rbd_dev:
5041         rbd_dev_destroy(rbd_dev);
5042 err_out_client:
5043         rbd_put_client(rbdc);
5044 err_out_args:
5045         rbd_spec_put(spec);
5046 err_out_module:
5047         module_put(THIS_MODULE);
5048
5049         dout("Error adding device %s\n", buf);
5050
5051         return (ssize_t)rc;
5052 }
5053
5054 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5055 {
5056         struct list_head *tmp;
5057         struct rbd_device *rbd_dev;
5058
5059         spin_lock(&rbd_dev_list_lock);
5060         list_for_each(tmp, &rbd_dev_list) {
5061                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5062                 if (rbd_dev->dev_id == dev_id) {
5063                         spin_unlock(&rbd_dev_list_lock);
5064                         return rbd_dev;
5065                 }
5066         }
5067         spin_unlock(&rbd_dev_list_lock);
5068         return NULL;
5069 }
5070
5071 static void rbd_dev_device_release(struct device *dev)
5072 {
5073         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5074
5075         rbd_free_disk(rbd_dev);
5076         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5077         rbd_dev_mapping_clear(rbd_dev);
5078         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5079         rbd_dev->major = 0;
5080         rbd_dev_id_put(rbd_dev);
5081         rbd_dev_mapping_clear(rbd_dev);
5082 }
5083
5084 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5085 {
5086         while (rbd_dev->parent) {
5087                 struct rbd_device *first = rbd_dev;
5088                 struct rbd_device *second = first->parent;
5089                 struct rbd_device *third;
5090
5091                 /*
5092                  * Follow to the parent with no grandparent and
5093                  * remove it.
5094                  */
5095                 while (second && (third = second->parent)) {
5096                         first = second;
5097                         second = third;
5098                 }
5099                 rbd_assert(second);
5100                 rbd_dev_image_release(second);
5101                 first->parent = NULL;
5102                 first->parent_overlap = 0;
5103
5104                 rbd_assert(first->parent_spec);
5105                 rbd_spec_put(first->parent_spec);
5106                 first->parent_spec = NULL;
5107         }
5108 }
5109
5110 static ssize_t rbd_remove(struct bus_type *bus,
5111                           const char *buf,
5112                           size_t count)
5113 {
5114         struct rbd_device *rbd_dev = NULL;
5115         int target_id;
5116         unsigned long ul;
5117         int ret;
5118
5119         ret = strict_strtoul(buf, 10, &ul);
5120         if (ret)
5121                 return ret;
5122
5123         /* convert to int; abort if we lost anything in the conversion */
5124         target_id = (int) ul;
5125         if (target_id != ul)
5126                 return -EINVAL;
5127
5128         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5129
5130         rbd_dev = __rbd_get_dev(target_id);
5131         if (!rbd_dev) {
5132                 ret = -ENOENT;
5133                 goto done;
5134         }
5135
5136         spin_lock_irq(&rbd_dev->lock);
5137         if (rbd_dev->open_count)
5138                 ret = -EBUSY;
5139         else
5140                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5141         spin_unlock_irq(&rbd_dev->lock);
5142         if (ret < 0)
5143                 goto done;
5144         rbd_bus_del_dev(rbd_dev);
5145         ret = rbd_dev_header_watch_sync(rbd_dev, false);
5146         if (ret)
5147                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5148         rbd_dev_image_release(rbd_dev);
5149         module_put(THIS_MODULE);
5150         ret = count;
5151 done:
5152         mutex_unlock(&ctl_mutex);
5153
5154         return ret;
5155 }
5156
5157 /*
5158  * create control files in sysfs
5159  * /sys/bus/rbd/...
5160  */
5161 static int rbd_sysfs_init(void)
5162 {
5163         int ret;
5164
5165         ret = device_register(&rbd_root_dev);
5166         if (ret < 0)
5167                 return ret;
5168
5169         ret = bus_register(&rbd_bus_type);
5170         if (ret < 0)
5171                 device_unregister(&rbd_root_dev);
5172
5173         return ret;
5174 }
5175
5176 static void rbd_sysfs_cleanup(void)
5177 {
5178         bus_unregister(&rbd_bus_type);
5179         device_unregister(&rbd_root_dev);
5180 }
5181
5182 static int rbd_slab_init(void)
5183 {
5184         rbd_assert(!rbd_img_request_cache);
5185         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5186                                         sizeof (struct rbd_img_request),
5187                                         __alignof__(struct rbd_img_request),
5188                                         0, NULL);
5189         if (!rbd_img_request_cache)
5190                 return -ENOMEM;
5191
5192         rbd_assert(!rbd_obj_request_cache);
5193         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5194                                         sizeof (struct rbd_obj_request),
5195                                         __alignof__(struct rbd_obj_request),
5196                                         0, NULL);
5197         if (!rbd_obj_request_cache)
5198                 goto out_err;
5199
5200         rbd_assert(!rbd_segment_name_cache);
5201         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5202                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5203         if (rbd_segment_name_cache)
5204                 return 0;
5205 out_err:
5206         if (rbd_obj_request_cache) {
5207                 kmem_cache_destroy(rbd_obj_request_cache);
5208                 rbd_obj_request_cache = NULL;
5209         }
5210
5211         kmem_cache_destroy(rbd_img_request_cache);
5212         rbd_img_request_cache = NULL;
5213
5214         return -ENOMEM;
5215 }
5216
5217 static void rbd_slab_exit(void)
5218 {
5219         rbd_assert(rbd_segment_name_cache);
5220         kmem_cache_destroy(rbd_segment_name_cache);
5221         rbd_segment_name_cache = NULL;
5222
5223         rbd_assert(rbd_obj_request_cache);
5224         kmem_cache_destroy(rbd_obj_request_cache);
5225         rbd_obj_request_cache = NULL;
5226
5227         rbd_assert(rbd_img_request_cache);
5228         kmem_cache_destroy(rbd_img_request_cache);
5229         rbd_img_request_cache = NULL;
5230 }
5231
5232 static int __init rbd_init(void)
5233 {
5234         int rc;
5235
5236         if (!libceph_compatible(NULL)) {
5237                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5238
5239                 return -EINVAL;
5240         }
5241         rc = rbd_slab_init();
5242         if (rc)
5243                 return rc;
5244         rc = rbd_sysfs_init();
5245         if (rc)
5246                 rbd_slab_exit();
5247         else
5248                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5249
5250         return rc;
5251 }
5252
5253 static void __exit rbd_exit(void)
5254 {
5255         rbd_sysfs_cleanup();
5256         rbd_slab_exit();
5257 }
5258
5259 module_init(rbd_init);
5260 module_exit(rbd_exit);
5261
5262 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5263 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5264 MODULE_DESCRIPTION("rados block device");
5265
5266 /* following authorship retained from original osdblk.c */
5267 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5268
5269 MODULE_LICENSE("GPL");