]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
fd2795d1136aa6aec1ec52e2c24778bc23d972ec
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 /*
59  * Increment the given counter and return its updated value.
60  * If the counter is already 0 it will not be incremented.
61  * If the counter is already at its maximum value returns
62  * -EINVAL without updating it.
63  */
64 static int atomic_inc_return_safe(atomic_t *v)
65 {
66         unsigned int counter;
67
68         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69         if (counter <= (unsigned int)INT_MAX)
70                 return (int)counter;
71
72         atomic_dec(v);
73
74         return -EINVAL;
75 }
76
77 /* Decrement the counter.  Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
79 {
80         int counter;
81
82         counter = atomic_dec_return(v);
83         if (counter >= 0)
84                 return counter;
85
86         atomic_inc(v);
87
88         return -EINVAL;
89 }
90
91 #define RBD_DRV_NAME "rbd"
92 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
93
94 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
95
96 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
97 #define RBD_MAX_SNAP_NAME_LEN   \
98                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
100 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
101
102 #define RBD_SNAP_HEAD_NAME      "-"
103
104 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
105
106 /* This allows a single page to hold an image name sent by OSD */
107 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
108 #define RBD_IMAGE_ID_LEN_MAX    64
109
110 #define RBD_OBJ_PREFIX_LEN_MAX  64
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING    (1<<0)
115 #define RBD_FEATURE_STRIPINGV2  (1<<1)
116 #define RBD_FEATURES_ALL \
117             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118
119 /* Features supported by this (client software) implementation. */
120
121 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
122
123 /*
124  * An RBD device name will be "rbd#", where the "rbd" comes from
125  * RBD_DRV_NAME above, and # is a unique integer identifier.
126  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127  * enough to hold all possible device names.
128  */
129 #define DEV_NAME_LEN            32
130 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
131
132 /*
133  * block device image metadata (in-memory version)
134  */
135 struct rbd_image_header {
136         /* These six fields never change for a given rbd image */
137         char *object_prefix;
138         __u8 obj_order;
139         __u8 crypt_type;
140         __u8 comp_type;
141         u64 stripe_unit;
142         u64 stripe_count;
143         u64 features;           /* Might be changeable someday? */
144
145         /* The remaining fields need to be updated occasionally */
146         u64 image_size;
147         struct ceph_snap_context *snapc;
148         char *snap_names;       /* format 1 only */
149         u64 *snap_sizes;        /* format 1 only */
150 };
151
152 /*
153  * An rbd image specification.
154  *
155  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
156  * identify an image.  Each rbd_dev structure includes a pointer to
157  * an rbd_spec structure that encapsulates this identity.
158  *
159  * Each of the id's in an rbd_spec has an associated name.  For a
160  * user-mapped image, the names are supplied and the id's associated
161  * with them are looked up.  For a layered image, a parent image is
162  * defined by the tuple, and the names are looked up.
163  *
164  * An rbd_dev structure contains a parent_spec pointer which is
165  * non-null if the image it represents is a child in a layered
166  * image.  This pointer will refer to the rbd_spec structure used
167  * by the parent rbd_dev for its own identity (i.e., the structure
168  * is shared between the parent and child).
169  *
170  * Since these structures are populated once, during the discovery
171  * phase of image construction, they are effectively immutable so
172  * we make no effort to synchronize access to them.
173  *
174  * Note that code herein does not assume the image name is known (it
175  * could be a null pointer).
176  */
177 struct rbd_spec {
178         u64             pool_id;
179         const char      *pool_name;
180
181         const char      *image_id;
182         const char      *image_name;
183
184         u64             snap_id;
185         const char      *snap_name;
186
187         struct kref     kref;
188 };
189
190 /*
191  * an instance of the client.  multiple devices may share an rbd client.
192  */
193 struct rbd_client {
194         struct ceph_client      *client;
195         struct kref             kref;
196         struct list_head        node;
197 };
198
199 struct rbd_img_request;
200 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
203
204 struct rbd_obj_request;
205 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
207 enum obj_request_type {
208         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209 };
210
211 enum obj_req_flags {
212         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
213         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
214         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
215         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
216 };
217
218 struct rbd_obj_request {
219         const char              *object_name;
220         u64                     offset;         /* object start byte */
221         u64                     length;         /* bytes from offset */
222         unsigned long           flags;
223
224         /*
225          * An object request associated with an image will have its
226          * img_data flag set; a standalone object request will not.
227          *
228          * A standalone object request will have which == BAD_WHICH
229          * and a null obj_request pointer.
230          *
231          * An object request initiated in support of a layered image
232          * object (to check for its existence before a write) will
233          * have which == BAD_WHICH and a non-null obj_request pointer.
234          *
235          * Finally, an object request for rbd image data will have
236          * which != BAD_WHICH, and will have a non-null img_request
237          * pointer.  The value of which will be in the range
238          * 0..(img_request->obj_request_count-1).
239          */
240         union {
241                 struct rbd_obj_request  *obj_request;   /* STAT op */
242                 struct {
243                         struct rbd_img_request  *img_request;
244                         u64                     img_offset;
245                         /* links for img_request->obj_requests list */
246                         struct list_head        links;
247                 };
248         };
249         u32                     which;          /* posn image request list */
250
251         enum obj_request_type   type;
252         union {
253                 struct bio      *bio_list;
254                 struct {
255                         struct page     **pages;
256                         u32             page_count;
257                 };
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261
262         struct ceph_osd_request *osd_req;
263
264         u64                     xferred;        /* bytes transferred */
265         int                     result;
266
267         rbd_obj_callback_t      callback;
268         struct completion       completion;
269
270         struct kref             kref;
271 };
272
273 enum img_req_flags {
274         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
275         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
276         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
277 };
278
279 struct rbd_img_request {
280         struct rbd_device       *rbd_dev;
281         u64                     offset; /* starting image byte offset */
282         u64                     length; /* byte count from offset */
283         unsigned long           flags;
284         union {
285                 u64                     snap_id;        /* for reads */
286                 struct ceph_snap_context *snapc;        /* for writes */
287         };
288         union {
289                 struct request          *rq;            /* block request */
290                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
291         };
292         struct page             **copyup_pages;
293         u32                     copyup_page_count;
294         spinlock_t              completion_lock;/* protects next_completion */
295         u32                     next_completion;
296         rbd_img_callback_t      callback;
297         u64                     xferred;/* aggregate bytes transferred */
298         int                     result; /* first nonzero obj_request result */
299
300         u32                     obj_request_count;
301         struct list_head        obj_requests;   /* rbd_obj_request structs */
302
303         struct kref             kref;
304 };
305
306 #define for_each_obj_request(ireq, oreq) \
307         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
308 #define for_each_obj_request_from(ireq, oreq) \
309         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
310 #define for_each_obj_request_safe(ireq, oreq, n) \
311         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
312
313 struct rbd_mapping {
314         u64                     size;
315         u64                     features;
316         bool                    read_only;
317 };
318
319 /*
320  * a single device
321  */
322 struct rbd_device {
323         int                     dev_id;         /* blkdev unique id */
324
325         int                     major;          /* blkdev assigned major */
326         struct gendisk          *disk;          /* blkdev's gendisk and rq */
327
328         u32                     image_format;   /* Either 1 or 2 */
329         struct rbd_client       *rbd_client;
330
331         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
333         spinlock_t              lock;           /* queue, flags, open_count */
334
335         struct rbd_image_header header;
336         unsigned long           flags;          /* possibly lock protected */
337         struct rbd_spec         *spec;
338
339         char                    *header_name;
340
341         struct ceph_file_layout layout;
342
343         struct ceph_osd_event   *watch_event;
344         struct rbd_obj_request  *watch_request;
345
346         struct rbd_spec         *parent_spec;
347         u64                     parent_overlap;
348         atomic_t                parent_ref;
349         struct rbd_device       *parent;
350
351         /* protects updating the header */
352         struct rw_semaphore     header_rwsem;
353
354         struct rbd_mapping      mapping;
355
356         struct list_head        node;
357
358         /* sysfs related */
359         struct device           dev;
360         unsigned long           open_count;     /* protected by lock */
361 };
362
363 /*
364  * Flag bits for rbd_dev->flags.  If atomicity is required,
365  * rbd_dev->lock is used to protect access.
366  *
367  * Currently, only the "removing" flag (which is coupled with the
368  * "open_count" field) requires atomic access.
369  */
370 enum rbd_dev_flags {
371         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
372         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
373 };
374
375 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
376
377 static LIST_HEAD(rbd_dev_list);    /* devices */
378 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
380 static LIST_HEAD(rbd_client_list);              /* clients */
381 static DEFINE_SPINLOCK(rbd_client_list_lock);
382
383 /* Slab caches for frequently-allocated structures */
384
385 static struct kmem_cache        *rbd_img_request_cache;
386 static struct kmem_cache        *rbd_obj_request_cache;
387 static struct kmem_cache        *rbd_segment_name_cache;
388
389 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
391 static void rbd_dev_device_release(struct device *dev);
392
393 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394                        size_t count);
395 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396                           size_t count);
397 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398 static void rbd_spec_put(struct rbd_spec *spec);
399
400 static struct bus_attribute rbd_bus_attrs[] = {
401         __ATTR(add, S_IWUSR, NULL, rbd_add),
402         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403         __ATTR_NULL
404 };
405
406 static struct bus_type rbd_bus_type = {
407         .name           = "rbd",
408         .bus_attrs      = rbd_bus_attrs,
409 };
410
411 static void rbd_root_dev_release(struct device *dev)
412 {
413 }
414
415 static struct device rbd_root_dev = {
416         .init_name =    "rbd",
417         .release =      rbd_root_dev_release,
418 };
419
420 static __printf(2, 3)
421 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422 {
423         struct va_format vaf;
424         va_list args;
425
426         va_start(args, fmt);
427         vaf.fmt = fmt;
428         vaf.va = &args;
429
430         if (!rbd_dev)
431                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432         else if (rbd_dev->disk)
433                 printk(KERN_WARNING "%s: %s: %pV\n",
434                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435         else if (rbd_dev->spec && rbd_dev->spec->image_name)
436                 printk(KERN_WARNING "%s: image %s: %pV\n",
437                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438         else if (rbd_dev->spec && rbd_dev->spec->image_id)
439                 printk(KERN_WARNING "%s: id %s: %pV\n",
440                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441         else    /* punt */
442                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443                         RBD_DRV_NAME, rbd_dev, &vaf);
444         va_end(args);
445 }
446
447 #ifdef RBD_DEBUG
448 #define rbd_assert(expr)                                                \
449                 if (unlikely(!(expr))) {                                \
450                         printk(KERN_ERR "\nAssertion failure in %s() "  \
451                                                 "at line %d:\n\n"       \
452                                         "\trbd_assert(%s);\n\n",        \
453                                         __func__, __LINE__, #expr);     \
454                         BUG();                                          \
455                 }
456 #else /* !RBD_DEBUG */
457 #  define rbd_assert(expr)      ((void) 0)
458 #endif /* !RBD_DEBUG */
459
460 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
461 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
463
464 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
465 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
467 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468                                         u64 snap_id);
469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470                                 u8 *order, u64 *snap_size);
471 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472                 u64 *snap_features);
473 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
474
475 static int rbd_open(struct block_device *bdev, fmode_t mode)
476 {
477         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
478         bool removing = false;
479
480         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
481                 return -EROFS;
482
483         spin_lock_irq(&rbd_dev->lock);
484         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485                 removing = true;
486         else
487                 rbd_dev->open_count++;
488         spin_unlock_irq(&rbd_dev->lock);
489         if (removing)
490                 return -ENOENT;
491
492         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493         (void) get_device(&rbd_dev->dev);
494         set_device_ro(bdev, rbd_dev->mapping.read_only);
495         mutex_unlock(&ctl_mutex);
496
497         return 0;
498 }
499
500 static void rbd_release(struct gendisk *disk, fmode_t mode)
501 {
502         struct rbd_device *rbd_dev = disk->private_data;
503         unsigned long open_count_before;
504
505         spin_lock_irq(&rbd_dev->lock);
506         open_count_before = rbd_dev->open_count--;
507         spin_unlock_irq(&rbd_dev->lock);
508         rbd_assert(open_count_before > 0);
509
510         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511         put_device(&rbd_dev->dev);
512         mutex_unlock(&ctl_mutex);
513 }
514
515 static const struct block_device_operations rbd_bd_ops = {
516         .owner                  = THIS_MODULE,
517         .open                   = rbd_open,
518         .release                = rbd_release,
519 };
520
521 /*
522  * Initialize an rbd client instance.  Success or not, this function
523  * consumes ceph_opts.  Caller holds ctl_mutex.
524  */
525 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
526 {
527         struct rbd_client *rbdc;
528         int ret = -ENOMEM;
529
530         dout("%s:\n", __func__);
531         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532         if (!rbdc)
533                 goto out_opt;
534
535         kref_init(&rbdc->kref);
536         INIT_LIST_HEAD(&rbdc->node);
537
538         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
539         if (IS_ERR(rbdc->client))
540                 goto out_rbdc;
541         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
542
543         ret = ceph_open_session(rbdc->client);
544         if (ret < 0)
545                 goto out_client;
546
547         spin_lock(&rbd_client_list_lock);
548         list_add_tail(&rbdc->node, &rbd_client_list);
549         spin_unlock(&rbd_client_list_lock);
550
551         dout("%s: rbdc %p\n", __func__, rbdc);
552
553         return rbdc;
554 out_client:
555         ceph_destroy_client(rbdc->client);
556 out_rbdc:
557         kfree(rbdc);
558 out_opt:
559         if (ceph_opts)
560                 ceph_destroy_options(ceph_opts);
561         dout("%s: error %d\n", __func__, ret);
562
563         return ERR_PTR(ret);
564 }
565
566 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
567 {
568         kref_get(&rbdc->kref);
569
570         return rbdc;
571 }
572
573 /*
574  * Find a ceph client with specific addr and configuration.  If
575  * found, bump its reference count.
576  */
577 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
578 {
579         struct rbd_client *client_node;
580         bool found = false;
581
582         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
583                 return NULL;
584
585         spin_lock(&rbd_client_list_lock);
586         list_for_each_entry(client_node, &rbd_client_list, node) {
587                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
588                         __rbd_get_client(client_node);
589
590                         found = true;
591                         break;
592                 }
593         }
594         spin_unlock(&rbd_client_list_lock);
595
596         return found ? client_node : NULL;
597 }
598
599 /*
600  * mount options
601  */
602 enum {
603         Opt_last_int,
604         /* int args above */
605         Opt_last_string,
606         /* string args above */
607         Opt_read_only,
608         Opt_read_write,
609         /* Boolean args above */
610         Opt_last_bool,
611 };
612
613 static match_table_t rbd_opts_tokens = {
614         /* int args above */
615         /* string args above */
616         {Opt_read_only, "read_only"},
617         {Opt_read_only, "ro"},          /* Alternate spelling */
618         {Opt_read_write, "read_write"},
619         {Opt_read_write, "rw"},         /* Alternate spelling */
620         /* Boolean args above */
621         {-1, NULL}
622 };
623
624 struct rbd_options {
625         bool    read_only;
626 };
627
628 #define RBD_READ_ONLY_DEFAULT   false
629
630 static int parse_rbd_opts_token(char *c, void *private)
631 {
632         struct rbd_options *rbd_opts = private;
633         substring_t argstr[MAX_OPT_ARGS];
634         int token, intval, ret;
635
636         token = match_token(c, rbd_opts_tokens, argstr);
637         if (token < 0)
638                 return -EINVAL;
639
640         if (token < Opt_last_int) {
641                 ret = match_int(&argstr[0], &intval);
642                 if (ret < 0) {
643                         pr_err("bad mount option arg (not int) "
644                                "at '%s'\n", c);
645                         return ret;
646                 }
647                 dout("got int token %d val %d\n", token, intval);
648         } else if (token > Opt_last_int && token < Opt_last_string) {
649                 dout("got string token %d val %s\n", token,
650                      argstr[0].from);
651         } else if (token > Opt_last_string && token < Opt_last_bool) {
652                 dout("got Boolean token %d\n", token);
653         } else {
654                 dout("got token %d\n", token);
655         }
656
657         switch (token) {
658         case Opt_read_only:
659                 rbd_opts->read_only = true;
660                 break;
661         case Opt_read_write:
662                 rbd_opts->read_only = false;
663                 break;
664         default:
665                 rbd_assert(false);
666                 break;
667         }
668         return 0;
669 }
670
671 /*
672  * Get a ceph client with specific addr and configuration, if one does
673  * not exist create it.  Either way, ceph_opts is consumed by this
674  * function.
675  */
676 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
677 {
678         struct rbd_client *rbdc;
679
680         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
681         rbdc = rbd_client_find(ceph_opts);
682         if (rbdc)       /* using an existing client */
683                 ceph_destroy_options(ceph_opts);
684         else
685                 rbdc = rbd_client_create(ceph_opts);
686         mutex_unlock(&ctl_mutex);
687
688         return rbdc;
689 }
690
691 /*
692  * Destroy ceph client
693  *
694  * Caller must hold rbd_client_list_lock.
695  */
696 static void rbd_client_release(struct kref *kref)
697 {
698         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
699
700         dout("%s: rbdc %p\n", __func__, rbdc);
701         spin_lock(&rbd_client_list_lock);
702         list_del(&rbdc->node);
703         spin_unlock(&rbd_client_list_lock);
704
705         ceph_destroy_client(rbdc->client);
706         kfree(rbdc);
707 }
708
709 /*
710  * Drop reference to ceph client node. If it's not referenced anymore, release
711  * it.
712  */
713 static void rbd_put_client(struct rbd_client *rbdc)
714 {
715         if (rbdc)
716                 kref_put(&rbdc->kref, rbd_client_release);
717 }
718
719 static bool rbd_image_format_valid(u32 image_format)
720 {
721         return image_format == 1 || image_format == 2;
722 }
723
724 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
725 {
726         size_t size;
727         u32 snap_count;
728
729         /* The header has to start with the magic rbd header text */
730         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
731                 return false;
732
733         /* The bio layer requires at least sector-sized I/O */
734
735         if (ondisk->options.order < SECTOR_SHIFT)
736                 return false;
737
738         /* If we use u64 in a few spots we may be able to loosen this */
739
740         if (ondisk->options.order > 8 * sizeof (int) - 1)
741                 return false;
742
743         /*
744          * The size of a snapshot header has to fit in a size_t, and
745          * that limits the number of snapshots.
746          */
747         snap_count = le32_to_cpu(ondisk->snap_count);
748         size = SIZE_MAX - sizeof (struct ceph_snap_context);
749         if (snap_count > size / sizeof (__le64))
750                 return false;
751
752         /*
753          * Not only that, but the size of the entire the snapshot
754          * header must also be representable in a size_t.
755          */
756         size -= snap_count * sizeof (__le64);
757         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
758                 return false;
759
760         return true;
761 }
762
763 /*
764  * Fill an rbd image header with information from the given format 1
765  * on-disk header.
766  */
767 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
768                                  struct rbd_image_header_ondisk *ondisk)
769 {
770         struct rbd_image_header *header = &rbd_dev->header;
771         bool first_time = header->object_prefix == NULL;
772         struct ceph_snap_context *snapc;
773         char *object_prefix = NULL;
774         char *snap_names = NULL;
775         u64 *snap_sizes = NULL;
776         u32 snap_count;
777         size_t size;
778         int ret = -ENOMEM;
779         u32 i;
780
781         /* Allocate this now to avoid having to handle failure below */
782
783         if (first_time) {
784                 size_t len;
785
786                 len = strnlen(ondisk->object_prefix,
787                                 sizeof (ondisk->object_prefix));
788                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
789                 if (!object_prefix)
790                         return -ENOMEM;
791                 memcpy(object_prefix, ondisk->object_prefix, len);
792                 object_prefix[len] = '\0';
793         }
794
795         /* Allocate the snapshot context and fill it in */
796
797         snap_count = le32_to_cpu(ondisk->snap_count);
798         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
799         if (!snapc)
800                 goto out_err;
801         snapc->seq = le64_to_cpu(ondisk->snap_seq);
802         if (snap_count) {
803                 struct rbd_image_snap_ondisk *snaps;
804                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
805
806                 /* We'll keep a copy of the snapshot names... */
807
808                 if (snap_names_len > (u64)SIZE_MAX)
809                         goto out_2big;
810                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
811                 if (!snap_names)
812                         goto out_err;
813
814                 /* ...as well as the array of their sizes. */
815
816                 size = snap_count * sizeof (*header->snap_sizes);
817                 snap_sizes = kmalloc(size, GFP_KERNEL);
818                 if (!snap_sizes)
819                         goto out_err;
820
821                 /*
822                  * Copy the names, and fill in each snapshot's id
823                  * and size.
824                  *
825                  * Note that rbd_dev_v1_header_info() guarantees the
826                  * ondisk buffer we're working with has
827                  * snap_names_len bytes beyond the end of the
828                  * snapshot id array, this memcpy() is safe.
829                  */
830                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
831                 snaps = ondisk->snaps;
832                 for (i = 0; i < snap_count; i++) {
833                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
834                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
835                 }
836         }
837
838         /* We won't fail any more, fill in the header */
839
840         down_write(&rbd_dev->header_rwsem);
841         if (first_time) {
842                 header->object_prefix = object_prefix;
843                 header->obj_order = ondisk->options.order;
844                 header->crypt_type = ondisk->options.crypt_type;
845                 header->comp_type = ondisk->options.comp_type;
846                 /* The rest aren't used for format 1 images */
847                 header->stripe_unit = 0;
848                 header->stripe_count = 0;
849                 header->features = 0;
850         } else {
851                 ceph_put_snap_context(header->snapc);
852                 kfree(header->snap_names);
853                 kfree(header->snap_sizes);
854         }
855
856         /* The remaining fields always get updated (when we refresh) */
857
858         header->image_size = le64_to_cpu(ondisk->image_size);
859         header->snapc = snapc;
860         header->snap_names = snap_names;
861         header->snap_sizes = snap_sizes;
862
863         /* Make sure mapping size is consistent with header info */
864
865         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
866                 if (rbd_dev->mapping.size != header->image_size)
867                         rbd_dev->mapping.size = header->image_size;
868
869         up_write(&rbd_dev->header_rwsem);
870
871         return 0;
872 out_2big:
873         ret = -EIO;
874 out_err:
875         kfree(snap_sizes);
876         kfree(snap_names);
877         ceph_put_snap_context(snapc);
878         kfree(object_prefix);
879
880         return ret;
881 }
882
883 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
884 {
885         const char *snap_name;
886
887         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
888
889         /* Skip over names until we find the one we are looking for */
890
891         snap_name = rbd_dev->header.snap_names;
892         while (which--)
893                 snap_name += strlen(snap_name) + 1;
894
895         return kstrdup(snap_name, GFP_KERNEL);
896 }
897
898 /*
899  * Snapshot id comparison function for use with qsort()/bsearch().
900  * Note that result is for snapshots in *descending* order.
901  */
902 static int snapid_compare_reverse(const void *s1, const void *s2)
903 {
904         u64 snap_id1 = *(u64 *)s1;
905         u64 snap_id2 = *(u64 *)s2;
906
907         if (snap_id1 < snap_id2)
908                 return 1;
909         return snap_id1 == snap_id2 ? 0 : -1;
910 }
911
912 /*
913  * Search a snapshot context to see if the given snapshot id is
914  * present.
915  *
916  * Returns the position of the snapshot id in the array if it's found,
917  * or BAD_SNAP_INDEX otherwise.
918  *
919  * Note: The snapshot array is in kept sorted (by the osd) in
920  * reverse order, highest snapshot id first.
921  */
922 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
923 {
924         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
925         u64 *found;
926
927         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
928                                 sizeof (snap_id), snapid_compare_reverse);
929
930         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
931 }
932
933 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
934                                         u64 snap_id)
935 {
936         u32 which;
937
938         which = rbd_dev_snap_index(rbd_dev, snap_id);
939         if (which == BAD_SNAP_INDEX)
940                 return NULL;
941
942         return _rbd_dev_v1_snap_name(rbd_dev, which);
943 }
944
945 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
946 {
947         if (snap_id == CEPH_NOSNAP)
948                 return RBD_SNAP_HEAD_NAME;
949
950         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
951         if (rbd_dev->image_format == 1)
952                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
953
954         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
955 }
956
957 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
958                                 u64 *snap_size)
959 {
960         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
961         if (snap_id == CEPH_NOSNAP) {
962                 *snap_size = rbd_dev->header.image_size;
963         } else if (rbd_dev->image_format == 1) {
964                 u32 which;
965
966                 which = rbd_dev_snap_index(rbd_dev, snap_id);
967                 if (which == BAD_SNAP_INDEX)
968                         return -ENOENT;
969
970                 *snap_size = rbd_dev->header.snap_sizes[which];
971         } else {
972                 u64 size = 0;
973                 int ret;
974
975                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
976                 if (ret)
977                         return ret;
978
979                 *snap_size = size;
980         }
981         return 0;
982 }
983
984 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
985                         u64 *snap_features)
986 {
987         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
988         if (snap_id == CEPH_NOSNAP) {
989                 *snap_features = rbd_dev->header.features;
990         } else if (rbd_dev->image_format == 1) {
991                 *snap_features = 0;     /* No features for format 1 */
992         } else {
993                 u64 features = 0;
994                 int ret;
995
996                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
997                 if (ret)
998                         return ret;
999
1000                 *snap_features = features;
1001         }
1002         return 0;
1003 }
1004
1005 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1006 {
1007         u64 snap_id = rbd_dev->spec->snap_id;
1008         u64 size = 0;
1009         u64 features = 0;
1010         int ret;
1011
1012         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1013         if (ret)
1014                 return ret;
1015         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1016         if (ret)
1017                 return ret;
1018
1019         rbd_dev->mapping.size = size;
1020         rbd_dev->mapping.features = features;
1021
1022         return 0;
1023 }
1024
1025 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1026 {
1027         rbd_dev->mapping.size = 0;
1028         rbd_dev->mapping.features = 0;
1029 }
1030
1031 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1032 {
1033         char *name;
1034         u64 segment;
1035         int ret;
1036         char *name_format;
1037
1038         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1039         if (!name)
1040                 return NULL;
1041         segment = offset >> rbd_dev->header.obj_order;
1042         name_format = "%s.%012llx";
1043         if (rbd_dev->image_format == 2)
1044                 name_format = "%s.%016llx";
1045         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
1046                         rbd_dev->header.object_prefix, segment);
1047         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1048                 pr_err("error formatting segment name for #%llu (%d)\n",
1049                         segment, ret);
1050                 kfree(name);
1051                 name = NULL;
1052         }
1053
1054         return name;
1055 }
1056
1057 static void rbd_segment_name_free(const char *name)
1058 {
1059         /* The explicit cast here is needed to drop the const qualifier */
1060
1061         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1062 }
1063
1064 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1065 {
1066         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1067
1068         return offset & (segment_size - 1);
1069 }
1070
1071 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1072                                 u64 offset, u64 length)
1073 {
1074         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1075
1076         offset &= segment_size - 1;
1077
1078         rbd_assert(length <= U64_MAX - offset);
1079         if (offset + length > segment_size)
1080                 length = segment_size - offset;
1081
1082         return length;
1083 }
1084
1085 /*
1086  * returns the size of an object in the image
1087  */
1088 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1089 {
1090         return 1 << header->obj_order;
1091 }
1092
1093 /*
1094  * bio helpers
1095  */
1096
1097 static void bio_chain_put(struct bio *chain)
1098 {
1099         struct bio *tmp;
1100
1101         while (chain) {
1102                 tmp = chain;
1103                 chain = chain->bi_next;
1104                 bio_put(tmp);
1105         }
1106 }
1107
1108 /*
1109  * zeros a bio chain, starting at specific offset
1110  */
1111 static void zero_bio_chain(struct bio *chain, int start_ofs)
1112 {
1113         struct bio_vec *bv;
1114         unsigned long flags;
1115         void *buf;
1116         int i;
1117         int pos = 0;
1118
1119         while (chain) {
1120                 bio_for_each_segment(bv, chain, i) {
1121                         if (pos + bv->bv_len > start_ofs) {
1122                                 int remainder = max(start_ofs - pos, 0);
1123                                 buf = bvec_kmap_irq(bv, &flags);
1124                                 memset(buf + remainder, 0,
1125                                        bv->bv_len - remainder);
1126                                 flush_dcache_page(bv->bv_page);
1127                                 bvec_kunmap_irq(buf, &flags);
1128                         }
1129                         pos += bv->bv_len;
1130                 }
1131
1132                 chain = chain->bi_next;
1133         }
1134 }
1135
1136 /*
1137  * similar to zero_bio_chain(), zeros data defined by a page array,
1138  * starting at the given byte offset from the start of the array and
1139  * continuing up to the given end offset.  The pages array is
1140  * assumed to be big enough to hold all bytes up to the end.
1141  */
1142 static void zero_pages(struct page **pages, u64 offset, u64 end)
1143 {
1144         struct page **page = &pages[offset >> PAGE_SHIFT];
1145
1146         rbd_assert(end > offset);
1147         rbd_assert(end - offset <= (u64)SIZE_MAX);
1148         while (offset < end) {
1149                 size_t page_offset;
1150                 size_t length;
1151                 unsigned long flags;
1152                 void *kaddr;
1153
1154                 page_offset = offset & ~PAGE_MASK;
1155                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1156                 local_irq_save(flags);
1157                 kaddr = kmap_atomic(*page);
1158                 memset(kaddr + page_offset, 0, length);
1159                 flush_dcache_page(*page);
1160                 kunmap_atomic(kaddr);
1161                 local_irq_restore(flags);
1162
1163                 offset += length;
1164                 page++;
1165         }
1166 }
1167
1168 /*
1169  * Clone a portion of a bio, starting at the given byte offset
1170  * and continuing for the number of bytes indicated.
1171  */
1172 static struct bio *bio_clone_range(struct bio *bio_src,
1173                                         unsigned int offset,
1174                                         unsigned int len,
1175                                         gfp_t gfpmask)
1176 {
1177         struct bio_vec *bv;
1178         unsigned int resid;
1179         unsigned short idx;
1180         unsigned int voff;
1181         unsigned short end_idx;
1182         unsigned short vcnt;
1183         struct bio *bio;
1184
1185         /* Handle the easy case for the caller */
1186
1187         if (!offset && len == bio_src->bi_size)
1188                 return bio_clone(bio_src, gfpmask);
1189
1190         if (WARN_ON_ONCE(!len))
1191                 return NULL;
1192         if (WARN_ON_ONCE(len > bio_src->bi_size))
1193                 return NULL;
1194         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1195                 return NULL;
1196
1197         /* Find first affected segment... */
1198
1199         resid = offset;
1200         bio_for_each_segment(bv, bio_src, idx) {
1201                 if (resid < bv->bv_len)
1202                         break;
1203                 resid -= bv->bv_len;
1204         }
1205         voff = resid;
1206
1207         /* ...and the last affected segment */
1208
1209         resid += len;
1210         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1211                 if (resid <= bv->bv_len)
1212                         break;
1213                 resid -= bv->bv_len;
1214         }
1215         vcnt = end_idx - idx + 1;
1216
1217         /* Build the clone */
1218
1219         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1220         if (!bio)
1221                 return NULL;    /* ENOMEM */
1222
1223         bio->bi_bdev = bio_src->bi_bdev;
1224         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1225         bio->bi_rw = bio_src->bi_rw;
1226         bio->bi_flags |= 1 << BIO_CLONED;
1227
1228         /*
1229          * Copy over our part of the bio_vec, then update the first
1230          * and last (or only) entries.
1231          */
1232         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1233                         vcnt * sizeof (struct bio_vec));
1234         bio->bi_io_vec[0].bv_offset += voff;
1235         if (vcnt > 1) {
1236                 bio->bi_io_vec[0].bv_len -= voff;
1237                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1238         } else {
1239                 bio->bi_io_vec[0].bv_len = len;
1240         }
1241
1242         bio->bi_vcnt = vcnt;
1243         bio->bi_size = len;
1244         bio->bi_idx = 0;
1245
1246         return bio;
1247 }
1248
1249 /*
1250  * Clone a portion of a bio chain, starting at the given byte offset
1251  * into the first bio in the source chain and continuing for the
1252  * number of bytes indicated.  The result is another bio chain of
1253  * exactly the given length, or a null pointer on error.
1254  *
1255  * The bio_src and offset parameters are both in-out.  On entry they
1256  * refer to the first source bio and the offset into that bio where
1257  * the start of data to be cloned is located.
1258  *
1259  * On return, bio_src is updated to refer to the bio in the source
1260  * chain that contains first un-cloned byte, and *offset will
1261  * contain the offset of that byte within that bio.
1262  */
1263 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1264                                         unsigned int *offset,
1265                                         unsigned int len,
1266                                         gfp_t gfpmask)
1267 {
1268         struct bio *bi = *bio_src;
1269         unsigned int off = *offset;
1270         struct bio *chain = NULL;
1271         struct bio **end;
1272
1273         /* Build up a chain of clone bios up to the limit */
1274
1275         if (!bi || off >= bi->bi_size || !len)
1276                 return NULL;            /* Nothing to clone */
1277
1278         end = &chain;
1279         while (len) {
1280                 unsigned int bi_size;
1281                 struct bio *bio;
1282
1283                 if (!bi) {
1284                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1285                         goto out_err;   /* EINVAL; ran out of bio's */
1286                 }
1287                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1288                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1289                 if (!bio)
1290                         goto out_err;   /* ENOMEM */
1291
1292                 *end = bio;
1293                 end = &bio->bi_next;
1294
1295                 off += bi_size;
1296                 if (off == bi->bi_size) {
1297                         bi = bi->bi_next;
1298                         off = 0;
1299                 }
1300                 len -= bi_size;
1301         }
1302         *bio_src = bi;
1303         *offset = off;
1304
1305         return chain;
1306 out_err:
1307         bio_chain_put(chain);
1308
1309         return NULL;
1310 }
1311
1312 /*
1313  * The default/initial value for all object request flags is 0.  For
1314  * each flag, once its value is set to 1 it is never reset to 0
1315  * again.
1316  */
1317 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1318 {
1319         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1320                 struct rbd_device *rbd_dev;
1321
1322                 rbd_dev = obj_request->img_request->rbd_dev;
1323                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1324                         obj_request);
1325         }
1326 }
1327
1328 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1329 {
1330         smp_mb();
1331         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1332 }
1333
1334 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1335 {
1336         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1337                 struct rbd_device *rbd_dev = NULL;
1338
1339                 if (obj_request_img_data_test(obj_request))
1340                         rbd_dev = obj_request->img_request->rbd_dev;
1341                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1342                         obj_request);
1343         }
1344 }
1345
1346 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1347 {
1348         smp_mb();
1349         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1350 }
1351
1352 /*
1353  * This sets the KNOWN flag after (possibly) setting the EXISTS
1354  * flag.  The latter is set based on the "exists" value provided.
1355  *
1356  * Note that for our purposes once an object exists it never goes
1357  * away again.  It's possible that the response from two existence
1358  * checks are separated by the creation of the target object, and
1359  * the first ("doesn't exist") response arrives *after* the second
1360  * ("does exist").  In that case we ignore the second one.
1361  */
1362 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1363                                 bool exists)
1364 {
1365         if (exists)
1366                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1367         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1368         smp_mb();
1369 }
1370
1371 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1372 {
1373         smp_mb();
1374         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1375 }
1376
1377 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1378 {
1379         smp_mb();
1380         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1381 }
1382
1383 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1384 {
1385         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1386                 atomic_read(&obj_request->kref.refcount));
1387         kref_get(&obj_request->kref);
1388 }
1389
1390 static void rbd_obj_request_destroy(struct kref *kref);
1391 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1392 {
1393         rbd_assert(obj_request != NULL);
1394         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1395                 atomic_read(&obj_request->kref.refcount));
1396         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1397 }
1398
1399 static bool img_request_child_test(struct rbd_img_request *img_request);
1400 static void rbd_parent_request_destroy(struct kref *kref);
1401 static void rbd_img_request_destroy(struct kref *kref);
1402 static void rbd_img_request_put(struct rbd_img_request *img_request)
1403 {
1404         rbd_assert(img_request != NULL);
1405         dout("%s: img %p (was %d)\n", __func__, img_request,
1406                 atomic_read(&img_request->kref.refcount));
1407         if (img_request_child_test(img_request))
1408                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1409         else
1410                 kref_put(&img_request->kref, rbd_img_request_destroy);
1411 }
1412
1413 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1414                                         struct rbd_obj_request *obj_request)
1415 {
1416         rbd_assert(obj_request->img_request == NULL);
1417
1418         /* Image request now owns object's original reference */
1419         obj_request->img_request = img_request;
1420         obj_request->which = img_request->obj_request_count;
1421         rbd_assert(!obj_request_img_data_test(obj_request));
1422         obj_request_img_data_set(obj_request);
1423         rbd_assert(obj_request->which != BAD_WHICH);
1424         img_request->obj_request_count++;
1425         list_add_tail(&obj_request->links, &img_request->obj_requests);
1426         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1427                 obj_request->which);
1428 }
1429
1430 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1431                                         struct rbd_obj_request *obj_request)
1432 {
1433         rbd_assert(obj_request->which != BAD_WHICH);
1434
1435         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1436                 obj_request->which);
1437         list_del(&obj_request->links);
1438         rbd_assert(img_request->obj_request_count > 0);
1439         img_request->obj_request_count--;
1440         rbd_assert(obj_request->which == img_request->obj_request_count);
1441         obj_request->which = BAD_WHICH;
1442         rbd_assert(obj_request_img_data_test(obj_request));
1443         rbd_assert(obj_request->img_request == img_request);
1444         obj_request->img_request = NULL;
1445         obj_request->callback = NULL;
1446         rbd_obj_request_put(obj_request);
1447 }
1448
1449 static bool obj_request_type_valid(enum obj_request_type type)
1450 {
1451         switch (type) {
1452         case OBJ_REQUEST_NODATA:
1453         case OBJ_REQUEST_BIO:
1454         case OBJ_REQUEST_PAGES:
1455                 return true;
1456         default:
1457                 return false;
1458         }
1459 }
1460
1461 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1462                                 struct rbd_obj_request *obj_request)
1463 {
1464         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1465
1466         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1467 }
1468
1469 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1470 {
1471
1472         dout("%s: img %p\n", __func__, img_request);
1473
1474         /*
1475          * If no error occurred, compute the aggregate transfer
1476          * count for the image request.  We could instead use
1477          * atomic64_cmpxchg() to update it as each object request
1478          * completes; not clear which way is better off hand.
1479          */
1480         if (!img_request->result) {
1481                 struct rbd_obj_request *obj_request;
1482                 u64 xferred = 0;
1483
1484                 for_each_obj_request(img_request, obj_request)
1485                         xferred += obj_request->xferred;
1486                 img_request->xferred = xferred;
1487         }
1488
1489         if (img_request->callback)
1490                 img_request->callback(img_request);
1491         else
1492                 rbd_img_request_put(img_request);
1493 }
1494
1495 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1496
1497 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1498 {
1499         dout("%s: obj %p\n", __func__, obj_request);
1500
1501         return wait_for_completion_interruptible(&obj_request->completion);
1502 }
1503
1504 /*
1505  * The default/initial value for all image request flags is 0.  Each
1506  * is conditionally set to 1 at image request initialization time
1507  * and currently never change thereafter.
1508  */
1509 static void img_request_write_set(struct rbd_img_request *img_request)
1510 {
1511         set_bit(IMG_REQ_WRITE, &img_request->flags);
1512         smp_mb();
1513 }
1514
1515 static bool img_request_write_test(struct rbd_img_request *img_request)
1516 {
1517         smp_mb();
1518         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1519 }
1520
1521 static void img_request_child_set(struct rbd_img_request *img_request)
1522 {
1523         set_bit(IMG_REQ_CHILD, &img_request->flags);
1524         smp_mb();
1525 }
1526
1527 static void img_request_child_clear(struct rbd_img_request *img_request)
1528 {
1529         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1530         smp_mb();
1531 }
1532
1533 static bool img_request_child_test(struct rbd_img_request *img_request)
1534 {
1535         smp_mb();
1536         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1537 }
1538
1539 static void img_request_layered_set(struct rbd_img_request *img_request)
1540 {
1541         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1542         smp_mb();
1543 }
1544
1545 static void img_request_layered_clear(struct rbd_img_request *img_request)
1546 {
1547         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1548         smp_mb();
1549 }
1550
1551 static bool img_request_layered_test(struct rbd_img_request *img_request)
1552 {
1553         smp_mb();
1554         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1555 }
1556
1557 static void
1558 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1559 {
1560         u64 xferred = obj_request->xferred;
1561         u64 length = obj_request->length;
1562
1563         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1564                 obj_request, obj_request->img_request, obj_request->result,
1565                 xferred, length);
1566         /*
1567          * ENOENT means a hole in the image.  We zero-fill the
1568          * entire length of the request.  A short read also implies
1569          * zero-fill to the end of the request.  Either way we
1570          * update the xferred count to indicate the whole request
1571          * was satisfied.
1572          */
1573         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1574         if (obj_request->result == -ENOENT) {
1575                 if (obj_request->type == OBJ_REQUEST_BIO)
1576                         zero_bio_chain(obj_request->bio_list, 0);
1577                 else
1578                         zero_pages(obj_request->pages, 0, length);
1579                 obj_request->result = 0;
1580                 obj_request->xferred = length;
1581         } else if (xferred < length && !obj_request->result) {
1582                 if (obj_request->type == OBJ_REQUEST_BIO)
1583                         zero_bio_chain(obj_request->bio_list, xferred);
1584                 else
1585                         zero_pages(obj_request->pages, xferred, length);
1586                 obj_request->xferred = length;
1587         }
1588         obj_request_done_set(obj_request);
1589 }
1590
1591 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1592 {
1593         dout("%s: obj %p cb %p\n", __func__, obj_request,
1594                 obj_request->callback);
1595         if (obj_request->callback)
1596                 obj_request->callback(obj_request);
1597         else
1598                 complete_all(&obj_request->completion);
1599 }
1600
1601 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1602 {
1603         dout("%s: obj %p\n", __func__, obj_request);
1604         obj_request_done_set(obj_request);
1605 }
1606
1607 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1608 {
1609         struct rbd_img_request *img_request = NULL;
1610         struct rbd_device *rbd_dev = NULL;
1611         bool layered = false;
1612
1613         if (obj_request_img_data_test(obj_request)) {
1614                 img_request = obj_request->img_request;
1615                 layered = img_request && img_request_layered_test(img_request);
1616                 rbd_dev = img_request->rbd_dev;
1617         }
1618
1619         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1620                 obj_request, img_request, obj_request->result,
1621                 obj_request->xferred, obj_request->length);
1622         if (layered && obj_request->result == -ENOENT &&
1623                         obj_request->img_offset < rbd_dev->parent_overlap)
1624                 rbd_img_parent_read(obj_request);
1625         else if (img_request)
1626                 rbd_img_obj_request_read_callback(obj_request);
1627         else
1628                 obj_request_done_set(obj_request);
1629 }
1630
1631 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1632 {
1633         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1634                 obj_request->result, obj_request->length);
1635         /*
1636          * There is no such thing as a successful short write.  Set
1637          * it to our originally-requested length.
1638          */
1639         obj_request->xferred = obj_request->length;
1640         obj_request_done_set(obj_request);
1641 }
1642
1643 /*
1644  * For a simple stat call there's nothing to do.  We'll do more if
1645  * this is part of a write sequence for a layered image.
1646  */
1647 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1648 {
1649         dout("%s: obj %p\n", __func__, obj_request);
1650         obj_request_done_set(obj_request);
1651 }
1652
1653 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1654                                 struct ceph_msg *msg)
1655 {
1656         struct rbd_obj_request *obj_request = osd_req->r_priv;
1657         u16 opcode;
1658
1659         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1660         rbd_assert(osd_req == obj_request->osd_req);
1661         if (obj_request_img_data_test(obj_request)) {
1662                 rbd_assert(obj_request->img_request);
1663                 rbd_assert(obj_request->which != BAD_WHICH);
1664         } else {
1665                 rbd_assert(obj_request->which == BAD_WHICH);
1666         }
1667
1668         if (osd_req->r_result < 0)
1669                 obj_request->result = osd_req->r_result;
1670
1671         BUG_ON(osd_req->r_num_ops > 2);
1672
1673         /*
1674          * We support a 64-bit length, but ultimately it has to be
1675          * passed to blk_end_request(), which takes an unsigned int.
1676          */
1677         obj_request->xferred = osd_req->r_reply_op_len[0];
1678         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1679         opcode = osd_req->r_ops[0].op;
1680         switch (opcode) {
1681         case CEPH_OSD_OP_READ:
1682                 rbd_osd_read_callback(obj_request);
1683                 break;
1684         case CEPH_OSD_OP_WRITE:
1685                 rbd_osd_write_callback(obj_request);
1686                 break;
1687         case CEPH_OSD_OP_STAT:
1688                 rbd_osd_stat_callback(obj_request);
1689                 break;
1690         case CEPH_OSD_OP_CALL:
1691         case CEPH_OSD_OP_NOTIFY_ACK:
1692         case CEPH_OSD_OP_WATCH:
1693                 rbd_osd_trivial_callback(obj_request);
1694                 break;
1695         default:
1696                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1697                         obj_request->object_name, (unsigned short) opcode);
1698                 break;
1699         }
1700
1701         if (obj_request_done_test(obj_request))
1702                 rbd_obj_request_complete(obj_request);
1703 }
1704
1705 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1706 {
1707         struct rbd_img_request *img_request = obj_request->img_request;
1708         struct ceph_osd_request *osd_req = obj_request->osd_req;
1709         u64 snap_id;
1710
1711         rbd_assert(osd_req != NULL);
1712
1713         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1714         ceph_osdc_build_request(osd_req, obj_request->offset,
1715                         NULL, snap_id, NULL);
1716 }
1717
1718 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1719 {
1720         struct rbd_img_request *img_request = obj_request->img_request;
1721         struct ceph_osd_request *osd_req = obj_request->osd_req;
1722         struct ceph_snap_context *snapc;
1723         struct timespec mtime = CURRENT_TIME;
1724
1725         rbd_assert(osd_req != NULL);
1726
1727         snapc = img_request ? img_request->snapc : NULL;
1728         ceph_osdc_build_request(osd_req, obj_request->offset,
1729                         snapc, CEPH_NOSNAP, &mtime);
1730 }
1731
1732 static struct ceph_osd_request *rbd_osd_req_create(
1733                                         struct rbd_device *rbd_dev,
1734                                         bool write_request,
1735                                         struct rbd_obj_request *obj_request)
1736 {
1737         struct ceph_snap_context *snapc = NULL;
1738         struct ceph_osd_client *osdc;
1739         struct ceph_osd_request *osd_req;
1740
1741         if (obj_request_img_data_test(obj_request)) {
1742                 struct rbd_img_request *img_request = obj_request->img_request;
1743
1744                 rbd_assert(write_request ==
1745                                 img_request_write_test(img_request));
1746                 if (write_request)
1747                         snapc = img_request->snapc;
1748         }
1749
1750         /* Allocate and initialize the request, for the single op */
1751
1752         osdc = &rbd_dev->rbd_client->client->osdc;
1753         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1754         if (!osd_req)
1755                 return NULL;    /* ENOMEM */
1756
1757         if (write_request)
1758                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1759         else
1760                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1761
1762         osd_req->r_callback = rbd_osd_req_callback;
1763         osd_req->r_priv = obj_request;
1764
1765         osd_req->r_oid_len = strlen(obj_request->object_name);
1766         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1767         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1768
1769         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1770
1771         return osd_req;
1772 }
1773
1774 /*
1775  * Create a copyup osd request based on the information in the
1776  * object request supplied.  A copyup request has two osd ops,
1777  * a copyup method call, and a "normal" write request.
1778  */
1779 static struct ceph_osd_request *
1780 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1781 {
1782         struct rbd_img_request *img_request;
1783         struct ceph_snap_context *snapc;
1784         struct rbd_device *rbd_dev;
1785         struct ceph_osd_client *osdc;
1786         struct ceph_osd_request *osd_req;
1787
1788         rbd_assert(obj_request_img_data_test(obj_request));
1789         img_request = obj_request->img_request;
1790         rbd_assert(img_request);
1791         rbd_assert(img_request_write_test(img_request));
1792
1793         /* Allocate and initialize the request, for the two ops */
1794
1795         snapc = img_request->snapc;
1796         rbd_dev = img_request->rbd_dev;
1797         osdc = &rbd_dev->rbd_client->client->osdc;
1798         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1799         if (!osd_req)
1800                 return NULL;    /* ENOMEM */
1801
1802         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1803         osd_req->r_callback = rbd_osd_req_callback;
1804         osd_req->r_priv = obj_request;
1805
1806         osd_req->r_oid_len = strlen(obj_request->object_name);
1807         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1808         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1809
1810         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1811
1812         return osd_req;
1813 }
1814
1815
1816 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1817 {
1818         ceph_osdc_put_request(osd_req);
1819 }
1820
1821 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1822
1823 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1824                                                 u64 offset, u64 length,
1825                                                 enum obj_request_type type)
1826 {
1827         struct rbd_obj_request *obj_request;
1828         size_t size;
1829         char *name;
1830
1831         rbd_assert(obj_request_type_valid(type));
1832
1833         size = strlen(object_name) + 1;
1834         name = kmalloc(size, GFP_KERNEL);
1835         if (!name)
1836                 return NULL;
1837
1838         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1839         if (!obj_request) {
1840                 kfree(name);
1841                 return NULL;
1842         }
1843
1844         obj_request->object_name = memcpy(name, object_name, size);
1845         obj_request->offset = offset;
1846         obj_request->length = length;
1847         obj_request->flags = 0;
1848         obj_request->which = BAD_WHICH;
1849         obj_request->type = type;
1850         INIT_LIST_HEAD(&obj_request->links);
1851         init_completion(&obj_request->completion);
1852         kref_init(&obj_request->kref);
1853
1854         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1855                 offset, length, (int)type, obj_request);
1856
1857         return obj_request;
1858 }
1859
1860 static void rbd_obj_request_destroy(struct kref *kref)
1861 {
1862         struct rbd_obj_request *obj_request;
1863
1864         obj_request = container_of(kref, struct rbd_obj_request, kref);
1865
1866         dout("%s: obj %p\n", __func__, obj_request);
1867
1868         rbd_assert(obj_request->img_request == NULL);
1869         rbd_assert(obj_request->which == BAD_WHICH);
1870
1871         if (obj_request->osd_req)
1872                 rbd_osd_req_destroy(obj_request->osd_req);
1873
1874         rbd_assert(obj_request_type_valid(obj_request->type));
1875         switch (obj_request->type) {
1876         case OBJ_REQUEST_NODATA:
1877                 break;          /* Nothing to do */
1878         case OBJ_REQUEST_BIO:
1879                 if (obj_request->bio_list)
1880                         bio_chain_put(obj_request->bio_list);
1881                 break;
1882         case OBJ_REQUEST_PAGES:
1883                 if (obj_request->pages)
1884                         ceph_release_page_vector(obj_request->pages,
1885                                                 obj_request->page_count);
1886                 break;
1887         }
1888
1889         kfree(obj_request->object_name);
1890         obj_request->object_name = NULL;
1891         kmem_cache_free(rbd_obj_request_cache, obj_request);
1892 }
1893
1894 /* It's OK to call this for a device with no parent */
1895
1896 static void rbd_spec_put(struct rbd_spec *spec);
1897 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1898 {
1899         rbd_dev_remove_parent(rbd_dev);
1900         rbd_spec_put(rbd_dev->parent_spec);
1901         rbd_dev->parent_spec = NULL;
1902         rbd_dev->parent_overlap = 0;
1903 }
1904
1905 /*
1906  * Parent image reference counting is used to determine when an
1907  * image's parent fields can be safely torn down--after there are no
1908  * more in-flight requests to the parent image.  When the last
1909  * reference is dropped, cleaning them up is safe.
1910  */
1911 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1912 {
1913         int counter;
1914
1915         if (!rbd_dev->parent_spec)
1916                 return;
1917
1918         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1919         if (counter > 0)
1920                 return;
1921
1922         /* Last reference; clean up parent data structures */
1923
1924         if (!counter)
1925                 rbd_dev_unparent(rbd_dev);
1926         else
1927                 rbd_warn(rbd_dev, "parent reference underflow\n");
1928 }
1929
1930 /*
1931  * If an image has a non-zero parent overlap, get a reference to its
1932  * parent.
1933  *
1934  * We must get the reference before checking for the overlap to
1935  * coordinate properly with zeroing the parent overlap in
1936  * rbd_dev_v2_parent_info() when an image gets flattened.  We
1937  * drop it again if there is no overlap.
1938  *
1939  * Returns true if the rbd device has a parent with a non-zero
1940  * overlap and a reference for it was successfully taken, or
1941  * false otherwise.
1942  */
1943 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1944 {
1945         int counter;
1946
1947         if (!rbd_dev->parent_spec)
1948                 return false;
1949
1950         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1951         if (counter > 0 && rbd_dev->parent_overlap)
1952                 return true;
1953
1954         /* Image was flattened, but parent is not yet torn down */
1955
1956         if (counter < 0)
1957                 rbd_warn(rbd_dev, "parent reference overflow\n");
1958
1959         return false;
1960 }
1961
1962 /*
1963  * Caller is responsible for filling in the list of object requests
1964  * that comprises the image request, and the Linux request pointer
1965  * (if there is one).
1966  */
1967 static struct rbd_img_request *rbd_img_request_create(
1968                                         struct rbd_device *rbd_dev,
1969                                         u64 offset, u64 length,
1970                                         bool write_request)
1971 {
1972         struct rbd_img_request *img_request;
1973
1974         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1975         if (!img_request)
1976                 return NULL;
1977
1978         if (write_request) {
1979                 down_read(&rbd_dev->header_rwsem);
1980                 ceph_get_snap_context(rbd_dev->header.snapc);
1981                 up_read(&rbd_dev->header_rwsem);
1982         }
1983
1984         img_request->rq = NULL;
1985         img_request->rbd_dev = rbd_dev;
1986         img_request->offset = offset;
1987         img_request->length = length;
1988         img_request->flags = 0;
1989         if (write_request) {
1990                 img_request_write_set(img_request);
1991                 img_request->snapc = rbd_dev->header.snapc;
1992         } else {
1993                 img_request->snap_id = rbd_dev->spec->snap_id;
1994         }
1995         if (rbd_dev_parent_get(rbd_dev))
1996                 img_request_layered_set(img_request);
1997         spin_lock_init(&img_request->completion_lock);
1998         img_request->next_completion = 0;
1999         img_request->callback = NULL;
2000         img_request->result = 0;
2001         img_request->obj_request_count = 0;
2002         INIT_LIST_HEAD(&img_request->obj_requests);
2003         kref_init(&img_request->kref);
2004
2005         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2006                 write_request ? "write" : "read", offset, length,
2007                 img_request);
2008
2009         return img_request;
2010 }
2011
2012 static void rbd_img_request_destroy(struct kref *kref)
2013 {
2014         struct rbd_img_request *img_request;
2015         struct rbd_obj_request *obj_request;
2016         struct rbd_obj_request *next_obj_request;
2017
2018         img_request = container_of(kref, struct rbd_img_request, kref);
2019
2020         dout("%s: img %p\n", __func__, img_request);
2021
2022         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2023                 rbd_img_obj_request_del(img_request, obj_request);
2024         rbd_assert(img_request->obj_request_count == 0);
2025
2026         if (img_request_layered_test(img_request)) {
2027                 img_request_layered_clear(img_request);
2028                 rbd_dev_parent_put(img_request->rbd_dev);
2029         }
2030
2031         if (img_request_write_test(img_request))
2032                 ceph_put_snap_context(img_request->snapc);
2033
2034         kmem_cache_free(rbd_img_request_cache, img_request);
2035 }
2036
2037 static struct rbd_img_request *rbd_parent_request_create(
2038                                         struct rbd_obj_request *obj_request,
2039                                         u64 img_offset, u64 length)
2040 {
2041         struct rbd_img_request *parent_request;
2042         struct rbd_device *rbd_dev;
2043
2044         rbd_assert(obj_request->img_request);
2045         rbd_dev = obj_request->img_request->rbd_dev;
2046
2047         parent_request = rbd_img_request_create(rbd_dev->parent,
2048                                                 img_offset, length, false);
2049         if (!parent_request)
2050                 return NULL;
2051
2052         img_request_child_set(parent_request);
2053         rbd_obj_request_get(obj_request);
2054         parent_request->obj_request = obj_request;
2055
2056         return parent_request;
2057 }
2058
2059 static void rbd_parent_request_destroy(struct kref *kref)
2060 {
2061         struct rbd_img_request *parent_request;
2062         struct rbd_obj_request *orig_request;
2063
2064         parent_request = container_of(kref, struct rbd_img_request, kref);
2065         orig_request = parent_request->obj_request;
2066
2067         parent_request->obj_request = NULL;
2068         rbd_obj_request_put(orig_request);
2069         img_request_child_clear(parent_request);
2070
2071         rbd_img_request_destroy(kref);
2072 }
2073
2074 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2075 {
2076         struct rbd_img_request *img_request;
2077         unsigned int xferred;
2078         int result;
2079         bool more;
2080
2081         rbd_assert(obj_request_img_data_test(obj_request));
2082         img_request = obj_request->img_request;
2083
2084         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2085         xferred = (unsigned int)obj_request->xferred;
2086         result = obj_request->result;
2087         if (result) {
2088                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2089
2090                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2091                         img_request_write_test(img_request) ? "write" : "read",
2092                         obj_request->length, obj_request->img_offset,
2093                         obj_request->offset);
2094                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
2095                         result, xferred);
2096                 if (!img_request->result)
2097                         img_request->result = result;
2098         }
2099
2100         /* Image object requests don't own their page array */
2101
2102         if (obj_request->type == OBJ_REQUEST_PAGES) {
2103                 obj_request->pages = NULL;
2104                 obj_request->page_count = 0;
2105         }
2106
2107         if (img_request_child_test(img_request)) {
2108                 rbd_assert(img_request->obj_request != NULL);
2109                 more = obj_request->which < img_request->obj_request_count - 1;
2110         } else {
2111                 rbd_assert(img_request->rq != NULL);
2112                 more = blk_end_request(img_request->rq, result, xferred);
2113         }
2114
2115         return more;
2116 }
2117
2118 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2119 {
2120         struct rbd_img_request *img_request;
2121         u32 which = obj_request->which;
2122         bool more = true;
2123
2124         rbd_assert(obj_request_img_data_test(obj_request));
2125         img_request = obj_request->img_request;
2126
2127         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2128         rbd_assert(img_request != NULL);
2129         rbd_assert(img_request->obj_request_count > 0);
2130         rbd_assert(which != BAD_WHICH);
2131         rbd_assert(which < img_request->obj_request_count);
2132         rbd_assert(which >= img_request->next_completion);
2133
2134         spin_lock_irq(&img_request->completion_lock);
2135         if (which != img_request->next_completion)
2136                 goto out;
2137
2138         for_each_obj_request_from(img_request, obj_request) {
2139                 rbd_assert(more);
2140                 rbd_assert(which < img_request->obj_request_count);
2141
2142                 if (!obj_request_done_test(obj_request))
2143                         break;
2144                 more = rbd_img_obj_end_request(obj_request);
2145                 which++;
2146         }
2147
2148         rbd_assert(more ^ (which == img_request->obj_request_count));
2149         img_request->next_completion = which;
2150 out:
2151         spin_unlock_irq(&img_request->completion_lock);
2152
2153         if (!more)
2154                 rbd_img_request_complete(img_request);
2155 }
2156
2157 /*
2158  * Split up an image request into one or more object requests, each
2159  * to a different object.  The "type" parameter indicates whether
2160  * "data_desc" is the pointer to the head of a list of bio
2161  * structures, or the base of a page array.  In either case this
2162  * function assumes data_desc describes memory sufficient to hold
2163  * all data described by the image request.
2164  */
2165 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2166                                         enum obj_request_type type,
2167                                         void *data_desc)
2168 {
2169         struct rbd_device *rbd_dev = img_request->rbd_dev;
2170         struct rbd_obj_request *obj_request = NULL;
2171         struct rbd_obj_request *next_obj_request;
2172         bool write_request = img_request_write_test(img_request);
2173         struct bio *bio_list;
2174         unsigned int bio_offset = 0;
2175         struct page **pages;
2176         u64 img_offset;
2177         u64 resid;
2178         u16 opcode;
2179
2180         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2181                 (int)type, data_desc);
2182
2183         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2184         img_offset = img_request->offset;
2185         resid = img_request->length;
2186         rbd_assert(resid > 0);
2187
2188         if (type == OBJ_REQUEST_BIO) {
2189                 bio_list = data_desc;
2190                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2191         } else {
2192                 rbd_assert(type == OBJ_REQUEST_PAGES);
2193                 pages = data_desc;
2194         }
2195
2196         while (resid) {
2197                 struct ceph_osd_request *osd_req;
2198                 const char *object_name;
2199                 u64 offset;
2200                 u64 length;
2201
2202                 object_name = rbd_segment_name(rbd_dev, img_offset);
2203                 if (!object_name)
2204                         goto out_unwind;
2205                 offset = rbd_segment_offset(rbd_dev, img_offset);
2206                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2207                 obj_request = rbd_obj_request_create(object_name,
2208                                                 offset, length, type);
2209                 /* object request has its own copy of the object name */
2210                 rbd_segment_name_free(object_name);
2211                 if (!obj_request)
2212                         goto out_unwind;
2213
2214                 if (type == OBJ_REQUEST_BIO) {
2215                         unsigned int clone_size;
2216
2217                         rbd_assert(length <= (u64)UINT_MAX);
2218                         clone_size = (unsigned int)length;
2219                         obj_request->bio_list =
2220                                         bio_chain_clone_range(&bio_list,
2221                                                                 &bio_offset,
2222                                                                 clone_size,
2223                                                                 GFP_ATOMIC);
2224                         if (!obj_request->bio_list)
2225                                 goto out_partial;
2226                 } else {
2227                         unsigned int page_count;
2228
2229                         obj_request->pages = pages;
2230                         page_count = (u32)calc_pages_for(offset, length);
2231                         obj_request->page_count = page_count;
2232                         if ((offset + length) & ~PAGE_MASK)
2233                                 page_count--;   /* more on last page */
2234                         pages += page_count;
2235                 }
2236
2237                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2238                                                 obj_request);
2239                 if (!osd_req)
2240                         goto out_partial;
2241                 obj_request->osd_req = osd_req;
2242                 obj_request->callback = rbd_img_obj_callback;
2243
2244                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2245                                                 0, 0);
2246                 if (type == OBJ_REQUEST_BIO)
2247                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2248                                         obj_request->bio_list, length);
2249                 else
2250                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2251                                         obj_request->pages, length,
2252                                         offset & ~PAGE_MASK, false, false);
2253
2254                 /*
2255                  * set obj_request->img_request before formatting
2256                  * the osd_request so that it gets the right snapc
2257                  */
2258                 rbd_img_obj_request_add(img_request, obj_request);
2259                 if (write_request)
2260                         rbd_osd_req_format_write(obj_request);
2261                 else
2262                         rbd_osd_req_format_read(obj_request);
2263
2264                 obj_request->img_offset = img_offset;
2265
2266                 img_offset += length;
2267                 resid -= length;
2268         }
2269
2270         return 0;
2271
2272 out_partial:
2273         rbd_obj_request_put(obj_request);
2274 out_unwind:
2275         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2276                 rbd_obj_request_put(obj_request);
2277
2278         return -ENOMEM;
2279 }
2280
2281 static void
2282 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2283 {
2284         struct rbd_img_request *img_request;
2285         struct rbd_device *rbd_dev;
2286         struct page **pages;
2287         u32 page_count;
2288
2289         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2290         rbd_assert(obj_request_img_data_test(obj_request));
2291         img_request = obj_request->img_request;
2292         rbd_assert(img_request);
2293
2294         rbd_dev = img_request->rbd_dev;
2295         rbd_assert(rbd_dev);
2296
2297         pages = obj_request->copyup_pages;
2298         rbd_assert(pages != NULL);
2299         obj_request->copyup_pages = NULL;
2300         page_count = obj_request->copyup_page_count;
2301         rbd_assert(page_count);
2302         obj_request->copyup_page_count = 0;
2303         ceph_release_page_vector(pages, page_count);
2304
2305         /*
2306          * We want the transfer count to reflect the size of the
2307          * original write request.  There is no such thing as a
2308          * successful short write, so if the request was successful
2309          * we can just set it to the originally-requested length.
2310          */
2311         if (!obj_request->result)
2312                 obj_request->xferred = obj_request->length;
2313
2314         /* Finish up with the normal image object callback */
2315
2316         rbd_img_obj_callback(obj_request);
2317 }
2318
2319 static void
2320 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2321 {
2322         struct rbd_obj_request *orig_request;
2323         struct ceph_osd_request *osd_req;
2324         struct ceph_osd_client *osdc;
2325         struct rbd_device *rbd_dev;
2326         struct page **pages;
2327         u32 page_count;
2328         int img_result;
2329         u64 parent_length;
2330         u64 offset;
2331         u64 length;
2332
2333         rbd_assert(img_request_child_test(img_request));
2334
2335         /* First get what we need from the image request */
2336
2337         pages = img_request->copyup_pages;
2338         rbd_assert(pages != NULL);
2339         img_request->copyup_pages = NULL;
2340         page_count = img_request->copyup_page_count;
2341         rbd_assert(page_count);
2342         img_request->copyup_page_count = 0;
2343
2344         orig_request = img_request->obj_request;
2345         rbd_assert(orig_request != NULL);
2346         rbd_assert(obj_request_type_valid(orig_request->type));
2347         img_result = img_request->result;
2348         parent_length = img_request->length;
2349         rbd_assert(parent_length == img_request->xferred);
2350         rbd_img_request_put(img_request);
2351
2352         rbd_assert(orig_request->img_request);
2353         rbd_dev = orig_request->img_request->rbd_dev;
2354         rbd_assert(rbd_dev);
2355
2356         /*
2357          * If the overlap has become 0 (most likely because the
2358          * image has been flattened) we need to free the pages
2359          * and re-submit the original write request.
2360          */
2361         if (!rbd_dev->parent_overlap) {
2362                 struct ceph_osd_client *osdc;
2363
2364                 ceph_release_page_vector(pages, page_count);
2365                 osdc = &rbd_dev->rbd_client->client->osdc;
2366                 img_result = rbd_obj_request_submit(osdc, orig_request);
2367                 if (!img_result)
2368                         return;
2369         }
2370
2371         if (img_result)
2372                 goto out_err;
2373
2374         /*
2375          * The original osd request is of no use to use any more.
2376          * We need a new one that can hold the two ops in a copyup
2377          * request.  Allocate the new copyup osd request for the
2378          * original request, and release the old one.
2379          */
2380         img_result = -ENOMEM;
2381         osd_req = rbd_osd_req_create_copyup(orig_request);
2382         if (!osd_req)
2383                 goto out_err;
2384         rbd_osd_req_destroy(orig_request->osd_req);
2385         orig_request->osd_req = osd_req;
2386         orig_request->copyup_pages = pages;
2387         orig_request->copyup_page_count = page_count;
2388
2389         /* Initialize the copyup op */
2390
2391         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2392         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2393                                                 false, false);
2394
2395         /* Then the original write request op */
2396
2397         offset = orig_request->offset;
2398         length = orig_request->length;
2399         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2400                                         offset, length, 0, 0);
2401         if (orig_request->type == OBJ_REQUEST_BIO)
2402                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2403                                         orig_request->bio_list, length);
2404         else
2405                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2406                                         orig_request->pages, length,
2407                                         offset & ~PAGE_MASK, false, false);
2408
2409         rbd_osd_req_format_write(orig_request);
2410
2411         /* All set, send it off. */
2412
2413         orig_request->callback = rbd_img_obj_copyup_callback;
2414         osdc = &rbd_dev->rbd_client->client->osdc;
2415         img_result = rbd_obj_request_submit(osdc, orig_request);
2416         if (!img_result)
2417                 return;
2418 out_err:
2419         /* Record the error code and complete the request */
2420
2421         orig_request->result = img_result;
2422         orig_request->xferred = 0;
2423         obj_request_done_set(orig_request);
2424         rbd_obj_request_complete(orig_request);
2425 }
2426
2427 /*
2428  * Read from the parent image the range of data that covers the
2429  * entire target of the given object request.  This is used for
2430  * satisfying a layered image write request when the target of an
2431  * object request from the image request does not exist.
2432  *
2433  * A page array big enough to hold the returned data is allocated
2434  * and supplied to rbd_img_request_fill() as the "data descriptor."
2435  * When the read completes, this page array will be transferred to
2436  * the original object request for the copyup operation.
2437  *
2438  * If an error occurs, record it as the result of the original
2439  * object request and mark it done so it gets completed.
2440  */
2441 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2442 {
2443         struct rbd_img_request *img_request = NULL;
2444         struct rbd_img_request *parent_request = NULL;
2445         struct rbd_device *rbd_dev;
2446         u64 img_offset;
2447         u64 length;
2448         struct page **pages = NULL;
2449         u32 page_count;
2450         int result;
2451
2452         rbd_assert(obj_request_img_data_test(obj_request));
2453         rbd_assert(obj_request_type_valid(obj_request->type));
2454
2455         img_request = obj_request->img_request;
2456         rbd_assert(img_request != NULL);
2457         rbd_dev = img_request->rbd_dev;
2458         rbd_assert(rbd_dev->parent != NULL);
2459
2460         /*
2461          * Determine the byte range covered by the object in the
2462          * child image to which the original request was to be sent.
2463          */
2464         img_offset = obj_request->img_offset - obj_request->offset;
2465         length = (u64)1 << rbd_dev->header.obj_order;
2466
2467         /*
2468          * There is no defined parent data beyond the parent
2469          * overlap, so limit what we read at that boundary if
2470          * necessary.
2471          */
2472         if (img_offset + length > rbd_dev->parent_overlap) {
2473                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2474                 length = rbd_dev->parent_overlap - img_offset;
2475         }
2476
2477         /*
2478          * Allocate a page array big enough to receive the data read
2479          * from the parent.
2480          */
2481         page_count = (u32)calc_pages_for(0, length);
2482         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2483         if (IS_ERR(pages)) {
2484                 result = PTR_ERR(pages);
2485                 pages = NULL;
2486                 goto out_err;
2487         }
2488
2489         result = -ENOMEM;
2490         parent_request = rbd_parent_request_create(obj_request,
2491                                                 img_offset, length);
2492         if (!parent_request)
2493                 goto out_err;
2494
2495         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2496         if (result)
2497                 goto out_err;
2498         parent_request->copyup_pages = pages;
2499         parent_request->copyup_page_count = page_count;
2500
2501         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2502         result = rbd_img_request_submit(parent_request);
2503         if (!result)
2504                 return 0;
2505
2506         parent_request->copyup_pages = NULL;
2507         parent_request->copyup_page_count = 0;
2508         parent_request->obj_request = NULL;
2509         rbd_obj_request_put(obj_request);
2510 out_err:
2511         if (pages)
2512                 ceph_release_page_vector(pages, page_count);
2513         if (parent_request)
2514                 rbd_img_request_put(parent_request);
2515         obj_request->result = result;
2516         obj_request->xferred = 0;
2517         obj_request_done_set(obj_request);
2518
2519         return result;
2520 }
2521
2522 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2523 {
2524         struct rbd_obj_request *orig_request;
2525         struct rbd_device *rbd_dev;
2526         int result;
2527
2528         rbd_assert(!obj_request_img_data_test(obj_request));
2529
2530         /*
2531          * All we need from the object request is the original
2532          * request and the result of the STAT op.  Grab those, then
2533          * we're done with the request.
2534          */
2535         orig_request = obj_request->obj_request;
2536         obj_request->obj_request = NULL;
2537         rbd_obj_request_put(orig_request);
2538         rbd_assert(orig_request);
2539         rbd_assert(orig_request->img_request);
2540
2541         result = obj_request->result;
2542         obj_request->result = 0;
2543
2544         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2545                 obj_request, orig_request, result,
2546                 obj_request->xferred, obj_request->length);
2547         rbd_obj_request_put(obj_request);
2548
2549         /*
2550          * If the overlap has become 0 (most likely because the
2551          * image has been flattened) we need to free the pages
2552          * and re-submit the original write request.
2553          */
2554         rbd_dev = orig_request->img_request->rbd_dev;
2555         if (!rbd_dev->parent_overlap) {
2556                 struct ceph_osd_client *osdc;
2557
2558                 osdc = &rbd_dev->rbd_client->client->osdc;
2559                 result = rbd_obj_request_submit(osdc, orig_request);
2560                 if (!result)
2561                         return;
2562         }
2563
2564         /*
2565          * Our only purpose here is to determine whether the object
2566          * exists, and we don't want to treat the non-existence as
2567          * an error.  If something else comes back, transfer the
2568          * error to the original request and complete it now.
2569          */
2570         if (!result) {
2571                 obj_request_existence_set(orig_request, true);
2572         } else if (result == -ENOENT) {
2573                 obj_request_existence_set(orig_request, false);
2574         } else if (result) {
2575                 orig_request->result = result;
2576                 goto out;
2577         }
2578
2579         /*
2580          * Resubmit the original request now that we have recorded
2581          * whether the target object exists.
2582          */
2583         orig_request->result = rbd_img_obj_request_submit(orig_request);
2584 out:
2585         if (orig_request->result)
2586                 rbd_obj_request_complete(orig_request);
2587 }
2588
2589 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2590 {
2591         struct rbd_obj_request *stat_request;
2592         struct rbd_device *rbd_dev;
2593         struct ceph_osd_client *osdc;
2594         struct page **pages = NULL;
2595         u32 page_count;
2596         size_t size;
2597         int ret;
2598
2599         /*
2600          * The response data for a STAT call consists of:
2601          *     le64 length;
2602          *     struct {
2603          *         le32 tv_sec;
2604          *         le32 tv_nsec;
2605          *     } mtime;
2606          */
2607         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2608         page_count = (u32)calc_pages_for(0, size);
2609         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2610         if (IS_ERR(pages))
2611                 return PTR_ERR(pages);
2612
2613         ret = -ENOMEM;
2614         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2615                                                         OBJ_REQUEST_PAGES);
2616         if (!stat_request)
2617                 goto out;
2618
2619         rbd_obj_request_get(obj_request);
2620         stat_request->obj_request = obj_request;
2621         stat_request->pages = pages;
2622         stat_request->page_count = page_count;
2623
2624         rbd_assert(obj_request->img_request);
2625         rbd_dev = obj_request->img_request->rbd_dev;
2626         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2627                                                 stat_request);
2628         if (!stat_request->osd_req)
2629                 goto out;
2630         stat_request->callback = rbd_img_obj_exists_callback;
2631
2632         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2633         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2634                                         false, false);
2635         rbd_osd_req_format_read(stat_request);
2636
2637         osdc = &rbd_dev->rbd_client->client->osdc;
2638         ret = rbd_obj_request_submit(osdc, stat_request);
2639 out:
2640         if (ret)
2641                 rbd_obj_request_put(obj_request);
2642
2643         return ret;
2644 }
2645
2646 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2647 {
2648         struct rbd_img_request *img_request;
2649         struct rbd_device *rbd_dev;
2650         bool known;
2651
2652         rbd_assert(obj_request_img_data_test(obj_request));
2653
2654         img_request = obj_request->img_request;
2655         rbd_assert(img_request);
2656         rbd_dev = img_request->rbd_dev;
2657
2658         /*
2659          * Only writes to layered images need special handling.
2660          * Reads and non-layered writes are simple object requests.
2661          * Layered writes that start beyond the end of the overlap
2662          * with the parent have no parent data, so they too are
2663          * simple object requests.  Finally, if the target object is
2664          * known to already exist, its parent data has already been
2665          * copied, so a write to the object can also be handled as a
2666          * simple object request.
2667          */
2668         if (!img_request_write_test(img_request) ||
2669                 !img_request_layered_test(img_request) ||
2670                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2671                 ((known = obj_request_known_test(obj_request)) &&
2672                         obj_request_exists_test(obj_request))) {
2673
2674                 struct rbd_device *rbd_dev;
2675                 struct ceph_osd_client *osdc;
2676
2677                 rbd_dev = obj_request->img_request->rbd_dev;
2678                 osdc = &rbd_dev->rbd_client->client->osdc;
2679
2680                 return rbd_obj_request_submit(osdc, obj_request);
2681         }
2682
2683         /*
2684          * It's a layered write.  The target object might exist but
2685          * we may not know that yet.  If we know it doesn't exist,
2686          * start by reading the data for the full target object from
2687          * the parent so we can use it for a copyup to the target.
2688          */
2689         if (known)
2690                 return rbd_img_obj_parent_read_full(obj_request);
2691
2692         /* We don't know whether the target exists.  Go find out. */
2693
2694         return rbd_img_obj_exists_submit(obj_request);
2695 }
2696
2697 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2698 {
2699         struct rbd_obj_request *obj_request;
2700         struct rbd_obj_request *next_obj_request;
2701
2702         dout("%s: img %p\n", __func__, img_request);
2703         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2704                 int ret;
2705
2706                 ret = rbd_img_obj_request_submit(obj_request);
2707                 if (ret)
2708                         return ret;
2709         }
2710
2711         return 0;
2712 }
2713
2714 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2715 {
2716         struct rbd_obj_request *obj_request;
2717         struct rbd_device *rbd_dev;
2718         u64 obj_end;
2719         u64 img_xferred;
2720         int img_result;
2721
2722         rbd_assert(img_request_child_test(img_request));
2723
2724         /* First get what we need from the image request and release it */
2725
2726         obj_request = img_request->obj_request;
2727         img_xferred = img_request->xferred;
2728         img_result = img_request->result;
2729         rbd_img_request_put(img_request);
2730
2731         /*
2732          * If the overlap has become 0 (most likely because the
2733          * image has been flattened) we need to re-submit the
2734          * original request.
2735          */
2736         rbd_assert(obj_request);
2737         rbd_assert(obj_request->img_request);
2738         rbd_dev = obj_request->img_request->rbd_dev;
2739         if (!rbd_dev->parent_overlap) {
2740                 struct ceph_osd_client *osdc;
2741
2742                 osdc = &rbd_dev->rbd_client->client->osdc;
2743                 img_result = rbd_obj_request_submit(osdc, obj_request);
2744                 if (!img_result)
2745                         return;
2746         }
2747
2748         obj_request->result = img_result;
2749         if (obj_request->result)
2750                 goto out;
2751
2752         /*
2753          * We need to zero anything beyond the parent overlap
2754          * boundary.  Since rbd_img_obj_request_read_callback()
2755          * will zero anything beyond the end of a short read, an
2756          * easy way to do this is to pretend the data from the
2757          * parent came up short--ending at the overlap boundary.
2758          */
2759         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2760         obj_end = obj_request->img_offset + obj_request->length;
2761         if (obj_end > rbd_dev->parent_overlap) {
2762                 u64 xferred = 0;
2763
2764                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2765                         xferred = rbd_dev->parent_overlap -
2766                                         obj_request->img_offset;
2767
2768                 obj_request->xferred = min(img_xferred, xferred);
2769         } else {
2770                 obj_request->xferred = img_xferred;
2771         }
2772 out:
2773         rbd_img_obj_request_read_callback(obj_request);
2774         rbd_obj_request_complete(obj_request);
2775 }
2776
2777 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2778 {
2779         struct rbd_img_request *img_request;
2780         int result;
2781
2782         rbd_assert(obj_request_img_data_test(obj_request));
2783         rbd_assert(obj_request->img_request != NULL);
2784         rbd_assert(obj_request->result == (s32) -ENOENT);
2785         rbd_assert(obj_request_type_valid(obj_request->type));
2786
2787         /* rbd_read_finish(obj_request, obj_request->length); */
2788         img_request = rbd_parent_request_create(obj_request,
2789                                                 obj_request->img_offset,
2790                                                 obj_request->length);
2791         result = -ENOMEM;
2792         if (!img_request)
2793                 goto out_err;
2794
2795         if (obj_request->type == OBJ_REQUEST_BIO)
2796                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2797                                                 obj_request->bio_list);
2798         else
2799                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2800                                                 obj_request->pages);
2801         if (result)
2802                 goto out_err;
2803
2804         img_request->callback = rbd_img_parent_read_callback;
2805         result = rbd_img_request_submit(img_request);
2806         if (result)
2807                 goto out_err;
2808
2809         return;
2810 out_err:
2811         if (img_request)
2812                 rbd_img_request_put(img_request);
2813         obj_request->result = result;
2814         obj_request->xferred = 0;
2815         obj_request_done_set(obj_request);
2816 }
2817
2818 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2819 {
2820         struct rbd_obj_request *obj_request;
2821         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2822         int ret;
2823
2824         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2825                                                         OBJ_REQUEST_NODATA);
2826         if (!obj_request)
2827                 return -ENOMEM;
2828
2829         ret = -ENOMEM;
2830         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2831         if (!obj_request->osd_req)
2832                 goto out;
2833         obj_request->callback = rbd_obj_request_put;
2834
2835         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2836                                         notify_id, 0, 0);
2837         rbd_osd_req_format_read(obj_request);
2838
2839         ret = rbd_obj_request_submit(osdc, obj_request);
2840 out:
2841         if (ret)
2842                 rbd_obj_request_put(obj_request);
2843
2844         return ret;
2845 }
2846
2847 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2848 {
2849         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2850         int ret;
2851
2852         if (!rbd_dev)
2853                 return;
2854
2855         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2856                 rbd_dev->header_name, (unsigned long long)notify_id,
2857                 (unsigned int)opcode);
2858         ret = rbd_dev_refresh(rbd_dev);
2859         if (ret)
2860                 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2861
2862         rbd_obj_notify_ack(rbd_dev, notify_id);
2863 }
2864
2865 /*
2866  * Request sync osd watch/unwatch.  The value of "start" determines
2867  * whether a watch request is being initiated or torn down.
2868  */
2869 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870 {
2871         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2872         struct rbd_obj_request *obj_request;
2873         int ret;
2874
2875         rbd_assert(start ^ !!rbd_dev->watch_event);
2876         rbd_assert(start ^ !!rbd_dev->watch_request);
2877
2878         if (start) {
2879                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2880                                                 &rbd_dev->watch_event);
2881                 if (ret < 0)
2882                         return ret;
2883                 rbd_assert(rbd_dev->watch_event != NULL);
2884         }
2885
2886         ret = -ENOMEM;
2887         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2888                                                         OBJ_REQUEST_NODATA);
2889         if (!obj_request)
2890                 goto out_cancel;
2891
2892         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2893         if (!obj_request->osd_req)
2894                 goto out_cancel;
2895
2896         if (start)
2897                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2898         else
2899                 ceph_osdc_unregister_linger_request(osdc,
2900                                         rbd_dev->watch_request->osd_req);
2901
2902         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2903                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2904         rbd_osd_req_format_write(obj_request);
2905
2906         ret = rbd_obj_request_submit(osdc, obj_request);
2907         if (ret)
2908                 goto out_cancel;
2909         ret = rbd_obj_request_wait(obj_request);
2910         if (ret)
2911                 goto out_cancel;
2912         ret = obj_request->result;
2913         if (ret)
2914                 goto out_cancel;
2915
2916         /*
2917          * A watch request is set to linger, so the underlying osd
2918          * request won't go away until we unregister it.  We retain
2919          * a pointer to the object request during that time (in
2920          * rbd_dev->watch_request), so we'll keep a reference to
2921          * it.  We'll drop that reference (below) after we've
2922          * unregistered it.
2923          */
2924         if (start) {
2925                 rbd_dev->watch_request = obj_request;
2926
2927                 return 0;
2928         }
2929
2930         /* We have successfully torn down the watch request */
2931
2932         rbd_obj_request_put(rbd_dev->watch_request);
2933         rbd_dev->watch_request = NULL;
2934 out_cancel:
2935         /* Cancel the event if we're tearing down, or on error */
2936         ceph_osdc_cancel_event(rbd_dev->watch_event);
2937         rbd_dev->watch_event = NULL;
2938         if (obj_request)
2939                 rbd_obj_request_put(obj_request);
2940
2941         return ret;
2942 }
2943
2944 /*
2945  * Synchronous osd object method call.  Returns the number of bytes
2946  * returned in the outbound buffer, or a negative error code.
2947  */
2948 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2949                              const char *object_name,
2950                              const char *class_name,
2951                              const char *method_name,
2952                              const void *outbound,
2953                              size_t outbound_size,
2954                              void *inbound,
2955                              size_t inbound_size)
2956 {
2957         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2958         struct rbd_obj_request *obj_request;
2959         struct page **pages;
2960         u32 page_count;
2961         int ret;
2962
2963         /*
2964          * Method calls are ultimately read operations.  The result
2965          * should placed into the inbound buffer provided.  They
2966          * also supply outbound data--parameters for the object
2967          * method.  Currently if this is present it will be a
2968          * snapshot id.
2969          */
2970         page_count = (u32)calc_pages_for(0, inbound_size);
2971         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2972         if (IS_ERR(pages))
2973                 return PTR_ERR(pages);
2974
2975         ret = -ENOMEM;
2976         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2977                                                         OBJ_REQUEST_PAGES);
2978         if (!obj_request)
2979                 goto out;
2980
2981         obj_request->pages = pages;
2982         obj_request->page_count = page_count;
2983
2984         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2985         if (!obj_request->osd_req)
2986                 goto out;
2987
2988         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2989                                         class_name, method_name);
2990         if (outbound_size) {
2991                 struct ceph_pagelist *pagelist;
2992
2993                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2994                 if (!pagelist)
2995                         goto out;
2996
2997                 ceph_pagelist_init(pagelist);
2998                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2999                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3000                                                 pagelist);
3001         }
3002         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3003                                         obj_request->pages, inbound_size,
3004                                         0, false, false);
3005         rbd_osd_req_format_read(obj_request);
3006
3007         ret = rbd_obj_request_submit(osdc, obj_request);
3008         if (ret)
3009                 goto out;
3010         ret = rbd_obj_request_wait(obj_request);
3011         if (ret)
3012                 goto out;
3013
3014         ret = obj_request->result;
3015         if (ret < 0)
3016                 goto out;
3017
3018         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3019         ret = (int)obj_request->xferred;
3020         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3021 out:
3022         if (obj_request)
3023                 rbd_obj_request_put(obj_request);
3024         else
3025                 ceph_release_page_vector(pages, page_count);
3026
3027         return ret;
3028 }
3029
3030 static void rbd_request_fn(struct request_queue *q)
3031                 __releases(q->queue_lock) __acquires(q->queue_lock)
3032 {
3033         struct rbd_device *rbd_dev = q->queuedata;
3034         bool read_only = rbd_dev->mapping.read_only;
3035         struct request *rq;
3036         int result;
3037
3038         while ((rq = blk_fetch_request(q))) {
3039                 bool write_request = rq_data_dir(rq) == WRITE;
3040                 struct rbd_img_request *img_request;
3041                 u64 offset;
3042                 u64 length;
3043
3044                 /* Ignore any non-FS requests that filter through. */
3045
3046                 if (rq->cmd_type != REQ_TYPE_FS) {
3047                         dout("%s: non-fs request type %d\n", __func__,
3048                                 (int) rq->cmd_type);
3049                         __blk_end_request_all(rq, 0);
3050                         continue;
3051                 }
3052
3053                 /* Ignore/skip any zero-length requests */
3054
3055                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3056                 length = (u64) blk_rq_bytes(rq);
3057
3058                 if (!length) {
3059                         dout("%s: zero-length request\n", __func__);
3060                         __blk_end_request_all(rq, 0);
3061                         continue;
3062                 }
3063
3064                 spin_unlock_irq(q->queue_lock);
3065
3066                 /* Disallow writes to a read-only device */
3067
3068                 if (write_request) {
3069                         result = -EROFS;
3070                         if (read_only)
3071                                 goto end_request;
3072                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3073                 }
3074
3075                 /*
3076                  * Quit early if the mapped snapshot no longer
3077                  * exists.  It's still possible the snapshot will
3078                  * have disappeared by the time our request arrives
3079                  * at the osd, but there's no sense in sending it if
3080                  * we already know.
3081                  */
3082                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3083                         dout("request for non-existent snapshot");
3084                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3085                         result = -ENXIO;
3086                         goto end_request;
3087                 }
3088
3089                 result = -EINVAL;
3090                 if (offset && length > U64_MAX - offset + 1) {
3091                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3092                                 offset, length);
3093                         goto end_request;       /* Shouldn't happen */
3094                 }
3095
3096                 result = -EIO;
3097                 if (offset + length > rbd_dev->mapping.size) {
3098                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3099                                 offset, length, rbd_dev->mapping.size);
3100                         goto end_request;
3101                 }
3102
3103                 result = -ENOMEM;
3104                 img_request = rbd_img_request_create(rbd_dev, offset, length,
3105                                                         write_request);
3106                 if (!img_request)
3107                         goto end_request;
3108
3109                 img_request->rq = rq;
3110
3111                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3112                                                 rq->bio);
3113                 if (!result)
3114                         result = rbd_img_request_submit(img_request);
3115                 if (result)
3116                         rbd_img_request_put(img_request);
3117 end_request:
3118                 spin_lock_irq(q->queue_lock);
3119                 if (result < 0) {
3120                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3121                                 write_request ? "write" : "read",
3122                                 length, offset, result);
3123
3124                         __blk_end_request_all(rq, result);
3125                 }
3126         }
3127 }
3128
3129 /*
3130  * a queue callback. Makes sure that we don't create a bio that spans across
3131  * multiple osd objects. One exception would be with a single page bios,
3132  * which we handle later at bio_chain_clone_range()
3133  */
3134 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3135                           struct bio_vec *bvec)
3136 {
3137         struct rbd_device *rbd_dev = q->queuedata;
3138         sector_t sector_offset;
3139         sector_t sectors_per_obj;
3140         sector_t obj_sector_offset;
3141         int ret;
3142
3143         /*
3144          * Find how far into its rbd object the partition-relative
3145          * bio start sector is to offset relative to the enclosing
3146          * device.
3147          */
3148         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3149         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3150         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3151
3152         /*
3153          * Compute the number of bytes from that offset to the end
3154          * of the object.  Account for what's already used by the bio.
3155          */
3156         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3157         if (ret > bmd->bi_size)
3158                 ret -= bmd->bi_size;
3159         else
3160                 ret = 0;
3161
3162         /*
3163          * Don't send back more than was asked for.  And if the bio
3164          * was empty, let the whole thing through because:  "Note
3165          * that a block device *must* allow a single page to be
3166          * added to an empty bio."
3167          */
3168         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3169         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3170                 ret = (int) bvec->bv_len;
3171
3172         return ret;
3173 }
3174
3175 static void rbd_free_disk(struct rbd_device *rbd_dev)
3176 {
3177         struct gendisk *disk = rbd_dev->disk;
3178
3179         if (!disk)
3180                 return;
3181
3182         rbd_dev->disk = NULL;
3183         if (disk->flags & GENHD_FL_UP) {
3184                 del_gendisk(disk);
3185                 if (disk->queue)
3186                         blk_cleanup_queue(disk->queue);
3187         }
3188         put_disk(disk);
3189 }
3190
3191 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3192                                 const char *object_name,
3193                                 u64 offset, u64 length, void *buf)
3194
3195 {
3196         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3197         struct rbd_obj_request *obj_request;
3198         struct page **pages = NULL;
3199         u32 page_count;
3200         size_t size;
3201         int ret;
3202
3203         page_count = (u32) calc_pages_for(offset, length);
3204         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3205         if (IS_ERR(pages))
3206                 ret = PTR_ERR(pages);
3207
3208         ret = -ENOMEM;
3209         obj_request = rbd_obj_request_create(object_name, offset, length,
3210                                                         OBJ_REQUEST_PAGES);
3211         if (!obj_request)
3212                 goto out;
3213
3214         obj_request->pages = pages;
3215         obj_request->page_count = page_count;
3216
3217         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3218         if (!obj_request->osd_req)
3219                 goto out;
3220
3221         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3222                                         offset, length, 0, 0);
3223         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3224                                         obj_request->pages,
3225                                         obj_request->length,
3226                                         obj_request->offset & ~PAGE_MASK,
3227                                         false, false);
3228         rbd_osd_req_format_read(obj_request);
3229
3230         ret = rbd_obj_request_submit(osdc, obj_request);
3231         if (ret)
3232                 goto out;
3233         ret = rbd_obj_request_wait(obj_request);
3234         if (ret)
3235                 goto out;
3236
3237         ret = obj_request->result;
3238         if (ret < 0)
3239                 goto out;
3240
3241         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3242         size = (size_t) obj_request->xferred;
3243         ceph_copy_from_page_vector(pages, buf, 0, size);
3244         rbd_assert(size <= (size_t)INT_MAX);
3245         ret = (int)size;
3246 out:
3247         if (obj_request)
3248                 rbd_obj_request_put(obj_request);
3249         else
3250                 ceph_release_page_vector(pages, page_count);
3251
3252         return ret;
3253 }
3254
3255 /*
3256  * Read the complete header for the given rbd device.  On successful
3257  * return, the rbd_dev->header field will contain up-to-date
3258  * information about the image.
3259  */
3260 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3261 {
3262         struct rbd_image_header_ondisk *ondisk = NULL;
3263         u32 snap_count = 0;
3264         u64 names_size = 0;
3265         u32 want_count;
3266         int ret;
3267
3268         /*
3269          * The complete header will include an array of its 64-bit
3270          * snapshot ids, followed by the names of those snapshots as
3271          * a contiguous block of NUL-terminated strings.  Note that
3272          * the number of snapshots could change by the time we read
3273          * it in, in which case we re-read it.
3274          */
3275         do {
3276                 size_t size;
3277
3278                 kfree(ondisk);
3279
3280                 size = sizeof (*ondisk);
3281                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3282                 size += names_size;
3283                 ondisk = kmalloc(size, GFP_KERNEL);
3284                 if (!ondisk)
3285                         return -ENOMEM;
3286
3287                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3288                                        0, size, ondisk);
3289                 if (ret < 0)
3290                         goto out;
3291                 if ((size_t)ret < size) {
3292                         ret = -ENXIO;
3293                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3294                                 size, ret);
3295                         goto out;
3296                 }
3297                 if (!rbd_dev_ondisk_valid(ondisk)) {
3298                         ret = -ENXIO;
3299                         rbd_warn(rbd_dev, "invalid header");
3300                         goto out;
3301                 }
3302
3303                 names_size = le64_to_cpu(ondisk->snap_names_len);
3304                 want_count = snap_count;
3305                 snap_count = le32_to_cpu(ondisk->snap_count);
3306         } while (snap_count != want_count);
3307
3308         ret = rbd_header_from_disk(rbd_dev, ondisk);
3309 out:
3310         kfree(ondisk);
3311
3312         return ret;
3313 }
3314
3315 /*
3316  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3317  * has disappeared from the (just updated) snapshot context.
3318  */
3319 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3320 {
3321         u64 snap_id;
3322
3323         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3324                 return;
3325
3326         snap_id = rbd_dev->spec->snap_id;
3327         if (snap_id == CEPH_NOSNAP)
3328                 return;
3329
3330         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3331                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3332 }
3333
3334 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3335 {
3336         u64 mapping_size;
3337         int ret;
3338
3339         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3340         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3341         mapping_size = rbd_dev->mapping.size;
3342         if (rbd_dev->image_format == 1)
3343                 ret = rbd_dev_v1_header_info(rbd_dev);
3344         else
3345                 ret = rbd_dev_v2_header_info(rbd_dev);
3346
3347         /* If it's a mapped snapshot, validate its EXISTS flag */
3348
3349         rbd_exists_validate(rbd_dev);
3350         mutex_unlock(&ctl_mutex);
3351         if (mapping_size != rbd_dev->mapping.size) {
3352                 sector_t size;
3353
3354                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3355                 dout("setting size to %llu sectors", (unsigned long long)size);
3356                 set_capacity(rbd_dev->disk, size);
3357                 revalidate_disk(rbd_dev->disk);
3358         }
3359
3360         return ret;
3361 }
3362
3363 static int rbd_init_disk(struct rbd_device *rbd_dev)
3364 {
3365         struct gendisk *disk;
3366         struct request_queue *q;
3367         u64 segment_size;
3368
3369         /* create gendisk info */
3370         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3371         if (!disk)
3372                 return -ENOMEM;
3373
3374         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3375                  rbd_dev->dev_id);
3376         disk->major = rbd_dev->major;
3377         disk->first_minor = 0;
3378         disk->fops = &rbd_bd_ops;
3379         disk->private_data = rbd_dev;
3380
3381         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3382         if (!q)
3383                 goto out_disk;
3384
3385         /* We use the default size, but let's be explicit about it. */
3386         blk_queue_physical_block_size(q, SECTOR_SIZE);
3387
3388         /* set io sizes to object size */
3389         segment_size = rbd_obj_bytes(&rbd_dev->header);
3390         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3391         blk_queue_max_segment_size(q, segment_size);
3392         blk_queue_io_min(q, segment_size);
3393         blk_queue_io_opt(q, segment_size);
3394
3395         blk_queue_merge_bvec(q, rbd_merge_bvec);
3396         disk->queue = q;
3397
3398         q->queuedata = rbd_dev;
3399
3400         rbd_dev->disk = disk;
3401
3402         return 0;
3403 out_disk:
3404         put_disk(disk);
3405
3406         return -ENOMEM;
3407 }
3408
3409 /*
3410   sysfs
3411 */
3412
3413 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3414 {
3415         return container_of(dev, struct rbd_device, dev);
3416 }
3417
3418 static ssize_t rbd_size_show(struct device *dev,
3419                              struct device_attribute *attr, char *buf)
3420 {
3421         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3422
3423         return sprintf(buf, "%llu\n",
3424                 (unsigned long long)rbd_dev->mapping.size);
3425 }
3426
3427 /*
3428  * Note this shows the features for whatever's mapped, which is not
3429  * necessarily the base image.
3430  */
3431 static ssize_t rbd_features_show(struct device *dev,
3432                              struct device_attribute *attr, char *buf)
3433 {
3434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3435
3436         return sprintf(buf, "0x%016llx\n",
3437                         (unsigned long long)rbd_dev->mapping.features);
3438 }
3439
3440 static ssize_t rbd_major_show(struct device *dev,
3441                               struct device_attribute *attr, char *buf)
3442 {
3443         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3444
3445         if (rbd_dev->major)
3446                 return sprintf(buf, "%d\n", rbd_dev->major);
3447
3448         return sprintf(buf, "(none)\n");
3449
3450 }
3451
3452 static ssize_t rbd_client_id_show(struct device *dev,
3453                                   struct device_attribute *attr, char *buf)
3454 {
3455         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3456
3457         return sprintf(buf, "client%lld\n",
3458                         ceph_client_id(rbd_dev->rbd_client->client));
3459 }
3460
3461 static ssize_t rbd_pool_show(struct device *dev,
3462                              struct device_attribute *attr, char *buf)
3463 {
3464         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3465
3466         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3467 }
3468
3469 static ssize_t rbd_pool_id_show(struct device *dev,
3470                              struct device_attribute *attr, char *buf)
3471 {
3472         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3473
3474         return sprintf(buf, "%llu\n",
3475                         (unsigned long long) rbd_dev->spec->pool_id);
3476 }
3477
3478 static ssize_t rbd_name_show(struct device *dev,
3479                              struct device_attribute *attr, char *buf)
3480 {
3481         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3482
3483         if (rbd_dev->spec->image_name)
3484                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3485
3486         return sprintf(buf, "(unknown)\n");
3487 }
3488
3489 static ssize_t rbd_image_id_show(struct device *dev,
3490                              struct device_attribute *attr, char *buf)
3491 {
3492         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3493
3494         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3495 }
3496
3497 /*
3498  * Shows the name of the currently-mapped snapshot (or
3499  * RBD_SNAP_HEAD_NAME for the base image).
3500  */
3501 static ssize_t rbd_snap_show(struct device *dev,
3502                              struct device_attribute *attr,
3503                              char *buf)
3504 {
3505         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3506
3507         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3508 }
3509
3510 /*
3511  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3512  * for the parent image.  If there is no parent, simply shows
3513  * "(no parent image)".
3514  */
3515 static ssize_t rbd_parent_show(struct device *dev,
3516                              struct device_attribute *attr,
3517                              char *buf)
3518 {
3519         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3520         struct rbd_spec *spec = rbd_dev->parent_spec;
3521         int count;
3522         char *bufp = buf;
3523
3524         if (!spec)
3525                 return sprintf(buf, "(no parent image)\n");
3526
3527         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3528                         (unsigned long long) spec->pool_id, spec->pool_name);
3529         if (count < 0)
3530                 return count;
3531         bufp += count;
3532
3533         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3534                         spec->image_name ? spec->image_name : "(unknown)");
3535         if (count < 0)
3536                 return count;
3537         bufp += count;
3538
3539         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3540                         (unsigned long long) spec->snap_id, spec->snap_name);
3541         if (count < 0)
3542                 return count;
3543         bufp += count;
3544
3545         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3546         if (count < 0)
3547                 return count;
3548         bufp += count;
3549
3550         return (ssize_t) (bufp - buf);
3551 }
3552
3553 static ssize_t rbd_image_refresh(struct device *dev,
3554                                  struct device_attribute *attr,
3555                                  const char *buf,
3556                                  size_t size)
3557 {
3558         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3559         int ret;
3560
3561         ret = rbd_dev_refresh(rbd_dev);
3562         if (ret)
3563                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3564
3565         return ret < 0 ? ret : size;
3566 }
3567
3568 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3569 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3570 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3571 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3572 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3573 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3574 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3575 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3576 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3577 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3578 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3579
3580 static struct attribute *rbd_attrs[] = {
3581         &dev_attr_size.attr,
3582         &dev_attr_features.attr,
3583         &dev_attr_major.attr,
3584         &dev_attr_client_id.attr,
3585         &dev_attr_pool.attr,
3586         &dev_attr_pool_id.attr,
3587         &dev_attr_name.attr,
3588         &dev_attr_image_id.attr,
3589         &dev_attr_current_snap.attr,
3590         &dev_attr_parent.attr,
3591         &dev_attr_refresh.attr,
3592         NULL
3593 };
3594
3595 static struct attribute_group rbd_attr_group = {
3596         .attrs = rbd_attrs,
3597 };
3598
3599 static const struct attribute_group *rbd_attr_groups[] = {
3600         &rbd_attr_group,
3601         NULL
3602 };
3603
3604 static void rbd_sysfs_dev_release(struct device *dev)
3605 {
3606 }
3607
3608 static struct device_type rbd_device_type = {
3609         .name           = "rbd",
3610         .groups         = rbd_attr_groups,
3611         .release        = rbd_sysfs_dev_release,
3612 };
3613
3614 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3615 {
3616         kref_get(&spec->kref);
3617
3618         return spec;
3619 }
3620
3621 static void rbd_spec_free(struct kref *kref);
3622 static void rbd_spec_put(struct rbd_spec *spec)
3623 {
3624         if (spec)
3625                 kref_put(&spec->kref, rbd_spec_free);
3626 }
3627
3628 static struct rbd_spec *rbd_spec_alloc(void)
3629 {
3630         struct rbd_spec *spec;
3631
3632         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3633         if (!spec)
3634                 return NULL;
3635         kref_init(&spec->kref);
3636
3637         return spec;
3638 }
3639
3640 static void rbd_spec_free(struct kref *kref)
3641 {
3642         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3643
3644         kfree(spec->pool_name);
3645         kfree(spec->image_id);
3646         kfree(spec->image_name);
3647         kfree(spec->snap_name);
3648         kfree(spec);
3649 }
3650
3651 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3652                                 struct rbd_spec *spec)
3653 {
3654         struct rbd_device *rbd_dev;
3655
3656         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3657         if (!rbd_dev)
3658                 return NULL;
3659
3660         spin_lock_init(&rbd_dev->lock);
3661         rbd_dev->flags = 0;
3662         atomic_set(&rbd_dev->parent_ref, 0);
3663         INIT_LIST_HEAD(&rbd_dev->node);
3664         init_rwsem(&rbd_dev->header_rwsem);
3665
3666         rbd_dev->spec = spec;
3667         rbd_dev->rbd_client = rbdc;
3668
3669         /* Initialize the layout used for all rbd requests */
3670
3671         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3672         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3673         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3674         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3675
3676         return rbd_dev;
3677 }
3678
3679 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3680 {
3681         rbd_put_client(rbd_dev->rbd_client);
3682         rbd_spec_put(rbd_dev->spec);
3683         kfree(rbd_dev);
3684 }
3685
3686 /*
3687  * Get the size and object order for an image snapshot, or if
3688  * snap_id is CEPH_NOSNAP, gets this information for the base
3689  * image.
3690  */
3691 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3692                                 u8 *order, u64 *snap_size)
3693 {
3694         __le64 snapid = cpu_to_le64(snap_id);
3695         int ret;
3696         struct {
3697                 u8 order;
3698                 __le64 size;
3699         } __attribute__ ((packed)) size_buf = { 0 };
3700
3701         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3702                                 "rbd", "get_size",
3703                                 &snapid, sizeof (snapid),
3704                                 &size_buf, sizeof (size_buf));
3705         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3706         if (ret < 0)
3707                 return ret;
3708         if (ret < sizeof (size_buf))
3709                 return -ERANGE;
3710
3711         if (order)
3712                 *order = size_buf.order;
3713         *snap_size = le64_to_cpu(size_buf.size);
3714
3715         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3716                 (unsigned long long)snap_id, (unsigned int)*order,
3717                 (unsigned long long)*snap_size);
3718
3719         return 0;
3720 }
3721
3722 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3723 {
3724         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3725                                         &rbd_dev->header.obj_order,
3726                                         &rbd_dev->header.image_size);
3727 }
3728
3729 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3730 {
3731         void *reply_buf;
3732         int ret;
3733         void *p;
3734
3735         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3736         if (!reply_buf)
3737                 return -ENOMEM;
3738
3739         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3740                                 "rbd", "get_object_prefix", NULL, 0,
3741                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3742         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3743         if (ret < 0)
3744                 goto out;
3745
3746         p = reply_buf;
3747         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3748                                                 p + ret, NULL, GFP_NOIO);
3749         ret = 0;
3750
3751         if (IS_ERR(rbd_dev->header.object_prefix)) {
3752                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3753                 rbd_dev->header.object_prefix = NULL;
3754         } else {
3755                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3756         }
3757 out:
3758         kfree(reply_buf);
3759
3760         return ret;
3761 }
3762
3763 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3764                 u64 *snap_features)
3765 {
3766         __le64 snapid = cpu_to_le64(snap_id);
3767         struct {
3768                 __le64 features;
3769                 __le64 incompat;
3770         } __attribute__ ((packed)) features_buf = { 0 };
3771         u64 incompat;
3772         int ret;
3773
3774         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3775                                 "rbd", "get_features",
3776                                 &snapid, sizeof (snapid),
3777                                 &features_buf, sizeof (features_buf));
3778         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3779         if (ret < 0)
3780                 return ret;
3781         if (ret < sizeof (features_buf))
3782                 return -ERANGE;
3783
3784         incompat = le64_to_cpu(features_buf.incompat);
3785         if (incompat & ~RBD_FEATURES_SUPPORTED)
3786                 return -ENXIO;
3787
3788         *snap_features = le64_to_cpu(features_buf.features);
3789
3790         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3791                 (unsigned long long)snap_id,
3792                 (unsigned long long)*snap_features,
3793                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3794
3795         return 0;
3796 }
3797
3798 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3799 {
3800         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3801                                                 &rbd_dev->header.features);
3802 }
3803
3804 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3805 {
3806         struct rbd_spec *parent_spec;
3807         size_t size;
3808         void *reply_buf = NULL;
3809         __le64 snapid;
3810         void *p;
3811         void *end;
3812         u64 pool_id;
3813         char *image_id;
3814         u64 snap_id;
3815         u64 overlap;
3816         int ret;
3817
3818         parent_spec = rbd_spec_alloc();
3819         if (!parent_spec)
3820                 return -ENOMEM;
3821
3822         size = sizeof (__le64) +                                /* pool_id */
3823                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3824                 sizeof (__le64) +                               /* snap_id */
3825                 sizeof (__le64);                                /* overlap */
3826         reply_buf = kmalloc(size, GFP_KERNEL);
3827         if (!reply_buf) {
3828                 ret = -ENOMEM;
3829                 goto out_err;
3830         }
3831
3832         snapid = cpu_to_le64(CEPH_NOSNAP);
3833         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3834                                 "rbd", "get_parent",
3835                                 &snapid, sizeof (snapid),
3836                                 reply_buf, size);
3837         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3838         if (ret < 0)
3839                 goto out_err;
3840
3841         p = reply_buf;
3842         end = reply_buf + ret;
3843         ret = -ERANGE;
3844         ceph_decode_64_safe(&p, end, pool_id, out_err);
3845         if (pool_id == CEPH_NOPOOL) {
3846                 /*
3847                  * Either the parent never existed, or we have
3848                  * record of it but the image got flattened so it no
3849                  * longer has a parent.  When the parent of a
3850                  * layered image disappears we immediately set the
3851                  * overlap to 0.  The effect of this is that all new
3852                  * requests will be treated as if the image had no
3853                  * parent.
3854                  */
3855                 if (rbd_dev->parent_overlap) {
3856                         rbd_dev->parent_overlap = 0;
3857                         smp_mb();
3858                         rbd_dev_parent_put(rbd_dev);
3859                         pr_info("%s: clone image has been flattened\n",
3860                                 rbd_dev->disk->disk_name);
3861                 }
3862
3863                 goto out;       /* No parent?  No problem. */
3864         }
3865
3866         /* The ceph file layout needs to fit pool id in 32 bits */
3867
3868         ret = -EIO;
3869         if (pool_id > (u64)U32_MAX) {
3870                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3871                         (unsigned long long)pool_id, U32_MAX);
3872                 goto out_err;
3873         }
3874
3875         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3876         if (IS_ERR(image_id)) {
3877                 ret = PTR_ERR(image_id);
3878                 goto out_err;
3879         }
3880         ceph_decode_64_safe(&p, end, snap_id, out_err);
3881         ceph_decode_64_safe(&p, end, overlap, out_err);
3882
3883         /*
3884          * The parent won't change (except when the clone is
3885          * flattened, already handled that).  So we only need to
3886          * record the parent spec we have not already done so.
3887          */
3888         if (!rbd_dev->parent_spec) {
3889                 parent_spec->pool_id = pool_id;
3890                 parent_spec->image_id = image_id;
3891                 parent_spec->snap_id = snap_id;
3892                 rbd_dev->parent_spec = parent_spec;
3893                 parent_spec = NULL;     /* rbd_dev now owns this */
3894         }
3895
3896         /*
3897          * We always update the parent overlap.  If it's zero we
3898          * treat it specially.
3899          */
3900         rbd_dev->parent_overlap = overlap;
3901         smp_mb();
3902         if (!overlap) {
3903
3904                 /* A null parent_spec indicates it's the initial probe */
3905
3906                 if (parent_spec) {
3907                         /*
3908                          * The overlap has become zero, so the clone
3909                          * must have been resized down to 0 at some
3910                          * point.  Treat this the same as a flatten.
3911                          */
3912                         rbd_dev_parent_put(rbd_dev);
3913                         pr_info("%s: clone image now standalone\n",
3914                                 rbd_dev->disk->disk_name);
3915                 } else {
3916                         /*
3917                          * For the initial probe, if we find the
3918                          * overlap is zero we just pretend there was
3919                          * no parent image.
3920                          */
3921                         rbd_warn(rbd_dev, "ignoring parent of "
3922                                                 "clone with overlap 0\n");
3923                 }
3924         }
3925 out:
3926         ret = 0;
3927 out_err:
3928         kfree(reply_buf);
3929         rbd_spec_put(parent_spec);
3930
3931         return ret;
3932 }
3933
3934 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3935 {
3936         struct {
3937                 __le64 stripe_unit;
3938                 __le64 stripe_count;
3939         } __attribute__ ((packed)) striping_info_buf = { 0 };
3940         size_t size = sizeof (striping_info_buf);
3941         void *p;
3942         u64 obj_size;
3943         u64 stripe_unit;
3944         u64 stripe_count;
3945         int ret;
3946
3947         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3948                                 "rbd", "get_stripe_unit_count", NULL, 0,
3949                                 (char *)&striping_info_buf, size);
3950         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3951         if (ret < 0)
3952                 return ret;
3953         if (ret < size)
3954                 return -ERANGE;
3955
3956         /*
3957          * We don't actually support the "fancy striping" feature
3958          * (STRIPINGV2) yet, but if the striping sizes are the
3959          * defaults the behavior is the same as before.  So find
3960          * out, and only fail if the image has non-default values.
3961          */
3962         ret = -EINVAL;
3963         obj_size = (u64)1 << rbd_dev->header.obj_order;
3964         p = &striping_info_buf;
3965         stripe_unit = ceph_decode_64(&p);
3966         if (stripe_unit != obj_size) {
3967                 rbd_warn(rbd_dev, "unsupported stripe unit "
3968                                 "(got %llu want %llu)",
3969                                 stripe_unit, obj_size);
3970                 return -EINVAL;
3971         }
3972         stripe_count = ceph_decode_64(&p);
3973         if (stripe_count != 1) {
3974                 rbd_warn(rbd_dev, "unsupported stripe count "
3975                                 "(got %llu want 1)", stripe_count);
3976                 return -EINVAL;
3977         }
3978         rbd_dev->header.stripe_unit = stripe_unit;
3979         rbd_dev->header.stripe_count = stripe_count;
3980
3981         return 0;
3982 }
3983
3984 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3985 {
3986         size_t image_id_size;
3987         char *image_id;
3988         void *p;
3989         void *end;
3990         size_t size;
3991         void *reply_buf = NULL;
3992         size_t len = 0;
3993         char *image_name = NULL;
3994         int ret;
3995
3996         rbd_assert(!rbd_dev->spec->image_name);
3997
3998         len = strlen(rbd_dev->spec->image_id);
3999         image_id_size = sizeof (__le32) + len;
4000         image_id = kmalloc(image_id_size, GFP_KERNEL);
4001         if (!image_id)
4002                 return NULL;
4003
4004         p = image_id;
4005         end = image_id + image_id_size;
4006         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4007
4008         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4009         reply_buf = kmalloc(size, GFP_KERNEL);
4010         if (!reply_buf)
4011                 goto out;
4012
4013         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4014                                 "rbd", "dir_get_name",
4015                                 image_id, image_id_size,
4016                                 reply_buf, size);
4017         if (ret < 0)
4018                 goto out;
4019         p = reply_buf;
4020         end = reply_buf + ret;
4021
4022         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4023         if (IS_ERR(image_name))
4024                 image_name = NULL;
4025         else
4026                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4027 out:
4028         kfree(reply_buf);
4029         kfree(image_id);
4030
4031         return image_name;
4032 }
4033
4034 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4035 {
4036         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4037         const char *snap_name;
4038         u32 which = 0;
4039
4040         /* Skip over names until we find the one we are looking for */
4041
4042         snap_name = rbd_dev->header.snap_names;
4043         while (which < snapc->num_snaps) {
4044                 if (!strcmp(name, snap_name))
4045                         return snapc->snaps[which];
4046                 snap_name += strlen(snap_name) + 1;
4047                 which++;
4048         }
4049         return CEPH_NOSNAP;
4050 }
4051
4052 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4053 {
4054         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4055         u32 which;
4056         bool found = false;
4057         u64 snap_id;
4058
4059         for (which = 0; !found && which < snapc->num_snaps; which++) {
4060                 const char *snap_name;
4061
4062                 snap_id = snapc->snaps[which];
4063                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4064                 if (IS_ERR(snap_name))
4065                         break;
4066                 found = !strcmp(name, snap_name);
4067                 kfree(snap_name);
4068         }
4069         return found ? snap_id : CEPH_NOSNAP;
4070 }
4071
4072 /*
4073  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4074  * no snapshot by that name is found, or if an error occurs.
4075  */
4076 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4077 {
4078         if (rbd_dev->image_format == 1)
4079                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4080
4081         return rbd_v2_snap_id_by_name(rbd_dev, name);
4082 }
4083
4084 /*
4085  * When an rbd image has a parent image, it is identified by the
4086  * pool, image, and snapshot ids (not names).  This function fills
4087  * in the names for those ids.  (It's OK if we can't figure out the
4088  * name for an image id, but the pool and snapshot ids should always
4089  * exist and have names.)  All names in an rbd spec are dynamically
4090  * allocated.
4091  *
4092  * When an image being mapped (not a parent) is probed, we have the
4093  * pool name and pool id, image name and image id, and the snapshot
4094  * name.  The only thing we're missing is the snapshot id.
4095  */
4096 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4097 {
4098         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4099         struct rbd_spec *spec = rbd_dev->spec;
4100         const char *pool_name;
4101         const char *image_name;
4102         const char *snap_name;
4103         int ret;
4104
4105         /*
4106          * An image being mapped will have the pool name (etc.), but
4107          * we need to look up the snapshot id.
4108          */
4109         if (spec->pool_name) {
4110                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4111                         u64 snap_id;
4112
4113                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4114                         if (snap_id == CEPH_NOSNAP)
4115                                 return -ENOENT;
4116                         spec->snap_id = snap_id;
4117                 } else {
4118                         spec->snap_id = CEPH_NOSNAP;
4119                 }
4120
4121                 return 0;
4122         }
4123
4124         /* Get the pool name; we have to make our own copy of this */
4125
4126         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4127         if (!pool_name) {
4128                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4129                 return -EIO;
4130         }
4131         pool_name = kstrdup(pool_name, GFP_KERNEL);
4132         if (!pool_name)
4133                 return -ENOMEM;
4134
4135         /* Fetch the image name; tolerate failure here */
4136
4137         image_name = rbd_dev_image_name(rbd_dev);
4138         if (!image_name)
4139                 rbd_warn(rbd_dev, "unable to get image name");
4140
4141         /* Look up the snapshot name, and make a copy */
4142
4143         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4144         if (!snap_name) {
4145                 ret = -ENOMEM;
4146                 goto out_err;
4147         }
4148
4149         spec->pool_name = pool_name;
4150         spec->image_name = image_name;
4151         spec->snap_name = snap_name;
4152
4153         return 0;
4154 out_err:
4155         kfree(image_name);
4156         kfree(pool_name);
4157
4158         return ret;
4159 }
4160
4161 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4162 {
4163         size_t size;
4164         int ret;
4165         void *reply_buf;
4166         void *p;
4167         void *end;
4168         u64 seq;
4169         u32 snap_count;
4170         struct ceph_snap_context *snapc;
4171         u32 i;
4172
4173         /*
4174          * We'll need room for the seq value (maximum snapshot id),
4175          * snapshot count, and array of that many snapshot ids.
4176          * For now we have a fixed upper limit on the number we're
4177          * prepared to receive.
4178          */
4179         size = sizeof (__le64) + sizeof (__le32) +
4180                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4181         reply_buf = kzalloc(size, GFP_KERNEL);
4182         if (!reply_buf)
4183                 return -ENOMEM;
4184
4185         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4186                                 "rbd", "get_snapcontext", NULL, 0,
4187                                 reply_buf, size);
4188         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4189         if (ret < 0)
4190                 goto out;
4191
4192         p = reply_buf;
4193         end = reply_buf + ret;
4194         ret = -ERANGE;
4195         ceph_decode_64_safe(&p, end, seq, out);
4196         ceph_decode_32_safe(&p, end, snap_count, out);
4197
4198         /*
4199          * Make sure the reported number of snapshot ids wouldn't go
4200          * beyond the end of our buffer.  But before checking that,
4201          * make sure the computed size of the snapshot context we
4202          * allocate is representable in a size_t.
4203          */
4204         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4205                                  / sizeof (u64)) {
4206                 ret = -EINVAL;
4207                 goto out;
4208         }
4209         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4210                 goto out;
4211         ret = 0;
4212
4213         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4214         if (!snapc) {
4215                 ret = -ENOMEM;
4216                 goto out;
4217         }
4218         snapc->seq = seq;
4219         for (i = 0; i < snap_count; i++)
4220                 snapc->snaps[i] = ceph_decode_64(&p);
4221
4222         ceph_put_snap_context(rbd_dev->header.snapc);
4223         rbd_dev->header.snapc = snapc;
4224
4225         dout("  snap context seq = %llu, snap_count = %u\n",
4226                 (unsigned long long)seq, (unsigned int)snap_count);
4227 out:
4228         kfree(reply_buf);
4229
4230         return ret;
4231 }
4232
4233 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4234                                         u64 snap_id)
4235 {
4236         size_t size;
4237         void *reply_buf;
4238         __le64 snapid;
4239         int ret;
4240         void *p;
4241         void *end;
4242         char *snap_name;
4243
4244         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4245         reply_buf = kmalloc(size, GFP_KERNEL);
4246         if (!reply_buf)
4247                 return ERR_PTR(-ENOMEM);
4248
4249         snapid = cpu_to_le64(snap_id);
4250         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4251                                 "rbd", "get_snapshot_name",
4252                                 &snapid, sizeof (snapid),
4253                                 reply_buf, size);
4254         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4255         if (ret < 0) {
4256                 snap_name = ERR_PTR(ret);
4257                 goto out;
4258         }
4259
4260         p = reply_buf;
4261         end = reply_buf + ret;
4262         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4263         if (IS_ERR(snap_name))
4264                 goto out;
4265
4266         dout("  snap_id 0x%016llx snap_name = %s\n",
4267                 (unsigned long long)snap_id, snap_name);
4268 out:
4269         kfree(reply_buf);
4270
4271         return snap_name;
4272 }
4273
4274 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4275 {
4276         bool first_time = rbd_dev->header.object_prefix == NULL;
4277         int ret;
4278
4279         down_write(&rbd_dev->header_rwsem);
4280
4281         ret = rbd_dev_v2_image_size(rbd_dev);
4282         if (ret)
4283                 goto out;
4284
4285         if (first_time) {
4286                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4287                 if (ret)
4288                         goto out;
4289         }
4290
4291         /*
4292          * If the image supports layering, get the parent info.  We
4293          * need to probe the first time regardless.  Thereafter we
4294          * only need to if there's a parent, to see if it has
4295          * disappeared due to the mapped image getting flattened.
4296          */
4297         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4298                         (first_time || rbd_dev->parent_spec)) {
4299                 bool warn;
4300
4301                 ret = rbd_dev_v2_parent_info(rbd_dev);
4302                 if (ret)
4303                         goto out;
4304
4305                 /*
4306                  * Print a warning if this is the initial probe and
4307                  * the image has a parent.  Don't print it if the
4308                  * image now being probed is itself a parent.  We
4309                  * can tell at this point because we won't know its
4310                  * pool name yet (just its pool id).
4311                  */
4312                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4313                 if (first_time && warn)
4314                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4315                                         "is EXPERIMENTAL!");
4316         }
4317
4318         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4319                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4320                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4321
4322         ret = rbd_dev_v2_snap_context(rbd_dev);
4323         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4324 out:
4325         up_write(&rbd_dev->header_rwsem);
4326
4327         return ret;
4328 }
4329
4330 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4331 {
4332         struct device *dev;
4333         int ret;
4334
4335         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4336
4337         dev = &rbd_dev->dev;
4338         dev->bus = &rbd_bus_type;
4339         dev->type = &rbd_device_type;
4340         dev->parent = &rbd_root_dev;
4341         dev->release = rbd_dev_device_release;
4342         dev_set_name(dev, "%d", rbd_dev->dev_id);
4343         ret = device_register(dev);
4344
4345         mutex_unlock(&ctl_mutex);
4346
4347         return ret;
4348 }
4349
4350 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4351 {
4352         device_unregister(&rbd_dev->dev);
4353 }
4354
4355 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4356
4357 /*
4358  * Get a unique rbd identifier for the given new rbd_dev, and add
4359  * the rbd_dev to the global list.  The minimum rbd id is 1.
4360  */
4361 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4362 {
4363         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4364
4365         spin_lock(&rbd_dev_list_lock);
4366         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4367         spin_unlock(&rbd_dev_list_lock);
4368         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4369                 (unsigned long long) rbd_dev->dev_id);
4370 }
4371
4372 /*
4373  * Remove an rbd_dev from the global list, and record that its
4374  * identifier is no longer in use.
4375  */
4376 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4377 {
4378         struct list_head *tmp;
4379         int rbd_id = rbd_dev->dev_id;
4380         int max_id;
4381
4382         rbd_assert(rbd_id > 0);
4383
4384         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4385                 (unsigned long long) rbd_dev->dev_id);
4386         spin_lock(&rbd_dev_list_lock);
4387         list_del_init(&rbd_dev->node);
4388
4389         /*
4390          * If the id being "put" is not the current maximum, there
4391          * is nothing special we need to do.
4392          */
4393         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4394                 spin_unlock(&rbd_dev_list_lock);
4395                 return;
4396         }
4397
4398         /*
4399          * We need to update the current maximum id.  Search the
4400          * list to find out what it is.  We're more likely to find
4401          * the maximum at the end, so search the list backward.
4402          */
4403         max_id = 0;
4404         list_for_each_prev(tmp, &rbd_dev_list) {
4405                 struct rbd_device *rbd_dev;
4406
4407                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4408                 if (rbd_dev->dev_id > max_id)
4409                         max_id = rbd_dev->dev_id;
4410         }
4411         spin_unlock(&rbd_dev_list_lock);
4412
4413         /*
4414          * The max id could have been updated by rbd_dev_id_get(), in
4415          * which case it now accurately reflects the new maximum.
4416          * Be careful not to overwrite the maximum value in that
4417          * case.
4418          */
4419         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4420         dout("  max dev id has been reset\n");
4421 }
4422
4423 /*
4424  * Skips over white space at *buf, and updates *buf to point to the
4425  * first found non-space character (if any). Returns the length of
4426  * the token (string of non-white space characters) found.  Note
4427  * that *buf must be terminated with '\0'.
4428  */
4429 static inline size_t next_token(const char **buf)
4430 {
4431         /*
4432         * These are the characters that produce nonzero for
4433         * isspace() in the "C" and "POSIX" locales.
4434         */
4435         const char *spaces = " \f\n\r\t\v";
4436
4437         *buf += strspn(*buf, spaces);   /* Find start of token */
4438
4439         return strcspn(*buf, spaces);   /* Return token length */
4440 }
4441
4442 /*
4443  * Finds the next token in *buf, and if the provided token buffer is
4444  * big enough, copies the found token into it.  The result, if
4445  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4446  * must be terminated with '\0' on entry.
4447  *
4448  * Returns the length of the token found (not including the '\0').
4449  * Return value will be 0 if no token is found, and it will be >=
4450  * token_size if the token would not fit.
4451  *
4452  * The *buf pointer will be updated to point beyond the end of the
4453  * found token.  Note that this occurs even if the token buffer is
4454  * too small to hold it.
4455  */
4456 static inline size_t copy_token(const char **buf,
4457                                 char *token,
4458                                 size_t token_size)
4459 {
4460         size_t len;
4461
4462         len = next_token(buf);
4463         if (len < token_size) {
4464                 memcpy(token, *buf, len);
4465                 *(token + len) = '\0';
4466         }
4467         *buf += len;
4468
4469         return len;
4470 }
4471
4472 /*
4473  * Finds the next token in *buf, dynamically allocates a buffer big
4474  * enough to hold a copy of it, and copies the token into the new
4475  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4476  * that a duplicate buffer is created even for a zero-length token.
4477  *
4478  * Returns a pointer to the newly-allocated duplicate, or a null
4479  * pointer if memory for the duplicate was not available.  If
4480  * the lenp argument is a non-null pointer, the length of the token
4481  * (not including the '\0') is returned in *lenp.
4482  *
4483  * If successful, the *buf pointer will be updated to point beyond
4484  * the end of the found token.
4485  *
4486  * Note: uses GFP_KERNEL for allocation.
4487  */
4488 static inline char *dup_token(const char **buf, size_t *lenp)
4489 {
4490         char *dup;
4491         size_t len;
4492
4493         len = next_token(buf);
4494         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4495         if (!dup)
4496                 return NULL;
4497         *(dup + len) = '\0';
4498         *buf += len;
4499
4500         if (lenp)
4501                 *lenp = len;
4502
4503         return dup;
4504 }
4505
4506 /*
4507  * Parse the options provided for an "rbd add" (i.e., rbd image
4508  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4509  * and the data written is passed here via a NUL-terminated buffer.
4510  * Returns 0 if successful or an error code otherwise.
4511  *
4512  * The information extracted from these options is recorded in
4513  * the other parameters which return dynamically-allocated
4514  * structures:
4515  *  ceph_opts
4516  *      The address of a pointer that will refer to a ceph options
4517  *      structure.  Caller must release the returned pointer using
4518  *      ceph_destroy_options() when it is no longer needed.
4519  *  rbd_opts
4520  *      Address of an rbd options pointer.  Fully initialized by
4521  *      this function; caller must release with kfree().
4522  *  spec
4523  *      Address of an rbd image specification pointer.  Fully
4524  *      initialized by this function based on parsed options.
4525  *      Caller must release with rbd_spec_put().
4526  *
4527  * The options passed take this form:
4528  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4529  * where:
4530  *  <mon_addrs>
4531  *      A comma-separated list of one or more monitor addresses.
4532  *      A monitor address is an ip address, optionally followed
4533  *      by a port number (separated by a colon).
4534  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4535  *  <options>
4536  *      A comma-separated list of ceph and/or rbd options.
4537  *  <pool_name>
4538  *      The name of the rados pool containing the rbd image.
4539  *  <image_name>
4540  *      The name of the image in that pool to map.
4541  *  <snap_id>
4542  *      An optional snapshot id.  If provided, the mapping will
4543  *      present data from the image at the time that snapshot was
4544  *      created.  The image head is used if no snapshot id is
4545  *      provided.  Snapshot mappings are always read-only.
4546  */
4547 static int rbd_add_parse_args(const char *buf,
4548                                 struct ceph_options **ceph_opts,
4549                                 struct rbd_options **opts,
4550                                 struct rbd_spec **rbd_spec)
4551 {
4552         size_t len;
4553         char *options;
4554         const char *mon_addrs;
4555         char *snap_name;
4556         size_t mon_addrs_size;
4557         struct rbd_spec *spec = NULL;
4558         struct rbd_options *rbd_opts = NULL;
4559         struct ceph_options *copts;
4560         int ret;
4561
4562         /* The first four tokens are required */
4563
4564         len = next_token(&buf);
4565         if (!len) {
4566                 rbd_warn(NULL, "no monitor address(es) provided");
4567                 return -EINVAL;
4568         }
4569         mon_addrs = buf;
4570         mon_addrs_size = len + 1;
4571         buf += len;
4572
4573         ret = -EINVAL;
4574         options = dup_token(&buf, NULL);
4575         if (!options)
4576                 return -ENOMEM;
4577         if (!*options) {
4578                 rbd_warn(NULL, "no options provided");
4579                 goto out_err;
4580         }
4581
4582         spec = rbd_spec_alloc();
4583         if (!spec)
4584                 goto out_mem;
4585
4586         spec->pool_name = dup_token(&buf, NULL);
4587         if (!spec->pool_name)
4588                 goto out_mem;
4589         if (!*spec->pool_name) {
4590                 rbd_warn(NULL, "no pool name provided");
4591                 goto out_err;
4592         }
4593
4594         spec->image_name = dup_token(&buf, NULL);
4595         if (!spec->image_name)
4596                 goto out_mem;
4597         if (!*spec->image_name) {
4598                 rbd_warn(NULL, "no image name provided");
4599                 goto out_err;
4600         }
4601
4602         /*
4603          * Snapshot name is optional; default is to use "-"
4604          * (indicating the head/no snapshot).
4605          */
4606         len = next_token(&buf);
4607         if (!len) {
4608                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4609                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4610         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4611                 ret = -ENAMETOOLONG;
4612                 goto out_err;
4613         }
4614         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4615         if (!snap_name)
4616                 goto out_mem;
4617         *(snap_name + len) = '\0';
4618         spec->snap_name = snap_name;
4619
4620         /* Initialize all rbd options to the defaults */
4621
4622         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4623         if (!rbd_opts)
4624                 goto out_mem;
4625
4626         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4627
4628         copts = ceph_parse_options(options, mon_addrs,
4629                                         mon_addrs + mon_addrs_size - 1,
4630                                         parse_rbd_opts_token, rbd_opts);
4631         if (IS_ERR(copts)) {
4632                 ret = PTR_ERR(copts);
4633                 goto out_err;
4634         }
4635         kfree(options);
4636
4637         *ceph_opts = copts;
4638         *opts = rbd_opts;
4639         *rbd_spec = spec;
4640
4641         return 0;
4642 out_mem:
4643         ret = -ENOMEM;
4644 out_err:
4645         kfree(rbd_opts);
4646         rbd_spec_put(spec);
4647         kfree(options);
4648
4649         return ret;
4650 }
4651
4652 /*
4653  * An rbd format 2 image has a unique identifier, distinct from the
4654  * name given to it by the user.  Internally, that identifier is
4655  * what's used to specify the names of objects related to the image.
4656  *
4657  * A special "rbd id" object is used to map an rbd image name to its
4658  * id.  If that object doesn't exist, then there is no v2 rbd image
4659  * with the supplied name.
4660  *
4661  * This function will record the given rbd_dev's image_id field if
4662  * it can be determined, and in that case will return 0.  If any
4663  * errors occur a negative errno will be returned and the rbd_dev's
4664  * image_id field will be unchanged (and should be NULL).
4665  */
4666 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4667 {
4668         int ret;
4669         size_t size;
4670         char *object_name;
4671         void *response;
4672         char *image_id;
4673
4674         /*
4675          * When probing a parent image, the image id is already
4676          * known (and the image name likely is not).  There's no
4677          * need to fetch the image id again in this case.  We
4678          * do still need to set the image format though.
4679          */
4680         if (rbd_dev->spec->image_id) {
4681                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4682
4683                 return 0;
4684         }
4685
4686         /*
4687          * First, see if the format 2 image id file exists, and if
4688          * so, get the image's persistent id from it.
4689          */
4690         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4691         object_name = kmalloc(size, GFP_NOIO);
4692         if (!object_name)
4693                 return -ENOMEM;
4694         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4695         dout("rbd id object name is %s\n", object_name);
4696
4697         /* Response will be an encoded string, which includes a length */
4698
4699         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4700         response = kzalloc(size, GFP_NOIO);
4701         if (!response) {
4702                 ret = -ENOMEM;
4703                 goto out;
4704         }
4705
4706         /* If it doesn't exist we'll assume it's a format 1 image */
4707
4708         ret = rbd_obj_method_sync(rbd_dev, object_name,
4709                                 "rbd", "get_id", NULL, 0,
4710                                 response, RBD_IMAGE_ID_LEN_MAX);
4711         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4712         if (ret == -ENOENT) {
4713                 image_id = kstrdup("", GFP_KERNEL);
4714                 ret = image_id ? 0 : -ENOMEM;
4715                 if (!ret)
4716                         rbd_dev->image_format = 1;
4717         } else if (ret > sizeof (__le32)) {
4718                 void *p = response;
4719
4720                 image_id = ceph_extract_encoded_string(&p, p + ret,
4721                                                 NULL, GFP_NOIO);
4722                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4723                 if (!ret)
4724                         rbd_dev->image_format = 2;
4725         } else {
4726                 ret = -EINVAL;
4727         }
4728
4729         if (!ret) {
4730                 rbd_dev->spec->image_id = image_id;
4731                 dout("image_id is %s\n", image_id);
4732         }
4733 out:
4734         kfree(response);
4735         kfree(object_name);
4736
4737         return ret;
4738 }
4739
4740 /*
4741  * Undo whatever state changes are made by v1 or v2 header info
4742  * call.
4743  */
4744 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4745 {
4746         struct rbd_image_header *header;
4747
4748         /* Drop parent reference unless it's already been done (or none) */
4749
4750         if (rbd_dev->parent_overlap)
4751                 rbd_dev_parent_put(rbd_dev);
4752
4753         /* Free dynamic fields from the header, then zero it out */
4754
4755         header = &rbd_dev->header;
4756         ceph_put_snap_context(header->snapc);
4757         kfree(header->snap_sizes);
4758         kfree(header->snap_names);
4759         kfree(header->object_prefix);
4760         memset(header, 0, sizeof (*header));
4761 }
4762
4763 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4764 {
4765         int ret;
4766
4767         ret = rbd_dev_v2_object_prefix(rbd_dev);
4768         if (ret)
4769                 goto out_err;
4770
4771         /*
4772          * Get the and check features for the image.  Currently the
4773          * features are assumed to never change.
4774          */
4775         ret = rbd_dev_v2_features(rbd_dev);
4776         if (ret)
4777                 goto out_err;
4778
4779         /* If the image supports fancy striping, get its parameters */
4780
4781         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4782                 ret = rbd_dev_v2_striping_info(rbd_dev);
4783                 if (ret < 0)
4784                         goto out_err;
4785         }
4786         /* No support for crypto and compression type format 2 images */
4787
4788         return 0;
4789 out_err:
4790         rbd_dev->header.features = 0;
4791         kfree(rbd_dev->header.object_prefix);
4792         rbd_dev->header.object_prefix = NULL;
4793
4794         return ret;
4795 }
4796
4797 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4798 {
4799         struct rbd_device *parent = NULL;
4800         struct rbd_spec *parent_spec;
4801         struct rbd_client *rbdc;
4802         int ret;
4803
4804         if (!rbd_dev->parent_spec)
4805                 return 0;
4806         /*
4807          * We need to pass a reference to the client and the parent
4808          * spec when creating the parent rbd_dev.  Images related by
4809          * parent/child relationships always share both.
4810          */
4811         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4812         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4813
4814         ret = -ENOMEM;
4815         parent = rbd_dev_create(rbdc, parent_spec);
4816         if (!parent)
4817                 goto out_err;
4818
4819         ret = rbd_dev_image_probe(parent, false);
4820         if (ret < 0)
4821                 goto out_err;
4822         rbd_dev->parent = parent;
4823         atomic_set(&rbd_dev->parent_ref, 1);
4824
4825         return 0;
4826 out_err:
4827         if (parent) {
4828                 rbd_dev_unparent(rbd_dev);
4829                 kfree(rbd_dev->header_name);
4830                 rbd_dev_destroy(parent);
4831         } else {
4832                 rbd_put_client(rbdc);
4833                 rbd_spec_put(parent_spec);
4834         }
4835
4836         return ret;
4837 }
4838
4839 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4840 {
4841         int ret;
4842
4843         /* generate unique id: find highest unique id, add one */
4844         rbd_dev_id_get(rbd_dev);
4845
4846         /* Fill in the device name, now that we have its id. */
4847         BUILD_BUG_ON(DEV_NAME_LEN
4848                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4849         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4850
4851         /* Get our block major device number. */
4852
4853         ret = register_blkdev(0, rbd_dev->name);
4854         if (ret < 0)
4855                 goto err_out_id;
4856         rbd_dev->major = ret;
4857
4858         /* Set up the blkdev mapping. */
4859
4860         ret = rbd_init_disk(rbd_dev);
4861         if (ret)
4862                 goto err_out_blkdev;
4863
4864         ret = rbd_dev_mapping_set(rbd_dev);
4865         if (ret)
4866                 goto err_out_disk;
4867         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4868
4869         ret = rbd_bus_add_dev(rbd_dev);
4870         if (ret)
4871                 goto err_out_mapping;
4872
4873         /* Everything's ready.  Announce the disk to the world. */
4874
4875         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4876         add_disk(rbd_dev->disk);
4877
4878         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4879                 (unsigned long long) rbd_dev->mapping.size);
4880
4881         return ret;
4882
4883 err_out_mapping:
4884         rbd_dev_mapping_clear(rbd_dev);
4885 err_out_disk:
4886         rbd_free_disk(rbd_dev);
4887 err_out_blkdev:
4888         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4889 err_out_id:
4890         rbd_dev_id_put(rbd_dev);
4891         rbd_dev_mapping_clear(rbd_dev);
4892
4893         return ret;
4894 }
4895
4896 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4897 {
4898         struct rbd_spec *spec = rbd_dev->spec;
4899         size_t size;
4900
4901         /* Record the header object name for this rbd image. */
4902
4903         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4904
4905         if (rbd_dev->image_format == 1)
4906                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4907         else
4908                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4909
4910         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4911         if (!rbd_dev->header_name)
4912                 return -ENOMEM;
4913
4914         if (rbd_dev->image_format == 1)
4915                 sprintf(rbd_dev->header_name, "%s%s",
4916                         spec->image_name, RBD_SUFFIX);
4917         else
4918                 sprintf(rbd_dev->header_name, "%s%s",
4919                         RBD_HEADER_PREFIX, spec->image_id);
4920         return 0;
4921 }
4922
4923 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4924 {
4925         rbd_dev_unprobe(rbd_dev);
4926         kfree(rbd_dev->header_name);
4927         rbd_dev->header_name = NULL;
4928         rbd_dev->image_format = 0;
4929         kfree(rbd_dev->spec->image_id);
4930         rbd_dev->spec->image_id = NULL;
4931
4932         rbd_dev_destroy(rbd_dev);
4933 }
4934
4935 /*
4936  * Probe for the existence of the header object for the given rbd
4937  * device.  If this image is the one being mapped (i.e., not a
4938  * parent), initiate a watch on its header object before using that
4939  * object to get detailed information about the rbd image.
4940  */
4941 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4942 {
4943         int ret;
4944         int tmp;
4945
4946         /*
4947          * Get the id from the image id object.  Unless there's an
4948          * error, rbd_dev->spec->image_id will be filled in with
4949          * a dynamically-allocated string, and rbd_dev->image_format
4950          * will be set to either 1 or 2.
4951          */
4952         ret = rbd_dev_image_id(rbd_dev);
4953         if (ret)
4954                 return ret;
4955         rbd_assert(rbd_dev->spec->image_id);
4956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4957
4958         ret = rbd_dev_header_name(rbd_dev);
4959         if (ret)
4960                 goto err_out_format;
4961
4962         if (mapping) {
4963                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4964                 if (ret)
4965                         goto out_header_name;
4966         }
4967
4968         if (rbd_dev->image_format == 1)
4969                 ret = rbd_dev_v1_header_info(rbd_dev);
4970         else
4971                 ret = rbd_dev_v2_header_info(rbd_dev);
4972         if (ret)
4973                 goto err_out_watch;
4974
4975         ret = rbd_dev_spec_update(rbd_dev);
4976         if (ret)
4977                 goto err_out_probe;
4978
4979         ret = rbd_dev_probe_parent(rbd_dev);
4980         if (ret)
4981                 goto err_out_probe;
4982
4983         dout("discovered format %u image, header name is %s\n",
4984                 rbd_dev->image_format, rbd_dev->header_name);
4985
4986         return 0;
4987 err_out_probe:
4988         rbd_dev_unprobe(rbd_dev);
4989 err_out_watch:
4990         if (mapping) {
4991                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4992                 if (tmp)
4993                         rbd_warn(rbd_dev, "unable to tear down "
4994                                         "watch request (%d)\n", tmp);
4995         }
4996 out_header_name:
4997         kfree(rbd_dev->header_name);
4998         rbd_dev->header_name = NULL;
4999 err_out_format:
5000         rbd_dev->image_format = 0;
5001         kfree(rbd_dev->spec->image_id);
5002         rbd_dev->spec->image_id = NULL;
5003
5004         dout("probe failed, returning %d\n", ret);
5005
5006         return ret;
5007 }
5008
5009 static ssize_t rbd_add(struct bus_type *bus,
5010                        const char *buf,
5011                        size_t count)
5012 {
5013         struct rbd_device *rbd_dev = NULL;
5014         struct ceph_options *ceph_opts = NULL;
5015         struct rbd_options *rbd_opts = NULL;
5016         struct rbd_spec *spec = NULL;
5017         struct rbd_client *rbdc;
5018         struct ceph_osd_client *osdc;
5019         bool read_only;
5020         int rc = -ENOMEM;
5021
5022         if (!try_module_get(THIS_MODULE))
5023                 return -ENODEV;
5024
5025         /* parse add command */
5026         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5027         if (rc < 0)
5028                 goto err_out_module;
5029         read_only = rbd_opts->read_only;
5030         kfree(rbd_opts);
5031         rbd_opts = NULL;        /* done with this */
5032
5033         rbdc = rbd_get_client(ceph_opts);
5034         if (IS_ERR(rbdc)) {
5035                 rc = PTR_ERR(rbdc);
5036                 goto err_out_args;
5037         }
5038
5039         /* pick the pool */
5040         osdc = &rbdc->client->osdc;
5041         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5042         if (rc < 0)
5043                 goto err_out_client;
5044         spec->pool_id = (u64)rc;
5045
5046         /* The ceph file layout needs to fit pool id in 32 bits */
5047
5048         if (spec->pool_id > (u64)U32_MAX) {
5049                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5050                                 (unsigned long long)spec->pool_id, U32_MAX);
5051                 rc = -EIO;
5052                 goto err_out_client;
5053         }
5054
5055         rbd_dev = rbd_dev_create(rbdc, spec);
5056         if (!rbd_dev)
5057                 goto err_out_client;
5058         rbdc = NULL;            /* rbd_dev now owns this */
5059         spec = NULL;            /* rbd_dev now owns this */
5060
5061         rc = rbd_dev_image_probe(rbd_dev, true);
5062         if (rc < 0)
5063                 goto err_out_rbd_dev;
5064
5065         /* If we are mapping a snapshot it must be marked read-only */
5066
5067         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5068                 read_only = true;
5069         rbd_dev->mapping.read_only = read_only;
5070
5071         rc = rbd_dev_device_setup(rbd_dev);
5072         if (rc) {
5073                 rbd_dev_image_release(rbd_dev);
5074                 goto err_out_module;
5075         }
5076
5077         return count;
5078
5079 err_out_rbd_dev:
5080         rbd_dev_destroy(rbd_dev);
5081 err_out_client:
5082         rbd_put_client(rbdc);
5083 err_out_args:
5084         rbd_spec_put(spec);
5085 err_out_module:
5086         module_put(THIS_MODULE);
5087
5088         dout("Error adding device %s\n", buf);
5089
5090         return (ssize_t)rc;
5091 }
5092
5093 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5094 {
5095         struct list_head *tmp;
5096         struct rbd_device *rbd_dev;
5097
5098         spin_lock(&rbd_dev_list_lock);
5099         list_for_each(tmp, &rbd_dev_list) {
5100                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5101                 if (rbd_dev->dev_id == dev_id) {
5102                         spin_unlock(&rbd_dev_list_lock);
5103                         return rbd_dev;
5104                 }
5105         }
5106         spin_unlock(&rbd_dev_list_lock);
5107         return NULL;
5108 }
5109
5110 static void rbd_dev_device_release(struct device *dev)
5111 {
5112         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5113
5114         rbd_free_disk(rbd_dev);
5115         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5116         rbd_dev_mapping_clear(rbd_dev);
5117         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5118         rbd_dev->major = 0;
5119         rbd_dev_id_put(rbd_dev);
5120         rbd_dev_mapping_clear(rbd_dev);
5121 }
5122
5123 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5124 {
5125         while (rbd_dev->parent) {
5126                 struct rbd_device *first = rbd_dev;
5127                 struct rbd_device *second = first->parent;
5128                 struct rbd_device *third;
5129
5130                 /*
5131                  * Follow to the parent with no grandparent and
5132                  * remove it.
5133                  */
5134                 while (second && (third = second->parent)) {
5135                         first = second;
5136                         second = third;
5137                 }
5138                 rbd_assert(second);
5139                 rbd_dev_image_release(second);
5140                 first->parent = NULL;
5141                 first->parent_overlap = 0;
5142
5143                 rbd_assert(first->parent_spec);
5144                 rbd_spec_put(first->parent_spec);
5145                 first->parent_spec = NULL;
5146         }
5147 }
5148
5149 static ssize_t rbd_remove(struct bus_type *bus,
5150                           const char *buf,
5151                           size_t count)
5152 {
5153         struct rbd_device *rbd_dev = NULL;
5154         int target_id;
5155         unsigned long ul;
5156         int ret;
5157
5158         ret = strict_strtoul(buf, 10, &ul);
5159         if (ret)
5160                 return ret;
5161
5162         /* convert to int; abort if we lost anything in the conversion */
5163         target_id = (int) ul;
5164         if (target_id != ul)
5165                 return -EINVAL;
5166
5167         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5168
5169         rbd_dev = __rbd_get_dev(target_id);
5170         if (!rbd_dev) {
5171                 ret = -ENOENT;
5172                 goto done;
5173         }
5174
5175         spin_lock_irq(&rbd_dev->lock);
5176         if (rbd_dev->open_count)
5177                 ret = -EBUSY;
5178         else
5179                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5180         spin_unlock_irq(&rbd_dev->lock);
5181         if (ret < 0)
5182                 goto done;
5183         rbd_bus_del_dev(rbd_dev);
5184         ret = rbd_dev_header_watch_sync(rbd_dev, false);
5185         if (ret)
5186                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5187         rbd_dev_image_release(rbd_dev);
5188         module_put(THIS_MODULE);
5189         ret = count;
5190 done:
5191         mutex_unlock(&ctl_mutex);
5192
5193         return ret;
5194 }
5195
5196 /*
5197  * create control files in sysfs
5198  * /sys/bus/rbd/...
5199  */
5200 static int rbd_sysfs_init(void)
5201 {
5202         int ret;
5203
5204         ret = device_register(&rbd_root_dev);
5205         if (ret < 0)
5206                 return ret;
5207
5208         ret = bus_register(&rbd_bus_type);
5209         if (ret < 0)
5210                 device_unregister(&rbd_root_dev);
5211
5212         return ret;
5213 }
5214
5215 static void rbd_sysfs_cleanup(void)
5216 {
5217         bus_unregister(&rbd_bus_type);
5218         device_unregister(&rbd_root_dev);
5219 }
5220
5221 static int rbd_slab_init(void)
5222 {
5223         rbd_assert(!rbd_img_request_cache);
5224         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5225                                         sizeof (struct rbd_img_request),
5226                                         __alignof__(struct rbd_img_request),
5227                                         0, NULL);
5228         if (!rbd_img_request_cache)
5229                 return -ENOMEM;
5230
5231         rbd_assert(!rbd_obj_request_cache);
5232         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5233                                         sizeof (struct rbd_obj_request),
5234                                         __alignof__(struct rbd_obj_request),
5235                                         0, NULL);
5236         if (!rbd_obj_request_cache)
5237                 goto out_err;
5238
5239         rbd_assert(!rbd_segment_name_cache);
5240         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5241                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5242         if (rbd_segment_name_cache)
5243                 return 0;
5244 out_err:
5245         if (rbd_obj_request_cache) {
5246                 kmem_cache_destroy(rbd_obj_request_cache);
5247                 rbd_obj_request_cache = NULL;
5248         }
5249
5250         kmem_cache_destroy(rbd_img_request_cache);
5251         rbd_img_request_cache = NULL;
5252
5253         return -ENOMEM;
5254 }
5255
5256 static void rbd_slab_exit(void)
5257 {
5258         rbd_assert(rbd_segment_name_cache);
5259         kmem_cache_destroy(rbd_segment_name_cache);
5260         rbd_segment_name_cache = NULL;
5261
5262         rbd_assert(rbd_obj_request_cache);
5263         kmem_cache_destroy(rbd_obj_request_cache);
5264         rbd_obj_request_cache = NULL;
5265
5266         rbd_assert(rbd_img_request_cache);
5267         kmem_cache_destroy(rbd_img_request_cache);
5268         rbd_img_request_cache = NULL;
5269 }
5270
5271 static int __init rbd_init(void)
5272 {
5273         int rc;
5274
5275         if (!libceph_compatible(NULL)) {
5276                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5277
5278                 return -EINVAL;
5279         }
5280         rc = rbd_slab_init();
5281         if (rc)
5282                 return rc;
5283         rc = rbd_sysfs_init();
5284         if (rc)
5285                 rbd_slab_exit();
5286         else
5287                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5288
5289         return rc;
5290 }
5291
5292 static void __exit rbd_exit(void)
5293 {
5294         rbd_sysfs_cleanup();
5295         rbd_slab_exit();
5296 }
5297
5298 module_init(rbd_init);
5299 module_exit(rbd_exit);
5300
5301 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5302 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5303 MODULE_DESCRIPTION("rados block device");
5304
5305 /* following authorship retained from original osdblk.c */
5306 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5307
5308 MODULE_LICENSE("GPL");