]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: fetch object order before using it
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 /*
59  * Increment the given counter and return its updated value.
60  * If the counter is already 0 it will not be incremented.
61  * If the counter is already at its maximum value returns
62  * -EINVAL without updating it.
63  */
64 static int atomic_inc_return_safe(atomic_t *v)
65 {
66         unsigned int counter;
67
68         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69         if (counter <= (unsigned int)INT_MAX)
70                 return (int)counter;
71
72         atomic_dec(v);
73
74         return -EINVAL;
75 }
76
77 /* Decrement the counter.  Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
79 {
80         int counter;
81
82         counter = atomic_dec_return(v);
83         if (counter >= 0)
84                 return counter;
85
86         atomic_inc(v);
87
88         return -EINVAL;
89 }
90
91 #define RBD_DRV_NAME "rbd"
92 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
93
94 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
95
96 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
97 #define RBD_MAX_SNAP_NAME_LEN   \
98                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
100 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
101
102 #define RBD_SNAP_HEAD_NAME      "-"
103
104 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
105
106 /* This allows a single page to hold an image name sent by OSD */
107 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
108 #define RBD_IMAGE_ID_LEN_MAX    64
109
110 #define RBD_OBJ_PREFIX_LEN_MAX  64
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING    (1<<0)
115 #define RBD_FEATURE_STRIPINGV2  (1<<1)
116 #define RBD_FEATURES_ALL \
117             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118
119 /* Features supported by this (client software) implementation. */
120
121 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
122
123 /*
124  * An RBD device name will be "rbd#", where the "rbd" comes from
125  * RBD_DRV_NAME above, and # is a unique integer identifier.
126  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127  * enough to hold all possible device names.
128  */
129 #define DEV_NAME_LEN            32
130 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
131
132 /*
133  * block device image metadata (in-memory version)
134  */
135 struct rbd_image_header {
136         /* These six fields never change for a given rbd image */
137         char *object_prefix;
138         __u8 obj_order;
139         __u8 crypt_type;
140         __u8 comp_type;
141         u64 stripe_unit;
142         u64 stripe_count;
143         u64 features;           /* Might be changeable someday? */
144
145         /* The remaining fields need to be updated occasionally */
146         u64 image_size;
147         struct ceph_snap_context *snapc;
148         char *snap_names;       /* format 1 only */
149         u64 *snap_sizes;        /* format 1 only */
150 };
151
152 /*
153  * An rbd image specification.
154  *
155  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
156  * identify an image.  Each rbd_dev structure includes a pointer to
157  * an rbd_spec structure that encapsulates this identity.
158  *
159  * Each of the id's in an rbd_spec has an associated name.  For a
160  * user-mapped image, the names are supplied and the id's associated
161  * with them are looked up.  For a layered image, a parent image is
162  * defined by the tuple, and the names are looked up.
163  *
164  * An rbd_dev structure contains a parent_spec pointer which is
165  * non-null if the image it represents is a child in a layered
166  * image.  This pointer will refer to the rbd_spec structure used
167  * by the parent rbd_dev for its own identity (i.e., the structure
168  * is shared between the parent and child).
169  *
170  * Since these structures are populated once, during the discovery
171  * phase of image construction, they are effectively immutable so
172  * we make no effort to synchronize access to them.
173  *
174  * Note that code herein does not assume the image name is known (it
175  * could be a null pointer).
176  */
177 struct rbd_spec {
178         u64             pool_id;
179         const char      *pool_name;
180
181         const char      *image_id;
182         const char      *image_name;
183
184         u64             snap_id;
185         const char      *snap_name;
186
187         struct kref     kref;
188 };
189
190 /*
191  * an instance of the client.  multiple devices may share an rbd client.
192  */
193 struct rbd_client {
194         struct ceph_client      *client;
195         struct kref             kref;
196         struct list_head        node;
197 };
198
199 struct rbd_img_request;
200 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
203
204 struct rbd_obj_request;
205 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
207 enum obj_request_type {
208         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209 };
210
211 enum obj_req_flags {
212         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
213         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
214         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
215         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
216 };
217
218 struct rbd_obj_request {
219         const char              *object_name;
220         u64                     offset;         /* object start byte */
221         u64                     length;         /* bytes from offset */
222         unsigned long           flags;
223
224         /*
225          * An object request associated with an image will have its
226          * img_data flag set; a standalone object request will not.
227          *
228          * A standalone object request will have which == BAD_WHICH
229          * and a null obj_request pointer.
230          *
231          * An object request initiated in support of a layered image
232          * object (to check for its existence before a write) will
233          * have which == BAD_WHICH and a non-null obj_request pointer.
234          *
235          * Finally, an object request for rbd image data will have
236          * which != BAD_WHICH, and will have a non-null img_request
237          * pointer.  The value of which will be in the range
238          * 0..(img_request->obj_request_count-1).
239          */
240         union {
241                 struct rbd_obj_request  *obj_request;   /* STAT op */
242                 struct {
243                         struct rbd_img_request  *img_request;
244                         u64                     img_offset;
245                         /* links for img_request->obj_requests list */
246                         struct list_head        links;
247                 };
248         };
249         u32                     which;          /* posn image request list */
250
251         enum obj_request_type   type;
252         union {
253                 struct bio      *bio_list;
254                 struct {
255                         struct page     **pages;
256                         u32             page_count;
257                 };
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261
262         struct ceph_osd_request *osd_req;
263
264         u64                     xferred;        /* bytes transferred */
265         int                     result;
266
267         rbd_obj_callback_t      callback;
268         struct completion       completion;
269
270         struct kref             kref;
271 };
272
273 enum img_req_flags {
274         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
275         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
276         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
277 };
278
279 struct rbd_img_request {
280         struct rbd_device       *rbd_dev;
281         u64                     offset; /* starting image byte offset */
282         u64                     length; /* byte count from offset */
283         unsigned long           flags;
284         union {
285                 u64                     snap_id;        /* for reads */
286                 struct ceph_snap_context *snapc;        /* for writes */
287         };
288         union {
289                 struct request          *rq;            /* block request */
290                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
291         };
292         struct page             **copyup_pages;
293         u32                     copyup_page_count;
294         spinlock_t              completion_lock;/* protects next_completion */
295         u32                     next_completion;
296         rbd_img_callback_t      callback;
297         u64                     xferred;/* aggregate bytes transferred */
298         int                     result; /* first nonzero obj_request result */
299
300         u32                     obj_request_count;
301         struct list_head        obj_requests;   /* rbd_obj_request structs */
302
303         struct kref             kref;
304 };
305
306 #define for_each_obj_request(ireq, oreq) \
307         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
308 #define for_each_obj_request_from(ireq, oreq) \
309         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
310 #define for_each_obj_request_safe(ireq, oreq, n) \
311         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
312
313 struct rbd_mapping {
314         u64                     size;
315         u64                     features;
316         bool                    read_only;
317 };
318
319 /*
320  * a single device
321  */
322 struct rbd_device {
323         int                     dev_id;         /* blkdev unique id */
324
325         int                     major;          /* blkdev assigned major */
326         struct gendisk          *disk;          /* blkdev's gendisk and rq */
327
328         u32                     image_format;   /* Either 1 or 2 */
329         struct rbd_client       *rbd_client;
330
331         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
333         spinlock_t              lock;           /* queue, flags, open_count */
334
335         struct rbd_image_header header;
336         unsigned long           flags;          /* possibly lock protected */
337         struct rbd_spec         *spec;
338
339         char                    *header_name;
340
341         struct ceph_file_layout layout;
342
343         struct ceph_osd_event   *watch_event;
344         struct rbd_obj_request  *watch_request;
345
346         struct rbd_spec         *parent_spec;
347         u64                     parent_overlap;
348         atomic_t                parent_ref;
349         struct rbd_device       *parent;
350
351         /* protects updating the header */
352         struct rw_semaphore     header_rwsem;
353
354         struct rbd_mapping      mapping;
355
356         struct list_head        node;
357
358         /* sysfs related */
359         struct device           dev;
360         unsigned long           open_count;     /* protected by lock */
361 };
362
363 /*
364  * Flag bits for rbd_dev->flags.  If atomicity is required,
365  * rbd_dev->lock is used to protect access.
366  *
367  * Currently, only the "removing" flag (which is coupled with the
368  * "open_count" field) requires atomic access.
369  */
370 enum rbd_dev_flags {
371         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
372         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
373 };
374
375 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
376
377 static LIST_HEAD(rbd_dev_list);    /* devices */
378 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
380 static LIST_HEAD(rbd_client_list);              /* clients */
381 static DEFINE_SPINLOCK(rbd_client_list_lock);
382
383 /* Slab caches for frequently-allocated structures */
384
385 static struct kmem_cache        *rbd_img_request_cache;
386 static struct kmem_cache        *rbd_obj_request_cache;
387 static struct kmem_cache        *rbd_segment_name_cache;
388
389 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
391 static void rbd_dev_device_release(struct device *dev);
392
393 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394                        size_t count);
395 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396                           size_t count);
397 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398 static void rbd_spec_put(struct rbd_spec *spec);
399
400 static struct bus_attribute rbd_bus_attrs[] = {
401         __ATTR(add, S_IWUSR, NULL, rbd_add),
402         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403         __ATTR_NULL
404 };
405
406 static struct bus_type rbd_bus_type = {
407         .name           = "rbd",
408         .bus_attrs      = rbd_bus_attrs,
409 };
410
411 static void rbd_root_dev_release(struct device *dev)
412 {
413 }
414
415 static struct device rbd_root_dev = {
416         .init_name =    "rbd",
417         .release =      rbd_root_dev_release,
418 };
419
420 static __printf(2, 3)
421 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422 {
423         struct va_format vaf;
424         va_list args;
425
426         va_start(args, fmt);
427         vaf.fmt = fmt;
428         vaf.va = &args;
429
430         if (!rbd_dev)
431                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432         else if (rbd_dev->disk)
433                 printk(KERN_WARNING "%s: %s: %pV\n",
434                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435         else if (rbd_dev->spec && rbd_dev->spec->image_name)
436                 printk(KERN_WARNING "%s: image %s: %pV\n",
437                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438         else if (rbd_dev->spec && rbd_dev->spec->image_id)
439                 printk(KERN_WARNING "%s: id %s: %pV\n",
440                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441         else    /* punt */
442                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443                         RBD_DRV_NAME, rbd_dev, &vaf);
444         va_end(args);
445 }
446
447 #ifdef RBD_DEBUG
448 #define rbd_assert(expr)                                                \
449                 if (unlikely(!(expr))) {                                \
450                         printk(KERN_ERR "\nAssertion failure in %s() "  \
451                                                 "at line %d:\n\n"       \
452                                         "\trbd_assert(%s);\n\n",        \
453                                         __func__, __LINE__, #expr);     \
454                         BUG();                                          \
455                 }
456 #else /* !RBD_DEBUG */
457 #  define rbd_assert(expr)      ((void) 0)
458 #endif /* !RBD_DEBUG */
459
460 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
461 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
463
464 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
465 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
467 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468                                         u64 snap_id);
469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470                                 u8 *order, u64 *snap_size);
471 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472                 u64 *snap_features);
473 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
474
475 static int rbd_open(struct block_device *bdev, fmode_t mode)
476 {
477         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
478         bool removing = false;
479
480         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
481                 return -EROFS;
482
483         spin_lock_irq(&rbd_dev->lock);
484         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485                 removing = true;
486         else
487                 rbd_dev->open_count++;
488         spin_unlock_irq(&rbd_dev->lock);
489         if (removing)
490                 return -ENOENT;
491
492         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493         (void) get_device(&rbd_dev->dev);
494         set_device_ro(bdev, rbd_dev->mapping.read_only);
495         mutex_unlock(&ctl_mutex);
496
497         return 0;
498 }
499
500 static int rbd_release(struct gendisk *disk, fmode_t mode)
501 {
502         struct rbd_device *rbd_dev = disk->private_data;
503         unsigned long open_count_before;
504
505         spin_lock_irq(&rbd_dev->lock);
506         open_count_before = rbd_dev->open_count--;
507         spin_unlock_irq(&rbd_dev->lock);
508         rbd_assert(open_count_before > 0);
509
510         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511         put_device(&rbd_dev->dev);
512         mutex_unlock(&ctl_mutex);
513
514         return 0;
515 }
516
517 static const struct block_device_operations rbd_bd_ops = {
518         .owner                  = THIS_MODULE,
519         .open                   = rbd_open,
520         .release                = rbd_release,
521 };
522
523 /*
524  * Initialize an rbd client instance.  Success or not, this function
525  * consumes ceph_opts.
526  */
527 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
528 {
529         struct rbd_client *rbdc;
530         int ret = -ENOMEM;
531
532         dout("%s:\n", __func__);
533         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
534         if (!rbdc)
535                 goto out_opt;
536
537         kref_init(&rbdc->kref);
538         INIT_LIST_HEAD(&rbdc->node);
539
540         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
541
542         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
543         if (IS_ERR(rbdc->client))
544                 goto out_mutex;
545         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
546
547         ret = ceph_open_session(rbdc->client);
548         if (ret < 0)
549                 goto out_err;
550
551         spin_lock(&rbd_client_list_lock);
552         list_add_tail(&rbdc->node, &rbd_client_list);
553         spin_unlock(&rbd_client_list_lock);
554
555         mutex_unlock(&ctl_mutex);
556         dout("%s: rbdc %p\n", __func__, rbdc);
557
558         return rbdc;
559
560 out_err:
561         ceph_destroy_client(rbdc->client);
562 out_mutex:
563         mutex_unlock(&ctl_mutex);
564         kfree(rbdc);
565 out_opt:
566         if (ceph_opts)
567                 ceph_destroy_options(ceph_opts);
568         dout("%s: error %d\n", __func__, ret);
569
570         return ERR_PTR(ret);
571 }
572
573 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
574 {
575         kref_get(&rbdc->kref);
576
577         return rbdc;
578 }
579
580 /*
581  * Find a ceph client with specific addr and configuration.  If
582  * found, bump its reference count.
583  */
584 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
585 {
586         struct rbd_client *client_node;
587         bool found = false;
588
589         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
590                 return NULL;
591
592         spin_lock(&rbd_client_list_lock);
593         list_for_each_entry(client_node, &rbd_client_list, node) {
594                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
595                         __rbd_get_client(client_node);
596
597                         found = true;
598                         break;
599                 }
600         }
601         spin_unlock(&rbd_client_list_lock);
602
603         return found ? client_node : NULL;
604 }
605
606 /*
607  * mount options
608  */
609 enum {
610         Opt_last_int,
611         /* int args above */
612         Opt_last_string,
613         /* string args above */
614         Opt_read_only,
615         Opt_read_write,
616         /* Boolean args above */
617         Opt_last_bool,
618 };
619
620 static match_table_t rbd_opts_tokens = {
621         /* int args above */
622         /* string args above */
623         {Opt_read_only, "read_only"},
624         {Opt_read_only, "ro"},          /* Alternate spelling */
625         {Opt_read_write, "read_write"},
626         {Opt_read_write, "rw"},         /* Alternate spelling */
627         /* Boolean args above */
628         {-1, NULL}
629 };
630
631 struct rbd_options {
632         bool    read_only;
633 };
634
635 #define RBD_READ_ONLY_DEFAULT   false
636
637 static int parse_rbd_opts_token(char *c, void *private)
638 {
639         struct rbd_options *rbd_opts = private;
640         substring_t argstr[MAX_OPT_ARGS];
641         int token, intval, ret;
642
643         token = match_token(c, rbd_opts_tokens, argstr);
644         if (token < 0)
645                 return -EINVAL;
646
647         if (token < Opt_last_int) {
648                 ret = match_int(&argstr[0], &intval);
649                 if (ret < 0) {
650                         pr_err("bad mount option arg (not int) "
651                                "at '%s'\n", c);
652                         return ret;
653                 }
654                 dout("got int token %d val %d\n", token, intval);
655         } else if (token > Opt_last_int && token < Opt_last_string) {
656                 dout("got string token %d val %s\n", token,
657                      argstr[0].from);
658         } else if (token > Opt_last_string && token < Opt_last_bool) {
659                 dout("got Boolean token %d\n", token);
660         } else {
661                 dout("got token %d\n", token);
662         }
663
664         switch (token) {
665         case Opt_read_only:
666                 rbd_opts->read_only = true;
667                 break;
668         case Opt_read_write:
669                 rbd_opts->read_only = false;
670                 break;
671         default:
672                 rbd_assert(false);
673                 break;
674         }
675         return 0;
676 }
677
678 /*
679  * Get a ceph client with specific addr and configuration, if one does
680  * not exist create it.  Either way, ceph_opts is consumed by this
681  * function.
682  */
683 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
684 {
685         struct rbd_client *rbdc;
686
687         rbdc = rbd_client_find(ceph_opts);
688         if (rbdc)       /* using an existing client */
689                 ceph_destroy_options(ceph_opts);
690         else
691                 rbdc = rbd_client_create(ceph_opts);
692
693         return rbdc;
694 }
695
696 /*
697  * Destroy ceph client
698  *
699  * Caller must hold rbd_client_list_lock.
700  */
701 static void rbd_client_release(struct kref *kref)
702 {
703         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
704
705         dout("%s: rbdc %p\n", __func__, rbdc);
706         spin_lock(&rbd_client_list_lock);
707         list_del(&rbdc->node);
708         spin_unlock(&rbd_client_list_lock);
709
710         ceph_destroy_client(rbdc->client);
711         kfree(rbdc);
712 }
713
714 /*
715  * Drop reference to ceph client node. If it's not referenced anymore, release
716  * it.
717  */
718 static void rbd_put_client(struct rbd_client *rbdc)
719 {
720         if (rbdc)
721                 kref_put(&rbdc->kref, rbd_client_release);
722 }
723
724 static bool rbd_image_format_valid(u32 image_format)
725 {
726         return image_format == 1 || image_format == 2;
727 }
728
729 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
730 {
731         size_t size;
732         u32 snap_count;
733
734         /* The header has to start with the magic rbd header text */
735         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
736                 return false;
737
738         /* The bio layer requires at least sector-sized I/O */
739
740         if (ondisk->options.order < SECTOR_SHIFT)
741                 return false;
742
743         /* If we use u64 in a few spots we may be able to loosen this */
744
745         if (ondisk->options.order > 8 * sizeof (int) - 1)
746                 return false;
747
748         /*
749          * The size of a snapshot header has to fit in a size_t, and
750          * that limits the number of snapshots.
751          */
752         snap_count = le32_to_cpu(ondisk->snap_count);
753         size = SIZE_MAX - sizeof (struct ceph_snap_context);
754         if (snap_count > size / sizeof (__le64))
755                 return false;
756
757         /*
758          * Not only that, but the size of the entire the snapshot
759          * header must also be representable in a size_t.
760          */
761         size -= snap_count * sizeof (__le64);
762         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
763                 return false;
764
765         return true;
766 }
767
768 /*
769  * Fill an rbd image header with information from the given format 1
770  * on-disk header.
771  */
772 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
773                                  struct rbd_image_header_ondisk *ondisk)
774 {
775         struct rbd_image_header *header = &rbd_dev->header;
776         bool first_time = header->object_prefix == NULL;
777         struct ceph_snap_context *snapc;
778         char *object_prefix = NULL;
779         char *snap_names = NULL;
780         u64 *snap_sizes = NULL;
781         u32 snap_count;
782         size_t size;
783         int ret = -ENOMEM;
784         u32 i;
785
786         /* Allocate this now to avoid having to handle failure below */
787
788         if (first_time) {
789                 size_t len;
790
791                 len = strnlen(ondisk->object_prefix,
792                                 sizeof (ondisk->object_prefix));
793                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
794                 if (!object_prefix)
795                         return -ENOMEM;
796                 memcpy(object_prefix, ondisk->object_prefix, len);
797                 object_prefix[len] = '\0';
798         }
799
800         /* Allocate the snapshot context and fill it in */
801
802         snap_count = le32_to_cpu(ondisk->snap_count);
803         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
804         if (!snapc)
805                 goto out_err;
806         snapc->seq = le64_to_cpu(ondisk->snap_seq);
807         if (snap_count) {
808                 struct rbd_image_snap_ondisk *snaps;
809                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
810
811                 /* We'll keep a copy of the snapshot names... */
812
813                 if (snap_names_len > (u64)SIZE_MAX)
814                         goto out_2big;
815                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
816                 if (!snap_names)
817                         goto out_err;
818
819                 /* ...as well as the array of their sizes. */
820
821                 size = snap_count * sizeof (*header->snap_sizes);
822                 snap_sizes = kmalloc(size, GFP_KERNEL);
823                 if (!snap_sizes)
824                         goto out_err;
825
826                 /*
827                  * Copy the names, and fill in each snapshot's id
828                  * and size.
829                  *
830                  * Note that rbd_dev_v1_header_info() guarantees the
831                  * ondisk buffer we're working with has
832                  * snap_names_len bytes beyond the end of the
833                  * snapshot id array, this memcpy() is safe.
834                  */
835                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
836                 snaps = ondisk->snaps;
837                 for (i = 0; i < snap_count; i++) {
838                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
839                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
840                 }
841         }
842
843         /* We won't fail any more, fill in the header */
844
845         down_write(&rbd_dev->header_rwsem);
846         if (first_time) {
847                 header->object_prefix = object_prefix;
848                 header->obj_order = ondisk->options.order;
849                 header->crypt_type = ondisk->options.crypt_type;
850                 header->comp_type = ondisk->options.comp_type;
851                 /* The rest aren't used for format 1 images */
852                 header->stripe_unit = 0;
853                 header->stripe_count = 0;
854                 header->features = 0;
855         } else {
856                 ceph_put_snap_context(header->snapc);
857                 kfree(header->snap_names);
858                 kfree(header->snap_sizes);
859         }
860
861         /* The remaining fields always get updated (when we refresh) */
862
863         header->image_size = le64_to_cpu(ondisk->image_size);
864         header->snapc = snapc;
865         header->snap_names = snap_names;
866         header->snap_sizes = snap_sizes;
867
868         /* Make sure mapping size is consistent with header info */
869
870         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
871                 if (rbd_dev->mapping.size != header->image_size)
872                         rbd_dev->mapping.size = header->image_size;
873
874         up_write(&rbd_dev->header_rwsem);
875
876         return 0;
877 out_2big:
878         ret = -EIO;
879 out_err:
880         kfree(snap_sizes);
881         kfree(snap_names);
882         ceph_put_snap_context(snapc);
883         kfree(object_prefix);
884
885         return ret;
886 }
887
888 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
889 {
890         const char *snap_name;
891
892         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
893
894         /* Skip over names until we find the one we are looking for */
895
896         snap_name = rbd_dev->header.snap_names;
897         while (which--)
898                 snap_name += strlen(snap_name) + 1;
899
900         return kstrdup(snap_name, GFP_KERNEL);
901 }
902
903 /*
904  * Snapshot id comparison function for use with qsort()/bsearch().
905  * Note that result is for snapshots in *descending* order.
906  */
907 static int snapid_compare_reverse(const void *s1, const void *s2)
908 {
909         u64 snap_id1 = *(u64 *)s1;
910         u64 snap_id2 = *(u64 *)s2;
911
912         if (snap_id1 < snap_id2)
913                 return 1;
914         return snap_id1 == snap_id2 ? 0 : -1;
915 }
916
917 /*
918  * Search a snapshot context to see if the given snapshot id is
919  * present.
920  *
921  * Returns the position of the snapshot id in the array if it's found,
922  * or BAD_SNAP_INDEX otherwise.
923  *
924  * Note: The snapshot array is in kept sorted (by the osd) in
925  * reverse order, highest snapshot id first.
926  */
927 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
928 {
929         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
930         u64 *found;
931
932         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
933                                 sizeof (snap_id), snapid_compare_reverse);
934
935         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
936 }
937
938 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
939                                         u64 snap_id)
940 {
941         u32 which;
942
943         which = rbd_dev_snap_index(rbd_dev, snap_id);
944         if (which == BAD_SNAP_INDEX)
945                 return NULL;
946
947         return _rbd_dev_v1_snap_name(rbd_dev, which);
948 }
949
950 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
951 {
952         if (snap_id == CEPH_NOSNAP)
953                 return RBD_SNAP_HEAD_NAME;
954
955         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
956         if (rbd_dev->image_format == 1)
957                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
958
959         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
960 }
961
962 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
963                                 u64 *snap_size)
964 {
965         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
966         if (snap_id == CEPH_NOSNAP) {
967                 *snap_size = rbd_dev->header.image_size;
968         } else if (rbd_dev->image_format == 1) {
969                 u32 which;
970
971                 which = rbd_dev_snap_index(rbd_dev, snap_id);
972                 if (which == BAD_SNAP_INDEX)
973                         return -ENOENT;
974
975                 *snap_size = rbd_dev->header.snap_sizes[which];
976         } else {
977                 u64 size = 0;
978                 int ret;
979
980                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
981                 if (ret)
982                         return ret;
983
984                 *snap_size = size;
985         }
986         return 0;
987 }
988
989 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
990                         u64 *snap_features)
991 {
992         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
993         if (snap_id == CEPH_NOSNAP) {
994                 *snap_features = rbd_dev->header.features;
995         } else if (rbd_dev->image_format == 1) {
996                 *snap_features = 0;     /* No features for format 1 */
997         } else {
998                 u64 features = 0;
999                 int ret;
1000
1001                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1002                 if (ret)
1003                         return ret;
1004
1005                 *snap_features = features;
1006         }
1007         return 0;
1008 }
1009
1010 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1011 {
1012         u64 snap_id = rbd_dev->spec->snap_id;
1013         u64 size = 0;
1014         u64 features = 0;
1015         int ret;
1016
1017         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1018         if (ret)
1019                 return ret;
1020         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1021         if (ret)
1022                 return ret;
1023
1024         rbd_dev->mapping.size = size;
1025         rbd_dev->mapping.features = features;
1026
1027         return 0;
1028 }
1029
1030 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1031 {
1032         rbd_dev->mapping.size = 0;
1033         rbd_dev->mapping.features = 0;
1034 }
1035
1036 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1037 {
1038         char *name;
1039         u64 segment;
1040         int ret;
1041         char *name_format;
1042
1043         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1044         if (!name)
1045                 return NULL;
1046         segment = offset >> rbd_dev->header.obj_order;
1047         name_format = "%s.%012llx";
1048         if (rbd_dev->image_format == 2)
1049                 name_format = "%s.%016llx";
1050         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
1051                         rbd_dev->header.object_prefix, segment);
1052         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1053                 pr_err("error formatting segment name for #%llu (%d)\n",
1054                         segment, ret);
1055                 kfree(name);
1056                 name = NULL;
1057         }
1058
1059         return name;
1060 }
1061
1062 static void rbd_segment_name_free(const char *name)
1063 {
1064         /* The explicit cast here is needed to drop the const qualifier */
1065
1066         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1067 }
1068
1069 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1070 {
1071         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1072
1073         return offset & (segment_size - 1);
1074 }
1075
1076 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1077                                 u64 offset, u64 length)
1078 {
1079         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1080
1081         offset &= segment_size - 1;
1082
1083         rbd_assert(length <= U64_MAX - offset);
1084         if (offset + length > segment_size)
1085                 length = segment_size - offset;
1086
1087         return length;
1088 }
1089
1090 /*
1091  * returns the size of an object in the image
1092  */
1093 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1094 {
1095         return 1 << header->obj_order;
1096 }
1097
1098 /*
1099  * bio helpers
1100  */
1101
1102 static void bio_chain_put(struct bio *chain)
1103 {
1104         struct bio *tmp;
1105
1106         while (chain) {
1107                 tmp = chain;
1108                 chain = chain->bi_next;
1109                 bio_put(tmp);
1110         }
1111 }
1112
1113 /*
1114  * zeros a bio chain, starting at specific offset
1115  */
1116 static void zero_bio_chain(struct bio *chain, int start_ofs)
1117 {
1118         struct bio_vec *bv;
1119         unsigned long flags;
1120         void *buf;
1121         int i;
1122         int pos = 0;
1123
1124         while (chain) {
1125                 bio_for_each_segment(bv, chain, i) {
1126                         if (pos + bv->bv_len > start_ofs) {
1127                                 int remainder = max(start_ofs - pos, 0);
1128                                 buf = bvec_kmap_irq(bv, &flags);
1129                                 memset(buf + remainder, 0,
1130                                        bv->bv_len - remainder);
1131                                 bvec_kunmap_irq(buf, &flags);
1132                         }
1133                         pos += bv->bv_len;
1134                 }
1135
1136                 chain = chain->bi_next;
1137         }
1138 }
1139
1140 /*
1141  * similar to zero_bio_chain(), zeros data defined by a page array,
1142  * starting at the given byte offset from the start of the array and
1143  * continuing up to the given end offset.  The pages array is
1144  * assumed to be big enough to hold all bytes up to the end.
1145  */
1146 static void zero_pages(struct page **pages, u64 offset, u64 end)
1147 {
1148         struct page **page = &pages[offset >> PAGE_SHIFT];
1149
1150         rbd_assert(end > offset);
1151         rbd_assert(end - offset <= (u64)SIZE_MAX);
1152         while (offset < end) {
1153                 size_t page_offset;
1154                 size_t length;
1155                 unsigned long flags;
1156                 void *kaddr;
1157
1158                 page_offset = (size_t)(offset & ~PAGE_MASK);
1159                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1160                 local_irq_save(flags);
1161                 kaddr = kmap_atomic(*page);
1162                 memset(kaddr + page_offset, 0, length);
1163                 kunmap_atomic(kaddr);
1164                 local_irq_restore(flags);
1165
1166                 offset += length;
1167                 page++;
1168         }
1169 }
1170
1171 /*
1172  * Clone a portion of a bio, starting at the given byte offset
1173  * and continuing for the number of bytes indicated.
1174  */
1175 static struct bio *bio_clone_range(struct bio *bio_src,
1176                                         unsigned int offset,
1177                                         unsigned int len,
1178                                         gfp_t gfpmask)
1179 {
1180         struct bio_vec *bv;
1181         unsigned int resid;
1182         unsigned short idx;
1183         unsigned int voff;
1184         unsigned short end_idx;
1185         unsigned short vcnt;
1186         struct bio *bio;
1187
1188         /* Handle the easy case for the caller */
1189
1190         if (!offset && len == bio_src->bi_size)
1191                 return bio_clone(bio_src, gfpmask);
1192
1193         if (WARN_ON_ONCE(!len))
1194                 return NULL;
1195         if (WARN_ON_ONCE(len > bio_src->bi_size))
1196                 return NULL;
1197         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1198                 return NULL;
1199
1200         /* Find first affected segment... */
1201
1202         resid = offset;
1203         __bio_for_each_segment(bv, bio_src, idx, 0) {
1204                 if (resid < bv->bv_len)
1205                         break;
1206                 resid -= bv->bv_len;
1207         }
1208         voff = resid;
1209
1210         /* ...and the last affected segment */
1211
1212         resid += len;
1213         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1214                 if (resid <= bv->bv_len)
1215                         break;
1216                 resid -= bv->bv_len;
1217         }
1218         vcnt = end_idx - idx + 1;
1219
1220         /* Build the clone */
1221
1222         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1223         if (!bio)
1224                 return NULL;    /* ENOMEM */
1225
1226         bio->bi_bdev = bio_src->bi_bdev;
1227         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1228         bio->bi_rw = bio_src->bi_rw;
1229         bio->bi_flags |= 1 << BIO_CLONED;
1230
1231         /*
1232          * Copy over our part of the bio_vec, then update the first
1233          * and last (or only) entries.
1234          */
1235         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1236                         vcnt * sizeof (struct bio_vec));
1237         bio->bi_io_vec[0].bv_offset += voff;
1238         if (vcnt > 1) {
1239                 bio->bi_io_vec[0].bv_len -= voff;
1240                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1241         } else {
1242                 bio->bi_io_vec[0].bv_len = len;
1243         }
1244
1245         bio->bi_vcnt = vcnt;
1246         bio->bi_size = len;
1247         bio->bi_idx = 0;
1248
1249         return bio;
1250 }
1251
1252 /*
1253  * Clone a portion of a bio chain, starting at the given byte offset
1254  * into the first bio in the source chain and continuing for the
1255  * number of bytes indicated.  The result is another bio chain of
1256  * exactly the given length, or a null pointer on error.
1257  *
1258  * The bio_src and offset parameters are both in-out.  On entry they
1259  * refer to the first source bio and the offset into that bio where
1260  * the start of data to be cloned is located.
1261  *
1262  * On return, bio_src is updated to refer to the bio in the source
1263  * chain that contains first un-cloned byte, and *offset will
1264  * contain the offset of that byte within that bio.
1265  */
1266 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1267                                         unsigned int *offset,
1268                                         unsigned int len,
1269                                         gfp_t gfpmask)
1270 {
1271         struct bio *bi = *bio_src;
1272         unsigned int off = *offset;
1273         struct bio *chain = NULL;
1274         struct bio **end;
1275
1276         /* Build up a chain of clone bios up to the limit */
1277
1278         if (!bi || off >= bi->bi_size || !len)
1279                 return NULL;            /* Nothing to clone */
1280
1281         end = &chain;
1282         while (len) {
1283                 unsigned int bi_size;
1284                 struct bio *bio;
1285
1286                 if (!bi) {
1287                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1288                         goto out_err;   /* EINVAL; ran out of bio's */
1289                 }
1290                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1291                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1292                 if (!bio)
1293                         goto out_err;   /* ENOMEM */
1294
1295                 *end = bio;
1296                 end = &bio->bi_next;
1297
1298                 off += bi_size;
1299                 if (off == bi->bi_size) {
1300                         bi = bi->bi_next;
1301                         off = 0;
1302                 }
1303                 len -= bi_size;
1304         }
1305         *bio_src = bi;
1306         *offset = off;
1307
1308         return chain;
1309 out_err:
1310         bio_chain_put(chain);
1311
1312         return NULL;
1313 }
1314
1315 /*
1316  * The default/initial value for all object request flags is 0.  For
1317  * each flag, once its value is set to 1 it is never reset to 0
1318  * again.
1319  */
1320 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1321 {
1322         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1323                 struct rbd_device *rbd_dev;
1324
1325                 rbd_dev = obj_request->img_request->rbd_dev;
1326                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1327                         obj_request);
1328         }
1329 }
1330
1331 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1332 {
1333         smp_mb();
1334         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1335 }
1336
1337 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1338 {
1339         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1340                 struct rbd_device *rbd_dev = NULL;
1341
1342                 if (obj_request_img_data_test(obj_request))
1343                         rbd_dev = obj_request->img_request->rbd_dev;
1344                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1345                         obj_request);
1346         }
1347 }
1348
1349 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1350 {
1351         smp_mb();
1352         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1353 }
1354
1355 /*
1356  * This sets the KNOWN flag after (possibly) setting the EXISTS
1357  * flag.  The latter is set based on the "exists" value provided.
1358  *
1359  * Note that for our purposes once an object exists it never goes
1360  * away again.  It's possible that the response from two existence
1361  * checks are separated by the creation of the target object, and
1362  * the first ("doesn't exist") response arrives *after* the second
1363  * ("does exist").  In that case we ignore the second one.
1364  */
1365 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1366                                 bool exists)
1367 {
1368         if (exists)
1369                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1370         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1371         smp_mb();
1372 }
1373
1374 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1375 {
1376         smp_mb();
1377         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1378 }
1379
1380 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1381 {
1382         smp_mb();
1383         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1384 }
1385
1386 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1387 {
1388         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1389                 atomic_read(&obj_request->kref.refcount));
1390         kref_get(&obj_request->kref);
1391 }
1392
1393 static void rbd_obj_request_destroy(struct kref *kref);
1394 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1395 {
1396         rbd_assert(obj_request != NULL);
1397         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1398                 atomic_read(&obj_request->kref.refcount));
1399         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1400 }
1401
1402 static bool img_request_child_test(struct rbd_img_request *img_request);
1403 static void rbd_parent_request_destroy(struct kref *kref);
1404 static void rbd_img_request_destroy(struct kref *kref);
1405 static void rbd_img_request_put(struct rbd_img_request *img_request)
1406 {
1407         rbd_assert(img_request != NULL);
1408         dout("%s: img %p (was %d)\n", __func__, img_request,
1409                 atomic_read(&img_request->kref.refcount));
1410         if (img_request_child_test(img_request))
1411                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1412         else
1413                 kref_put(&img_request->kref, rbd_img_request_destroy);
1414 }
1415
1416 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1417                                         struct rbd_obj_request *obj_request)
1418 {
1419         rbd_assert(obj_request->img_request == NULL);
1420
1421         /* Image request now owns object's original reference */
1422         obj_request->img_request = img_request;
1423         obj_request->which = img_request->obj_request_count;
1424         rbd_assert(!obj_request_img_data_test(obj_request));
1425         obj_request_img_data_set(obj_request);
1426         rbd_assert(obj_request->which != BAD_WHICH);
1427         img_request->obj_request_count++;
1428         list_add_tail(&obj_request->links, &img_request->obj_requests);
1429         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1430                 obj_request->which);
1431 }
1432
1433 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1434                                         struct rbd_obj_request *obj_request)
1435 {
1436         rbd_assert(obj_request->which != BAD_WHICH);
1437
1438         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1439                 obj_request->which);
1440         list_del(&obj_request->links);
1441         rbd_assert(img_request->obj_request_count > 0);
1442         img_request->obj_request_count--;
1443         rbd_assert(obj_request->which == img_request->obj_request_count);
1444         obj_request->which = BAD_WHICH;
1445         rbd_assert(obj_request_img_data_test(obj_request));
1446         rbd_assert(obj_request->img_request == img_request);
1447         obj_request->img_request = NULL;
1448         obj_request->callback = NULL;
1449         rbd_obj_request_put(obj_request);
1450 }
1451
1452 static bool obj_request_type_valid(enum obj_request_type type)
1453 {
1454         switch (type) {
1455         case OBJ_REQUEST_NODATA:
1456         case OBJ_REQUEST_BIO:
1457         case OBJ_REQUEST_PAGES:
1458                 return true;
1459         default:
1460                 return false;
1461         }
1462 }
1463
1464 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1465                                 struct rbd_obj_request *obj_request)
1466 {
1467         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1468
1469         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1470 }
1471
1472 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1473 {
1474
1475         dout("%s: img %p\n", __func__, img_request);
1476
1477         /*
1478          * If no error occurred, compute the aggregate transfer
1479          * count for the image request.  We could instead use
1480          * atomic64_cmpxchg() to update it as each object request
1481          * completes; not clear which way is better off hand.
1482          */
1483         if (!img_request->result) {
1484                 struct rbd_obj_request *obj_request;
1485                 u64 xferred = 0;
1486
1487                 for_each_obj_request(img_request, obj_request)
1488                         xferred += obj_request->xferred;
1489                 img_request->xferred = xferred;
1490         }
1491
1492         if (img_request->callback)
1493                 img_request->callback(img_request);
1494         else
1495                 rbd_img_request_put(img_request);
1496 }
1497
1498 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1499
1500 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1501 {
1502         dout("%s: obj %p\n", __func__, obj_request);
1503
1504         return wait_for_completion_interruptible(&obj_request->completion);
1505 }
1506
1507 /*
1508  * The default/initial value for all image request flags is 0.  Each
1509  * is conditionally set to 1 at image request initialization time
1510  * and currently never change thereafter.
1511  */
1512 static void img_request_write_set(struct rbd_img_request *img_request)
1513 {
1514         set_bit(IMG_REQ_WRITE, &img_request->flags);
1515         smp_mb();
1516 }
1517
1518 static bool img_request_write_test(struct rbd_img_request *img_request)
1519 {
1520         smp_mb();
1521         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1522 }
1523
1524 static void img_request_child_set(struct rbd_img_request *img_request)
1525 {
1526         set_bit(IMG_REQ_CHILD, &img_request->flags);
1527         smp_mb();
1528 }
1529
1530 static void img_request_child_clear(struct rbd_img_request *img_request)
1531 {
1532         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1533         smp_mb();
1534 }
1535
1536 static bool img_request_child_test(struct rbd_img_request *img_request)
1537 {
1538         smp_mb();
1539         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1540 }
1541
1542 static void img_request_layered_set(struct rbd_img_request *img_request)
1543 {
1544         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1545         smp_mb();
1546 }
1547
1548 static void img_request_layered_clear(struct rbd_img_request *img_request)
1549 {
1550         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1551         smp_mb();
1552 }
1553
1554 static bool img_request_layered_test(struct rbd_img_request *img_request)
1555 {
1556         smp_mb();
1557         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1558 }
1559
1560 static void
1561 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1562 {
1563         u64 xferred = obj_request->xferred;
1564         u64 length = obj_request->length;
1565
1566         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1567                 obj_request, obj_request->img_request, obj_request->result,
1568                 xferred, length);
1569         /*
1570          * ENOENT means a hole in the image.  We zero-fill the
1571          * entire length of the request.  A short read also implies
1572          * zero-fill to the end of the request.  Either way we
1573          * update the xferred count to indicate the whole request
1574          * was satisfied.
1575          */
1576         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1577         if (obj_request->result == -ENOENT) {
1578                 if (obj_request->type == OBJ_REQUEST_BIO)
1579                         zero_bio_chain(obj_request->bio_list, 0);
1580                 else
1581                         zero_pages(obj_request->pages, 0, length);
1582                 obj_request->result = 0;
1583                 obj_request->xferred = length;
1584         } else if (xferred < length && !obj_request->result) {
1585                 if (obj_request->type == OBJ_REQUEST_BIO)
1586                         zero_bio_chain(obj_request->bio_list, xferred);
1587                 else
1588                         zero_pages(obj_request->pages, xferred, length);
1589                 obj_request->xferred = length;
1590         }
1591         obj_request_done_set(obj_request);
1592 }
1593
1594 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1595 {
1596         dout("%s: obj %p cb %p\n", __func__, obj_request,
1597                 obj_request->callback);
1598         if (obj_request->callback)
1599                 obj_request->callback(obj_request);
1600         else
1601                 complete_all(&obj_request->completion);
1602 }
1603
1604 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1605 {
1606         dout("%s: obj %p\n", __func__, obj_request);
1607         obj_request_done_set(obj_request);
1608 }
1609
1610 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1611 {
1612         struct rbd_img_request *img_request = NULL;
1613         struct rbd_device *rbd_dev = NULL;
1614         bool layered = false;
1615
1616         if (obj_request_img_data_test(obj_request)) {
1617                 img_request = obj_request->img_request;
1618                 layered = img_request && img_request_layered_test(img_request);
1619                 rbd_dev = img_request->rbd_dev;
1620         }
1621
1622         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1623                 obj_request, img_request, obj_request->result,
1624                 obj_request->xferred, obj_request->length);
1625         if (layered && obj_request->result == -ENOENT &&
1626                         obj_request->img_offset < rbd_dev->parent_overlap)
1627                 rbd_img_parent_read(obj_request);
1628         else if (img_request)
1629                 rbd_img_obj_request_read_callback(obj_request);
1630         else
1631                 obj_request_done_set(obj_request);
1632 }
1633
1634 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1635 {
1636         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1637                 obj_request->result, obj_request->length);
1638         /*
1639          * There is no such thing as a successful short write.  Set
1640          * it to our originally-requested length.
1641          */
1642         obj_request->xferred = obj_request->length;
1643         obj_request_done_set(obj_request);
1644 }
1645
1646 /*
1647  * For a simple stat call there's nothing to do.  We'll do more if
1648  * this is part of a write sequence for a layered image.
1649  */
1650 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1651 {
1652         dout("%s: obj %p\n", __func__, obj_request);
1653         obj_request_done_set(obj_request);
1654 }
1655
1656 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1657                                 struct ceph_msg *msg)
1658 {
1659         struct rbd_obj_request *obj_request = osd_req->r_priv;
1660         u16 opcode;
1661
1662         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1663         rbd_assert(osd_req == obj_request->osd_req);
1664         if (obj_request_img_data_test(obj_request)) {
1665                 rbd_assert(obj_request->img_request);
1666                 rbd_assert(obj_request->which != BAD_WHICH);
1667         } else {
1668                 rbd_assert(obj_request->which == BAD_WHICH);
1669         }
1670
1671         if (osd_req->r_result < 0)
1672                 obj_request->result = osd_req->r_result;
1673
1674         BUG_ON(osd_req->r_num_ops > 2);
1675
1676         /*
1677          * We support a 64-bit length, but ultimately it has to be
1678          * passed to blk_end_request(), which takes an unsigned int.
1679          */
1680         obj_request->xferred = osd_req->r_reply_op_len[0];
1681         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1682         opcode = osd_req->r_ops[0].op;
1683         switch (opcode) {
1684         case CEPH_OSD_OP_READ:
1685                 rbd_osd_read_callback(obj_request);
1686                 break;
1687         case CEPH_OSD_OP_WRITE:
1688                 rbd_osd_write_callback(obj_request);
1689                 break;
1690         case CEPH_OSD_OP_STAT:
1691                 rbd_osd_stat_callback(obj_request);
1692                 break;
1693         case CEPH_OSD_OP_CALL:
1694         case CEPH_OSD_OP_NOTIFY_ACK:
1695         case CEPH_OSD_OP_WATCH:
1696                 rbd_osd_trivial_callback(obj_request);
1697                 break;
1698         default:
1699                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1700                         obj_request->object_name, (unsigned short) opcode);
1701                 break;
1702         }
1703
1704         if (obj_request_done_test(obj_request))
1705                 rbd_obj_request_complete(obj_request);
1706 }
1707
1708 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1709 {
1710         struct rbd_img_request *img_request = obj_request->img_request;
1711         struct ceph_osd_request *osd_req = obj_request->osd_req;
1712         u64 snap_id;
1713
1714         rbd_assert(osd_req != NULL);
1715
1716         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1717         ceph_osdc_build_request(osd_req, obj_request->offset,
1718                         NULL, snap_id, NULL);
1719 }
1720
1721 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1722 {
1723         struct rbd_img_request *img_request = obj_request->img_request;
1724         struct ceph_osd_request *osd_req = obj_request->osd_req;
1725         struct ceph_snap_context *snapc;
1726         struct timespec mtime = CURRENT_TIME;
1727
1728         rbd_assert(osd_req != NULL);
1729
1730         snapc = img_request ? img_request->snapc : NULL;
1731         ceph_osdc_build_request(osd_req, obj_request->offset,
1732                         snapc, CEPH_NOSNAP, &mtime);
1733 }
1734
1735 static struct ceph_osd_request *rbd_osd_req_create(
1736                                         struct rbd_device *rbd_dev,
1737                                         bool write_request,
1738                                         struct rbd_obj_request *obj_request)
1739 {
1740         struct ceph_snap_context *snapc = NULL;
1741         struct ceph_osd_client *osdc;
1742         struct ceph_osd_request *osd_req;
1743
1744         if (obj_request_img_data_test(obj_request)) {
1745                 struct rbd_img_request *img_request = obj_request->img_request;
1746
1747                 rbd_assert(write_request ==
1748                                 img_request_write_test(img_request));
1749                 if (write_request)
1750                         snapc = img_request->snapc;
1751         }
1752
1753         /* Allocate and initialize the request, for the single op */
1754
1755         osdc = &rbd_dev->rbd_client->client->osdc;
1756         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1757         if (!osd_req)
1758                 return NULL;    /* ENOMEM */
1759
1760         if (write_request)
1761                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1762         else
1763                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1764
1765         osd_req->r_callback = rbd_osd_req_callback;
1766         osd_req->r_priv = obj_request;
1767
1768         osd_req->r_oid_len = strlen(obj_request->object_name);
1769         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1770         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1771
1772         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1773
1774         return osd_req;
1775 }
1776
1777 /*
1778  * Create a copyup osd request based on the information in the
1779  * object request supplied.  A copyup request has two osd ops,
1780  * a copyup method call, and a "normal" write request.
1781  */
1782 static struct ceph_osd_request *
1783 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1784 {
1785         struct rbd_img_request *img_request;
1786         struct ceph_snap_context *snapc;
1787         struct rbd_device *rbd_dev;
1788         struct ceph_osd_client *osdc;
1789         struct ceph_osd_request *osd_req;
1790
1791         rbd_assert(obj_request_img_data_test(obj_request));
1792         img_request = obj_request->img_request;
1793         rbd_assert(img_request);
1794         rbd_assert(img_request_write_test(img_request));
1795
1796         /* Allocate and initialize the request, for the two ops */
1797
1798         snapc = img_request->snapc;
1799         rbd_dev = img_request->rbd_dev;
1800         osdc = &rbd_dev->rbd_client->client->osdc;
1801         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1802         if (!osd_req)
1803                 return NULL;    /* ENOMEM */
1804
1805         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1806         osd_req->r_callback = rbd_osd_req_callback;
1807         osd_req->r_priv = obj_request;
1808
1809         osd_req->r_oid_len = strlen(obj_request->object_name);
1810         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1811         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1812
1813         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1814
1815         return osd_req;
1816 }
1817
1818
1819 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1820 {
1821         ceph_osdc_put_request(osd_req);
1822 }
1823
1824 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1825
1826 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1827                                                 u64 offset, u64 length,
1828                                                 enum obj_request_type type)
1829 {
1830         struct rbd_obj_request *obj_request;
1831         size_t size;
1832         char *name;
1833
1834         rbd_assert(obj_request_type_valid(type));
1835
1836         size = strlen(object_name) + 1;
1837         name = kmalloc(size, GFP_KERNEL);
1838         if (!name)
1839                 return NULL;
1840
1841         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1842         if (!obj_request) {
1843                 kfree(name);
1844                 return NULL;
1845         }
1846
1847         obj_request->object_name = memcpy(name, object_name, size);
1848         obj_request->offset = offset;
1849         obj_request->length = length;
1850         obj_request->flags = 0;
1851         obj_request->which = BAD_WHICH;
1852         obj_request->type = type;
1853         INIT_LIST_HEAD(&obj_request->links);
1854         init_completion(&obj_request->completion);
1855         kref_init(&obj_request->kref);
1856
1857         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1858                 offset, length, (int)type, obj_request);
1859
1860         return obj_request;
1861 }
1862
1863 static void rbd_obj_request_destroy(struct kref *kref)
1864 {
1865         struct rbd_obj_request *obj_request;
1866
1867         obj_request = container_of(kref, struct rbd_obj_request, kref);
1868
1869         dout("%s: obj %p\n", __func__, obj_request);
1870
1871         rbd_assert(obj_request->img_request == NULL);
1872         rbd_assert(obj_request->which == BAD_WHICH);
1873
1874         if (obj_request->osd_req)
1875                 rbd_osd_req_destroy(obj_request->osd_req);
1876
1877         rbd_assert(obj_request_type_valid(obj_request->type));
1878         switch (obj_request->type) {
1879         case OBJ_REQUEST_NODATA:
1880                 break;          /* Nothing to do */
1881         case OBJ_REQUEST_BIO:
1882                 if (obj_request->bio_list)
1883                         bio_chain_put(obj_request->bio_list);
1884                 break;
1885         case OBJ_REQUEST_PAGES:
1886                 if (obj_request->pages)
1887                         ceph_release_page_vector(obj_request->pages,
1888                                                 obj_request->page_count);
1889                 break;
1890         }
1891
1892         kfree(obj_request->object_name);
1893         obj_request->object_name = NULL;
1894         kmem_cache_free(rbd_obj_request_cache, obj_request);
1895 }
1896
1897 /* It's OK to call this for a device with no parent */
1898
1899 static void rbd_spec_put(struct rbd_spec *spec);
1900 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1901 {
1902         rbd_dev_remove_parent(rbd_dev);
1903         rbd_spec_put(rbd_dev->parent_spec);
1904         rbd_dev->parent_spec = NULL;
1905         rbd_dev->parent_overlap = 0;
1906 }
1907
1908 /*
1909  * Parent image reference counting is used to determine when an
1910  * image's parent fields can be safely torn down--after there are no
1911  * more in-flight requests to the parent image.  When the last
1912  * reference is dropped, cleaning them up is safe.
1913  */
1914 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1915 {
1916         int counter;
1917
1918         if (!rbd_dev->parent_spec)
1919                 return;
1920
1921         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1922         if (counter > 0)
1923                 return;
1924
1925         /* Last reference; clean up parent data structures */
1926
1927         if (!counter)
1928                 rbd_dev_unparent(rbd_dev);
1929         else
1930                 rbd_warn(rbd_dev, "parent reference underflow\n");
1931 }
1932
1933 /*
1934  * If an image has a non-zero parent overlap, get a reference to its
1935  * parent.
1936  *
1937  * We must get the reference before checking for the overlap to
1938  * coordinate properly with zeroing the parent overlap in
1939  * rbd_dev_v2_parent_info() when an image gets flattened.  We
1940  * drop it again if there is no overlap.
1941  *
1942  * Returns true if the rbd device has a parent with a non-zero
1943  * overlap and a reference for it was successfully taken, or
1944  * false otherwise.
1945  */
1946 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1947 {
1948         int counter;
1949
1950         if (!rbd_dev->parent_spec)
1951                 return false;
1952
1953         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1954         if (counter > 0 && rbd_dev->parent_overlap)
1955                 return true;
1956
1957         /* Image was flattened, but parent is not yet torn down */
1958
1959         if (counter < 0)
1960                 rbd_warn(rbd_dev, "parent reference overflow\n");
1961
1962         return false;
1963 }
1964
1965 /*
1966  * Caller is responsible for filling in the list of object requests
1967  * that comprises the image request, and the Linux request pointer
1968  * (if there is one).
1969  */
1970 static struct rbd_img_request *rbd_img_request_create(
1971                                         struct rbd_device *rbd_dev,
1972                                         u64 offset, u64 length,
1973                                         bool write_request)
1974 {
1975         struct rbd_img_request *img_request;
1976
1977         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1978         if (!img_request)
1979                 return NULL;
1980
1981         if (write_request) {
1982                 down_read(&rbd_dev->header_rwsem);
1983                 ceph_get_snap_context(rbd_dev->header.snapc);
1984                 up_read(&rbd_dev->header_rwsem);
1985         }
1986
1987         img_request->rq = NULL;
1988         img_request->rbd_dev = rbd_dev;
1989         img_request->offset = offset;
1990         img_request->length = length;
1991         img_request->flags = 0;
1992         if (write_request) {
1993                 img_request_write_set(img_request);
1994                 img_request->snapc = rbd_dev->header.snapc;
1995         } else {
1996                 img_request->snap_id = rbd_dev->spec->snap_id;
1997         }
1998         if (rbd_dev_parent_get(rbd_dev))
1999                 img_request_layered_set(img_request);
2000         spin_lock_init(&img_request->completion_lock);
2001         img_request->next_completion = 0;
2002         img_request->callback = NULL;
2003         img_request->result = 0;
2004         img_request->obj_request_count = 0;
2005         INIT_LIST_HEAD(&img_request->obj_requests);
2006         kref_init(&img_request->kref);
2007
2008         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2009                 write_request ? "write" : "read", offset, length,
2010                 img_request);
2011
2012         return img_request;
2013 }
2014
2015 static void rbd_img_request_destroy(struct kref *kref)
2016 {
2017         struct rbd_img_request *img_request;
2018         struct rbd_obj_request *obj_request;
2019         struct rbd_obj_request *next_obj_request;
2020
2021         img_request = container_of(kref, struct rbd_img_request, kref);
2022
2023         dout("%s: img %p\n", __func__, img_request);
2024
2025         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2026                 rbd_img_obj_request_del(img_request, obj_request);
2027         rbd_assert(img_request->obj_request_count == 0);
2028
2029         if (img_request_layered_test(img_request)) {
2030                 img_request_layered_clear(img_request);
2031                 rbd_dev_parent_put(img_request->rbd_dev);
2032         }
2033
2034         if (img_request_write_test(img_request))
2035                 ceph_put_snap_context(img_request->snapc);
2036
2037         kmem_cache_free(rbd_img_request_cache, img_request);
2038 }
2039
2040 static struct rbd_img_request *rbd_parent_request_create(
2041                                         struct rbd_obj_request *obj_request,
2042                                         u64 img_offset, u64 length)
2043 {
2044         struct rbd_img_request *parent_request;
2045         struct rbd_device *rbd_dev;
2046
2047         rbd_assert(obj_request->img_request);
2048         rbd_dev = obj_request->img_request->rbd_dev;
2049
2050         parent_request = rbd_img_request_create(rbd_dev->parent,
2051                                                 img_offset, length, false);
2052         if (!parent_request)
2053                 return NULL;
2054
2055         img_request_child_set(parent_request);
2056         rbd_obj_request_get(obj_request);
2057         parent_request->obj_request = obj_request;
2058
2059         return parent_request;
2060 }
2061
2062 static void rbd_parent_request_destroy(struct kref *kref)
2063 {
2064         struct rbd_img_request *parent_request;
2065         struct rbd_obj_request *orig_request;
2066
2067         parent_request = container_of(kref, struct rbd_img_request, kref);
2068         orig_request = parent_request->obj_request;
2069
2070         parent_request->obj_request = NULL;
2071         rbd_obj_request_put(orig_request);
2072         img_request_child_clear(parent_request);
2073
2074         rbd_img_request_destroy(kref);
2075 }
2076
2077 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2078 {
2079         struct rbd_img_request *img_request;
2080         unsigned int xferred;
2081         int result;
2082         bool more;
2083
2084         rbd_assert(obj_request_img_data_test(obj_request));
2085         img_request = obj_request->img_request;
2086
2087         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2088         xferred = (unsigned int)obj_request->xferred;
2089         result = obj_request->result;
2090         if (result) {
2091                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2092
2093                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2094                         img_request_write_test(img_request) ? "write" : "read",
2095                         obj_request->length, obj_request->img_offset,
2096                         obj_request->offset);
2097                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
2098                         result, xferred);
2099                 if (!img_request->result)
2100                         img_request->result = result;
2101         }
2102
2103         /* Image object requests don't own their page array */
2104
2105         if (obj_request->type == OBJ_REQUEST_PAGES) {
2106                 obj_request->pages = NULL;
2107                 obj_request->page_count = 0;
2108         }
2109
2110         if (img_request_child_test(img_request)) {
2111                 rbd_assert(img_request->obj_request != NULL);
2112                 more = obj_request->which < img_request->obj_request_count - 1;
2113         } else {
2114                 rbd_assert(img_request->rq != NULL);
2115                 more = blk_end_request(img_request->rq, result, xferred);
2116         }
2117
2118         return more;
2119 }
2120
2121 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2122 {
2123         struct rbd_img_request *img_request;
2124         u32 which = obj_request->which;
2125         bool more = true;
2126
2127         rbd_assert(obj_request_img_data_test(obj_request));
2128         img_request = obj_request->img_request;
2129
2130         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2131         rbd_assert(img_request != NULL);
2132         rbd_assert(img_request->obj_request_count > 0);
2133         rbd_assert(which != BAD_WHICH);
2134         rbd_assert(which < img_request->obj_request_count);
2135         rbd_assert(which >= img_request->next_completion);
2136
2137         spin_lock_irq(&img_request->completion_lock);
2138         if (which != img_request->next_completion)
2139                 goto out;
2140
2141         for_each_obj_request_from(img_request, obj_request) {
2142                 rbd_assert(more);
2143                 rbd_assert(which < img_request->obj_request_count);
2144
2145                 if (!obj_request_done_test(obj_request))
2146                         break;
2147                 more = rbd_img_obj_end_request(obj_request);
2148                 which++;
2149         }
2150
2151         rbd_assert(more ^ (which == img_request->obj_request_count));
2152         img_request->next_completion = which;
2153 out:
2154         spin_unlock_irq(&img_request->completion_lock);
2155
2156         if (!more)
2157                 rbd_img_request_complete(img_request);
2158 }
2159
2160 /*
2161  * Split up an image request into one or more object requests, each
2162  * to a different object.  The "type" parameter indicates whether
2163  * "data_desc" is the pointer to the head of a list of bio
2164  * structures, or the base of a page array.  In either case this
2165  * function assumes data_desc describes memory sufficient to hold
2166  * all data described by the image request.
2167  */
2168 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2169                                         enum obj_request_type type,
2170                                         void *data_desc)
2171 {
2172         struct rbd_device *rbd_dev = img_request->rbd_dev;
2173         struct rbd_obj_request *obj_request = NULL;
2174         struct rbd_obj_request *next_obj_request;
2175         bool write_request = img_request_write_test(img_request);
2176         struct bio *bio_list;
2177         unsigned int bio_offset = 0;
2178         struct page **pages;
2179         u64 img_offset;
2180         u64 resid;
2181         u16 opcode;
2182
2183         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2184                 (int)type, data_desc);
2185
2186         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2187         img_offset = img_request->offset;
2188         resid = img_request->length;
2189         rbd_assert(resid > 0);
2190
2191         if (type == OBJ_REQUEST_BIO) {
2192                 bio_list = data_desc;
2193                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2194         } else {
2195                 rbd_assert(type == OBJ_REQUEST_PAGES);
2196                 pages = data_desc;
2197         }
2198
2199         while (resid) {
2200                 struct ceph_osd_request *osd_req;
2201                 const char *object_name;
2202                 u64 offset;
2203                 u64 length;
2204
2205                 object_name = rbd_segment_name(rbd_dev, img_offset);
2206                 if (!object_name)
2207                         goto out_unwind;
2208                 offset = rbd_segment_offset(rbd_dev, img_offset);
2209                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2210                 obj_request = rbd_obj_request_create(object_name,
2211                                                 offset, length, type);
2212                 /* object request has its own copy of the object name */
2213                 rbd_segment_name_free(object_name);
2214                 if (!obj_request)
2215                         goto out_unwind;
2216
2217                 if (type == OBJ_REQUEST_BIO) {
2218                         unsigned int clone_size;
2219
2220                         rbd_assert(length <= (u64)UINT_MAX);
2221                         clone_size = (unsigned int)length;
2222                         obj_request->bio_list =
2223                                         bio_chain_clone_range(&bio_list,
2224                                                                 &bio_offset,
2225                                                                 clone_size,
2226                                                                 GFP_ATOMIC);
2227                         if (!obj_request->bio_list)
2228                                 goto out_partial;
2229                 } else {
2230                         unsigned int page_count;
2231
2232                         obj_request->pages = pages;
2233                         page_count = (u32)calc_pages_for(offset, length);
2234                         obj_request->page_count = page_count;
2235                         if ((offset + length) & ~PAGE_MASK)
2236                                 page_count--;   /* more on last page */
2237                         pages += page_count;
2238                 }
2239
2240                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2241                                                 obj_request);
2242                 if (!osd_req)
2243                         goto out_partial;
2244                 obj_request->osd_req = osd_req;
2245                 obj_request->callback = rbd_img_obj_callback;
2246
2247                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2248                                                 0, 0);
2249                 if (type == OBJ_REQUEST_BIO)
2250                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2251                                         obj_request->bio_list, length);
2252                 else
2253                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2254                                         obj_request->pages, length,
2255                                         offset & ~PAGE_MASK, false, false);
2256
2257                 if (write_request)
2258                         rbd_osd_req_format_write(obj_request);
2259                 else
2260                         rbd_osd_req_format_read(obj_request);
2261
2262                 obj_request->img_offset = img_offset;
2263                 rbd_img_obj_request_add(img_request, obj_request);
2264
2265                 img_offset += length;
2266                 resid -= length;
2267         }
2268
2269         return 0;
2270
2271 out_partial:
2272         rbd_obj_request_put(obj_request);
2273 out_unwind:
2274         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2275                 rbd_obj_request_put(obj_request);
2276
2277         return -ENOMEM;
2278 }
2279
2280 static void
2281 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2282 {
2283         struct rbd_img_request *img_request;
2284         struct rbd_device *rbd_dev;
2285         struct page **pages;
2286         u32 page_count;
2287
2288         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2289         rbd_assert(obj_request_img_data_test(obj_request));
2290         img_request = obj_request->img_request;
2291         rbd_assert(img_request);
2292
2293         rbd_dev = img_request->rbd_dev;
2294         rbd_assert(rbd_dev);
2295
2296         pages = obj_request->copyup_pages;
2297         rbd_assert(pages != NULL);
2298         obj_request->copyup_pages = NULL;
2299         page_count = obj_request->copyup_page_count;
2300         rbd_assert(page_count);
2301         obj_request->copyup_page_count = 0;
2302         ceph_release_page_vector(pages, page_count);
2303
2304         /*
2305          * We want the transfer count to reflect the size of the
2306          * original write request.  There is no such thing as a
2307          * successful short write, so if the request was successful
2308          * we can just set it to the originally-requested length.
2309          */
2310         if (!obj_request->result)
2311                 obj_request->xferred = obj_request->length;
2312
2313         /* Finish up with the normal image object callback */
2314
2315         rbd_img_obj_callback(obj_request);
2316 }
2317
2318 static void
2319 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2320 {
2321         struct rbd_obj_request *orig_request;
2322         struct ceph_osd_request *osd_req;
2323         struct ceph_osd_client *osdc;
2324         struct rbd_device *rbd_dev;
2325         struct page **pages;
2326         u32 page_count;
2327         int img_result;
2328         u64 parent_length;
2329         u64 offset;
2330         u64 length;
2331
2332         rbd_assert(img_request_child_test(img_request));
2333
2334         /* First get what we need from the image request */
2335
2336         pages = img_request->copyup_pages;
2337         rbd_assert(pages != NULL);
2338         img_request->copyup_pages = NULL;
2339         page_count = img_request->copyup_page_count;
2340         rbd_assert(page_count);
2341         img_request->copyup_page_count = 0;
2342
2343         orig_request = img_request->obj_request;
2344         rbd_assert(orig_request != NULL);
2345         rbd_assert(obj_request_type_valid(orig_request->type));
2346         img_result = img_request->result;
2347         parent_length = img_request->length;
2348         rbd_assert(parent_length == img_request->xferred);
2349         rbd_img_request_put(img_request);
2350
2351         rbd_assert(orig_request->img_request);
2352         rbd_dev = orig_request->img_request->rbd_dev;
2353         rbd_assert(rbd_dev);
2354
2355         /*
2356          * If the overlap has become 0 (most likely because the
2357          * image has been flattened) we need to free the pages
2358          * and re-submit the original write request.
2359          */
2360         if (!rbd_dev->parent_overlap) {
2361                 struct ceph_osd_client *osdc;
2362
2363                 ceph_release_page_vector(pages, page_count);
2364                 osdc = &rbd_dev->rbd_client->client->osdc;
2365                 img_result = rbd_obj_request_submit(osdc, orig_request);
2366                 if (!img_result)
2367                         return;
2368         }
2369
2370         if (img_result)
2371                 goto out_err;
2372
2373         /*
2374          * The original osd request is of no use to use any more.
2375          * We need a new one that can hold the two ops in a copyup
2376          * request.  Allocate the new copyup osd request for the
2377          * original request, and release the old one.
2378          */
2379         img_result = -ENOMEM;
2380         osd_req = rbd_osd_req_create_copyup(orig_request);
2381         if (!osd_req)
2382                 goto out_err;
2383         rbd_osd_req_destroy(orig_request->osd_req);
2384         orig_request->osd_req = osd_req;
2385         orig_request->copyup_pages = pages;
2386         orig_request->copyup_page_count = page_count;
2387
2388         /* Initialize the copyup op */
2389
2390         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2391         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2392                                                 false, false);
2393
2394         /* Then the original write request op */
2395
2396         offset = orig_request->offset;
2397         length = orig_request->length;
2398         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2399                                         offset, length, 0, 0);
2400         if (orig_request->type == OBJ_REQUEST_BIO)
2401                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2402                                         orig_request->bio_list, length);
2403         else
2404                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2405                                         orig_request->pages, length,
2406                                         offset & ~PAGE_MASK, false, false);
2407
2408         rbd_osd_req_format_write(orig_request);
2409
2410         /* All set, send it off. */
2411
2412         orig_request->callback = rbd_img_obj_copyup_callback;
2413         osdc = &rbd_dev->rbd_client->client->osdc;
2414         img_result = rbd_obj_request_submit(osdc, orig_request);
2415         if (!img_result)
2416                 return;
2417 out_err:
2418         /* Record the error code and complete the request */
2419
2420         orig_request->result = img_result;
2421         orig_request->xferred = 0;
2422         obj_request_done_set(orig_request);
2423         rbd_obj_request_complete(orig_request);
2424 }
2425
2426 /*
2427  * Read from the parent image the range of data that covers the
2428  * entire target of the given object request.  This is used for
2429  * satisfying a layered image write request when the target of an
2430  * object request from the image request does not exist.
2431  *
2432  * A page array big enough to hold the returned data is allocated
2433  * and supplied to rbd_img_request_fill() as the "data descriptor."
2434  * When the read completes, this page array will be transferred to
2435  * the original object request for the copyup operation.
2436  *
2437  * If an error occurs, record it as the result of the original
2438  * object request and mark it done so it gets completed.
2439  */
2440 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2441 {
2442         struct rbd_img_request *img_request = NULL;
2443         struct rbd_img_request *parent_request = NULL;
2444         struct rbd_device *rbd_dev;
2445         u64 img_offset;
2446         u64 length;
2447         struct page **pages = NULL;
2448         u32 page_count;
2449         int result;
2450
2451         rbd_assert(obj_request_img_data_test(obj_request));
2452         rbd_assert(obj_request_type_valid(obj_request->type));
2453
2454         img_request = obj_request->img_request;
2455         rbd_assert(img_request != NULL);
2456         rbd_dev = img_request->rbd_dev;
2457         rbd_assert(rbd_dev->parent != NULL);
2458
2459         /*
2460          * Determine the byte range covered by the object in the
2461          * child image to which the original request was to be sent.
2462          */
2463         img_offset = obj_request->img_offset - obj_request->offset;
2464         length = (u64)1 << rbd_dev->header.obj_order;
2465
2466         /*
2467          * There is no defined parent data beyond the parent
2468          * overlap, so limit what we read at that boundary if
2469          * necessary.
2470          */
2471         if (img_offset + length > rbd_dev->parent_overlap) {
2472                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2473                 length = rbd_dev->parent_overlap - img_offset;
2474         }
2475
2476         /*
2477          * Allocate a page array big enough to receive the data read
2478          * from the parent.
2479          */
2480         page_count = (u32)calc_pages_for(0, length);
2481         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2482         if (IS_ERR(pages)) {
2483                 result = PTR_ERR(pages);
2484                 pages = NULL;
2485                 goto out_err;
2486         }
2487
2488         result = -ENOMEM;
2489         parent_request = rbd_parent_request_create(obj_request,
2490                                                 img_offset, length);
2491         if (!parent_request)
2492                 goto out_err;
2493
2494         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2495         if (result)
2496                 goto out_err;
2497         parent_request->copyup_pages = pages;
2498         parent_request->copyup_page_count = page_count;
2499
2500         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2501         result = rbd_img_request_submit(parent_request);
2502         if (!result)
2503                 return 0;
2504
2505         parent_request->copyup_pages = NULL;
2506         parent_request->copyup_page_count = 0;
2507         parent_request->obj_request = NULL;
2508         rbd_obj_request_put(obj_request);
2509 out_err:
2510         if (pages)
2511                 ceph_release_page_vector(pages, page_count);
2512         if (parent_request)
2513                 rbd_img_request_put(parent_request);
2514         obj_request->result = result;
2515         obj_request->xferred = 0;
2516         obj_request_done_set(obj_request);
2517
2518         return result;
2519 }
2520
2521 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2522 {
2523         struct rbd_obj_request *orig_request;
2524         struct rbd_device *rbd_dev;
2525         int result;
2526
2527         rbd_assert(!obj_request_img_data_test(obj_request));
2528
2529         /*
2530          * All we need from the object request is the original
2531          * request and the result of the STAT op.  Grab those, then
2532          * we're done with the request.
2533          */
2534         orig_request = obj_request->obj_request;
2535         obj_request->obj_request = NULL;
2536         rbd_assert(orig_request);
2537         rbd_assert(orig_request->img_request);
2538
2539         result = obj_request->result;
2540         obj_request->result = 0;
2541
2542         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2543                 obj_request, orig_request, result,
2544                 obj_request->xferred, obj_request->length);
2545         rbd_obj_request_put(obj_request);
2546
2547         /*
2548          * If the overlap has become 0 (most likely because the
2549          * image has been flattened) we need to free the pages
2550          * and re-submit the original write request.
2551          */
2552         rbd_dev = orig_request->img_request->rbd_dev;
2553         if (!rbd_dev->parent_overlap) {
2554                 struct ceph_osd_client *osdc;
2555
2556                 rbd_obj_request_put(orig_request);
2557                 osdc = &rbd_dev->rbd_client->client->osdc;
2558                 result = rbd_obj_request_submit(osdc, orig_request);
2559                 if (!result)
2560                         return;
2561         }
2562
2563         /*
2564          * Our only purpose here is to determine whether the object
2565          * exists, and we don't want to treat the non-existence as
2566          * an error.  If something else comes back, transfer the
2567          * error to the original request and complete it now.
2568          */
2569         if (!result) {
2570                 obj_request_existence_set(orig_request, true);
2571         } else if (result == -ENOENT) {
2572                 obj_request_existence_set(orig_request, false);
2573         } else if (result) {
2574                 orig_request->result = result;
2575                 goto out;
2576         }
2577
2578         /*
2579          * Resubmit the original request now that we have recorded
2580          * whether the target object exists.
2581          */
2582         orig_request->result = rbd_img_obj_request_submit(orig_request);
2583 out:
2584         if (orig_request->result)
2585                 rbd_obj_request_complete(orig_request);
2586         rbd_obj_request_put(orig_request);
2587 }
2588
2589 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2590 {
2591         struct rbd_obj_request *stat_request;
2592         struct rbd_device *rbd_dev;
2593         struct ceph_osd_client *osdc;
2594         struct page **pages = NULL;
2595         u32 page_count;
2596         size_t size;
2597         int ret;
2598
2599         /*
2600          * The response data for a STAT call consists of:
2601          *     le64 length;
2602          *     struct {
2603          *         le32 tv_sec;
2604          *         le32 tv_nsec;
2605          *     } mtime;
2606          */
2607         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2608         page_count = (u32)calc_pages_for(0, size);
2609         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2610         if (IS_ERR(pages))
2611                 return PTR_ERR(pages);
2612
2613         ret = -ENOMEM;
2614         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2615                                                         OBJ_REQUEST_PAGES);
2616         if (!stat_request)
2617                 goto out;
2618
2619         rbd_obj_request_get(obj_request);
2620         stat_request->obj_request = obj_request;
2621         stat_request->pages = pages;
2622         stat_request->page_count = page_count;
2623
2624         rbd_assert(obj_request->img_request);
2625         rbd_dev = obj_request->img_request->rbd_dev;
2626         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2627                                                 stat_request);
2628         if (!stat_request->osd_req)
2629                 goto out;
2630         stat_request->callback = rbd_img_obj_exists_callback;
2631
2632         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2633         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2634                                         false, false);
2635         rbd_osd_req_format_read(stat_request);
2636
2637         osdc = &rbd_dev->rbd_client->client->osdc;
2638         ret = rbd_obj_request_submit(osdc, stat_request);
2639 out:
2640         if (ret)
2641                 rbd_obj_request_put(obj_request);
2642
2643         return ret;
2644 }
2645
2646 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2647 {
2648         struct rbd_img_request *img_request;
2649         struct rbd_device *rbd_dev;
2650         bool known;
2651
2652         rbd_assert(obj_request_img_data_test(obj_request));
2653
2654         img_request = obj_request->img_request;
2655         rbd_assert(img_request);
2656         rbd_dev = img_request->rbd_dev;
2657
2658         /*
2659          * Only writes to layered images need special handling.
2660          * Reads and non-layered writes are simple object requests.
2661          * Layered writes that start beyond the end of the overlap
2662          * with the parent have no parent data, so they too are
2663          * simple object requests.  Finally, if the target object is
2664          * known to already exist, its parent data has already been
2665          * copied, so a write to the object can also be handled as a
2666          * simple object request.
2667          */
2668         if (!img_request_write_test(img_request) ||
2669                 !img_request_layered_test(img_request) ||
2670                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2671                 ((known = obj_request_known_test(obj_request)) &&
2672                         obj_request_exists_test(obj_request))) {
2673
2674                 struct rbd_device *rbd_dev;
2675                 struct ceph_osd_client *osdc;
2676
2677                 rbd_dev = obj_request->img_request->rbd_dev;
2678                 osdc = &rbd_dev->rbd_client->client->osdc;
2679
2680                 return rbd_obj_request_submit(osdc, obj_request);
2681         }
2682
2683         /*
2684          * It's a layered write.  The target object might exist but
2685          * we may not know that yet.  If we know it doesn't exist,
2686          * start by reading the data for the full target object from
2687          * the parent so we can use it for a copyup to the target.
2688          */
2689         if (known)
2690                 return rbd_img_obj_parent_read_full(obj_request);
2691
2692         /* We don't know whether the target exists.  Go find out. */
2693
2694         return rbd_img_obj_exists_submit(obj_request);
2695 }
2696
2697 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2698 {
2699         struct rbd_obj_request *obj_request;
2700         struct rbd_obj_request *next_obj_request;
2701
2702         dout("%s: img %p\n", __func__, img_request);
2703         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2704                 int ret;
2705
2706                 ret = rbd_img_obj_request_submit(obj_request);
2707                 if (ret)
2708                         return ret;
2709         }
2710
2711         return 0;
2712 }
2713
2714 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2715 {
2716         struct rbd_obj_request *obj_request;
2717         struct rbd_device *rbd_dev;
2718         u64 obj_end;
2719         u64 img_xferred;
2720         int img_result;
2721
2722         rbd_assert(img_request_child_test(img_request));
2723
2724         /* First get what we need from the image request and release it */
2725
2726         obj_request = img_request->obj_request;
2727         img_xferred = img_request->xferred;
2728         img_result = img_request->result;
2729         rbd_img_request_put(img_request);
2730
2731         /*
2732          * If the overlap has become 0 (most likely because the
2733          * image has been flattened) we need to re-submit the
2734          * original request.
2735          */
2736         rbd_assert(obj_request);
2737         rbd_assert(obj_request->img_request);
2738         rbd_dev = obj_request->img_request->rbd_dev;
2739         if (!rbd_dev->parent_overlap) {
2740                 struct ceph_osd_client *osdc;
2741
2742                 osdc = &rbd_dev->rbd_client->client->osdc;
2743                 img_result = rbd_obj_request_submit(osdc, obj_request);
2744                 if (!img_result)
2745                         return;
2746         }
2747
2748         obj_request->result = img_result;
2749         if (obj_request->result)
2750                 goto out;
2751
2752         /*
2753          * We need to zero anything beyond the parent overlap
2754          * boundary.  Since rbd_img_obj_request_read_callback()
2755          * will zero anything beyond the end of a short read, an
2756          * easy way to do this is to pretend the data from the
2757          * parent came up short--ending at the overlap boundary.
2758          */
2759         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2760         obj_end = obj_request->img_offset + obj_request->length;
2761         if (obj_end > rbd_dev->parent_overlap) {
2762                 u64 xferred = 0;
2763
2764                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2765                         xferred = rbd_dev->parent_overlap -
2766                                         obj_request->img_offset;
2767
2768                 obj_request->xferred = min(img_xferred, xferred);
2769         } else {
2770                 obj_request->xferred = img_xferred;
2771         }
2772 out:
2773         rbd_img_obj_request_read_callback(obj_request);
2774         rbd_obj_request_complete(obj_request);
2775 }
2776
2777 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2778 {
2779         struct rbd_img_request *img_request;
2780         int result;
2781
2782         rbd_assert(obj_request_img_data_test(obj_request));
2783         rbd_assert(obj_request->img_request != NULL);
2784         rbd_assert(obj_request->result == (s32) -ENOENT);
2785         rbd_assert(obj_request_type_valid(obj_request->type));
2786
2787         /* rbd_read_finish(obj_request, obj_request->length); */
2788         img_request = rbd_parent_request_create(obj_request,
2789                                                 obj_request->img_offset,
2790                                                 obj_request->length);
2791         result = -ENOMEM;
2792         if (!img_request)
2793                 goto out_err;
2794
2795         if (obj_request->type == OBJ_REQUEST_BIO)
2796                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2797                                                 obj_request->bio_list);
2798         else
2799                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2800                                                 obj_request->pages);
2801         if (result)
2802                 goto out_err;
2803
2804         img_request->callback = rbd_img_parent_read_callback;
2805         result = rbd_img_request_submit(img_request);
2806         if (result)
2807                 goto out_err;
2808
2809         return;
2810 out_err:
2811         if (img_request)
2812                 rbd_img_request_put(img_request);
2813         obj_request->result = result;
2814         obj_request->xferred = 0;
2815         obj_request_done_set(obj_request);
2816 }
2817
2818 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2819 {
2820         struct rbd_obj_request *obj_request;
2821         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2822         int ret;
2823
2824         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2825                                                         OBJ_REQUEST_NODATA);
2826         if (!obj_request)
2827                 return -ENOMEM;
2828
2829         ret = -ENOMEM;
2830         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2831         if (!obj_request->osd_req)
2832                 goto out;
2833         obj_request->callback = rbd_obj_request_put;
2834
2835         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2836                                         notify_id, 0, 0);
2837         rbd_osd_req_format_read(obj_request);
2838
2839         ret = rbd_obj_request_submit(osdc, obj_request);
2840 out:
2841         if (ret)
2842                 rbd_obj_request_put(obj_request);
2843
2844         return ret;
2845 }
2846
2847 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2848 {
2849         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2850         int ret;
2851
2852         if (!rbd_dev)
2853                 return;
2854
2855         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2856                 rbd_dev->header_name, (unsigned long long)notify_id,
2857                 (unsigned int)opcode);
2858         ret = rbd_dev_refresh(rbd_dev);
2859         if (ret)
2860                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2861
2862         rbd_obj_notify_ack(rbd_dev, notify_id);
2863 }
2864
2865 /*
2866  * Request sync osd watch/unwatch.  The value of "start" determines
2867  * whether a watch request is being initiated or torn down.
2868  */
2869 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870 {
2871         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2872         struct rbd_obj_request *obj_request;
2873         int ret;
2874
2875         rbd_assert(start ^ !!rbd_dev->watch_event);
2876         rbd_assert(start ^ !!rbd_dev->watch_request);
2877
2878         if (start) {
2879                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2880                                                 &rbd_dev->watch_event);
2881                 if (ret < 0)
2882                         return ret;
2883                 rbd_assert(rbd_dev->watch_event != NULL);
2884         }
2885
2886         ret = -ENOMEM;
2887         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2888                                                         OBJ_REQUEST_NODATA);
2889         if (!obj_request)
2890                 goto out_cancel;
2891
2892         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2893         if (!obj_request->osd_req)
2894                 goto out_cancel;
2895
2896         if (start)
2897                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2898         else
2899                 ceph_osdc_unregister_linger_request(osdc,
2900                                         rbd_dev->watch_request->osd_req);
2901
2902         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2903                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2904         rbd_osd_req_format_write(obj_request);
2905
2906         ret = rbd_obj_request_submit(osdc, obj_request);
2907         if (ret)
2908                 goto out_cancel;
2909         ret = rbd_obj_request_wait(obj_request);
2910         if (ret)
2911                 goto out_cancel;
2912         ret = obj_request->result;
2913         if (ret)
2914                 goto out_cancel;
2915
2916         /*
2917          * A watch request is set to linger, so the underlying osd
2918          * request won't go away until we unregister it.  We retain
2919          * a pointer to the object request during that time (in
2920          * rbd_dev->watch_request), so we'll keep a reference to
2921          * it.  We'll drop that reference (below) after we've
2922          * unregistered it.
2923          */
2924         if (start) {
2925                 rbd_dev->watch_request = obj_request;
2926
2927                 return 0;
2928         }
2929
2930         /* We have successfully torn down the watch request */
2931
2932         rbd_obj_request_put(rbd_dev->watch_request);
2933         rbd_dev->watch_request = NULL;
2934 out_cancel:
2935         /* Cancel the event if we're tearing down, or on error */
2936         ceph_osdc_cancel_event(rbd_dev->watch_event);
2937         rbd_dev->watch_event = NULL;
2938         if (obj_request)
2939                 rbd_obj_request_put(obj_request);
2940
2941         return ret;
2942 }
2943
2944 /*
2945  * Synchronous osd object method call.  Returns the number of bytes
2946  * returned in the outbound buffer, or a negative error code.
2947  */
2948 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2949                              const char *object_name,
2950                              const char *class_name,
2951                              const char *method_name,
2952                              const void *outbound,
2953                              size_t outbound_size,
2954                              void *inbound,
2955                              size_t inbound_size)
2956 {
2957         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2958         struct rbd_obj_request *obj_request;
2959         struct page **pages;
2960         u32 page_count;
2961         int ret;
2962
2963         /*
2964          * Method calls are ultimately read operations.  The result
2965          * should placed into the inbound buffer provided.  They
2966          * also supply outbound data--parameters for the object
2967          * method.  Currently if this is present it will be a
2968          * snapshot id.
2969          */
2970         page_count = (u32)calc_pages_for(0, inbound_size);
2971         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2972         if (IS_ERR(pages))
2973                 return PTR_ERR(pages);
2974
2975         ret = -ENOMEM;
2976         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2977                                                         OBJ_REQUEST_PAGES);
2978         if (!obj_request)
2979                 goto out;
2980
2981         obj_request->pages = pages;
2982         obj_request->page_count = page_count;
2983
2984         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2985         if (!obj_request->osd_req)
2986                 goto out;
2987
2988         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2989                                         class_name, method_name);
2990         if (outbound_size) {
2991                 struct ceph_pagelist *pagelist;
2992
2993                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2994                 if (!pagelist)
2995                         goto out;
2996
2997                 ceph_pagelist_init(pagelist);
2998                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2999                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3000                                                 pagelist);
3001         }
3002         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3003                                         obj_request->pages, inbound_size,
3004                                         0, false, false);
3005         rbd_osd_req_format_read(obj_request);
3006
3007         ret = rbd_obj_request_submit(osdc, obj_request);
3008         if (ret)
3009                 goto out;
3010         ret = rbd_obj_request_wait(obj_request);
3011         if (ret)
3012                 goto out;
3013
3014         ret = obj_request->result;
3015         if (ret < 0)
3016                 goto out;
3017
3018         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3019         ret = (int)obj_request->xferred;
3020         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3021 out:
3022         if (obj_request)
3023                 rbd_obj_request_put(obj_request);
3024         else
3025                 ceph_release_page_vector(pages, page_count);
3026
3027         return ret;
3028 }
3029
3030 static void rbd_request_fn(struct request_queue *q)
3031                 __releases(q->queue_lock) __acquires(q->queue_lock)
3032 {
3033         struct rbd_device *rbd_dev = q->queuedata;
3034         bool read_only = rbd_dev->mapping.read_only;
3035         struct request *rq;
3036         int result;
3037
3038         while ((rq = blk_fetch_request(q))) {
3039                 bool write_request = rq_data_dir(rq) == WRITE;
3040                 struct rbd_img_request *img_request;
3041                 u64 offset;
3042                 u64 length;
3043
3044                 /* Ignore any non-FS requests that filter through. */
3045
3046                 if (rq->cmd_type != REQ_TYPE_FS) {
3047                         dout("%s: non-fs request type %d\n", __func__,
3048                                 (int) rq->cmd_type);
3049                         __blk_end_request_all(rq, 0);
3050                         continue;
3051                 }
3052
3053                 /* Ignore/skip any zero-length requests */
3054
3055                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3056                 length = (u64) blk_rq_bytes(rq);
3057
3058                 if (!length) {
3059                         dout("%s: zero-length request\n", __func__);
3060                         __blk_end_request_all(rq, 0);
3061                         continue;
3062                 }
3063
3064                 spin_unlock_irq(q->queue_lock);
3065
3066                 /* Disallow writes to a read-only device */
3067
3068                 if (write_request) {
3069                         result = -EROFS;
3070                         if (read_only)
3071                                 goto end_request;
3072                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3073                 }
3074
3075                 /*
3076                  * Quit early if the mapped snapshot no longer
3077                  * exists.  It's still possible the snapshot will
3078                  * have disappeared by the time our request arrives
3079                  * at the osd, but there's no sense in sending it if
3080                  * we already know.
3081                  */
3082                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3083                         dout("request for non-existent snapshot");
3084                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3085                         result = -ENXIO;
3086                         goto end_request;
3087                 }
3088
3089                 result = -EINVAL;
3090                 if (offset && length > U64_MAX - offset + 1) {
3091                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3092                                 offset, length);
3093                         goto end_request;       /* Shouldn't happen */
3094                 }
3095
3096                 result = -EIO;
3097                 if (offset + length > rbd_dev->mapping.size) {
3098                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3099                                 offset, length, rbd_dev->mapping.size);
3100                         goto end_request;
3101                 }
3102
3103                 result = -ENOMEM;
3104                 img_request = rbd_img_request_create(rbd_dev, offset, length,
3105                                                         write_request);
3106                 if (!img_request)
3107                         goto end_request;
3108
3109                 img_request->rq = rq;
3110
3111                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3112                                                 rq->bio);
3113                 if (!result)
3114                         result = rbd_img_request_submit(img_request);
3115                 if (result)
3116                         rbd_img_request_put(img_request);
3117 end_request:
3118                 spin_lock_irq(q->queue_lock);
3119                 if (result < 0) {
3120                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3121                                 write_request ? "write" : "read",
3122                                 length, offset, result);
3123
3124                         __blk_end_request_all(rq, result);
3125                 }
3126         }
3127 }
3128
3129 /*
3130  * a queue callback. Makes sure that we don't create a bio that spans across
3131  * multiple osd objects. One exception would be with a single page bios,
3132  * which we handle later at bio_chain_clone_range()
3133  */
3134 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3135                           struct bio_vec *bvec)
3136 {
3137         struct rbd_device *rbd_dev = q->queuedata;
3138         sector_t sector_offset;
3139         sector_t sectors_per_obj;
3140         sector_t obj_sector_offset;
3141         int ret;
3142
3143         /*
3144          * Find how far into its rbd object the partition-relative
3145          * bio start sector is to offset relative to the enclosing
3146          * device.
3147          */
3148         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3149         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3150         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3151
3152         /*
3153          * Compute the number of bytes from that offset to the end
3154          * of the object.  Account for what's already used by the bio.
3155          */
3156         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3157         if (ret > bmd->bi_size)
3158                 ret -= bmd->bi_size;
3159         else
3160                 ret = 0;
3161
3162         /*
3163          * Don't send back more than was asked for.  And if the bio
3164          * was empty, let the whole thing through because:  "Note
3165          * that a block device *must* allow a single page to be
3166          * added to an empty bio."
3167          */
3168         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3169         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3170                 ret = (int) bvec->bv_len;
3171
3172         return ret;
3173 }
3174
3175 static void rbd_free_disk(struct rbd_device *rbd_dev)
3176 {
3177         struct gendisk *disk = rbd_dev->disk;
3178
3179         if (!disk)
3180                 return;
3181
3182         rbd_dev->disk = NULL;
3183         if (disk->flags & GENHD_FL_UP) {
3184                 del_gendisk(disk);
3185                 if (disk->queue)
3186                         blk_cleanup_queue(disk->queue);
3187         }
3188         put_disk(disk);
3189 }
3190
3191 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3192                                 const char *object_name,
3193                                 u64 offset, u64 length, void *buf)
3194
3195 {
3196         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3197         struct rbd_obj_request *obj_request;
3198         struct page **pages = NULL;
3199         u32 page_count;
3200         size_t size;
3201         int ret;
3202
3203         page_count = (u32) calc_pages_for(offset, length);
3204         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3205         if (IS_ERR(pages))
3206                 ret = PTR_ERR(pages);
3207
3208         ret = -ENOMEM;
3209         obj_request = rbd_obj_request_create(object_name, offset, length,
3210                                                         OBJ_REQUEST_PAGES);
3211         if (!obj_request)
3212                 goto out;
3213
3214         obj_request->pages = pages;
3215         obj_request->page_count = page_count;
3216
3217         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3218         if (!obj_request->osd_req)
3219                 goto out;
3220
3221         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3222                                         offset, length, 0, 0);
3223         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3224                                         obj_request->pages,
3225                                         obj_request->length,
3226                                         obj_request->offset & ~PAGE_MASK,
3227                                         false, false);
3228         rbd_osd_req_format_read(obj_request);
3229
3230         ret = rbd_obj_request_submit(osdc, obj_request);
3231         if (ret)
3232                 goto out;
3233         ret = rbd_obj_request_wait(obj_request);
3234         if (ret)
3235                 goto out;
3236
3237         ret = obj_request->result;
3238         if (ret < 0)
3239                 goto out;
3240
3241         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3242         size = (size_t) obj_request->xferred;
3243         ceph_copy_from_page_vector(pages, buf, 0, size);
3244         rbd_assert(size <= (size_t)INT_MAX);
3245         ret = (int)size;
3246 out:
3247         if (obj_request)
3248                 rbd_obj_request_put(obj_request);
3249         else
3250                 ceph_release_page_vector(pages, page_count);
3251
3252         return ret;
3253 }
3254
3255 /*
3256  * Read the complete header for the given rbd device.  On successful
3257  * return, the rbd_dev->header field will contain up-to-date
3258  * information about the image.
3259  */
3260 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3261 {
3262         struct rbd_image_header_ondisk *ondisk = NULL;
3263         u32 snap_count = 0;
3264         u64 names_size = 0;
3265         u32 want_count;
3266         int ret;
3267
3268         /*
3269          * The complete header will include an array of its 64-bit
3270          * snapshot ids, followed by the names of those snapshots as
3271          * a contiguous block of NUL-terminated strings.  Note that
3272          * the number of snapshots could change by the time we read
3273          * it in, in which case we re-read it.
3274          */
3275         do {
3276                 size_t size;
3277
3278                 kfree(ondisk);
3279
3280                 size = sizeof (*ondisk);
3281                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3282                 size += names_size;
3283                 ondisk = kmalloc(size, GFP_KERNEL);
3284                 if (!ondisk)
3285                         return -ENOMEM;
3286
3287                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3288                                        0, size, ondisk);
3289                 if (ret < 0)
3290                         goto out;
3291                 if ((size_t)ret < size) {
3292                         ret = -ENXIO;
3293                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3294                                 size, ret);
3295                         goto out;
3296                 }
3297                 if (!rbd_dev_ondisk_valid(ondisk)) {
3298                         ret = -ENXIO;
3299                         rbd_warn(rbd_dev, "invalid header");
3300                         goto out;
3301                 }
3302
3303                 names_size = le64_to_cpu(ondisk->snap_names_len);
3304                 want_count = snap_count;
3305                 snap_count = le32_to_cpu(ondisk->snap_count);
3306         } while (snap_count != want_count);
3307
3308         ret = rbd_header_from_disk(rbd_dev, ondisk);
3309 out:
3310         kfree(ondisk);
3311
3312         return ret;
3313 }
3314
3315 /*
3316  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3317  * has disappeared from the (just updated) snapshot context.
3318  */
3319 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3320 {
3321         u64 snap_id;
3322
3323         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3324                 return;
3325
3326         snap_id = rbd_dev->spec->snap_id;
3327         if (snap_id == CEPH_NOSNAP)
3328                 return;
3329
3330         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3331                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3332 }
3333
3334 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3335 {
3336         u64 mapping_size;
3337         int ret;
3338
3339         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3340         mapping_size = rbd_dev->mapping.size;
3341         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3342         if (rbd_dev->image_format == 1)
3343                 ret = rbd_dev_v1_header_info(rbd_dev);
3344         else
3345                 ret = rbd_dev_v2_header_info(rbd_dev);
3346
3347         /* If it's a mapped snapshot, validate its EXISTS flag */
3348
3349         rbd_exists_validate(rbd_dev);
3350         mutex_unlock(&ctl_mutex);
3351         if (mapping_size != rbd_dev->mapping.size) {
3352                 sector_t size;
3353
3354                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3355                 dout("setting size to %llu sectors", (unsigned long long)size);
3356                 set_capacity(rbd_dev->disk, size);
3357                 revalidate_disk(rbd_dev->disk);
3358         }
3359
3360         return ret;
3361 }
3362
3363 static int rbd_init_disk(struct rbd_device *rbd_dev)
3364 {
3365         struct gendisk *disk;
3366         struct request_queue *q;
3367         u64 segment_size;
3368
3369         /* create gendisk info */
3370         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3371         if (!disk)
3372                 return -ENOMEM;
3373
3374         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3375                  rbd_dev->dev_id);
3376         disk->major = rbd_dev->major;
3377         disk->first_minor = 0;
3378         disk->fops = &rbd_bd_ops;
3379         disk->private_data = rbd_dev;
3380
3381         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3382         if (!q)
3383                 goto out_disk;
3384
3385         /* We use the default size, but let's be explicit about it. */
3386         blk_queue_physical_block_size(q, SECTOR_SIZE);
3387
3388         /* set io sizes to object size */
3389         segment_size = rbd_obj_bytes(&rbd_dev->header);
3390         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3391         blk_queue_max_segment_size(q, segment_size);
3392         blk_queue_io_min(q, segment_size);
3393         blk_queue_io_opt(q, segment_size);
3394
3395         blk_queue_merge_bvec(q, rbd_merge_bvec);
3396         disk->queue = q;
3397
3398         q->queuedata = rbd_dev;
3399
3400         rbd_dev->disk = disk;
3401
3402         return 0;
3403 out_disk:
3404         put_disk(disk);
3405
3406         return -ENOMEM;
3407 }
3408
3409 /*
3410   sysfs
3411 */
3412
3413 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3414 {
3415         return container_of(dev, struct rbd_device, dev);
3416 }
3417
3418 static ssize_t rbd_size_show(struct device *dev,
3419                              struct device_attribute *attr, char *buf)
3420 {
3421         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3422
3423         return sprintf(buf, "%llu\n",
3424                 (unsigned long long)rbd_dev->mapping.size);
3425 }
3426
3427 /*
3428  * Note this shows the features for whatever's mapped, which is not
3429  * necessarily the base image.
3430  */
3431 static ssize_t rbd_features_show(struct device *dev,
3432                              struct device_attribute *attr, char *buf)
3433 {
3434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3435
3436         return sprintf(buf, "0x%016llx\n",
3437                         (unsigned long long)rbd_dev->mapping.features);
3438 }
3439
3440 static ssize_t rbd_major_show(struct device *dev,
3441                               struct device_attribute *attr, char *buf)
3442 {
3443         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3444
3445         if (rbd_dev->major)
3446                 return sprintf(buf, "%d\n", rbd_dev->major);
3447
3448         return sprintf(buf, "(none)\n");
3449
3450 }
3451
3452 static ssize_t rbd_client_id_show(struct device *dev,
3453                                   struct device_attribute *attr, char *buf)
3454 {
3455         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3456
3457         return sprintf(buf, "client%lld\n",
3458                         ceph_client_id(rbd_dev->rbd_client->client));
3459 }
3460
3461 static ssize_t rbd_pool_show(struct device *dev,
3462                              struct device_attribute *attr, char *buf)
3463 {
3464         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3465
3466         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3467 }
3468
3469 static ssize_t rbd_pool_id_show(struct device *dev,
3470                              struct device_attribute *attr, char *buf)
3471 {
3472         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3473
3474         return sprintf(buf, "%llu\n",
3475                         (unsigned long long) rbd_dev->spec->pool_id);
3476 }
3477
3478 static ssize_t rbd_name_show(struct device *dev,
3479                              struct device_attribute *attr, char *buf)
3480 {
3481         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3482
3483         if (rbd_dev->spec->image_name)
3484                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3485
3486         return sprintf(buf, "(unknown)\n");
3487 }
3488
3489 static ssize_t rbd_image_id_show(struct device *dev,
3490                              struct device_attribute *attr, char *buf)
3491 {
3492         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3493
3494         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3495 }
3496
3497 /*
3498  * Shows the name of the currently-mapped snapshot (or
3499  * RBD_SNAP_HEAD_NAME for the base image).
3500  */
3501 static ssize_t rbd_snap_show(struct device *dev,
3502                              struct device_attribute *attr,
3503                              char *buf)
3504 {
3505         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3506
3507         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3508 }
3509
3510 /*
3511  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3512  * for the parent image.  If there is no parent, simply shows
3513  * "(no parent image)".
3514  */
3515 static ssize_t rbd_parent_show(struct device *dev,
3516                              struct device_attribute *attr,
3517                              char *buf)
3518 {
3519         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3520         struct rbd_spec *spec = rbd_dev->parent_spec;
3521         int count;
3522         char *bufp = buf;
3523
3524         if (!spec)
3525                 return sprintf(buf, "(no parent image)\n");
3526
3527         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3528                         (unsigned long long) spec->pool_id, spec->pool_name);
3529         if (count < 0)
3530                 return count;
3531         bufp += count;
3532
3533         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3534                         spec->image_name ? spec->image_name : "(unknown)");
3535         if (count < 0)
3536                 return count;
3537         bufp += count;
3538
3539         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3540                         (unsigned long long) spec->snap_id, spec->snap_name);
3541         if (count < 0)
3542                 return count;
3543         bufp += count;
3544
3545         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3546         if (count < 0)
3547                 return count;
3548         bufp += count;
3549
3550         return (ssize_t) (bufp - buf);
3551 }
3552
3553 static ssize_t rbd_image_refresh(struct device *dev,
3554                                  struct device_attribute *attr,
3555                                  const char *buf,
3556                                  size_t size)
3557 {
3558         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3559         int ret;
3560
3561         ret = rbd_dev_refresh(rbd_dev);
3562         if (ret)
3563                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3564
3565         return ret < 0 ? ret : size;
3566 }
3567
3568 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3569 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3570 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3571 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3572 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3573 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3574 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3575 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3576 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3577 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3578 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3579
3580 static struct attribute *rbd_attrs[] = {
3581         &dev_attr_size.attr,
3582         &dev_attr_features.attr,
3583         &dev_attr_major.attr,
3584         &dev_attr_client_id.attr,
3585         &dev_attr_pool.attr,
3586         &dev_attr_pool_id.attr,
3587         &dev_attr_name.attr,
3588         &dev_attr_image_id.attr,
3589         &dev_attr_current_snap.attr,
3590         &dev_attr_parent.attr,
3591         &dev_attr_refresh.attr,
3592         NULL
3593 };
3594
3595 static struct attribute_group rbd_attr_group = {
3596         .attrs = rbd_attrs,
3597 };
3598
3599 static const struct attribute_group *rbd_attr_groups[] = {
3600         &rbd_attr_group,
3601         NULL
3602 };
3603
3604 static void rbd_sysfs_dev_release(struct device *dev)
3605 {
3606 }
3607
3608 static struct device_type rbd_device_type = {
3609         .name           = "rbd",
3610         .groups         = rbd_attr_groups,
3611         .release        = rbd_sysfs_dev_release,
3612 };
3613
3614 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3615 {
3616         kref_get(&spec->kref);
3617
3618         return spec;
3619 }
3620
3621 static void rbd_spec_free(struct kref *kref);
3622 static void rbd_spec_put(struct rbd_spec *spec)
3623 {
3624         if (spec)
3625                 kref_put(&spec->kref, rbd_spec_free);
3626 }
3627
3628 static struct rbd_spec *rbd_spec_alloc(void)
3629 {
3630         struct rbd_spec *spec;
3631
3632         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3633         if (!spec)
3634                 return NULL;
3635         kref_init(&spec->kref);
3636
3637         return spec;
3638 }
3639
3640 static void rbd_spec_free(struct kref *kref)
3641 {
3642         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3643
3644         kfree(spec->pool_name);
3645         kfree(spec->image_id);
3646         kfree(spec->image_name);
3647         kfree(spec->snap_name);
3648         kfree(spec);
3649 }
3650
3651 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3652                                 struct rbd_spec *spec)
3653 {
3654         struct rbd_device *rbd_dev;
3655
3656         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3657         if (!rbd_dev)
3658                 return NULL;
3659
3660         spin_lock_init(&rbd_dev->lock);
3661         rbd_dev->flags = 0;
3662         atomic_set(&rbd_dev->parent_ref, 0);
3663         INIT_LIST_HEAD(&rbd_dev->node);
3664         init_rwsem(&rbd_dev->header_rwsem);
3665
3666         rbd_dev->spec = spec;
3667         rbd_dev->rbd_client = rbdc;
3668
3669         /* Initialize the layout used for all rbd requests */
3670
3671         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3672         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3673         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3674         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3675
3676         return rbd_dev;
3677 }
3678
3679 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3680 {
3681         rbd_put_client(rbd_dev->rbd_client);
3682         rbd_spec_put(rbd_dev->spec);
3683         kfree(rbd_dev);
3684 }
3685
3686 /*
3687  * Get the size and object order for an image snapshot, or if
3688  * snap_id is CEPH_NOSNAP, gets this information for the base
3689  * image.
3690  */
3691 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3692                                 u8 *order, u64 *snap_size)
3693 {
3694         __le64 snapid = cpu_to_le64(snap_id);
3695         int ret;
3696         struct {
3697                 u8 order;
3698                 __le64 size;
3699         } __attribute__ ((packed)) size_buf = { 0 };
3700
3701         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3702                                 "rbd", "get_size",
3703                                 &snapid, sizeof (snapid),
3704                                 &size_buf, sizeof (size_buf));
3705         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3706         if (ret < 0)
3707                 return ret;
3708         if (ret < sizeof (size_buf))
3709                 return -ERANGE;
3710
3711         if (order)
3712                 *order = size_buf.order;
3713         *snap_size = le64_to_cpu(size_buf.size);
3714
3715         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3716                 (unsigned long long)snap_id, (unsigned int)*order,
3717                 (unsigned long long)*snap_size);
3718
3719         return 0;
3720 }
3721
3722 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3723 {
3724         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3725                                         &rbd_dev->header.obj_order,
3726                                         &rbd_dev->header.image_size);
3727 }
3728
3729 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3730 {
3731         void *reply_buf;
3732         int ret;
3733         void *p;
3734
3735         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3736         if (!reply_buf)
3737                 return -ENOMEM;
3738
3739         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3740                                 "rbd", "get_object_prefix", NULL, 0,
3741                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3742         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3743         if (ret < 0)
3744                 goto out;
3745
3746         p = reply_buf;
3747         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3748                                                 p + ret, NULL, GFP_NOIO);
3749         ret = 0;
3750
3751         if (IS_ERR(rbd_dev->header.object_prefix)) {
3752                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3753                 rbd_dev->header.object_prefix = NULL;
3754         } else {
3755                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3756         }
3757 out:
3758         kfree(reply_buf);
3759
3760         return ret;
3761 }
3762
3763 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3764                 u64 *snap_features)
3765 {
3766         __le64 snapid = cpu_to_le64(snap_id);
3767         struct {
3768                 __le64 features;
3769                 __le64 incompat;
3770         } __attribute__ ((packed)) features_buf = { 0 };
3771         u64 incompat;
3772         int ret;
3773
3774         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3775                                 "rbd", "get_features",
3776                                 &snapid, sizeof (snapid),
3777                                 &features_buf, sizeof (features_buf));
3778         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3779         if (ret < 0)
3780                 return ret;
3781         if (ret < sizeof (features_buf))
3782                 return -ERANGE;
3783
3784         incompat = le64_to_cpu(features_buf.incompat);
3785         if (incompat & ~RBD_FEATURES_SUPPORTED)
3786                 return -ENXIO;
3787
3788         *snap_features = le64_to_cpu(features_buf.features);
3789
3790         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3791                 (unsigned long long)snap_id,
3792                 (unsigned long long)*snap_features,
3793                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3794
3795         return 0;
3796 }
3797
3798 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3799 {
3800         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3801                                                 &rbd_dev->header.features);
3802 }
3803
3804 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3805 {
3806         struct rbd_spec *parent_spec;
3807         size_t size;
3808         void *reply_buf = NULL;
3809         __le64 snapid;
3810         void *p;
3811         void *end;
3812         u64 pool_id;
3813         char *image_id;
3814         u64 overlap;
3815         int ret;
3816
3817         parent_spec = rbd_spec_alloc();
3818         if (!parent_spec)
3819                 return -ENOMEM;
3820
3821         size = sizeof (__le64) +                                /* pool_id */
3822                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3823                 sizeof (__le64) +                               /* snap_id */
3824                 sizeof (__le64);                                /* overlap */
3825         reply_buf = kmalloc(size, GFP_KERNEL);
3826         if (!reply_buf) {
3827                 ret = -ENOMEM;
3828                 goto out_err;
3829         }
3830
3831         snapid = cpu_to_le64(CEPH_NOSNAP);
3832         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3833                                 "rbd", "get_parent",
3834                                 &snapid, sizeof (snapid),
3835                                 reply_buf, size);
3836         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3837         if (ret < 0)
3838                 goto out_err;
3839
3840         p = reply_buf;
3841         end = reply_buf + ret;
3842         ret = -ERANGE;
3843         ceph_decode_64_safe(&p, end, pool_id, out_err);
3844         if (pool_id == CEPH_NOPOOL) {
3845                 /*
3846                  * Either the parent never existed, or we have
3847                  * record of it but the image got flattened so it no
3848                  * longer has a parent.  When the parent of a
3849                  * layered image disappears we immediately set the
3850                  * overlap to 0.  The effect of this is that all new
3851                  * requests will be treated as if the image had no
3852                  * parent.
3853                  */
3854                 if (rbd_dev->parent_overlap) {
3855                         rbd_dev->parent_overlap = 0;
3856                         smp_mb();
3857                         rbd_dev_parent_put(rbd_dev);
3858                         pr_info("%s: clone image has been flattened\n",
3859                                 rbd_dev->disk->disk_name);
3860                 }
3861
3862                 goto out;       /* No parent?  No problem. */
3863         }
3864
3865         /* The ceph file layout needs to fit pool id in 32 bits */
3866
3867         ret = -EIO;
3868         if (pool_id > (u64)U32_MAX) {
3869                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3870                         (unsigned long long)pool_id, U32_MAX);
3871                 goto out_err;
3872         }
3873         parent_spec->pool_id = pool_id;
3874
3875         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3876         if (IS_ERR(image_id)) {
3877                 ret = PTR_ERR(image_id);
3878                 goto out_err;
3879         }
3880         parent_spec->image_id = image_id;
3881         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3882         ceph_decode_64_safe(&p, end, overlap, out_err);
3883
3884         if (overlap) {
3885                 rbd_spec_put(rbd_dev->parent_spec);
3886                 rbd_dev->parent_spec = parent_spec;
3887                 parent_spec = NULL;     /* rbd_dev now owns this */
3888                 rbd_dev->parent_overlap = overlap;
3889         } else {
3890                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3891         }
3892 out:
3893         ret = 0;
3894 out_err:
3895         kfree(reply_buf);
3896         rbd_spec_put(parent_spec);
3897
3898         return ret;
3899 }
3900
3901 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3902 {
3903         struct {
3904                 __le64 stripe_unit;
3905                 __le64 stripe_count;
3906         } __attribute__ ((packed)) striping_info_buf = { 0 };
3907         size_t size = sizeof (striping_info_buf);
3908         void *p;
3909         u64 obj_size;
3910         u64 stripe_unit;
3911         u64 stripe_count;
3912         int ret;
3913
3914         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3915                                 "rbd", "get_stripe_unit_count", NULL, 0,
3916                                 (char *)&striping_info_buf, size);
3917         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3918         if (ret < 0)
3919                 return ret;
3920         if (ret < size)
3921                 return -ERANGE;
3922
3923         /*
3924          * We don't actually support the "fancy striping" feature
3925          * (STRIPINGV2) yet, but if the striping sizes are the
3926          * defaults the behavior is the same as before.  So find
3927          * out, and only fail if the image has non-default values.
3928          */
3929         ret = -EINVAL;
3930         obj_size = (u64)1 << rbd_dev->header.obj_order;
3931         p = &striping_info_buf;
3932         stripe_unit = ceph_decode_64(&p);
3933         if (stripe_unit != obj_size) {
3934                 rbd_warn(rbd_dev, "unsupported stripe unit "
3935                                 "(got %llu want %llu)",
3936                                 stripe_unit, obj_size);
3937                 return -EINVAL;
3938         }
3939         stripe_count = ceph_decode_64(&p);
3940         if (stripe_count != 1) {
3941                 rbd_warn(rbd_dev, "unsupported stripe count "
3942                                 "(got %llu want 1)", stripe_count);
3943                 return -EINVAL;
3944         }
3945         rbd_dev->header.stripe_unit = stripe_unit;
3946         rbd_dev->header.stripe_count = stripe_count;
3947
3948         return 0;
3949 }
3950
3951 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3952 {
3953         size_t image_id_size;
3954         char *image_id;
3955         void *p;
3956         void *end;
3957         size_t size;
3958         void *reply_buf = NULL;
3959         size_t len = 0;
3960         char *image_name = NULL;
3961         int ret;
3962
3963         rbd_assert(!rbd_dev->spec->image_name);
3964
3965         len = strlen(rbd_dev->spec->image_id);
3966         image_id_size = sizeof (__le32) + len;
3967         image_id = kmalloc(image_id_size, GFP_KERNEL);
3968         if (!image_id)
3969                 return NULL;
3970
3971         p = image_id;
3972         end = image_id + image_id_size;
3973         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3974
3975         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3976         reply_buf = kmalloc(size, GFP_KERNEL);
3977         if (!reply_buf)
3978                 goto out;
3979
3980         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3981                                 "rbd", "dir_get_name",
3982                                 image_id, image_id_size,
3983                                 reply_buf, size);
3984         if (ret < 0)
3985                 goto out;
3986         p = reply_buf;
3987         end = reply_buf + ret;
3988
3989         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3990         if (IS_ERR(image_name))
3991                 image_name = NULL;
3992         else
3993                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3994 out:
3995         kfree(reply_buf);
3996         kfree(image_id);
3997
3998         return image_name;
3999 }
4000
4001 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4002 {
4003         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4004         const char *snap_name;
4005         u32 which = 0;
4006
4007         /* Skip over names until we find the one we are looking for */
4008
4009         snap_name = rbd_dev->header.snap_names;
4010         while (which < snapc->num_snaps) {
4011                 if (!strcmp(name, snap_name))
4012                         return snapc->snaps[which];
4013                 snap_name += strlen(snap_name) + 1;
4014                 which++;
4015         }
4016         return CEPH_NOSNAP;
4017 }
4018
4019 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4020 {
4021         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4022         u32 which;
4023         bool found = false;
4024         u64 snap_id;
4025
4026         for (which = 0; !found && which < snapc->num_snaps; which++) {
4027                 const char *snap_name;
4028
4029                 snap_id = snapc->snaps[which];
4030                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4031                 if (IS_ERR(snap_name))
4032                         break;
4033                 found = !strcmp(name, snap_name);
4034                 kfree(snap_name);
4035         }
4036         return found ? snap_id : CEPH_NOSNAP;
4037 }
4038
4039 /*
4040  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4041  * no snapshot by that name is found, or if an error occurs.
4042  */
4043 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4044 {
4045         if (rbd_dev->image_format == 1)
4046                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4047
4048         return rbd_v2_snap_id_by_name(rbd_dev, name);
4049 }
4050
4051 /*
4052  * When an rbd image has a parent image, it is identified by the
4053  * pool, image, and snapshot ids (not names).  This function fills
4054  * in the names for those ids.  (It's OK if we can't figure out the
4055  * name for an image id, but the pool and snapshot ids should always
4056  * exist and have names.)  All names in an rbd spec are dynamically
4057  * allocated.
4058  *
4059  * When an image being mapped (not a parent) is probed, we have the
4060  * pool name and pool id, image name and image id, and the snapshot
4061  * name.  The only thing we're missing is the snapshot id.
4062  */
4063 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4064 {
4065         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4066         struct rbd_spec *spec = rbd_dev->spec;
4067         const char *pool_name;
4068         const char *image_name;
4069         const char *snap_name;
4070         int ret;
4071
4072         /*
4073          * An image being mapped will have the pool name (etc.), but
4074          * we need to look up the snapshot id.
4075          */
4076         if (spec->pool_name) {
4077                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4078                         u64 snap_id;
4079
4080                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4081                         if (snap_id == CEPH_NOSNAP)
4082                                 return -ENOENT;
4083                         spec->snap_id = snap_id;
4084                 } else {
4085                         spec->snap_id = CEPH_NOSNAP;
4086                 }
4087
4088                 return 0;
4089         }
4090
4091         /* Get the pool name; we have to make our own copy of this */
4092
4093         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4094         if (!pool_name) {
4095                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4096                 return -EIO;
4097         }
4098         pool_name = kstrdup(pool_name, GFP_KERNEL);
4099         if (!pool_name)
4100                 return -ENOMEM;
4101
4102         /* Fetch the image name; tolerate failure here */
4103
4104         image_name = rbd_dev_image_name(rbd_dev);
4105         if (!image_name)
4106                 rbd_warn(rbd_dev, "unable to get image name");
4107
4108         /* Look up the snapshot name, and make a copy */
4109
4110         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4111         if (!snap_name) {
4112                 ret = -ENOMEM;
4113                 goto out_err;
4114         }
4115
4116         spec->pool_name = pool_name;
4117         spec->image_name = image_name;
4118         spec->snap_name = snap_name;
4119
4120         return 0;
4121 out_err:
4122         kfree(image_name);
4123         kfree(pool_name);
4124
4125         return ret;
4126 }
4127
4128 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4129 {
4130         size_t size;
4131         int ret;
4132         void *reply_buf;
4133         void *p;
4134         void *end;
4135         u64 seq;
4136         u32 snap_count;
4137         struct ceph_snap_context *snapc;
4138         u32 i;
4139
4140         /*
4141          * We'll need room for the seq value (maximum snapshot id),
4142          * snapshot count, and array of that many snapshot ids.
4143          * For now we have a fixed upper limit on the number we're
4144          * prepared to receive.
4145          */
4146         size = sizeof (__le64) + sizeof (__le32) +
4147                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4148         reply_buf = kzalloc(size, GFP_KERNEL);
4149         if (!reply_buf)
4150                 return -ENOMEM;
4151
4152         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4153                                 "rbd", "get_snapcontext", NULL, 0,
4154                                 reply_buf, size);
4155         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4156         if (ret < 0)
4157                 goto out;
4158
4159         p = reply_buf;
4160         end = reply_buf + ret;
4161         ret = -ERANGE;
4162         ceph_decode_64_safe(&p, end, seq, out);
4163         ceph_decode_32_safe(&p, end, snap_count, out);
4164
4165         /*
4166          * Make sure the reported number of snapshot ids wouldn't go
4167          * beyond the end of our buffer.  But before checking that,
4168          * make sure the computed size of the snapshot context we
4169          * allocate is representable in a size_t.
4170          */
4171         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4172                                  / sizeof (u64)) {
4173                 ret = -EINVAL;
4174                 goto out;
4175         }
4176         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4177                 goto out;
4178         ret = 0;
4179
4180         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4181         if (!snapc) {
4182                 ret = -ENOMEM;
4183                 goto out;
4184         }
4185         snapc->seq = seq;
4186         for (i = 0; i < snap_count; i++)
4187                 snapc->snaps[i] = ceph_decode_64(&p);
4188
4189         ceph_put_snap_context(rbd_dev->header.snapc);
4190         rbd_dev->header.snapc = snapc;
4191
4192         dout("  snap context seq = %llu, snap_count = %u\n",
4193                 (unsigned long long)seq, (unsigned int)snap_count);
4194 out:
4195         kfree(reply_buf);
4196
4197         return ret;
4198 }
4199
4200 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4201                                         u64 snap_id)
4202 {
4203         size_t size;
4204         void *reply_buf;
4205         __le64 snapid;
4206         int ret;
4207         void *p;
4208         void *end;
4209         char *snap_name;
4210
4211         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4212         reply_buf = kmalloc(size, GFP_KERNEL);
4213         if (!reply_buf)
4214                 return ERR_PTR(-ENOMEM);
4215
4216         snapid = cpu_to_le64(snap_id);
4217         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4218                                 "rbd", "get_snapshot_name",
4219                                 &snapid, sizeof (snapid),
4220                                 reply_buf, size);
4221         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4222         if (ret < 0) {
4223                 snap_name = ERR_PTR(ret);
4224                 goto out;
4225         }
4226
4227         p = reply_buf;
4228         end = reply_buf + ret;
4229         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4230         if (IS_ERR(snap_name))
4231                 goto out;
4232
4233         dout("  snap_id 0x%016llx snap_name = %s\n",
4234                 (unsigned long long)snap_id, snap_name);
4235 out:
4236         kfree(reply_buf);
4237
4238         return snap_name;
4239 }
4240
4241 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4242 {
4243         bool first_time = rbd_dev->header.object_prefix == NULL;
4244         int ret;
4245
4246         down_write(&rbd_dev->header_rwsem);
4247
4248         ret = rbd_dev_v2_image_size(rbd_dev);
4249         if (ret)
4250                 goto out;
4251
4252         if (first_time) {
4253                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4254                 if (ret)
4255                         goto out;
4256         }
4257
4258         /*
4259          * If the image supports layering, get the parent info.  We
4260          * need to probe the first time regardless.  Thereafter we
4261          * only need to if there's a parent, to see if it has
4262          * disappeared due to the mapped image getting flattened.
4263          */
4264         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4265                         (first_time || rbd_dev->parent_spec)) {
4266                 bool warn;
4267
4268                 ret = rbd_dev_v2_parent_info(rbd_dev);
4269                 if (ret)
4270                         goto out;
4271
4272                 /*
4273                  * Print a warning if this is the initial probe and
4274                  * the image has a parent.  Don't print it if the
4275                  * image now being probed is itself a parent.  We
4276                  * can tell at this point because we won't know its
4277                  * pool name yet (just its pool id).
4278                  */
4279                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4280                 if (first_time && warn)
4281                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4282                                         "is EXPERIMENTAL!");
4283         }
4284
4285         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4286                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4287                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4288
4289         ret = rbd_dev_v2_snap_context(rbd_dev);
4290         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4291 out:
4292         up_write(&rbd_dev->header_rwsem);
4293
4294         return ret;
4295 }
4296
4297 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4298 {
4299         struct device *dev;
4300         int ret;
4301
4302         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4303
4304         dev = &rbd_dev->dev;
4305         dev->bus = &rbd_bus_type;
4306         dev->type = &rbd_device_type;
4307         dev->parent = &rbd_root_dev;
4308         dev->release = rbd_dev_device_release;
4309         dev_set_name(dev, "%d", rbd_dev->dev_id);
4310         ret = device_register(dev);
4311
4312         mutex_unlock(&ctl_mutex);
4313
4314         return ret;
4315 }
4316
4317 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4318 {
4319         device_unregister(&rbd_dev->dev);
4320 }
4321
4322 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4323
4324 /*
4325  * Get a unique rbd identifier for the given new rbd_dev, and add
4326  * the rbd_dev to the global list.  The minimum rbd id is 1.
4327  */
4328 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4329 {
4330         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4331
4332         spin_lock(&rbd_dev_list_lock);
4333         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4334         spin_unlock(&rbd_dev_list_lock);
4335         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4336                 (unsigned long long) rbd_dev->dev_id);
4337 }
4338
4339 /*
4340  * Remove an rbd_dev from the global list, and record that its
4341  * identifier is no longer in use.
4342  */
4343 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4344 {
4345         struct list_head *tmp;
4346         int rbd_id = rbd_dev->dev_id;
4347         int max_id;
4348
4349         rbd_assert(rbd_id > 0);
4350
4351         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4352                 (unsigned long long) rbd_dev->dev_id);
4353         spin_lock(&rbd_dev_list_lock);
4354         list_del_init(&rbd_dev->node);
4355
4356         /*
4357          * If the id being "put" is not the current maximum, there
4358          * is nothing special we need to do.
4359          */
4360         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4361                 spin_unlock(&rbd_dev_list_lock);
4362                 return;
4363         }
4364
4365         /*
4366          * We need to update the current maximum id.  Search the
4367          * list to find out what it is.  We're more likely to find
4368          * the maximum at the end, so search the list backward.
4369          */
4370         max_id = 0;
4371         list_for_each_prev(tmp, &rbd_dev_list) {
4372                 struct rbd_device *rbd_dev;
4373
4374                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4375                 if (rbd_dev->dev_id > max_id)
4376                         max_id = rbd_dev->dev_id;
4377         }
4378         spin_unlock(&rbd_dev_list_lock);
4379
4380         /*
4381          * The max id could have been updated by rbd_dev_id_get(), in
4382          * which case it now accurately reflects the new maximum.
4383          * Be careful not to overwrite the maximum value in that
4384          * case.
4385          */
4386         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4387         dout("  max dev id has been reset\n");
4388 }
4389
4390 /*
4391  * Skips over white space at *buf, and updates *buf to point to the
4392  * first found non-space character (if any). Returns the length of
4393  * the token (string of non-white space characters) found.  Note
4394  * that *buf must be terminated with '\0'.
4395  */
4396 static inline size_t next_token(const char **buf)
4397 {
4398         /*
4399         * These are the characters that produce nonzero for
4400         * isspace() in the "C" and "POSIX" locales.
4401         */
4402         const char *spaces = " \f\n\r\t\v";
4403
4404         *buf += strspn(*buf, spaces);   /* Find start of token */
4405
4406         return strcspn(*buf, spaces);   /* Return token length */
4407 }
4408
4409 /*
4410  * Finds the next token in *buf, and if the provided token buffer is
4411  * big enough, copies the found token into it.  The result, if
4412  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4413  * must be terminated with '\0' on entry.
4414  *
4415  * Returns the length of the token found (not including the '\0').
4416  * Return value will be 0 if no token is found, and it will be >=
4417  * token_size if the token would not fit.
4418  *
4419  * The *buf pointer will be updated to point beyond the end of the
4420  * found token.  Note that this occurs even if the token buffer is
4421  * too small to hold it.
4422  */
4423 static inline size_t copy_token(const char **buf,
4424                                 char *token,
4425                                 size_t token_size)
4426 {
4427         size_t len;
4428
4429         len = next_token(buf);
4430         if (len < token_size) {
4431                 memcpy(token, *buf, len);
4432                 *(token + len) = '\0';
4433         }
4434         *buf += len;
4435
4436         return len;
4437 }
4438
4439 /*
4440  * Finds the next token in *buf, dynamically allocates a buffer big
4441  * enough to hold a copy of it, and copies the token into the new
4442  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4443  * that a duplicate buffer is created even for a zero-length token.
4444  *
4445  * Returns a pointer to the newly-allocated duplicate, or a null
4446  * pointer if memory for the duplicate was not available.  If
4447  * the lenp argument is a non-null pointer, the length of the token
4448  * (not including the '\0') is returned in *lenp.
4449  *
4450  * If successful, the *buf pointer will be updated to point beyond
4451  * the end of the found token.
4452  *
4453  * Note: uses GFP_KERNEL for allocation.
4454  */
4455 static inline char *dup_token(const char **buf, size_t *lenp)
4456 {
4457         char *dup;
4458         size_t len;
4459
4460         len = next_token(buf);
4461         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4462         if (!dup)
4463                 return NULL;
4464         *(dup + len) = '\0';
4465         *buf += len;
4466
4467         if (lenp)
4468                 *lenp = len;
4469
4470         return dup;
4471 }
4472
4473 /*
4474  * Parse the options provided for an "rbd add" (i.e., rbd image
4475  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4476  * and the data written is passed here via a NUL-terminated buffer.
4477  * Returns 0 if successful or an error code otherwise.
4478  *
4479  * The information extracted from these options is recorded in
4480  * the other parameters which return dynamically-allocated
4481  * structures:
4482  *  ceph_opts
4483  *      The address of a pointer that will refer to a ceph options
4484  *      structure.  Caller must release the returned pointer using
4485  *      ceph_destroy_options() when it is no longer needed.
4486  *  rbd_opts
4487  *      Address of an rbd options pointer.  Fully initialized by
4488  *      this function; caller must release with kfree().
4489  *  spec
4490  *      Address of an rbd image specification pointer.  Fully
4491  *      initialized by this function based on parsed options.
4492  *      Caller must release with rbd_spec_put().
4493  *
4494  * The options passed take this form:
4495  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4496  * where:
4497  *  <mon_addrs>
4498  *      A comma-separated list of one or more monitor addresses.
4499  *      A monitor address is an ip address, optionally followed
4500  *      by a port number (separated by a colon).
4501  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4502  *  <options>
4503  *      A comma-separated list of ceph and/or rbd options.
4504  *  <pool_name>
4505  *      The name of the rados pool containing the rbd image.
4506  *  <image_name>
4507  *      The name of the image in that pool to map.
4508  *  <snap_id>
4509  *      An optional snapshot id.  If provided, the mapping will
4510  *      present data from the image at the time that snapshot was
4511  *      created.  The image head is used if no snapshot id is
4512  *      provided.  Snapshot mappings are always read-only.
4513  */
4514 static int rbd_add_parse_args(const char *buf,
4515                                 struct ceph_options **ceph_opts,
4516                                 struct rbd_options **opts,
4517                                 struct rbd_spec **rbd_spec)
4518 {
4519         size_t len;
4520         char *options;
4521         const char *mon_addrs;
4522         char *snap_name;
4523         size_t mon_addrs_size;
4524         struct rbd_spec *spec = NULL;
4525         struct rbd_options *rbd_opts = NULL;
4526         struct ceph_options *copts;
4527         int ret;
4528
4529         /* The first four tokens are required */
4530
4531         len = next_token(&buf);
4532         if (!len) {
4533                 rbd_warn(NULL, "no monitor address(es) provided");
4534                 return -EINVAL;
4535         }
4536         mon_addrs = buf;
4537         mon_addrs_size = len + 1;
4538         buf += len;
4539
4540         ret = -EINVAL;
4541         options = dup_token(&buf, NULL);
4542         if (!options)
4543                 return -ENOMEM;
4544         if (!*options) {
4545                 rbd_warn(NULL, "no options provided");
4546                 goto out_err;
4547         }
4548
4549         spec = rbd_spec_alloc();
4550         if (!spec)
4551                 goto out_mem;
4552
4553         spec->pool_name = dup_token(&buf, NULL);
4554         if (!spec->pool_name)
4555                 goto out_mem;
4556         if (!*spec->pool_name) {
4557                 rbd_warn(NULL, "no pool name provided");
4558                 goto out_err;
4559         }
4560
4561         spec->image_name = dup_token(&buf, NULL);
4562         if (!spec->image_name)
4563                 goto out_mem;
4564         if (!*spec->image_name) {
4565                 rbd_warn(NULL, "no image name provided");
4566                 goto out_err;
4567         }
4568
4569         /*
4570          * Snapshot name is optional; default is to use "-"
4571          * (indicating the head/no snapshot).
4572          */
4573         len = next_token(&buf);
4574         if (!len) {
4575                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4576                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4577         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4578                 ret = -ENAMETOOLONG;
4579                 goto out_err;
4580         }
4581         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4582         if (!snap_name)
4583                 goto out_mem;
4584         *(snap_name + len) = '\0';
4585         spec->snap_name = snap_name;
4586
4587         /* Initialize all rbd options to the defaults */
4588
4589         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4590         if (!rbd_opts)
4591                 goto out_mem;
4592
4593         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4594
4595         copts = ceph_parse_options(options, mon_addrs,
4596                                         mon_addrs + mon_addrs_size - 1,
4597                                         parse_rbd_opts_token, rbd_opts);
4598         if (IS_ERR(copts)) {
4599                 ret = PTR_ERR(copts);
4600                 goto out_err;
4601         }
4602         kfree(options);
4603
4604         *ceph_opts = copts;
4605         *opts = rbd_opts;
4606         *rbd_spec = spec;
4607
4608         return 0;
4609 out_mem:
4610         ret = -ENOMEM;
4611 out_err:
4612         kfree(rbd_opts);
4613         rbd_spec_put(spec);
4614         kfree(options);
4615
4616         return ret;
4617 }
4618
4619 /*
4620  * An rbd format 2 image has a unique identifier, distinct from the
4621  * name given to it by the user.  Internally, that identifier is
4622  * what's used to specify the names of objects related to the image.
4623  *
4624  * A special "rbd id" object is used to map an rbd image name to its
4625  * id.  If that object doesn't exist, then there is no v2 rbd image
4626  * with the supplied name.
4627  *
4628  * This function will record the given rbd_dev's image_id field if
4629  * it can be determined, and in that case will return 0.  If any
4630  * errors occur a negative errno will be returned and the rbd_dev's
4631  * image_id field will be unchanged (and should be NULL).
4632  */
4633 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4634 {
4635         int ret;
4636         size_t size;
4637         char *object_name;
4638         void *response;
4639         char *image_id;
4640
4641         /*
4642          * When probing a parent image, the image id is already
4643          * known (and the image name likely is not).  There's no
4644          * need to fetch the image id again in this case.  We
4645          * do still need to set the image format though.
4646          */
4647         if (rbd_dev->spec->image_id) {
4648                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4649
4650                 return 0;
4651         }
4652
4653         /*
4654          * First, see if the format 2 image id file exists, and if
4655          * so, get the image's persistent id from it.
4656          */
4657         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4658         object_name = kmalloc(size, GFP_NOIO);
4659         if (!object_name)
4660                 return -ENOMEM;
4661         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4662         dout("rbd id object name is %s\n", object_name);
4663
4664         /* Response will be an encoded string, which includes a length */
4665
4666         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4667         response = kzalloc(size, GFP_NOIO);
4668         if (!response) {
4669                 ret = -ENOMEM;
4670                 goto out;
4671         }
4672
4673         /* If it doesn't exist we'll assume it's a format 1 image */
4674
4675         ret = rbd_obj_method_sync(rbd_dev, object_name,
4676                                 "rbd", "get_id", NULL, 0,
4677                                 response, RBD_IMAGE_ID_LEN_MAX);
4678         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4679         if (ret == -ENOENT) {
4680                 image_id = kstrdup("", GFP_KERNEL);
4681                 ret = image_id ? 0 : -ENOMEM;
4682                 if (!ret)
4683                         rbd_dev->image_format = 1;
4684         } else if (ret > sizeof (__le32)) {
4685                 void *p = response;
4686
4687                 image_id = ceph_extract_encoded_string(&p, p + ret,
4688                                                 NULL, GFP_NOIO);
4689                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4690                 if (!ret)
4691                         rbd_dev->image_format = 2;
4692         } else {
4693                 ret = -EINVAL;
4694         }
4695
4696         if (!ret) {
4697                 rbd_dev->spec->image_id = image_id;
4698                 dout("image_id is %s\n", image_id);
4699         }
4700 out:
4701         kfree(response);
4702         kfree(object_name);
4703
4704         return ret;
4705 }
4706
4707 /*
4708  * Undo whatever state changes are made by v1 or v2 header info
4709  * call.
4710  */
4711 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4712 {
4713         struct rbd_image_header *header;
4714
4715         /* Drop parent reference unless it's already been done (or none) */
4716
4717         if (rbd_dev->parent_overlap)
4718                 rbd_dev_parent_put(rbd_dev);
4719
4720         /* Free dynamic fields from the header, then zero it out */
4721
4722         header = &rbd_dev->header;
4723         ceph_put_snap_context(header->snapc);
4724         kfree(header->snap_sizes);
4725         kfree(header->snap_names);
4726         kfree(header->object_prefix);
4727         memset(header, 0, sizeof (*header));
4728 }
4729
4730 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4731 {
4732         int ret;
4733
4734         ret = rbd_dev_v2_object_prefix(rbd_dev);
4735         if (ret)
4736                 goto out_err;
4737
4738         /*
4739          * Get the and check features for the image.  Currently the
4740          * features are assumed to never change.
4741          */
4742         ret = rbd_dev_v2_features(rbd_dev);
4743         if (ret)
4744                 goto out_err;
4745
4746         /* If the image supports fancy striping, get its parameters */
4747
4748         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4749                 ret = rbd_dev_v2_striping_info(rbd_dev);
4750                 if (ret < 0)
4751                         goto out_err;
4752         }
4753         /* No support for crypto and compression type format 2 images */
4754
4755         return 0;
4756 out_err:
4757         rbd_dev->header.features = 0;
4758         kfree(rbd_dev->header.object_prefix);
4759         rbd_dev->header.object_prefix = NULL;
4760
4761         return ret;
4762 }
4763
4764 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4765 {
4766         struct rbd_device *parent = NULL;
4767         struct rbd_spec *parent_spec;
4768         struct rbd_client *rbdc;
4769         int ret;
4770
4771         if (!rbd_dev->parent_spec)
4772                 return 0;
4773         /*
4774          * We need to pass a reference to the client and the parent
4775          * spec when creating the parent rbd_dev.  Images related by
4776          * parent/child relationships always share both.
4777          */
4778         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4779         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4780
4781         ret = -ENOMEM;
4782         parent = rbd_dev_create(rbdc, parent_spec);
4783         if (!parent)
4784                 goto out_err;
4785
4786         ret = rbd_dev_image_probe(parent, false);
4787         if (ret < 0)
4788                 goto out_err;
4789         rbd_dev->parent = parent;
4790         atomic_set(&rbd_dev->parent_ref, 1);
4791
4792         return 0;
4793 out_err:
4794         if (parent) {
4795                 rbd_dev_unparent(rbd_dev);
4796                 kfree(rbd_dev->header_name);
4797                 rbd_dev_destroy(parent);
4798         } else {
4799                 rbd_put_client(rbdc);
4800                 rbd_spec_put(parent_spec);
4801         }
4802
4803         return ret;
4804 }
4805
4806 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4807 {
4808         int ret;
4809
4810         /* generate unique id: find highest unique id, add one */
4811         rbd_dev_id_get(rbd_dev);
4812
4813         /* Fill in the device name, now that we have its id. */
4814         BUILD_BUG_ON(DEV_NAME_LEN
4815                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4816         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4817
4818         /* Get our block major device number. */
4819
4820         ret = register_blkdev(0, rbd_dev->name);
4821         if (ret < 0)
4822                 goto err_out_id;
4823         rbd_dev->major = ret;
4824
4825         /* Set up the blkdev mapping. */
4826
4827         ret = rbd_init_disk(rbd_dev);
4828         if (ret)
4829                 goto err_out_blkdev;
4830
4831         ret = rbd_dev_mapping_set(rbd_dev);
4832         if (ret)
4833                 goto err_out_disk;
4834         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4835
4836         ret = rbd_bus_add_dev(rbd_dev);
4837         if (ret)
4838                 goto err_out_mapping;
4839
4840         /* Everything's ready.  Announce the disk to the world. */
4841
4842         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4843         add_disk(rbd_dev->disk);
4844
4845         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4846                 (unsigned long long) rbd_dev->mapping.size);
4847
4848         return ret;
4849
4850 err_out_mapping:
4851         rbd_dev_mapping_clear(rbd_dev);
4852 err_out_disk:
4853         rbd_free_disk(rbd_dev);
4854 err_out_blkdev:
4855         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4856 err_out_id:
4857         rbd_dev_id_put(rbd_dev);
4858         rbd_dev_mapping_clear(rbd_dev);
4859
4860         return ret;
4861 }
4862
4863 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4864 {
4865         struct rbd_spec *spec = rbd_dev->spec;
4866         size_t size;
4867
4868         /* Record the header object name for this rbd image. */
4869
4870         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4871
4872         if (rbd_dev->image_format == 1)
4873                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4874         else
4875                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4876
4877         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4878         if (!rbd_dev->header_name)
4879                 return -ENOMEM;
4880
4881         if (rbd_dev->image_format == 1)
4882                 sprintf(rbd_dev->header_name, "%s%s",
4883                         spec->image_name, RBD_SUFFIX);
4884         else
4885                 sprintf(rbd_dev->header_name, "%s%s",
4886                         RBD_HEADER_PREFIX, spec->image_id);
4887         return 0;
4888 }
4889
4890 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4891 {
4892         rbd_dev_unprobe(rbd_dev);
4893         kfree(rbd_dev->header_name);
4894         rbd_dev->header_name = NULL;
4895         rbd_dev->image_format = 0;
4896         kfree(rbd_dev->spec->image_id);
4897         rbd_dev->spec->image_id = NULL;
4898
4899         rbd_dev_destroy(rbd_dev);
4900 }
4901
4902 /*
4903  * Probe for the existence of the header object for the given rbd
4904  * device.  If this image is the one being mapped (i.e., not a
4905  * parent), initiate a watch on its header object before using that
4906  * object to get detailed information about the rbd image.
4907  */
4908 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4909 {
4910         int ret;
4911         int tmp;
4912
4913         /*
4914          * Get the id from the image id object.  Unless there's an
4915          * error, rbd_dev->spec->image_id will be filled in with
4916          * a dynamically-allocated string, and rbd_dev->image_format
4917          * will be set to either 1 or 2.
4918          */
4919         ret = rbd_dev_image_id(rbd_dev);
4920         if (ret)
4921                 return ret;
4922         rbd_assert(rbd_dev->spec->image_id);
4923         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4924
4925         ret = rbd_dev_header_name(rbd_dev);
4926         if (ret)
4927                 goto err_out_format;
4928
4929         if (mapping) {
4930                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4931                 if (ret)
4932                         goto out_header_name;
4933         }
4934
4935         if (rbd_dev->image_format == 1)
4936                 ret = rbd_dev_v1_header_info(rbd_dev);
4937         else
4938                 ret = rbd_dev_v2_header_info(rbd_dev);
4939         if (ret)
4940                 goto err_out_watch;
4941
4942         ret = rbd_dev_spec_update(rbd_dev);
4943         if (ret)
4944                 goto err_out_probe;
4945
4946         ret = rbd_dev_probe_parent(rbd_dev);
4947         if (ret)
4948                 goto err_out_probe;
4949
4950         dout("discovered format %u image, header name is %s\n",
4951                 rbd_dev->image_format, rbd_dev->header_name);
4952
4953         return 0;
4954 err_out_probe:
4955         rbd_dev_unprobe(rbd_dev);
4956 err_out_watch:
4957         if (mapping) {
4958                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4959                 if (tmp)
4960                         rbd_warn(rbd_dev, "unable to tear down "
4961                                         "watch request (%d)\n", tmp);
4962         }
4963 out_header_name:
4964         kfree(rbd_dev->header_name);
4965         rbd_dev->header_name = NULL;
4966 err_out_format:
4967         rbd_dev->image_format = 0;
4968         kfree(rbd_dev->spec->image_id);
4969         rbd_dev->spec->image_id = NULL;
4970
4971         dout("probe failed, returning %d\n", ret);
4972
4973         return ret;
4974 }
4975
4976 static ssize_t rbd_add(struct bus_type *bus,
4977                        const char *buf,
4978                        size_t count)
4979 {
4980         struct rbd_device *rbd_dev = NULL;
4981         struct ceph_options *ceph_opts = NULL;
4982         struct rbd_options *rbd_opts = NULL;
4983         struct rbd_spec *spec = NULL;
4984         struct rbd_client *rbdc;
4985         struct ceph_osd_client *osdc;
4986         bool read_only;
4987         int rc = -ENOMEM;
4988
4989         if (!try_module_get(THIS_MODULE))
4990                 return -ENODEV;
4991
4992         /* parse add command */
4993         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4994         if (rc < 0)
4995                 goto err_out_module;
4996         read_only = rbd_opts->read_only;
4997         kfree(rbd_opts);
4998         rbd_opts = NULL;        /* done with this */
4999
5000         rbdc = rbd_get_client(ceph_opts);
5001         if (IS_ERR(rbdc)) {
5002                 rc = PTR_ERR(rbdc);
5003                 goto err_out_args;
5004         }
5005
5006         /* pick the pool */
5007         osdc = &rbdc->client->osdc;
5008         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5009         if (rc < 0)
5010                 goto err_out_client;
5011         spec->pool_id = (u64)rc;
5012
5013         /* The ceph file layout needs to fit pool id in 32 bits */
5014
5015         if (spec->pool_id > (u64)U32_MAX) {
5016                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5017                                 (unsigned long long)spec->pool_id, U32_MAX);
5018                 rc = -EIO;
5019                 goto err_out_client;
5020         }
5021
5022         rbd_dev = rbd_dev_create(rbdc, spec);
5023         if (!rbd_dev)
5024                 goto err_out_client;
5025         rbdc = NULL;            /* rbd_dev now owns this */
5026         spec = NULL;            /* rbd_dev now owns this */
5027
5028         rc = rbd_dev_image_probe(rbd_dev, true);
5029         if (rc < 0)
5030                 goto err_out_rbd_dev;
5031
5032         /* If we are mapping a snapshot it must be marked read-only */
5033
5034         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5035                 read_only = true;
5036         rbd_dev->mapping.read_only = read_only;
5037
5038         rc = rbd_dev_device_setup(rbd_dev);
5039         if (rc) {
5040                 rbd_dev_image_release(rbd_dev);
5041                 goto err_out_module;
5042         }
5043
5044         return count;
5045
5046 err_out_rbd_dev:
5047         rbd_dev_destroy(rbd_dev);
5048 err_out_client:
5049         rbd_put_client(rbdc);
5050 err_out_args:
5051         rbd_spec_put(spec);
5052 err_out_module:
5053         module_put(THIS_MODULE);
5054
5055         dout("Error adding device %s\n", buf);
5056
5057         return (ssize_t)rc;
5058 }
5059
5060 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5061 {
5062         struct list_head *tmp;
5063         struct rbd_device *rbd_dev;
5064
5065         spin_lock(&rbd_dev_list_lock);
5066         list_for_each(tmp, &rbd_dev_list) {
5067                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5068                 if (rbd_dev->dev_id == dev_id) {
5069                         spin_unlock(&rbd_dev_list_lock);
5070                         return rbd_dev;
5071                 }
5072         }
5073         spin_unlock(&rbd_dev_list_lock);
5074         return NULL;
5075 }
5076
5077 static void rbd_dev_device_release(struct device *dev)
5078 {
5079         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5080
5081         rbd_free_disk(rbd_dev);
5082         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5083         rbd_dev_mapping_clear(rbd_dev);
5084         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5085         rbd_dev->major = 0;
5086         rbd_dev_id_put(rbd_dev);
5087         rbd_dev_mapping_clear(rbd_dev);
5088 }
5089
5090 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5091 {
5092         while (rbd_dev->parent) {
5093                 struct rbd_device *first = rbd_dev;
5094                 struct rbd_device *second = first->parent;
5095                 struct rbd_device *third;
5096
5097                 /*
5098                  * Follow to the parent with no grandparent and
5099                  * remove it.
5100                  */
5101                 while (second && (third = second->parent)) {
5102                         first = second;
5103                         second = third;
5104                 }
5105                 rbd_assert(second);
5106                 rbd_dev_image_release(second);
5107                 first->parent = NULL;
5108                 first->parent_overlap = 0;
5109
5110                 rbd_assert(first->parent_spec);
5111                 rbd_spec_put(first->parent_spec);
5112                 first->parent_spec = NULL;
5113         }
5114 }
5115
5116 static ssize_t rbd_remove(struct bus_type *bus,
5117                           const char *buf,
5118                           size_t count)
5119 {
5120         struct rbd_device *rbd_dev = NULL;
5121         int target_id;
5122         unsigned long ul;
5123         int ret;
5124
5125         ret = strict_strtoul(buf, 10, &ul);
5126         if (ret)
5127                 return ret;
5128
5129         /* convert to int; abort if we lost anything in the conversion */
5130         target_id = (int) ul;
5131         if (target_id != ul)
5132                 return -EINVAL;
5133
5134         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5135
5136         rbd_dev = __rbd_get_dev(target_id);
5137         if (!rbd_dev) {
5138                 ret = -ENOENT;
5139                 goto done;
5140         }
5141
5142         spin_lock_irq(&rbd_dev->lock);
5143         if (rbd_dev->open_count)
5144                 ret = -EBUSY;
5145         else
5146                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5147         spin_unlock_irq(&rbd_dev->lock);
5148         if (ret < 0)
5149                 goto done;
5150         rbd_bus_del_dev(rbd_dev);
5151         ret = rbd_dev_header_watch_sync(rbd_dev, false);
5152         if (ret)
5153                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5154         rbd_dev_image_release(rbd_dev);
5155         module_put(THIS_MODULE);
5156         ret = count;
5157 done:
5158         mutex_unlock(&ctl_mutex);
5159
5160         return ret;
5161 }
5162
5163 /*
5164  * create control files in sysfs
5165  * /sys/bus/rbd/...
5166  */
5167 static int rbd_sysfs_init(void)
5168 {
5169         int ret;
5170
5171         ret = device_register(&rbd_root_dev);
5172         if (ret < 0)
5173                 return ret;
5174
5175         ret = bus_register(&rbd_bus_type);
5176         if (ret < 0)
5177                 device_unregister(&rbd_root_dev);
5178
5179         return ret;
5180 }
5181
5182 static void rbd_sysfs_cleanup(void)
5183 {
5184         bus_unregister(&rbd_bus_type);
5185         device_unregister(&rbd_root_dev);
5186 }
5187
5188 static int rbd_slab_init(void)
5189 {
5190         rbd_assert(!rbd_img_request_cache);
5191         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5192                                         sizeof (struct rbd_img_request),
5193                                         __alignof__(struct rbd_img_request),
5194                                         0, NULL);
5195         if (!rbd_img_request_cache)
5196                 return -ENOMEM;
5197
5198         rbd_assert(!rbd_obj_request_cache);
5199         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5200                                         sizeof (struct rbd_obj_request),
5201                                         __alignof__(struct rbd_obj_request),
5202                                         0, NULL);
5203         if (!rbd_obj_request_cache)
5204                 goto out_err;
5205
5206         rbd_assert(!rbd_segment_name_cache);
5207         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5208                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5209         if (rbd_segment_name_cache)
5210                 return 0;
5211 out_err:
5212         if (rbd_obj_request_cache) {
5213                 kmem_cache_destroy(rbd_obj_request_cache);
5214                 rbd_obj_request_cache = NULL;
5215         }
5216
5217         kmem_cache_destroy(rbd_img_request_cache);
5218         rbd_img_request_cache = NULL;
5219
5220         return -ENOMEM;
5221 }
5222
5223 static void rbd_slab_exit(void)
5224 {
5225         rbd_assert(rbd_segment_name_cache);
5226         kmem_cache_destroy(rbd_segment_name_cache);
5227         rbd_segment_name_cache = NULL;
5228
5229         rbd_assert(rbd_obj_request_cache);
5230         kmem_cache_destroy(rbd_obj_request_cache);
5231         rbd_obj_request_cache = NULL;
5232
5233         rbd_assert(rbd_img_request_cache);
5234         kmem_cache_destroy(rbd_img_request_cache);
5235         rbd_img_request_cache = NULL;
5236 }
5237
5238 static int __init rbd_init(void)
5239 {
5240         int rc;
5241
5242         if (!libceph_compatible(NULL)) {
5243                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5244
5245                 return -EINVAL;
5246         }
5247         rc = rbd_slab_init();
5248         if (rc)
5249                 return rc;
5250         rc = rbd_sysfs_init();
5251         if (rc)
5252                 rbd_slab_exit();
5253         else
5254                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5255
5256         return rc;
5257 }
5258
5259 static void __exit rbd_exit(void)
5260 {
5261         rbd_sysfs_cleanup();
5262         rbd_slab_exit();
5263 }
5264
5265 module_init(rbd_init);
5266 module_exit(rbd_exit);
5267
5268 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5269 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5270 MODULE_DESCRIPTION("rados block device");
5271
5272 /* following authorship retained from original osdblk.c */
5273 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5274
5275 MODULE_LICENSE("GPL");