]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
8b6091b6d5cbb4f007b06cd964ffd3e7a12ac203
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 /*
59  * Increment the given counter and return its updated value.
60  * If the counter is already 0 it will not be incremented.
61  * If the counter is already at its maximum value returns
62  * -EINVAL without updating it.
63  */
64 static int atomic_inc_return_safe(atomic_t *v)
65 {
66         unsigned int counter;
67
68         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69         if (counter <= (unsigned int)INT_MAX)
70                 return (int)counter;
71
72         atomic_dec(v);
73
74         return -EINVAL;
75 }
76
77 /* Decrement the counter.  Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
79 {
80         int counter;
81
82         counter = atomic_dec_return(v);
83         if (counter >= 0)
84                 return counter;
85
86         atomic_inc(v);
87
88         return -EINVAL;
89 }
90
91 #define RBD_DRV_NAME "rbd"
92 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
93
94 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
95
96 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
97 #define RBD_MAX_SNAP_NAME_LEN   \
98                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
100 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
101
102 #define RBD_SNAP_HEAD_NAME      "-"
103
104 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
105
106 /* This allows a single page to hold an image name sent by OSD */
107 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
108 #define RBD_IMAGE_ID_LEN_MAX    64
109
110 #define RBD_OBJ_PREFIX_LEN_MAX  64
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING    (1<<0)
115 #define RBD_FEATURE_STRIPINGV2  (1<<1)
116 #define RBD_FEATURES_ALL \
117             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118
119 /* Features supported by this (client software) implementation. */
120
121 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
122
123 /*
124  * An RBD device name will be "rbd#", where the "rbd" comes from
125  * RBD_DRV_NAME above, and # is a unique integer identifier.
126  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127  * enough to hold all possible device names.
128  */
129 #define DEV_NAME_LEN            32
130 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
131
132 /*
133  * block device image metadata (in-memory version)
134  */
135 struct rbd_image_header {
136         /* These six fields never change for a given rbd image */
137         char *object_prefix;
138         __u8 obj_order;
139         __u8 crypt_type;
140         __u8 comp_type;
141         u64 stripe_unit;
142         u64 stripe_count;
143         u64 features;           /* Might be changeable someday? */
144
145         /* The remaining fields need to be updated occasionally */
146         u64 image_size;
147         struct ceph_snap_context *snapc;
148         char *snap_names;       /* format 1 only */
149         u64 *snap_sizes;        /* format 1 only */
150 };
151
152 /*
153  * An rbd image specification.
154  *
155  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
156  * identify an image.  Each rbd_dev structure includes a pointer to
157  * an rbd_spec structure that encapsulates this identity.
158  *
159  * Each of the id's in an rbd_spec has an associated name.  For a
160  * user-mapped image, the names are supplied and the id's associated
161  * with them are looked up.  For a layered image, a parent image is
162  * defined by the tuple, and the names are looked up.
163  *
164  * An rbd_dev structure contains a parent_spec pointer which is
165  * non-null if the image it represents is a child in a layered
166  * image.  This pointer will refer to the rbd_spec structure used
167  * by the parent rbd_dev for its own identity (i.e., the structure
168  * is shared between the parent and child).
169  *
170  * Since these structures are populated once, during the discovery
171  * phase of image construction, they are effectively immutable so
172  * we make no effort to synchronize access to them.
173  *
174  * Note that code herein does not assume the image name is known (it
175  * could be a null pointer).
176  */
177 struct rbd_spec {
178         u64             pool_id;
179         const char      *pool_name;
180
181         const char      *image_id;
182         const char      *image_name;
183
184         u64             snap_id;
185         const char      *snap_name;
186
187         struct kref     kref;
188 };
189
190 /*
191  * an instance of the client.  multiple devices may share an rbd client.
192  */
193 struct rbd_client {
194         struct ceph_client      *client;
195         struct kref             kref;
196         struct list_head        node;
197 };
198
199 struct rbd_img_request;
200 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
203
204 struct rbd_obj_request;
205 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
207 enum obj_request_type {
208         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209 };
210
211 enum obj_req_flags {
212         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
213         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
214         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
215         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
216 };
217
218 struct rbd_obj_request {
219         const char              *object_name;
220         u64                     offset;         /* object start byte */
221         u64                     length;         /* bytes from offset */
222         unsigned long           flags;
223
224         /*
225          * An object request associated with an image will have its
226          * img_data flag set; a standalone object request will not.
227          *
228          * A standalone object request will have which == BAD_WHICH
229          * and a null obj_request pointer.
230          *
231          * An object request initiated in support of a layered image
232          * object (to check for its existence before a write) will
233          * have which == BAD_WHICH and a non-null obj_request pointer.
234          *
235          * Finally, an object request for rbd image data will have
236          * which != BAD_WHICH, and will have a non-null img_request
237          * pointer.  The value of which will be in the range
238          * 0..(img_request->obj_request_count-1).
239          */
240         union {
241                 struct rbd_obj_request  *obj_request;   /* STAT op */
242                 struct {
243                         struct rbd_img_request  *img_request;
244                         u64                     img_offset;
245                         /* links for img_request->obj_requests list */
246                         struct list_head        links;
247                 };
248         };
249         u32                     which;          /* posn image request list */
250
251         enum obj_request_type   type;
252         union {
253                 struct bio      *bio_list;
254                 struct {
255                         struct page     **pages;
256                         u32             page_count;
257                 };
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261
262         struct ceph_osd_request *osd_req;
263
264         u64                     xferred;        /* bytes transferred */
265         int                     result;
266
267         rbd_obj_callback_t      callback;
268         struct completion       completion;
269
270         struct kref             kref;
271 };
272
273 enum img_req_flags {
274         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
275         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
276         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
277 };
278
279 struct rbd_img_request {
280         struct rbd_device       *rbd_dev;
281         u64                     offset; /* starting image byte offset */
282         u64                     length; /* byte count from offset */
283         unsigned long           flags;
284         union {
285                 u64                     snap_id;        /* for reads */
286                 struct ceph_snap_context *snapc;        /* for writes */
287         };
288         union {
289                 struct request          *rq;            /* block request */
290                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
291         };
292         struct page             **copyup_pages;
293         u32                     copyup_page_count;
294         spinlock_t              completion_lock;/* protects next_completion */
295         u32                     next_completion;
296         rbd_img_callback_t      callback;
297         u64                     xferred;/* aggregate bytes transferred */
298         int                     result; /* first nonzero obj_request result */
299
300         u32                     obj_request_count;
301         struct list_head        obj_requests;   /* rbd_obj_request structs */
302
303         struct kref             kref;
304 };
305
306 #define for_each_obj_request(ireq, oreq) \
307         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
308 #define for_each_obj_request_from(ireq, oreq) \
309         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
310 #define for_each_obj_request_safe(ireq, oreq, n) \
311         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
312
313 struct rbd_mapping {
314         u64                     size;
315         u64                     features;
316         bool                    read_only;
317 };
318
319 /*
320  * a single device
321  */
322 struct rbd_device {
323         int                     dev_id;         /* blkdev unique id */
324
325         int                     major;          /* blkdev assigned major */
326         struct gendisk          *disk;          /* blkdev's gendisk and rq */
327
328         u32                     image_format;   /* Either 1 or 2 */
329         struct rbd_client       *rbd_client;
330
331         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
333         spinlock_t              lock;           /* queue, flags, open_count */
334
335         struct rbd_image_header header;
336         unsigned long           flags;          /* possibly lock protected */
337         struct rbd_spec         *spec;
338
339         char                    *header_name;
340
341         struct ceph_file_layout layout;
342
343         struct ceph_osd_event   *watch_event;
344         struct rbd_obj_request  *watch_request;
345
346         struct rbd_spec         *parent_spec;
347         u64                     parent_overlap;
348         atomic_t                parent_ref;
349         struct rbd_device       *parent;
350
351         /* protects updating the header */
352         struct rw_semaphore     header_rwsem;
353
354         struct rbd_mapping      mapping;
355
356         struct list_head        node;
357
358         /* sysfs related */
359         struct device           dev;
360         unsigned long           open_count;     /* protected by lock */
361 };
362
363 /*
364  * Flag bits for rbd_dev->flags.  If atomicity is required,
365  * rbd_dev->lock is used to protect access.
366  *
367  * Currently, only the "removing" flag (which is coupled with the
368  * "open_count" field) requires atomic access.
369  */
370 enum rbd_dev_flags {
371         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
372         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
373 };
374
375 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
376
377 static LIST_HEAD(rbd_dev_list);    /* devices */
378 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
380 static LIST_HEAD(rbd_client_list);              /* clients */
381 static DEFINE_SPINLOCK(rbd_client_list_lock);
382
383 /* Slab caches for frequently-allocated structures */
384
385 static struct kmem_cache        *rbd_img_request_cache;
386 static struct kmem_cache        *rbd_obj_request_cache;
387 static struct kmem_cache        *rbd_segment_name_cache;
388
389 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
391 static void rbd_dev_device_release(struct device *dev);
392
393 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394                        size_t count);
395 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396                           size_t count);
397 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398 static void rbd_spec_put(struct rbd_spec *spec);
399
400 static struct bus_attribute rbd_bus_attrs[] = {
401         __ATTR(add, S_IWUSR, NULL, rbd_add),
402         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403         __ATTR_NULL
404 };
405
406 static struct bus_type rbd_bus_type = {
407         .name           = "rbd",
408         .bus_attrs      = rbd_bus_attrs,
409 };
410
411 static void rbd_root_dev_release(struct device *dev)
412 {
413 }
414
415 static struct device rbd_root_dev = {
416         .init_name =    "rbd",
417         .release =      rbd_root_dev_release,
418 };
419
420 static __printf(2, 3)
421 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422 {
423         struct va_format vaf;
424         va_list args;
425
426         va_start(args, fmt);
427         vaf.fmt = fmt;
428         vaf.va = &args;
429
430         if (!rbd_dev)
431                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432         else if (rbd_dev->disk)
433                 printk(KERN_WARNING "%s: %s: %pV\n",
434                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435         else if (rbd_dev->spec && rbd_dev->spec->image_name)
436                 printk(KERN_WARNING "%s: image %s: %pV\n",
437                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438         else if (rbd_dev->spec && rbd_dev->spec->image_id)
439                 printk(KERN_WARNING "%s: id %s: %pV\n",
440                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441         else    /* punt */
442                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443                         RBD_DRV_NAME, rbd_dev, &vaf);
444         va_end(args);
445 }
446
447 #ifdef RBD_DEBUG
448 #define rbd_assert(expr)                                                \
449                 if (unlikely(!(expr))) {                                \
450                         printk(KERN_ERR "\nAssertion failure in %s() "  \
451                                                 "at line %d:\n\n"       \
452                                         "\trbd_assert(%s);\n\n",        \
453                                         __func__, __LINE__, #expr);     \
454                         BUG();                                          \
455                 }
456 #else /* !RBD_DEBUG */
457 #  define rbd_assert(expr)      ((void) 0)
458 #endif /* !RBD_DEBUG */
459
460 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
461 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
463
464 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
465 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
467 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468                                         u64 snap_id);
469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470                                 u8 *order, u64 *snap_size);
471 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472                 u64 *snap_features);
473 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
474
475 static int rbd_open(struct block_device *bdev, fmode_t mode)
476 {
477         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
478         bool removing = false;
479
480         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
481                 return -EROFS;
482
483         spin_lock_irq(&rbd_dev->lock);
484         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485                 removing = true;
486         else
487                 rbd_dev->open_count++;
488         spin_unlock_irq(&rbd_dev->lock);
489         if (removing)
490                 return -ENOENT;
491
492         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493         (void) get_device(&rbd_dev->dev);
494         set_device_ro(bdev, rbd_dev->mapping.read_only);
495         mutex_unlock(&ctl_mutex);
496
497         return 0;
498 }
499
500 static int rbd_release(struct gendisk *disk, fmode_t mode)
501 {
502         struct rbd_device *rbd_dev = disk->private_data;
503         unsigned long open_count_before;
504
505         spin_lock_irq(&rbd_dev->lock);
506         open_count_before = rbd_dev->open_count--;
507         spin_unlock_irq(&rbd_dev->lock);
508         rbd_assert(open_count_before > 0);
509
510         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511         put_device(&rbd_dev->dev);
512         mutex_unlock(&ctl_mutex);
513
514         return 0;
515 }
516
517 static const struct block_device_operations rbd_bd_ops = {
518         .owner                  = THIS_MODULE,
519         .open                   = rbd_open,
520         .release                = rbd_release,
521 };
522
523 /*
524  * Initialize an rbd client instance.
525  * We own *ceph_opts.
526  */
527 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
528 {
529         struct rbd_client *rbdc;
530         int ret = -ENOMEM;
531
532         dout("%s:\n", __func__);
533         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
534         if (!rbdc)
535                 goto out_opt;
536
537         kref_init(&rbdc->kref);
538         INIT_LIST_HEAD(&rbdc->node);
539
540         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
541
542         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
543         if (IS_ERR(rbdc->client))
544                 goto out_mutex;
545         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
546
547         ret = ceph_open_session(rbdc->client);
548         if (ret < 0)
549                 goto out_err;
550
551         spin_lock(&rbd_client_list_lock);
552         list_add_tail(&rbdc->node, &rbd_client_list);
553         spin_unlock(&rbd_client_list_lock);
554
555         mutex_unlock(&ctl_mutex);
556         dout("%s: rbdc %p\n", __func__, rbdc);
557
558         return rbdc;
559
560 out_err:
561         ceph_destroy_client(rbdc->client);
562 out_mutex:
563         mutex_unlock(&ctl_mutex);
564         kfree(rbdc);
565 out_opt:
566         if (ceph_opts)
567                 ceph_destroy_options(ceph_opts);
568         dout("%s: error %d\n", __func__, ret);
569
570         return ERR_PTR(ret);
571 }
572
573 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
574 {
575         kref_get(&rbdc->kref);
576
577         return rbdc;
578 }
579
580 /*
581  * Find a ceph client with specific addr and configuration.  If
582  * found, bump its reference count.
583  */
584 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
585 {
586         struct rbd_client *client_node;
587         bool found = false;
588
589         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
590                 return NULL;
591
592         spin_lock(&rbd_client_list_lock);
593         list_for_each_entry(client_node, &rbd_client_list, node) {
594                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
595                         __rbd_get_client(client_node);
596
597                         found = true;
598                         break;
599                 }
600         }
601         spin_unlock(&rbd_client_list_lock);
602
603         return found ? client_node : NULL;
604 }
605
606 /*
607  * mount options
608  */
609 enum {
610         Opt_last_int,
611         /* int args above */
612         Opt_last_string,
613         /* string args above */
614         Opt_read_only,
615         Opt_read_write,
616         /* Boolean args above */
617         Opt_last_bool,
618 };
619
620 static match_table_t rbd_opts_tokens = {
621         /* int args above */
622         /* string args above */
623         {Opt_read_only, "read_only"},
624         {Opt_read_only, "ro"},          /* Alternate spelling */
625         {Opt_read_write, "read_write"},
626         {Opt_read_write, "rw"},         /* Alternate spelling */
627         /* Boolean args above */
628         {-1, NULL}
629 };
630
631 struct rbd_options {
632         bool    read_only;
633 };
634
635 #define RBD_READ_ONLY_DEFAULT   false
636
637 static int parse_rbd_opts_token(char *c, void *private)
638 {
639         struct rbd_options *rbd_opts = private;
640         substring_t argstr[MAX_OPT_ARGS];
641         int token, intval, ret;
642
643         token = match_token(c, rbd_opts_tokens, argstr);
644         if (token < 0)
645                 return -EINVAL;
646
647         if (token < Opt_last_int) {
648                 ret = match_int(&argstr[0], &intval);
649                 if (ret < 0) {
650                         pr_err("bad mount option arg (not int) "
651                                "at '%s'\n", c);
652                         return ret;
653                 }
654                 dout("got int token %d val %d\n", token, intval);
655         } else if (token > Opt_last_int && token < Opt_last_string) {
656                 dout("got string token %d val %s\n", token,
657                      argstr[0].from);
658         } else if (token > Opt_last_string && token < Opt_last_bool) {
659                 dout("got Boolean token %d\n", token);
660         } else {
661                 dout("got token %d\n", token);
662         }
663
664         switch (token) {
665         case Opt_read_only:
666                 rbd_opts->read_only = true;
667                 break;
668         case Opt_read_write:
669                 rbd_opts->read_only = false;
670                 break;
671         default:
672                 rbd_assert(false);
673                 break;
674         }
675         return 0;
676 }
677
678 /*
679  * Get a ceph client with specific addr and configuration, if one does
680  * not exist create it.
681  */
682 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
683 {
684         struct rbd_client *rbdc;
685
686         rbdc = rbd_client_find(ceph_opts);
687         if (rbdc)       /* using an existing client */
688                 ceph_destroy_options(ceph_opts);
689         else
690                 rbdc = rbd_client_create(ceph_opts);
691
692         return rbdc;
693 }
694
695 /*
696  * Destroy ceph client
697  *
698  * Caller must hold rbd_client_list_lock.
699  */
700 static void rbd_client_release(struct kref *kref)
701 {
702         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
703
704         dout("%s: rbdc %p\n", __func__, rbdc);
705         spin_lock(&rbd_client_list_lock);
706         list_del(&rbdc->node);
707         spin_unlock(&rbd_client_list_lock);
708
709         ceph_destroy_client(rbdc->client);
710         kfree(rbdc);
711 }
712
713 /*
714  * Drop reference to ceph client node. If it's not referenced anymore, release
715  * it.
716  */
717 static void rbd_put_client(struct rbd_client *rbdc)
718 {
719         if (rbdc)
720                 kref_put(&rbdc->kref, rbd_client_release);
721 }
722
723 static bool rbd_image_format_valid(u32 image_format)
724 {
725         return image_format == 1 || image_format == 2;
726 }
727
728 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
729 {
730         size_t size;
731         u32 snap_count;
732
733         /* The header has to start with the magic rbd header text */
734         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
735                 return false;
736
737         /* The bio layer requires at least sector-sized I/O */
738
739         if (ondisk->options.order < SECTOR_SHIFT)
740                 return false;
741
742         /* If we use u64 in a few spots we may be able to loosen this */
743
744         if (ondisk->options.order > 8 * sizeof (int) - 1)
745                 return false;
746
747         /*
748          * The size of a snapshot header has to fit in a size_t, and
749          * that limits the number of snapshots.
750          */
751         snap_count = le32_to_cpu(ondisk->snap_count);
752         size = SIZE_MAX - sizeof (struct ceph_snap_context);
753         if (snap_count > size / sizeof (__le64))
754                 return false;
755
756         /*
757          * Not only that, but the size of the entire the snapshot
758          * header must also be representable in a size_t.
759          */
760         size -= snap_count * sizeof (__le64);
761         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
762                 return false;
763
764         return true;
765 }
766
767 /*
768  * Fill an rbd image header with information from the given format 1
769  * on-disk header.
770  */
771 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
772                                  struct rbd_image_header_ondisk *ondisk)
773 {
774         struct rbd_image_header *header = &rbd_dev->header;
775         bool first_time = header->object_prefix == NULL;
776         struct ceph_snap_context *snapc;
777         char *object_prefix = NULL;
778         char *snap_names = NULL;
779         u64 *snap_sizes = NULL;
780         u32 snap_count;
781         size_t size;
782         int ret = -ENOMEM;
783         u32 i;
784
785         /* Allocate this now to avoid having to handle failure below */
786
787         if (first_time) {
788                 size_t len;
789
790                 len = strnlen(ondisk->object_prefix,
791                                 sizeof (ondisk->object_prefix));
792                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
793                 if (!object_prefix)
794                         return -ENOMEM;
795                 memcpy(object_prefix, ondisk->object_prefix, len);
796                 object_prefix[len] = '\0';
797         }
798
799         /* Allocate the snapshot context and fill it in */
800
801         snap_count = le32_to_cpu(ondisk->snap_count);
802         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
803         if (!snapc)
804                 goto out_err;
805         snapc->seq = le64_to_cpu(ondisk->snap_seq);
806         if (snap_count) {
807                 struct rbd_image_snap_ondisk *snaps;
808                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
809
810                 /* We'll keep a copy of the snapshot names... */
811
812                 if (snap_names_len > (u64)SIZE_MAX)
813                         goto out_2big;
814                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
815                 if (!snap_names)
816                         goto out_err;
817
818                 /* ...as well as the array of their sizes. */
819
820                 size = snap_count * sizeof (*header->snap_sizes);
821                 snap_sizes = kmalloc(size, GFP_KERNEL);
822                 if (!snap_sizes)
823                         goto out_err;
824
825                 /*
826                  * Copy the names, and fill in each snapshot's id
827                  * and size.
828                  *
829                  * Note that rbd_dev_v1_header_info() guarantees the
830                  * ondisk buffer we're working with has
831                  * snap_names_len bytes beyond the end of the
832                  * snapshot id array, this memcpy() is safe.
833                  */
834                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
835                 snaps = ondisk->snaps;
836                 for (i = 0; i < snap_count; i++) {
837                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
838                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
839                 }
840         }
841
842         /* We won't fail any more, fill in the header */
843
844         down_write(&rbd_dev->header_rwsem);
845         if (first_time) {
846                 header->object_prefix = object_prefix;
847                 header->obj_order = ondisk->options.order;
848                 header->crypt_type = ondisk->options.crypt_type;
849                 header->comp_type = ondisk->options.comp_type;
850                 /* The rest aren't used for format 1 images */
851                 header->stripe_unit = 0;
852                 header->stripe_count = 0;
853                 header->features = 0;
854         } else {
855                 ceph_put_snap_context(header->snapc);
856                 kfree(header->snap_names);
857                 kfree(header->snap_sizes);
858         }
859
860         /* The remaining fields always get updated (when we refresh) */
861
862         header->image_size = le64_to_cpu(ondisk->image_size);
863         header->snapc = snapc;
864         header->snap_names = snap_names;
865         header->snap_sizes = snap_sizes;
866
867         /* Make sure mapping size is consistent with header info */
868
869         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
870                 if (rbd_dev->mapping.size != header->image_size)
871                         rbd_dev->mapping.size = header->image_size;
872
873         up_write(&rbd_dev->header_rwsem);
874
875         return 0;
876 out_2big:
877         ret = -EIO;
878 out_err:
879         kfree(snap_sizes);
880         kfree(snap_names);
881         ceph_put_snap_context(snapc);
882         kfree(object_prefix);
883
884         return ret;
885 }
886
887 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
888 {
889         const char *snap_name;
890
891         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
892
893         /* Skip over names until we find the one we are looking for */
894
895         snap_name = rbd_dev->header.snap_names;
896         while (which--)
897                 snap_name += strlen(snap_name) + 1;
898
899         return kstrdup(snap_name, GFP_KERNEL);
900 }
901
902 /*
903  * Snapshot id comparison function for use with qsort()/bsearch().
904  * Note that result is for snapshots in *descending* order.
905  */
906 static int snapid_compare_reverse(const void *s1, const void *s2)
907 {
908         u64 snap_id1 = *(u64 *)s1;
909         u64 snap_id2 = *(u64 *)s2;
910
911         if (snap_id1 < snap_id2)
912                 return 1;
913         return snap_id1 == snap_id2 ? 0 : -1;
914 }
915
916 /*
917  * Search a snapshot context to see if the given snapshot id is
918  * present.
919  *
920  * Returns the position of the snapshot id in the array if it's found,
921  * or BAD_SNAP_INDEX otherwise.
922  *
923  * Note: The snapshot array is in kept sorted (by the osd) in
924  * reverse order, highest snapshot id first.
925  */
926 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
927 {
928         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
929         u64 *found;
930
931         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
932                                 sizeof (snap_id), snapid_compare_reverse);
933
934         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
935 }
936
937 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
938                                         u64 snap_id)
939 {
940         u32 which;
941
942         which = rbd_dev_snap_index(rbd_dev, snap_id);
943         if (which == BAD_SNAP_INDEX)
944                 return NULL;
945
946         return _rbd_dev_v1_snap_name(rbd_dev, which);
947 }
948
949 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
950 {
951         if (snap_id == CEPH_NOSNAP)
952                 return RBD_SNAP_HEAD_NAME;
953
954         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955         if (rbd_dev->image_format == 1)
956                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
957
958         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
959 }
960
961 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
962                                 u64 *snap_size)
963 {
964         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
965         if (snap_id == CEPH_NOSNAP) {
966                 *snap_size = rbd_dev->header.image_size;
967         } else if (rbd_dev->image_format == 1) {
968                 u32 which;
969
970                 which = rbd_dev_snap_index(rbd_dev, snap_id);
971                 if (which == BAD_SNAP_INDEX)
972                         return -ENOENT;
973
974                 *snap_size = rbd_dev->header.snap_sizes[which];
975         } else {
976                 u64 size = 0;
977                 int ret;
978
979                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
980                 if (ret)
981                         return ret;
982
983                 *snap_size = size;
984         }
985         return 0;
986 }
987
988 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
989                         u64 *snap_features)
990 {
991         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
992         if (snap_id == CEPH_NOSNAP) {
993                 *snap_features = rbd_dev->header.features;
994         } else if (rbd_dev->image_format == 1) {
995                 *snap_features = 0;     /* No features for format 1 */
996         } else {
997                 u64 features = 0;
998                 int ret;
999
1000                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1001                 if (ret)
1002                         return ret;
1003
1004                 *snap_features = features;
1005         }
1006         return 0;
1007 }
1008
1009 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1010 {
1011         u64 snap_id = rbd_dev->spec->snap_id;
1012         u64 size = 0;
1013         u64 features = 0;
1014         int ret;
1015
1016         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1017         if (ret)
1018                 return ret;
1019         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1020         if (ret)
1021                 return ret;
1022
1023         rbd_dev->mapping.size = size;
1024         rbd_dev->mapping.features = features;
1025
1026         return 0;
1027 }
1028
1029 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1030 {
1031         rbd_dev->mapping.size = 0;
1032         rbd_dev->mapping.features = 0;
1033 }
1034
1035 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1036 {
1037         char *name;
1038         u64 segment;
1039         int ret;
1040
1041         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1042         if (!name)
1043                 return NULL;
1044         segment = offset >> rbd_dev->header.obj_order;
1045         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1046                         rbd_dev->header.object_prefix, segment);
1047         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1048                 pr_err("error formatting segment name for #%llu (%d)\n",
1049                         segment, ret);
1050                 kfree(name);
1051                 name = NULL;
1052         }
1053
1054         return name;
1055 }
1056
1057 static void rbd_segment_name_free(const char *name)
1058 {
1059         /* The explicit cast here is needed to drop the const qualifier */
1060
1061         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1062 }
1063
1064 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1065 {
1066         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1067
1068         return offset & (segment_size - 1);
1069 }
1070
1071 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1072                                 u64 offset, u64 length)
1073 {
1074         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1075
1076         offset &= segment_size - 1;
1077
1078         rbd_assert(length <= U64_MAX - offset);
1079         if (offset + length > segment_size)
1080                 length = segment_size - offset;
1081
1082         return length;
1083 }
1084
1085 /*
1086  * returns the size of an object in the image
1087  */
1088 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1089 {
1090         return 1 << header->obj_order;
1091 }
1092
1093 /*
1094  * bio helpers
1095  */
1096
1097 static void bio_chain_put(struct bio *chain)
1098 {
1099         struct bio *tmp;
1100
1101         while (chain) {
1102                 tmp = chain;
1103                 chain = chain->bi_next;
1104                 bio_put(tmp);
1105         }
1106 }
1107
1108 /*
1109  * zeros a bio chain, starting at specific offset
1110  */
1111 static void zero_bio_chain(struct bio *chain, int start_ofs)
1112 {
1113         struct bio_vec *bv;
1114         unsigned long flags;
1115         void *buf;
1116         int i;
1117         int pos = 0;
1118
1119         while (chain) {
1120                 bio_for_each_segment(bv, chain, i) {
1121                         if (pos + bv->bv_len > start_ofs) {
1122                                 int remainder = max(start_ofs - pos, 0);
1123                                 buf = bvec_kmap_irq(bv, &flags);
1124                                 memset(buf + remainder, 0,
1125                                        bv->bv_len - remainder);
1126                                 bvec_kunmap_irq(buf, &flags);
1127                         }
1128                         pos += bv->bv_len;
1129                 }
1130
1131                 chain = chain->bi_next;
1132         }
1133 }
1134
1135 /*
1136  * similar to zero_bio_chain(), zeros data defined by a page array,
1137  * starting at the given byte offset from the start of the array and
1138  * continuing up to the given end offset.  The pages array is
1139  * assumed to be big enough to hold all bytes up to the end.
1140  */
1141 static void zero_pages(struct page **pages, u64 offset, u64 end)
1142 {
1143         struct page **page = &pages[offset >> PAGE_SHIFT];
1144
1145         rbd_assert(end > offset);
1146         rbd_assert(end - offset <= (u64)SIZE_MAX);
1147         while (offset < end) {
1148                 size_t page_offset;
1149                 size_t length;
1150                 unsigned long flags;
1151                 void *kaddr;
1152
1153                 page_offset = (size_t)(offset & ~PAGE_MASK);
1154                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1155                 local_irq_save(flags);
1156                 kaddr = kmap_atomic(*page);
1157                 memset(kaddr + page_offset, 0, length);
1158                 kunmap_atomic(kaddr);
1159                 local_irq_restore(flags);
1160
1161                 offset += length;
1162                 page++;
1163         }
1164 }
1165
1166 /*
1167  * Clone a portion of a bio, starting at the given byte offset
1168  * and continuing for the number of bytes indicated.
1169  */
1170 static struct bio *bio_clone_range(struct bio *bio_src,
1171                                         unsigned int offset,
1172                                         unsigned int len,
1173                                         gfp_t gfpmask)
1174 {
1175         struct bio_vec *bv;
1176         unsigned int resid;
1177         unsigned short idx;
1178         unsigned int voff;
1179         unsigned short end_idx;
1180         unsigned short vcnt;
1181         struct bio *bio;
1182
1183         /* Handle the easy case for the caller */
1184
1185         if (!offset && len == bio_src->bi_size)
1186                 return bio_clone(bio_src, gfpmask);
1187
1188         if (WARN_ON_ONCE(!len))
1189                 return NULL;
1190         if (WARN_ON_ONCE(len > bio_src->bi_size))
1191                 return NULL;
1192         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1193                 return NULL;
1194
1195         /* Find first affected segment... */
1196
1197         resid = offset;
1198         __bio_for_each_segment(bv, bio_src, idx, 0) {
1199                 if (resid < bv->bv_len)
1200                         break;
1201                 resid -= bv->bv_len;
1202         }
1203         voff = resid;
1204
1205         /* ...and the last affected segment */
1206
1207         resid += len;
1208         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1209                 if (resid <= bv->bv_len)
1210                         break;
1211                 resid -= bv->bv_len;
1212         }
1213         vcnt = end_idx - idx + 1;
1214
1215         /* Build the clone */
1216
1217         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1218         if (!bio)
1219                 return NULL;    /* ENOMEM */
1220
1221         bio->bi_bdev = bio_src->bi_bdev;
1222         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1223         bio->bi_rw = bio_src->bi_rw;
1224         bio->bi_flags |= 1 << BIO_CLONED;
1225
1226         /*
1227          * Copy over our part of the bio_vec, then update the first
1228          * and last (or only) entries.
1229          */
1230         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1231                         vcnt * sizeof (struct bio_vec));
1232         bio->bi_io_vec[0].bv_offset += voff;
1233         if (vcnt > 1) {
1234                 bio->bi_io_vec[0].bv_len -= voff;
1235                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1236         } else {
1237                 bio->bi_io_vec[0].bv_len = len;
1238         }
1239
1240         bio->bi_vcnt = vcnt;
1241         bio->bi_size = len;
1242         bio->bi_idx = 0;
1243
1244         return bio;
1245 }
1246
1247 /*
1248  * Clone a portion of a bio chain, starting at the given byte offset
1249  * into the first bio in the source chain and continuing for the
1250  * number of bytes indicated.  The result is another bio chain of
1251  * exactly the given length, or a null pointer on error.
1252  *
1253  * The bio_src and offset parameters are both in-out.  On entry they
1254  * refer to the first source bio and the offset into that bio where
1255  * the start of data to be cloned is located.
1256  *
1257  * On return, bio_src is updated to refer to the bio in the source
1258  * chain that contains first un-cloned byte, and *offset will
1259  * contain the offset of that byte within that bio.
1260  */
1261 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1262                                         unsigned int *offset,
1263                                         unsigned int len,
1264                                         gfp_t gfpmask)
1265 {
1266         struct bio *bi = *bio_src;
1267         unsigned int off = *offset;
1268         struct bio *chain = NULL;
1269         struct bio **end;
1270
1271         /* Build up a chain of clone bios up to the limit */
1272
1273         if (!bi || off >= bi->bi_size || !len)
1274                 return NULL;            /* Nothing to clone */
1275
1276         end = &chain;
1277         while (len) {
1278                 unsigned int bi_size;
1279                 struct bio *bio;
1280
1281                 if (!bi) {
1282                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1283                         goto out_err;   /* EINVAL; ran out of bio's */
1284                 }
1285                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1286                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1287                 if (!bio)
1288                         goto out_err;   /* ENOMEM */
1289
1290                 *end = bio;
1291                 end = &bio->bi_next;
1292
1293                 off += bi_size;
1294                 if (off == bi->bi_size) {
1295                         bi = bi->bi_next;
1296                         off = 0;
1297                 }
1298                 len -= bi_size;
1299         }
1300         *bio_src = bi;
1301         *offset = off;
1302
1303         return chain;
1304 out_err:
1305         bio_chain_put(chain);
1306
1307         return NULL;
1308 }
1309
1310 /*
1311  * The default/initial value for all object request flags is 0.  For
1312  * each flag, once its value is set to 1 it is never reset to 0
1313  * again.
1314  */
1315 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1316 {
1317         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1318                 struct rbd_device *rbd_dev;
1319
1320                 rbd_dev = obj_request->img_request->rbd_dev;
1321                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1322                         obj_request);
1323         }
1324 }
1325
1326 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1327 {
1328         smp_mb();
1329         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1330 }
1331
1332 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1333 {
1334         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1335                 struct rbd_device *rbd_dev = NULL;
1336
1337                 if (obj_request_img_data_test(obj_request))
1338                         rbd_dev = obj_request->img_request->rbd_dev;
1339                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1340                         obj_request);
1341         }
1342 }
1343
1344 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1345 {
1346         smp_mb();
1347         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1348 }
1349
1350 /*
1351  * This sets the KNOWN flag after (possibly) setting the EXISTS
1352  * flag.  The latter is set based on the "exists" value provided.
1353  *
1354  * Note that for our purposes once an object exists it never goes
1355  * away again.  It's possible that the response from two existence
1356  * checks are separated by the creation of the target object, and
1357  * the first ("doesn't exist") response arrives *after* the second
1358  * ("does exist").  In that case we ignore the second one.
1359  */
1360 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1361                                 bool exists)
1362 {
1363         if (exists)
1364                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1365         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1370 {
1371         smp_mb();
1372         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1373 }
1374
1375 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1376 {
1377         smp_mb();
1378         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1379 }
1380
1381 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1382 {
1383         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1384                 atomic_read(&obj_request->kref.refcount));
1385         kref_get(&obj_request->kref);
1386 }
1387
1388 static void rbd_obj_request_destroy(struct kref *kref);
1389 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1390 {
1391         rbd_assert(obj_request != NULL);
1392         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1393                 atomic_read(&obj_request->kref.refcount));
1394         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1395 }
1396
1397 static bool img_request_child_test(struct rbd_img_request *img_request);
1398 static void rbd_parent_request_destroy(struct kref *kref);
1399 static void rbd_img_request_destroy(struct kref *kref);
1400 static void rbd_img_request_put(struct rbd_img_request *img_request)
1401 {
1402         rbd_assert(img_request != NULL);
1403         dout("%s: img %p (was %d)\n", __func__, img_request,
1404                 atomic_read(&img_request->kref.refcount));
1405         if (img_request_child_test(img_request))
1406                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1407         else
1408                 kref_put(&img_request->kref, rbd_img_request_destroy);
1409 }
1410
1411 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1412                                         struct rbd_obj_request *obj_request)
1413 {
1414         rbd_assert(obj_request->img_request == NULL);
1415
1416         /* Image request now owns object's original reference */
1417         obj_request->img_request = img_request;
1418         obj_request->which = img_request->obj_request_count;
1419         rbd_assert(!obj_request_img_data_test(obj_request));
1420         obj_request_img_data_set(obj_request);
1421         rbd_assert(obj_request->which != BAD_WHICH);
1422         img_request->obj_request_count++;
1423         list_add_tail(&obj_request->links, &img_request->obj_requests);
1424         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1425                 obj_request->which);
1426 }
1427
1428 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1429                                         struct rbd_obj_request *obj_request)
1430 {
1431         rbd_assert(obj_request->which != BAD_WHICH);
1432
1433         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1434                 obj_request->which);
1435         list_del(&obj_request->links);
1436         rbd_assert(img_request->obj_request_count > 0);
1437         img_request->obj_request_count--;
1438         rbd_assert(obj_request->which == img_request->obj_request_count);
1439         obj_request->which = BAD_WHICH;
1440         rbd_assert(obj_request_img_data_test(obj_request));
1441         rbd_assert(obj_request->img_request == img_request);
1442         obj_request->img_request = NULL;
1443         obj_request->callback = NULL;
1444         rbd_obj_request_put(obj_request);
1445 }
1446
1447 static bool obj_request_type_valid(enum obj_request_type type)
1448 {
1449         switch (type) {
1450         case OBJ_REQUEST_NODATA:
1451         case OBJ_REQUEST_BIO:
1452         case OBJ_REQUEST_PAGES:
1453                 return true;
1454         default:
1455                 return false;
1456         }
1457 }
1458
1459 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1460                                 struct rbd_obj_request *obj_request)
1461 {
1462         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1463
1464         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1465 }
1466
1467 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1468 {
1469
1470         dout("%s: img %p\n", __func__, img_request);
1471
1472         /*
1473          * If no error occurred, compute the aggregate transfer
1474          * count for the image request.  We could instead use
1475          * atomic64_cmpxchg() to update it as each object request
1476          * completes; not clear which way is better off hand.
1477          */
1478         if (!img_request->result) {
1479                 struct rbd_obj_request *obj_request;
1480                 u64 xferred = 0;
1481
1482                 for_each_obj_request(img_request, obj_request)
1483                         xferred += obj_request->xferred;
1484                 img_request->xferred = xferred;
1485         }
1486
1487         if (img_request->callback)
1488                 img_request->callback(img_request);
1489         else
1490                 rbd_img_request_put(img_request);
1491 }
1492
1493 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1494
1495 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1496 {
1497         dout("%s: obj %p\n", __func__, obj_request);
1498
1499         return wait_for_completion_interruptible(&obj_request->completion);
1500 }
1501
1502 /*
1503  * The default/initial value for all image request flags is 0.  Each
1504  * is conditionally set to 1 at image request initialization time
1505  * and currently never change thereafter.
1506  */
1507 static void img_request_write_set(struct rbd_img_request *img_request)
1508 {
1509         set_bit(IMG_REQ_WRITE, &img_request->flags);
1510         smp_mb();
1511 }
1512
1513 static bool img_request_write_test(struct rbd_img_request *img_request)
1514 {
1515         smp_mb();
1516         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1517 }
1518
1519 static void img_request_child_set(struct rbd_img_request *img_request)
1520 {
1521         set_bit(IMG_REQ_CHILD, &img_request->flags);
1522         smp_mb();
1523 }
1524
1525 static void img_request_child_clear(struct rbd_img_request *img_request)
1526 {
1527         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1528         smp_mb();
1529 }
1530
1531 static bool img_request_child_test(struct rbd_img_request *img_request)
1532 {
1533         smp_mb();
1534         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1535 }
1536
1537 static void img_request_layered_set(struct rbd_img_request *img_request)
1538 {
1539         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1540         smp_mb();
1541 }
1542
1543 static void img_request_layered_clear(struct rbd_img_request *img_request)
1544 {
1545         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1546         smp_mb();
1547 }
1548
1549 static bool img_request_layered_test(struct rbd_img_request *img_request)
1550 {
1551         smp_mb();
1552         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1553 }
1554
1555 static void
1556 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1557 {
1558         u64 xferred = obj_request->xferred;
1559         u64 length = obj_request->length;
1560
1561         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1562                 obj_request, obj_request->img_request, obj_request->result,
1563                 xferred, length);
1564         /*
1565          * ENOENT means a hole in the image.  We zero-fill the
1566          * entire length of the request.  A short read also implies
1567          * zero-fill to the end of the request.  Either way we
1568          * update the xferred count to indicate the whole request
1569          * was satisfied.
1570          */
1571         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1572         if (obj_request->result == -ENOENT) {
1573                 if (obj_request->type == OBJ_REQUEST_BIO)
1574                         zero_bio_chain(obj_request->bio_list, 0);
1575                 else
1576                         zero_pages(obj_request->pages, 0, length);
1577                 obj_request->result = 0;
1578                 obj_request->xferred = length;
1579         } else if (xferred < length && !obj_request->result) {
1580                 if (obj_request->type == OBJ_REQUEST_BIO)
1581                         zero_bio_chain(obj_request->bio_list, xferred);
1582                 else
1583                         zero_pages(obj_request->pages, xferred, length);
1584                 obj_request->xferred = length;
1585         }
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1590 {
1591         dout("%s: obj %p cb %p\n", __func__, obj_request,
1592                 obj_request->callback);
1593         if (obj_request->callback)
1594                 obj_request->callback(obj_request);
1595         else
1596                 complete_all(&obj_request->completion);
1597 }
1598
1599 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1600 {
1601         dout("%s: obj %p\n", __func__, obj_request);
1602         obj_request_done_set(obj_request);
1603 }
1604
1605 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1606 {
1607         struct rbd_img_request *img_request = NULL;
1608         struct rbd_device *rbd_dev = NULL;
1609         bool layered = false;
1610
1611         if (obj_request_img_data_test(obj_request)) {
1612                 img_request = obj_request->img_request;
1613                 layered = img_request && img_request_layered_test(img_request);
1614                 rbd_dev = img_request->rbd_dev;
1615         }
1616
1617         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1618                 obj_request, img_request, obj_request->result,
1619                 obj_request->xferred, obj_request->length);
1620         if (layered && obj_request->result == -ENOENT &&
1621                         obj_request->img_offset < rbd_dev->parent_overlap)
1622                 rbd_img_parent_read(obj_request);
1623         else if (img_request)
1624                 rbd_img_obj_request_read_callback(obj_request);
1625         else
1626                 obj_request_done_set(obj_request);
1627 }
1628
1629 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1630 {
1631         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1632                 obj_request->result, obj_request->length);
1633         /*
1634          * There is no such thing as a successful short write.  Set
1635          * it to our originally-requested length.
1636          */
1637         obj_request->xferred = obj_request->length;
1638         obj_request_done_set(obj_request);
1639 }
1640
1641 /*
1642  * For a simple stat call there's nothing to do.  We'll do more if
1643  * this is part of a write sequence for a layered image.
1644  */
1645 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1646 {
1647         dout("%s: obj %p\n", __func__, obj_request);
1648         obj_request_done_set(obj_request);
1649 }
1650
1651 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1652                                 struct ceph_msg *msg)
1653 {
1654         struct rbd_obj_request *obj_request = osd_req->r_priv;
1655         u16 opcode;
1656
1657         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1658         rbd_assert(osd_req == obj_request->osd_req);
1659         if (obj_request_img_data_test(obj_request)) {
1660                 rbd_assert(obj_request->img_request);
1661                 rbd_assert(obj_request->which != BAD_WHICH);
1662         } else {
1663                 rbd_assert(obj_request->which == BAD_WHICH);
1664         }
1665
1666         if (osd_req->r_result < 0)
1667                 obj_request->result = osd_req->r_result;
1668
1669         BUG_ON(osd_req->r_num_ops > 2);
1670
1671         /*
1672          * We support a 64-bit length, but ultimately it has to be
1673          * passed to blk_end_request(), which takes an unsigned int.
1674          */
1675         obj_request->xferred = osd_req->r_reply_op_len[0];
1676         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1677         opcode = osd_req->r_ops[0].op;
1678         switch (opcode) {
1679         case CEPH_OSD_OP_READ:
1680                 rbd_osd_read_callback(obj_request);
1681                 break;
1682         case CEPH_OSD_OP_WRITE:
1683                 rbd_osd_write_callback(obj_request);
1684                 break;
1685         case CEPH_OSD_OP_STAT:
1686                 rbd_osd_stat_callback(obj_request);
1687                 break;
1688         case CEPH_OSD_OP_CALL:
1689         case CEPH_OSD_OP_NOTIFY_ACK:
1690         case CEPH_OSD_OP_WATCH:
1691                 rbd_osd_trivial_callback(obj_request);
1692                 break;
1693         default:
1694                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1695                         obj_request->object_name, (unsigned short) opcode);
1696                 break;
1697         }
1698
1699         if (obj_request_done_test(obj_request))
1700                 rbd_obj_request_complete(obj_request);
1701 }
1702
1703 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1704 {
1705         struct rbd_img_request *img_request = obj_request->img_request;
1706         struct ceph_osd_request *osd_req = obj_request->osd_req;
1707         u64 snap_id;
1708
1709         rbd_assert(osd_req != NULL);
1710
1711         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1712         ceph_osdc_build_request(osd_req, obj_request->offset,
1713                         NULL, snap_id, NULL);
1714 }
1715
1716 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1717 {
1718         struct rbd_img_request *img_request = obj_request->img_request;
1719         struct ceph_osd_request *osd_req = obj_request->osd_req;
1720         struct ceph_snap_context *snapc;
1721         struct timespec mtime = CURRENT_TIME;
1722
1723         rbd_assert(osd_req != NULL);
1724
1725         snapc = img_request ? img_request->snapc : NULL;
1726         ceph_osdc_build_request(osd_req, obj_request->offset,
1727                         snapc, CEPH_NOSNAP, &mtime);
1728 }
1729
1730 static struct ceph_osd_request *rbd_osd_req_create(
1731                                         struct rbd_device *rbd_dev,
1732                                         bool write_request,
1733                                         struct rbd_obj_request *obj_request)
1734 {
1735         struct ceph_snap_context *snapc = NULL;
1736         struct ceph_osd_client *osdc;
1737         struct ceph_osd_request *osd_req;
1738
1739         if (obj_request_img_data_test(obj_request)) {
1740                 struct rbd_img_request *img_request = obj_request->img_request;
1741
1742                 rbd_assert(write_request ==
1743                                 img_request_write_test(img_request));
1744                 if (write_request)
1745                         snapc = img_request->snapc;
1746         }
1747
1748         /* Allocate and initialize the request, for the single op */
1749
1750         osdc = &rbd_dev->rbd_client->client->osdc;
1751         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1752         if (!osd_req)
1753                 return NULL;    /* ENOMEM */
1754
1755         if (write_request)
1756                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1757         else
1758                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1759
1760         osd_req->r_callback = rbd_osd_req_callback;
1761         osd_req->r_priv = obj_request;
1762
1763         osd_req->r_oid_len = strlen(obj_request->object_name);
1764         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1765         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1766
1767         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1768
1769         return osd_req;
1770 }
1771
1772 /*
1773  * Create a copyup osd request based on the information in the
1774  * object request supplied.  A copyup request has two osd ops,
1775  * a copyup method call, and a "normal" write request.
1776  */
1777 static struct ceph_osd_request *
1778 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1779 {
1780         struct rbd_img_request *img_request;
1781         struct ceph_snap_context *snapc;
1782         struct rbd_device *rbd_dev;
1783         struct ceph_osd_client *osdc;
1784         struct ceph_osd_request *osd_req;
1785
1786         rbd_assert(obj_request_img_data_test(obj_request));
1787         img_request = obj_request->img_request;
1788         rbd_assert(img_request);
1789         rbd_assert(img_request_write_test(img_request));
1790
1791         /* Allocate and initialize the request, for the two ops */
1792
1793         snapc = img_request->snapc;
1794         rbd_dev = img_request->rbd_dev;
1795         osdc = &rbd_dev->rbd_client->client->osdc;
1796         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1797         if (!osd_req)
1798                 return NULL;    /* ENOMEM */
1799
1800         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1801         osd_req->r_callback = rbd_osd_req_callback;
1802         osd_req->r_priv = obj_request;
1803
1804         osd_req->r_oid_len = strlen(obj_request->object_name);
1805         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1806         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1807
1808         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1809
1810         return osd_req;
1811 }
1812
1813
1814 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1815 {
1816         ceph_osdc_put_request(osd_req);
1817 }
1818
1819 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1820
1821 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1822                                                 u64 offset, u64 length,
1823                                                 enum obj_request_type type)
1824 {
1825         struct rbd_obj_request *obj_request;
1826         size_t size;
1827         char *name;
1828
1829         rbd_assert(obj_request_type_valid(type));
1830
1831         size = strlen(object_name) + 1;
1832         name = kmalloc(size, GFP_KERNEL);
1833         if (!name)
1834                 return NULL;
1835
1836         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1837         if (!obj_request) {
1838                 kfree(name);
1839                 return NULL;
1840         }
1841
1842         obj_request->object_name = memcpy(name, object_name, size);
1843         obj_request->offset = offset;
1844         obj_request->length = length;
1845         obj_request->flags = 0;
1846         obj_request->which = BAD_WHICH;
1847         obj_request->type = type;
1848         INIT_LIST_HEAD(&obj_request->links);
1849         init_completion(&obj_request->completion);
1850         kref_init(&obj_request->kref);
1851
1852         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1853                 offset, length, (int)type, obj_request);
1854
1855         return obj_request;
1856 }
1857
1858 static void rbd_obj_request_destroy(struct kref *kref)
1859 {
1860         struct rbd_obj_request *obj_request;
1861
1862         obj_request = container_of(kref, struct rbd_obj_request, kref);
1863
1864         dout("%s: obj %p\n", __func__, obj_request);
1865
1866         rbd_assert(obj_request->img_request == NULL);
1867         rbd_assert(obj_request->which == BAD_WHICH);
1868
1869         if (obj_request->osd_req)
1870                 rbd_osd_req_destroy(obj_request->osd_req);
1871
1872         rbd_assert(obj_request_type_valid(obj_request->type));
1873         switch (obj_request->type) {
1874         case OBJ_REQUEST_NODATA:
1875                 break;          /* Nothing to do */
1876         case OBJ_REQUEST_BIO:
1877                 if (obj_request->bio_list)
1878                         bio_chain_put(obj_request->bio_list);
1879                 break;
1880         case OBJ_REQUEST_PAGES:
1881                 if (obj_request->pages)
1882                         ceph_release_page_vector(obj_request->pages,
1883                                                 obj_request->page_count);
1884                 break;
1885         }
1886
1887         kfree(obj_request->object_name);
1888         obj_request->object_name = NULL;
1889         kmem_cache_free(rbd_obj_request_cache, obj_request);
1890 }
1891
1892 /* It's OK to call this for a device with no parent */
1893
1894 static void rbd_spec_put(struct rbd_spec *spec);
1895 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1896 {
1897         rbd_dev_remove_parent(rbd_dev);
1898         rbd_spec_put(rbd_dev->parent_spec);
1899         rbd_dev->parent_spec = NULL;
1900         rbd_dev->parent_overlap = 0;
1901 }
1902
1903 /*
1904  * Parent image reference counting is used to determine when an
1905  * image's parent fields can be safely torn down--after there are no
1906  * more in-flight requests to the parent image.  When the last
1907  * reference is dropped, cleaning them up is safe.
1908  */
1909 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1910 {
1911         int counter;
1912
1913         if (!rbd_dev->parent_spec)
1914                 return;
1915
1916         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1917         if (counter > 0)
1918                 return;
1919
1920         /* Last reference; clean up parent data structures */
1921
1922         if (!counter)
1923                 rbd_dev_unparent(rbd_dev);
1924         else
1925                 rbd_warn(rbd_dev, "parent reference underflow\n");
1926 }
1927
1928 /*
1929  * If an image has a non-zero parent overlap, get a reference to its
1930  * parent.
1931  *
1932  * Returns true if the rbd device has a parent with a non-zero
1933  * overlap and a reference for it was successfully taken, or
1934  * false otherwise.
1935  */
1936 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1937 {
1938         int counter;
1939
1940         if (!rbd_dev->parent_spec)
1941                 return false;
1942
1943         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1944         if (counter > 0 && rbd_dev->parent_overlap)
1945                 return true;
1946
1947         /* Image was flattened, but parent is not yet torn down */
1948
1949         if (counter < 0)
1950                 rbd_warn(rbd_dev, "parent reference overflow\n");
1951
1952         return false;
1953 }
1954
1955 /*
1956  * Caller is responsible for filling in the list of object requests
1957  * that comprises the image request, and the Linux request pointer
1958  * (if there is one).
1959  */
1960 static struct rbd_img_request *rbd_img_request_create(
1961                                         struct rbd_device *rbd_dev,
1962                                         u64 offset, u64 length,
1963                                         bool write_request)
1964 {
1965         struct rbd_img_request *img_request;
1966
1967         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1968         if (!img_request)
1969                 return NULL;
1970
1971         if (write_request) {
1972                 down_read(&rbd_dev->header_rwsem);
1973                 ceph_get_snap_context(rbd_dev->header.snapc);
1974                 up_read(&rbd_dev->header_rwsem);
1975         }
1976
1977         img_request->rq = NULL;
1978         img_request->rbd_dev = rbd_dev;
1979         img_request->offset = offset;
1980         img_request->length = length;
1981         img_request->flags = 0;
1982         if (write_request) {
1983                 img_request_write_set(img_request);
1984                 img_request->snapc = rbd_dev->header.snapc;
1985         } else {
1986                 img_request->snap_id = rbd_dev->spec->snap_id;
1987         }
1988         if (rbd_dev_parent_get(rbd_dev))
1989                 img_request_layered_set(img_request);
1990         spin_lock_init(&img_request->completion_lock);
1991         img_request->next_completion = 0;
1992         img_request->callback = NULL;
1993         img_request->result = 0;
1994         img_request->obj_request_count = 0;
1995         INIT_LIST_HEAD(&img_request->obj_requests);
1996         kref_init(&img_request->kref);
1997
1998         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1999                 write_request ? "write" : "read", offset, length,
2000                 img_request);
2001
2002         return img_request;
2003 }
2004
2005 static void rbd_img_request_destroy(struct kref *kref)
2006 {
2007         struct rbd_img_request *img_request;
2008         struct rbd_obj_request *obj_request;
2009         struct rbd_obj_request *next_obj_request;
2010
2011         img_request = container_of(kref, struct rbd_img_request, kref);
2012
2013         dout("%s: img %p\n", __func__, img_request);
2014
2015         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2016                 rbd_img_obj_request_del(img_request, obj_request);
2017         rbd_assert(img_request->obj_request_count == 0);
2018
2019         if (img_request_layered_test(img_request)) {
2020                 img_request_layered_clear(img_request);
2021                 rbd_dev_parent_put(img_request->rbd_dev);
2022         }
2023
2024         if (img_request_write_test(img_request))
2025                 ceph_put_snap_context(img_request->snapc);
2026
2027         kmem_cache_free(rbd_img_request_cache, img_request);
2028 }
2029
2030 static struct rbd_img_request *rbd_parent_request_create(
2031                                         struct rbd_obj_request *obj_request,
2032                                         u64 img_offset, u64 length)
2033 {
2034         struct rbd_img_request *parent_request;
2035         struct rbd_device *rbd_dev;
2036
2037         rbd_assert(obj_request->img_request);
2038         rbd_dev = obj_request->img_request->rbd_dev;
2039
2040         parent_request = rbd_img_request_create(rbd_dev->parent,
2041                                                 img_offset, length, false);
2042         if (!parent_request)
2043                 return NULL;
2044
2045         img_request_child_set(parent_request);
2046         rbd_obj_request_get(obj_request);
2047         parent_request->obj_request = obj_request;
2048
2049         return parent_request;
2050 }
2051
2052 static void rbd_parent_request_destroy(struct kref *kref)
2053 {
2054         struct rbd_img_request *parent_request;
2055         struct rbd_obj_request *orig_request;
2056
2057         parent_request = container_of(kref, struct rbd_img_request, kref);
2058         orig_request = parent_request->obj_request;
2059
2060         parent_request->obj_request = NULL;
2061         rbd_obj_request_put(orig_request);
2062         img_request_child_clear(parent_request);
2063
2064         rbd_img_request_destroy(kref);
2065 }
2066
2067 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2068 {
2069         struct rbd_img_request *img_request;
2070         unsigned int xferred;
2071         int result;
2072         bool more;
2073
2074         rbd_assert(obj_request_img_data_test(obj_request));
2075         img_request = obj_request->img_request;
2076
2077         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2078         xferred = (unsigned int)obj_request->xferred;
2079         result = obj_request->result;
2080         if (result) {
2081                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2082
2083                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2084                         img_request_write_test(img_request) ? "write" : "read",
2085                         obj_request->length, obj_request->img_offset,
2086                         obj_request->offset);
2087                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
2088                         result, xferred);
2089                 if (!img_request->result)
2090                         img_request->result = result;
2091         }
2092
2093         /* Image object requests don't own their page array */
2094
2095         if (obj_request->type == OBJ_REQUEST_PAGES) {
2096                 obj_request->pages = NULL;
2097                 obj_request->page_count = 0;
2098         }
2099
2100         if (img_request_child_test(img_request)) {
2101                 rbd_assert(img_request->obj_request != NULL);
2102                 more = obj_request->which < img_request->obj_request_count - 1;
2103         } else {
2104                 rbd_assert(img_request->rq != NULL);
2105                 more = blk_end_request(img_request->rq, result, xferred);
2106         }
2107
2108         return more;
2109 }
2110
2111 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2112 {
2113         struct rbd_img_request *img_request;
2114         u32 which = obj_request->which;
2115         bool more = true;
2116
2117         rbd_assert(obj_request_img_data_test(obj_request));
2118         img_request = obj_request->img_request;
2119
2120         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2121         rbd_assert(img_request != NULL);
2122         rbd_assert(img_request->obj_request_count > 0);
2123         rbd_assert(which != BAD_WHICH);
2124         rbd_assert(which < img_request->obj_request_count);
2125         rbd_assert(which >= img_request->next_completion);
2126
2127         spin_lock_irq(&img_request->completion_lock);
2128         if (which != img_request->next_completion)
2129                 goto out;
2130
2131         for_each_obj_request_from(img_request, obj_request) {
2132                 rbd_assert(more);
2133                 rbd_assert(which < img_request->obj_request_count);
2134
2135                 if (!obj_request_done_test(obj_request))
2136                         break;
2137                 more = rbd_img_obj_end_request(obj_request);
2138                 which++;
2139         }
2140
2141         rbd_assert(more ^ (which == img_request->obj_request_count));
2142         img_request->next_completion = which;
2143 out:
2144         spin_unlock_irq(&img_request->completion_lock);
2145
2146         if (!more)
2147                 rbd_img_request_complete(img_request);
2148 }
2149
2150 /*
2151  * Split up an image request into one or more object requests, each
2152  * to a different object.  The "type" parameter indicates whether
2153  * "data_desc" is the pointer to the head of a list of bio
2154  * structures, or the base of a page array.  In either case this
2155  * function assumes data_desc describes memory sufficient to hold
2156  * all data described by the image request.
2157  */
2158 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2159                                         enum obj_request_type type,
2160                                         void *data_desc)
2161 {
2162         struct rbd_device *rbd_dev = img_request->rbd_dev;
2163         struct rbd_obj_request *obj_request = NULL;
2164         struct rbd_obj_request *next_obj_request;
2165         bool write_request = img_request_write_test(img_request);
2166         struct bio *bio_list;
2167         unsigned int bio_offset = 0;
2168         struct page **pages;
2169         u64 img_offset;
2170         u64 resid;
2171         u16 opcode;
2172
2173         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2174                 (int)type, data_desc);
2175
2176         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2177         img_offset = img_request->offset;
2178         resid = img_request->length;
2179         rbd_assert(resid > 0);
2180
2181         if (type == OBJ_REQUEST_BIO) {
2182                 bio_list = data_desc;
2183                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2184         } else {
2185                 rbd_assert(type == OBJ_REQUEST_PAGES);
2186                 pages = data_desc;
2187         }
2188
2189         while (resid) {
2190                 struct ceph_osd_request *osd_req;
2191                 const char *object_name;
2192                 u64 offset;
2193                 u64 length;
2194
2195                 object_name = rbd_segment_name(rbd_dev, img_offset);
2196                 if (!object_name)
2197                         goto out_unwind;
2198                 offset = rbd_segment_offset(rbd_dev, img_offset);
2199                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2200                 obj_request = rbd_obj_request_create(object_name,
2201                                                 offset, length, type);
2202                 /* object request has its own copy of the object name */
2203                 rbd_segment_name_free(object_name);
2204                 if (!obj_request)
2205                         goto out_unwind;
2206
2207                 if (type == OBJ_REQUEST_BIO) {
2208                         unsigned int clone_size;
2209
2210                         rbd_assert(length <= (u64)UINT_MAX);
2211                         clone_size = (unsigned int)length;
2212                         obj_request->bio_list =
2213                                         bio_chain_clone_range(&bio_list,
2214                                                                 &bio_offset,
2215                                                                 clone_size,
2216                                                                 GFP_ATOMIC);
2217                         if (!obj_request->bio_list)
2218                                 goto out_partial;
2219                 } else {
2220                         unsigned int page_count;
2221
2222                         obj_request->pages = pages;
2223                         page_count = (u32)calc_pages_for(offset, length);
2224                         obj_request->page_count = page_count;
2225                         if ((offset + length) & ~PAGE_MASK)
2226                                 page_count--;   /* more on last page */
2227                         pages += page_count;
2228                 }
2229
2230                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2231                                                 obj_request);
2232                 if (!osd_req)
2233                         goto out_partial;
2234                 obj_request->osd_req = osd_req;
2235                 obj_request->callback = rbd_img_obj_callback;
2236
2237                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2238                                                 0, 0);
2239                 if (type == OBJ_REQUEST_BIO)
2240                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2241                                         obj_request->bio_list, length);
2242                 else
2243                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2244                                         obj_request->pages, length,
2245                                         offset & ~PAGE_MASK, false, false);
2246
2247                 if (write_request)
2248                         rbd_osd_req_format_write(obj_request);
2249                 else
2250                         rbd_osd_req_format_read(obj_request);
2251
2252                 obj_request->img_offset = img_offset;
2253                 rbd_img_obj_request_add(img_request, obj_request);
2254
2255                 img_offset += length;
2256                 resid -= length;
2257         }
2258
2259         return 0;
2260
2261 out_partial:
2262         rbd_obj_request_put(obj_request);
2263 out_unwind:
2264         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2265                 rbd_obj_request_put(obj_request);
2266
2267         return -ENOMEM;
2268 }
2269
2270 static void
2271 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2272 {
2273         struct rbd_img_request *img_request;
2274         struct rbd_device *rbd_dev;
2275         struct page **pages;
2276         u32 page_count;
2277
2278         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2279         rbd_assert(obj_request_img_data_test(obj_request));
2280         img_request = obj_request->img_request;
2281         rbd_assert(img_request);
2282
2283         rbd_dev = img_request->rbd_dev;
2284         rbd_assert(rbd_dev);
2285
2286         pages = obj_request->copyup_pages;
2287         rbd_assert(pages != NULL);
2288         obj_request->copyup_pages = NULL;
2289         page_count = obj_request->copyup_page_count;
2290         rbd_assert(page_count);
2291         obj_request->copyup_page_count = 0;
2292         ceph_release_page_vector(pages, page_count);
2293
2294         /*
2295          * We want the transfer count to reflect the size of the
2296          * original write request.  There is no such thing as a
2297          * successful short write, so if the request was successful
2298          * we can just set it to the originally-requested length.
2299          */
2300         if (!obj_request->result)
2301                 obj_request->xferred = obj_request->length;
2302
2303         /* Finish up with the normal image object callback */
2304
2305         rbd_img_obj_callback(obj_request);
2306 }
2307
2308 static void
2309 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2310 {
2311         struct rbd_obj_request *orig_request;
2312         struct ceph_osd_request *osd_req;
2313         struct ceph_osd_client *osdc;
2314         struct rbd_device *rbd_dev;
2315         struct page **pages;
2316         u32 page_count;
2317         int result;
2318         u64 parent_length;
2319         u64 offset;
2320         u64 length;
2321
2322         rbd_assert(img_request_child_test(img_request));
2323
2324         /* First get what we need from the image request */
2325
2326         pages = img_request->copyup_pages;
2327         rbd_assert(pages != NULL);
2328         img_request->copyup_pages = NULL;
2329         page_count = img_request->copyup_page_count;
2330         rbd_assert(page_count);
2331         img_request->copyup_page_count = 0;
2332
2333         orig_request = img_request->obj_request;
2334         rbd_assert(orig_request != NULL);
2335         rbd_assert(obj_request_type_valid(orig_request->type));
2336         result = img_request->result;
2337         parent_length = img_request->length;
2338         rbd_assert(parent_length == img_request->xferred);
2339         rbd_img_request_put(img_request);
2340
2341         rbd_assert(orig_request->img_request);
2342         rbd_dev = orig_request->img_request->rbd_dev;
2343         rbd_assert(rbd_dev);
2344
2345         if (result)
2346                 goto out_err;
2347
2348         /*
2349          * The original osd request is of no use to use any more.
2350          * We need a new one that can hold the two ops in a copyup
2351          * request.  Allocate the new copyup osd request for the
2352          * original request, and release the old one.
2353          */
2354         result = -ENOMEM;
2355         osd_req = rbd_osd_req_create_copyup(orig_request);
2356         if (!osd_req)
2357                 goto out_err;
2358         rbd_osd_req_destroy(orig_request->osd_req);
2359         orig_request->osd_req = osd_req;
2360         orig_request->copyup_pages = pages;
2361         orig_request->copyup_page_count = page_count;
2362
2363         /* Initialize the copyup op */
2364
2365         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2366         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2367                                                 false, false);
2368
2369         /* Then the original write request op */
2370
2371         offset = orig_request->offset;
2372         length = orig_request->length;
2373         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2374                                         offset, length, 0, 0);
2375         if (orig_request->type == OBJ_REQUEST_BIO)
2376                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2377                                         orig_request->bio_list, length);
2378         else
2379                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2380                                         orig_request->pages, length,
2381                                         offset & ~PAGE_MASK, false, false);
2382
2383         rbd_osd_req_format_write(orig_request);
2384
2385         /* All set, send it off. */
2386
2387         orig_request->callback = rbd_img_obj_copyup_callback;
2388         osdc = &rbd_dev->rbd_client->client->osdc;
2389         result = rbd_obj_request_submit(osdc, orig_request);
2390         if (!result)
2391                 return;
2392 out_err:
2393         /* Record the error code and complete the request */
2394
2395         orig_request->result = result;
2396         orig_request->xferred = 0;
2397         obj_request_done_set(orig_request);
2398         rbd_obj_request_complete(orig_request);
2399 }
2400
2401 /*
2402  * Read from the parent image the range of data that covers the
2403  * entire target of the given object request.  This is used for
2404  * satisfying a layered image write request when the target of an
2405  * object request from the image request does not exist.
2406  *
2407  * A page array big enough to hold the returned data is allocated
2408  * and supplied to rbd_img_request_fill() as the "data descriptor."
2409  * When the read completes, this page array will be transferred to
2410  * the original object request for the copyup operation.
2411  *
2412  * If an error occurs, record it as the result of the original
2413  * object request and mark it done so it gets completed.
2414  */
2415 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2416 {
2417         struct rbd_img_request *img_request = NULL;
2418         struct rbd_img_request *parent_request = NULL;
2419         struct rbd_device *rbd_dev;
2420         u64 img_offset;
2421         u64 length;
2422         struct page **pages = NULL;
2423         u32 page_count;
2424         int result;
2425
2426         rbd_assert(obj_request_img_data_test(obj_request));
2427         rbd_assert(obj_request_type_valid(obj_request->type));
2428
2429         img_request = obj_request->img_request;
2430         rbd_assert(img_request != NULL);
2431         rbd_dev = img_request->rbd_dev;
2432         rbd_assert(rbd_dev->parent != NULL);
2433
2434         /*
2435          * Determine the byte range covered by the object in the
2436          * child image to which the original request was to be sent.
2437          */
2438         img_offset = obj_request->img_offset - obj_request->offset;
2439         length = (u64)1 << rbd_dev->header.obj_order;
2440
2441         /*
2442          * There is no defined parent data beyond the parent
2443          * overlap, so limit what we read at that boundary if
2444          * necessary.
2445          */
2446         if (img_offset + length > rbd_dev->parent_overlap) {
2447                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2448                 length = rbd_dev->parent_overlap - img_offset;
2449         }
2450
2451         /*
2452          * Allocate a page array big enough to receive the data read
2453          * from the parent.
2454          */
2455         page_count = (u32)calc_pages_for(0, length);
2456         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2457         if (IS_ERR(pages)) {
2458                 result = PTR_ERR(pages);
2459                 pages = NULL;
2460                 goto out_err;
2461         }
2462
2463         result = -ENOMEM;
2464         parent_request = rbd_parent_request_create(obj_request,
2465                                                 img_offset, length);
2466         if (!parent_request)
2467                 goto out_err;
2468
2469         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2470         if (result)
2471                 goto out_err;
2472         parent_request->copyup_pages = pages;
2473         parent_request->copyup_page_count = page_count;
2474
2475         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2476         result = rbd_img_request_submit(parent_request);
2477         if (!result)
2478                 return 0;
2479
2480         parent_request->copyup_pages = NULL;
2481         parent_request->copyup_page_count = 0;
2482         parent_request->obj_request = NULL;
2483         rbd_obj_request_put(obj_request);
2484 out_err:
2485         if (pages)
2486                 ceph_release_page_vector(pages, page_count);
2487         if (parent_request)
2488                 rbd_img_request_put(parent_request);
2489         obj_request->result = result;
2490         obj_request->xferred = 0;
2491         obj_request_done_set(obj_request);
2492
2493         return result;
2494 }
2495
2496 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2497 {
2498         struct rbd_obj_request *orig_request;
2499         int result;
2500
2501         rbd_assert(!obj_request_img_data_test(obj_request));
2502
2503         /*
2504          * All we need from the object request is the original
2505          * request and the result of the STAT op.  Grab those, then
2506          * we're done with the request.
2507          */
2508         orig_request = obj_request->obj_request;
2509         obj_request->obj_request = NULL;
2510         rbd_assert(orig_request);
2511         rbd_assert(orig_request->img_request);
2512
2513         result = obj_request->result;
2514         obj_request->result = 0;
2515
2516         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2517                 obj_request, orig_request, result,
2518                 obj_request->xferred, obj_request->length);
2519         rbd_obj_request_put(obj_request);
2520
2521         rbd_assert(orig_request);
2522         rbd_assert(orig_request->img_request);
2523
2524         /*
2525          * Our only purpose here is to determine whether the object
2526          * exists, and we don't want to treat the non-existence as
2527          * an error.  If something else comes back, transfer the
2528          * error to the original request and complete it now.
2529          */
2530         if (!result) {
2531                 obj_request_existence_set(orig_request, true);
2532         } else if (result == -ENOENT) {
2533                 obj_request_existence_set(orig_request, false);
2534         } else if (result) {
2535                 orig_request->result = result;
2536                 goto out;
2537         }
2538
2539         /*
2540          * Resubmit the original request now that we have recorded
2541          * whether the target object exists.
2542          */
2543         orig_request->result = rbd_img_obj_request_submit(orig_request);
2544 out:
2545         if (orig_request->result)
2546                 rbd_obj_request_complete(orig_request);
2547         rbd_obj_request_put(orig_request);
2548 }
2549
2550 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2551 {
2552         struct rbd_obj_request *stat_request;
2553         struct rbd_device *rbd_dev;
2554         struct ceph_osd_client *osdc;
2555         struct page **pages = NULL;
2556         u32 page_count;
2557         size_t size;
2558         int ret;
2559
2560         /*
2561          * The response data for a STAT call consists of:
2562          *     le64 length;
2563          *     struct {
2564          *         le32 tv_sec;
2565          *         le32 tv_nsec;
2566          *     } mtime;
2567          */
2568         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2569         page_count = (u32)calc_pages_for(0, size);
2570         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2571         if (IS_ERR(pages))
2572                 return PTR_ERR(pages);
2573
2574         ret = -ENOMEM;
2575         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2576                                                         OBJ_REQUEST_PAGES);
2577         if (!stat_request)
2578                 goto out;
2579
2580         rbd_obj_request_get(obj_request);
2581         stat_request->obj_request = obj_request;
2582         stat_request->pages = pages;
2583         stat_request->page_count = page_count;
2584
2585         rbd_assert(obj_request->img_request);
2586         rbd_dev = obj_request->img_request->rbd_dev;
2587         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2588                                                 stat_request);
2589         if (!stat_request->osd_req)
2590                 goto out;
2591         stat_request->callback = rbd_img_obj_exists_callback;
2592
2593         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2594         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2595                                         false, false);
2596         rbd_osd_req_format_read(stat_request);
2597
2598         osdc = &rbd_dev->rbd_client->client->osdc;
2599         ret = rbd_obj_request_submit(osdc, stat_request);
2600 out:
2601         if (ret)
2602                 rbd_obj_request_put(obj_request);
2603
2604         return ret;
2605 }
2606
2607 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2608 {
2609         struct rbd_img_request *img_request;
2610         struct rbd_device *rbd_dev;
2611         bool known;
2612
2613         rbd_assert(obj_request_img_data_test(obj_request));
2614
2615         img_request = obj_request->img_request;
2616         rbd_assert(img_request);
2617         rbd_dev = img_request->rbd_dev;
2618
2619         /*
2620          * Only writes to layered images need special handling.
2621          * Reads and non-layered writes are simple object requests.
2622          * Layered writes that start beyond the end of the overlap
2623          * with the parent have no parent data, so they too are
2624          * simple object requests.  Finally, if the target object is
2625          * known to already exist, its parent data has already been
2626          * copied, so a write to the object can also be handled as a
2627          * simple object request.
2628          */
2629         if (!img_request_write_test(img_request) ||
2630                 !img_request_layered_test(img_request) ||
2631                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2632                 ((known = obj_request_known_test(obj_request)) &&
2633                         obj_request_exists_test(obj_request))) {
2634
2635                 struct rbd_device *rbd_dev;
2636                 struct ceph_osd_client *osdc;
2637
2638                 rbd_dev = obj_request->img_request->rbd_dev;
2639                 osdc = &rbd_dev->rbd_client->client->osdc;
2640
2641                 return rbd_obj_request_submit(osdc, obj_request);
2642         }
2643
2644         /*
2645          * It's a layered write.  The target object might exist but
2646          * we may not know that yet.  If we know it doesn't exist,
2647          * start by reading the data for the full target object from
2648          * the parent so we can use it for a copyup to the target.
2649          */
2650         if (known)
2651                 return rbd_img_obj_parent_read_full(obj_request);
2652
2653         /* We don't know whether the target exists.  Go find out. */
2654
2655         return rbd_img_obj_exists_submit(obj_request);
2656 }
2657
2658 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2659 {
2660         struct rbd_obj_request *obj_request;
2661         struct rbd_obj_request *next_obj_request;
2662
2663         dout("%s: img %p\n", __func__, img_request);
2664         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2665                 int ret;
2666
2667                 ret = rbd_img_obj_request_submit(obj_request);
2668                 if (ret)
2669                         return ret;
2670         }
2671
2672         return 0;
2673 }
2674
2675 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2676 {
2677         struct rbd_obj_request *obj_request;
2678         struct rbd_device *rbd_dev;
2679         u64 obj_end;
2680
2681         rbd_assert(img_request_child_test(img_request));
2682
2683         obj_request = img_request->obj_request;
2684         rbd_assert(obj_request);
2685         rbd_assert(obj_request->img_request);
2686
2687         obj_request->result = img_request->result;
2688         if (obj_request->result)
2689                 goto out;
2690
2691         /*
2692          * We need to zero anything beyond the parent overlap
2693          * boundary.  Since rbd_img_obj_request_read_callback()
2694          * will zero anything beyond the end of a short read, an
2695          * easy way to do this is to pretend the data from the
2696          * parent came up short--ending at the overlap boundary.
2697          */
2698         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2699         obj_end = obj_request->img_offset + obj_request->length;
2700         rbd_dev = obj_request->img_request->rbd_dev;
2701         if (obj_end > rbd_dev->parent_overlap) {
2702                 u64 xferred = 0;
2703
2704                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2705                         xferred = rbd_dev->parent_overlap -
2706                                         obj_request->img_offset;
2707
2708                 obj_request->xferred = min(img_request->xferred, xferred);
2709         } else {
2710                 obj_request->xferred = img_request->xferred;
2711         }
2712 out:
2713         rbd_img_request_put(img_request);
2714         rbd_img_obj_request_read_callback(obj_request);
2715         rbd_obj_request_complete(obj_request);
2716 }
2717
2718 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2719 {
2720         struct rbd_img_request *img_request;
2721         int result;
2722
2723         rbd_assert(obj_request_img_data_test(obj_request));
2724         rbd_assert(obj_request->img_request != NULL);
2725         rbd_assert(obj_request->result == (s32) -ENOENT);
2726         rbd_assert(obj_request_type_valid(obj_request->type));
2727
2728         /* rbd_read_finish(obj_request, obj_request->length); */
2729         img_request = rbd_parent_request_create(obj_request,
2730                                                 obj_request->img_offset,
2731                                                 obj_request->length);
2732         result = -ENOMEM;
2733         if (!img_request)
2734                 goto out_err;
2735
2736         if (obj_request->type == OBJ_REQUEST_BIO)
2737                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2738                                                 obj_request->bio_list);
2739         else
2740                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2741                                                 obj_request->pages);
2742         if (result)
2743                 goto out_err;
2744
2745         img_request->callback = rbd_img_parent_read_callback;
2746         result = rbd_img_request_submit(img_request);
2747         if (result)
2748                 goto out_err;
2749
2750         return;
2751 out_err:
2752         if (img_request)
2753                 rbd_img_request_put(img_request);
2754         obj_request->result = result;
2755         obj_request->xferred = 0;
2756         obj_request_done_set(obj_request);
2757 }
2758
2759 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2760 {
2761         struct rbd_obj_request *obj_request;
2762         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2763         int ret;
2764
2765         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2766                                                         OBJ_REQUEST_NODATA);
2767         if (!obj_request)
2768                 return -ENOMEM;
2769
2770         ret = -ENOMEM;
2771         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2772         if (!obj_request->osd_req)
2773                 goto out;
2774         obj_request->callback = rbd_obj_request_put;
2775
2776         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2777                                         notify_id, 0, 0);
2778         rbd_osd_req_format_read(obj_request);
2779
2780         ret = rbd_obj_request_submit(osdc, obj_request);
2781 out:
2782         if (ret)
2783                 rbd_obj_request_put(obj_request);
2784
2785         return ret;
2786 }
2787
2788 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2789 {
2790         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2791         int ret;
2792
2793         if (!rbd_dev)
2794                 return;
2795
2796         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2797                 rbd_dev->header_name, (unsigned long long)notify_id,
2798                 (unsigned int)opcode);
2799         ret = rbd_dev_refresh(rbd_dev);
2800         if (ret)
2801                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2802
2803         rbd_obj_notify_ack(rbd_dev, notify_id);
2804 }
2805
2806 /*
2807  * Request sync osd watch/unwatch.  The value of "start" determines
2808  * whether a watch request is being initiated or torn down.
2809  */
2810 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2811 {
2812         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2813         struct rbd_obj_request *obj_request;
2814         int ret;
2815
2816         rbd_assert(start ^ !!rbd_dev->watch_event);
2817         rbd_assert(start ^ !!rbd_dev->watch_request);
2818
2819         if (start) {
2820                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2821                                                 &rbd_dev->watch_event);
2822                 if (ret < 0)
2823                         return ret;
2824                 rbd_assert(rbd_dev->watch_event != NULL);
2825         }
2826
2827         ret = -ENOMEM;
2828         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2829                                                         OBJ_REQUEST_NODATA);
2830         if (!obj_request)
2831                 goto out_cancel;
2832
2833         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2834         if (!obj_request->osd_req)
2835                 goto out_cancel;
2836
2837         if (start)
2838                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2839         else
2840                 ceph_osdc_unregister_linger_request(osdc,
2841                                         rbd_dev->watch_request->osd_req);
2842
2843         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2844                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2845         rbd_osd_req_format_write(obj_request);
2846
2847         ret = rbd_obj_request_submit(osdc, obj_request);
2848         if (ret)
2849                 goto out_cancel;
2850         ret = rbd_obj_request_wait(obj_request);
2851         if (ret)
2852                 goto out_cancel;
2853         ret = obj_request->result;
2854         if (ret)
2855                 goto out_cancel;
2856
2857         /*
2858          * A watch request is set to linger, so the underlying osd
2859          * request won't go away until we unregister it.  We retain
2860          * a pointer to the object request during that time (in
2861          * rbd_dev->watch_request), so we'll keep a reference to
2862          * it.  We'll drop that reference (below) after we've
2863          * unregistered it.
2864          */
2865         if (start) {
2866                 rbd_dev->watch_request = obj_request;
2867
2868                 return 0;
2869         }
2870
2871         /* We have successfully torn down the watch request */
2872
2873         rbd_obj_request_put(rbd_dev->watch_request);
2874         rbd_dev->watch_request = NULL;
2875 out_cancel:
2876         /* Cancel the event if we're tearing down, or on error */
2877         ceph_osdc_cancel_event(rbd_dev->watch_event);
2878         rbd_dev->watch_event = NULL;
2879         if (obj_request)
2880                 rbd_obj_request_put(obj_request);
2881
2882         return ret;
2883 }
2884
2885 /*
2886  * Synchronous osd object method call.  Returns the number of bytes
2887  * returned in the outbound buffer, or a negative error code.
2888  */
2889 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2890                              const char *object_name,
2891                              const char *class_name,
2892                              const char *method_name,
2893                              const void *outbound,
2894                              size_t outbound_size,
2895                              void *inbound,
2896                              size_t inbound_size)
2897 {
2898         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2899         struct rbd_obj_request *obj_request;
2900         struct page **pages;
2901         u32 page_count;
2902         int ret;
2903
2904         /*
2905          * Method calls are ultimately read operations.  The result
2906          * should placed into the inbound buffer provided.  They
2907          * also supply outbound data--parameters for the object
2908          * method.  Currently if this is present it will be a
2909          * snapshot id.
2910          */
2911         page_count = (u32)calc_pages_for(0, inbound_size);
2912         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2913         if (IS_ERR(pages))
2914                 return PTR_ERR(pages);
2915
2916         ret = -ENOMEM;
2917         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2918                                                         OBJ_REQUEST_PAGES);
2919         if (!obj_request)
2920                 goto out;
2921
2922         obj_request->pages = pages;
2923         obj_request->page_count = page_count;
2924
2925         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2926         if (!obj_request->osd_req)
2927                 goto out;
2928
2929         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2930                                         class_name, method_name);
2931         if (outbound_size) {
2932                 struct ceph_pagelist *pagelist;
2933
2934                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2935                 if (!pagelist)
2936                         goto out;
2937
2938                 ceph_pagelist_init(pagelist);
2939                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2940                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2941                                                 pagelist);
2942         }
2943         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2944                                         obj_request->pages, inbound_size,
2945                                         0, false, false);
2946         rbd_osd_req_format_read(obj_request);
2947
2948         ret = rbd_obj_request_submit(osdc, obj_request);
2949         if (ret)
2950                 goto out;
2951         ret = rbd_obj_request_wait(obj_request);
2952         if (ret)
2953                 goto out;
2954
2955         ret = obj_request->result;
2956         if (ret < 0)
2957                 goto out;
2958
2959         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2960         ret = (int)obj_request->xferred;
2961         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2962 out:
2963         if (obj_request)
2964                 rbd_obj_request_put(obj_request);
2965         else
2966                 ceph_release_page_vector(pages, page_count);
2967
2968         return ret;
2969 }
2970
2971 static void rbd_request_fn(struct request_queue *q)
2972                 __releases(q->queue_lock) __acquires(q->queue_lock)
2973 {
2974         struct rbd_device *rbd_dev = q->queuedata;
2975         bool read_only = rbd_dev->mapping.read_only;
2976         struct request *rq;
2977         int result;
2978
2979         while ((rq = blk_fetch_request(q))) {
2980                 bool write_request = rq_data_dir(rq) == WRITE;
2981                 struct rbd_img_request *img_request;
2982                 u64 offset;
2983                 u64 length;
2984
2985                 /* Ignore any non-FS requests that filter through. */
2986
2987                 if (rq->cmd_type != REQ_TYPE_FS) {
2988                         dout("%s: non-fs request type %d\n", __func__,
2989                                 (int) rq->cmd_type);
2990                         __blk_end_request_all(rq, 0);
2991                         continue;
2992                 }
2993
2994                 /* Ignore/skip any zero-length requests */
2995
2996                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2997                 length = (u64) blk_rq_bytes(rq);
2998
2999                 if (!length) {
3000                         dout("%s: zero-length request\n", __func__);
3001                         __blk_end_request_all(rq, 0);
3002                         continue;
3003                 }
3004
3005                 spin_unlock_irq(q->queue_lock);
3006
3007                 /* Disallow writes to a read-only device */
3008
3009                 if (write_request) {
3010                         result = -EROFS;
3011                         if (read_only)
3012                                 goto end_request;
3013                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3014                 }
3015
3016                 /*
3017                  * Quit early if the mapped snapshot no longer
3018                  * exists.  It's still possible the snapshot will
3019                  * have disappeared by the time our request arrives
3020                  * at the osd, but there's no sense in sending it if
3021                  * we already know.
3022                  */
3023                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3024                         dout("request for non-existent snapshot");
3025                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3026                         result = -ENXIO;
3027                         goto end_request;
3028                 }
3029
3030                 result = -EINVAL;
3031                 if (offset && length > U64_MAX - offset + 1) {
3032                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3033                                 offset, length);
3034                         goto end_request;       /* Shouldn't happen */
3035                 }
3036
3037                 result = -EIO;
3038                 if (offset + length > rbd_dev->mapping.size) {
3039                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3040                                 offset, length, rbd_dev->mapping.size);
3041                         goto end_request;
3042                 }
3043
3044                 result = -ENOMEM;
3045                 img_request = rbd_img_request_create(rbd_dev, offset, length,
3046                                                         write_request);
3047                 if (!img_request)
3048                         goto end_request;
3049
3050                 img_request->rq = rq;
3051
3052                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3053                                                 rq->bio);
3054                 if (!result)
3055                         result = rbd_img_request_submit(img_request);
3056                 if (result)
3057                         rbd_img_request_put(img_request);
3058 end_request:
3059                 spin_lock_irq(q->queue_lock);
3060                 if (result < 0) {
3061                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3062                                 write_request ? "write" : "read",
3063                                 length, offset, result);
3064
3065                         __blk_end_request_all(rq, result);
3066                 }
3067         }
3068 }
3069
3070 /*
3071  * a queue callback. Makes sure that we don't create a bio that spans across
3072  * multiple osd objects. One exception would be with a single page bios,
3073  * which we handle later at bio_chain_clone_range()
3074  */
3075 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3076                           struct bio_vec *bvec)
3077 {
3078         struct rbd_device *rbd_dev = q->queuedata;
3079         sector_t sector_offset;
3080         sector_t sectors_per_obj;
3081         sector_t obj_sector_offset;
3082         int ret;
3083
3084         /*
3085          * Find how far into its rbd object the partition-relative
3086          * bio start sector is to offset relative to the enclosing
3087          * device.
3088          */
3089         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3090         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3091         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3092
3093         /*
3094          * Compute the number of bytes from that offset to the end
3095          * of the object.  Account for what's already used by the bio.
3096          */
3097         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3098         if (ret > bmd->bi_size)
3099                 ret -= bmd->bi_size;
3100         else
3101                 ret = 0;
3102
3103         /*
3104          * Don't send back more than was asked for.  And if the bio
3105          * was empty, let the whole thing through because:  "Note
3106          * that a block device *must* allow a single page to be
3107          * added to an empty bio."
3108          */
3109         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3110         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3111                 ret = (int) bvec->bv_len;
3112
3113         return ret;
3114 }
3115
3116 static void rbd_free_disk(struct rbd_device *rbd_dev)
3117 {
3118         struct gendisk *disk = rbd_dev->disk;
3119
3120         if (!disk)
3121                 return;
3122
3123         rbd_dev->disk = NULL;
3124         if (disk->flags & GENHD_FL_UP) {
3125                 del_gendisk(disk);
3126                 if (disk->queue)
3127                         blk_cleanup_queue(disk->queue);
3128         }
3129         put_disk(disk);
3130 }
3131
3132 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3133                                 const char *object_name,
3134                                 u64 offset, u64 length, void *buf)
3135
3136 {
3137         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3138         struct rbd_obj_request *obj_request;
3139         struct page **pages = NULL;
3140         u32 page_count;
3141         size_t size;
3142         int ret;
3143
3144         page_count = (u32) calc_pages_for(offset, length);
3145         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3146         if (IS_ERR(pages))
3147                 ret = PTR_ERR(pages);
3148
3149         ret = -ENOMEM;
3150         obj_request = rbd_obj_request_create(object_name, offset, length,
3151                                                         OBJ_REQUEST_PAGES);
3152         if (!obj_request)
3153                 goto out;
3154
3155         obj_request->pages = pages;
3156         obj_request->page_count = page_count;
3157
3158         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3159         if (!obj_request->osd_req)
3160                 goto out;
3161
3162         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3163                                         offset, length, 0, 0);
3164         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3165                                         obj_request->pages,
3166                                         obj_request->length,
3167                                         obj_request->offset & ~PAGE_MASK,
3168                                         false, false);
3169         rbd_osd_req_format_read(obj_request);
3170
3171         ret = rbd_obj_request_submit(osdc, obj_request);
3172         if (ret)
3173                 goto out;
3174         ret = rbd_obj_request_wait(obj_request);
3175         if (ret)
3176                 goto out;
3177
3178         ret = obj_request->result;
3179         if (ret < 0)
3180                 goto out;
3181
3182         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3183         size = (size_t) obj_request->xferred;
3184         ceph_copy_from_page_vector(pages, buf, 0, size);
3185         rbd_assert(size <= (size_t)INT_MAX);
3186         ret = (int)size;
3187 out:
3188         if (obj_request)
3189                 rbd_obj_request_put(obj_request);
3190         else
3191                 ceph_release_page_vector(pages, page_count);
3192
3193         return ret;
3194 }
3195
3196 /*
3197  * Read the complete header for the given rbd device.  On successful
3198  * return, the rbd_dev->header field will contain up-to-date
3199  * information about the image.
3200  */
3201 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3202 {
3203         struct rbd_image_header_ondisk *ondisk = NULL;
3204         u32 snap_count = 0;
3205         u64 names_size = 0;
3206         u32 want_count;
3207         int ret;
3208
3209         /*
3210          * The complete header will include an array of its 64-bit
3211          * snapshot ids, followed by the names of those snapshots as
3212          * a contiguous block of NUL-terminated strings.  Note that
3213          * the number of snapshots could change by the time we read
3214          * it in, in which case we re-read it.
3215          */
3216         do {
3217                 size_t size;
3218
3219                 kfree(ondisk);
3220
3221                 size = sizeof (*ondisk);
3222                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3223                 size += names_size;
3224                 ondisk = kmalloc(size, GFP_KERNEL);
3225                 if (!ondisk)
3226                         return -ENOMEM;
3227
3228                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3229                                        0, size, ondisk);
3230                 if (ret < 0)
3231                         goto out;
3232                 if ((size_t)ret < size) {
3233                         ret = -ENXIO;
3234                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3235                                 size, ret);
3236                         goto out;
3237                 }
3238                 if (!rbd_dev_ondisk_valid(ondisk)) {
3239                         ret = -ENXIO;
3240                         rbd_warn(rbd_dev, "invalid header");
3241                         goto out;
3242                 }
3243
3244                 names_size = le64_to_cpu(ondisk->snap_names_len);
3245                 want_count = snap_count;
3246                 snap_count = le32_to_cpu(ondisk->snap_count);
3247         } while (snap_count != want_count);
3248
3249         ret = rbd_header_from_disk(rbd_dev, ondisk);
3250 out:
3251         kfree(ondisk);
3252
3253         return ret;
3254 }
3255
3256 /*
3257  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3258  * has disappeared from the (just updated) snapshot context.
3259  */
3260 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3261 {
3262         u64 snap_id;
3263
3264         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3265                 return;
3266
3267         snap_id = rbd_dev->spec->snap_id;
3268         if (snap_id == CEPH_NOSNAP)
3269                 return;
3270
3271         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3272                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3273 }
3274
3275 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3276 {
3277         u64 mapping_size;
3278         int ret;
3279
3280         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3281         mapping_size = rbd_dev->mapping.size;
3282         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3283         if (rbd_dev->image_format == 1)
3284                 ret = rbd_dev_v1_header_info(rbd_dev);
3285         else
3286                 ret = rbd_dev_v2_header_info(rbd_dev);
3287
3288         /* If it's a mapped snapshot, validate its EXISTS flag */
3289
3290         rbd_exists_validate(rbd_dev);
3291         mutex_unlock(&ctl_mutex);
3292         if (mapping_size != rbd_dev->mapping.size) {
3293                 sector_t size;
3294
3295                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3296                 dout("setting size to %llu sectors", (unsigned long long)size);
3297                 set_capacity(rbd_dev->disk, size);
3298                 revalidate_disk(rbd_dev->disk);
3299         }
3300
3301         return ret;
3302 }
3303
3304 static int rbd_init_disk(struct rbd_device *rbd_dev)
3305 {
3306         struct gendisk *disk;
3307         struct request_queue *q;
3308         u64 segment_size;
3309
3310         /* create gendisk info */
3311         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3312         if (!disk)
3313                 return -ENOMEM;
3314
3315         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3316                  rbd_dev->dev_id);
3317         disk->major = rbd_dev->major;
3318         disk->first_minor = 0;
3319         disk->fops = &rbd_bd_ops;
3320         disk->private_data = rbd_dev;
3321
3322         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3323         if (!q)
3324                 goto out_disk;
3325
3326         /* We use the default size, but let's be explicit about it. */
3327         blk_queue_physical_block_size(q, SECTOR_SIZE);
3328
3329         /* set io sizes to object size */
3330         segment_size = rbd_obj_bytes(&rbd_dev->header);
3331         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3332         blk_queue_max_segment_size(q, segment_size);
3333         blk_queue_io_min(q, segment_size);
3334         blk_queue_io_opt(q, segment_size);
3335
3336         blk_queue_merge_bvec(q, rbd_merge_bvec);
3337         disk->queue = q;
3338
3339         q->queuedata = rbd_dev;
3340
3341         rbd_dev->disk = disk;
3342
3343         return 0;
3344 out_disk:
3345         put_disk(disk);
3346
3347         return -ENOMEM;
3348 }
3349
3350 /*
3351   sysfs
3352 */
3353
3354 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3355 {
3356         return container_of(dev, struct rbd_device, dev);
3357 }
3358
3359 static ssize_t rbd_size_show(struct device *dev,
3360                              struct device_attribute *attr, char *buf)
3361 {
3362         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3363
3364         return sprintf(buf, "%llu\n",
3365                 (unsigned long long)rbd_dev->mapping.size);
3366 }
3367
3368 /*
3369  * Note this shows the features for whatever's mapped, which is not
3370  * necessarily the base image.
3371  */
3372 static ssize_t rbd_features_show(struct device *dev,
3373                              struct device_attribute *attr, char *buf)
3374 {
3375         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3376
3377         return sprintf(buf, "0x%016llx\n",
3378                         (unsigned long long)rbd_dev->mapping.features);
3379 }
3380
3381 static ssize_t rbd_major_show(struct device *dev,
3382                               struct device_attribute *attr, char *buf)
3383 {
3384         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3385
3386         if (rbd_dev->major)
3387                 return sprintf(buf, "%d\n", rbd_dev->major);
3388
3389         return sprintf(buf, "(none)\n");
3390
3391 }
3392
3393 static ssize_t rbd_client_id_show(struct device *dev,
3394                                   struct device_attribute *attr, char *buf)
3395 {
3396         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3397
3398         return sprintf(buf, "client%lld\n",
3399                         ceph_client_id(rbd_dev->rbd_client->client));
3400 }
3401
3402 static ssize_t rbd_pool_show(struct device *dev,
3403                              struct device_attribute *attr, char *buf)
3404 {
3405         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3406
3407         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3408 }
3409
3410 static ssize_t rbd_pool_id_show(struct device *dev,
3411                              struct device_attribute *attr, char *buf)
3412 {
3413         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3414
3415         return sprintf(buf, "%llu\n",
3416                         (unsigned long long) rbd_dev->spec->pool_id);
3417 }
3418
3419 static ssize_t rbd_name_show(struct device *dev,
3420                              struct device_attribute *attr, char *buf)
3421 {
3422         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3423
3424         if (rbd_dev->spec->image_name)
3425                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3426
3427         return sprintf(buf, "(unknown)\n");
3428 }
3429
3430 static ssize_t rbd_image_id_show(struct device *dev,
3431                              struct device_attribute *attr, char *buf)
3432 {
3433         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3434
3435         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3436 }
3437
3438 /*
3439  * Shows the name of the currently-mapped snapshot (or
3440  * RBD_SNAP_HEAD_NAME for the base image).
3441  */
3442 static ssize_t rbd_snap_show(struct device *dev,
3443                              struct device_attribute *attr,
3444                              char *buf)
3445 {
3446         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3447
3448         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3449 }
3450
3451 /*
3452  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3453  * for the parent image.  If there is no parent, simply shows
3454  * "(no parent image)".
3455  */
3456 static ssize_t rbd_parent_show(struct device *dev,
3457                              struct device_attribute *attr,
3458                              char *buf)
3459 {
3460         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3461         struct rbd_spec *spec = rbd_dev->parent_spec;
3462         int count;
3463         char *bufp = buf;
3464
3465         if (!spec)
3466                 return sprintf(buf, "(no parent image)\n");
3467
3468         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3469                         (unsigned long long) spec->pool_id, spec->pool_name);
3470         if (count < 0)
3471                 return count;
3472         bufp += count;
3473
3474         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3475                         spec->image_name ? spec->image_name : "(unknown)");
3476         if (count < 0)
3477                 return count;
3478         bufp += count;
3479
3480         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3481                         (unsigned long long) spec->snap_id, spec->snap_name);
3482         if (count < 0)
3483                 return count;
3484         bufp += count;
3485
3486         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3487         if (count < 0)
3488                 return count;
3489         bufp += count;
3490
3491         return (ssize_t) (bufp - buf);
3492 }
3493
3494 static ssize_t rbd_image_refresh(struct device *dev,
3495                                  struct device_attribute *attr,
3496                                  const char *buf,
3497                                  size_t size)
3498 {
3499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3500         int ret;
3501
3502         ret = rbd_dev_refresh(rbd_dev);
3503         if (ret)
3504                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3505
3506         return ret < 0 ? ret : size;
3507 }
3508
3509 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3510 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3511 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3512 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3513 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3514 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3515 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3516 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3517 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3518 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3519 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3520
3521 static struct attribute *rbd_attrs[] = {
3522         &dev_attr_size.attr,
3523         &dev_attr_features.attr,
3524         &dev_attr_major.attr,
3525         &dev_attr_client_id.attr,
3526         &dev_attr_pool.attr,
3527         &dev_attr_pool_id.attr,
3528         &dev_attr_name.attr,
3529         &dev_attr_image_id.attr,
3530         &dev_attr_current_snap.attr,
3531         &dev_attr_parent.attr,
3532         &dev_attr_refresh.attr,
3533         NULL
3534 };
3535
3536 static struct attribute_group rbd_attr_group = {
3537         .attrs = rbd_attrs,
3538 };
3539
3540 static const struct attribute_group *rbd_attr_groups[] = {
3541         &rbd_attr_group,
3542         NULL
3543 };
3544
3545 static void rbd_sysfs_dev_release(struct device *dev)
3546 {
3547 }
3548
3549 static struct device_type rbd_device_type = {
3550         .name           = "rbd",
3551         .groups         = rbd_attr_groups,
3552         .release        = rbd_sysfs_dev_release,
3553 };
3554
3555 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3556 {
3557         kref_get(&spec->kref);
3558
3559         return spec;
3560 }
3561
3562 static void rbd_spec_free(struct kref *kref);
3563 static void rbd_spec_put(struct rbd_spec *spec)
3564 {
3565         if (spec)
3566                 kref_put(&spec->kref, rbd_spec_free);
3567 }
3568
3569 static struct rbd_spec *rbd_spec_alloc(void)
3570 {
3571         struct rbd_spec *spec;
3572
3573         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3574         if (!spec)
3575                 return NULL;
3576         kref_init(&spec->kref);
3577
3578         return spec;
3579 }
3580
3581 static void rbd_spec_free(struct kref *kref)
3582 {
3583         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3584
3585         kfree(spec->pool_name);
3586         kfree(spec->image_id);
3587         kfree(spec->image_name);
3588         kfree(spec->snap_name);
3589         kfree(spec);
3590 }
3591
3592 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3593                                 struct rbd_spec *spec)
3594 {
3595         struct rbd_device *rbd_dev;
3596
3597         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3598         if (!rbd_dev)
3599                 return NULL;
3600
3601         spin_lock_init(&rbd_dev->lock);
3602         rbd_dev->flags = 0;
3603         atomic_set(&rbd_dev->parent_ref, 0);
3604         INIT_LIST_HEAD(&rbd_dev->node);
3605         init_rwsem(&rbd_dev->header_rwsem);
3606
3607         rbd_dev->spec = spec;
3608         rbd_dev->rbd_client = rbdc;
3609
3610         /* Initialize the layout used for all rbd requests */
3611
3612         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3613         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3614         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3615         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3616
3617         return rbd_dev;
3618 }
3619
3620 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3621 {
3622         rbd_put_client(rbd_dev->rbd_client);
3623         rbd_spec_put(rbd_dev->spec);
3624         kfree(rbd_dev);
3625 }
3626
3627 /*
3628  * Get the size and object order for an image snapshot, or if
3629  * snap_id is CEPH_NOSNAP, gets this information for the base
3630  * image.
3631  */
3632 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3633                                 u8 *order, u64 *snap_size)
3634 {
3635         __le64 snapid = cpu_to_le64(snap_id);
3636         int ret;
3637         struct {
3638                 u8 order;
3639                 __le64 size;
3640         } __attribute__ ((packed)) size_buf = { 0 };
3641
3642         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3643                                 "rbd", "get_size",
3644                                 &snapid, sizeof (snapid),
3645                                 &size_buf, sizeof (size_buf));
3646         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3647         if (ret < 0)
3648                 return ret;
3649         if (ret < sizeof (size_buf))
3650                 return -ERANGE;
3651
3652         if (order)
3653                 *order = size_buf.order;
3654         *snap_size = le64_to_cpu(size_buf.size);
3655
3656         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3657                 (unsigned long long)snap_id, (unsigned int)*order,
3658                 (unsigned long long)*snap_size);
3659
3660         return 0;
3661 }
3662
3663 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3664 {
3665         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3666                                         &rbd_dev->header.obj_order,
3667                                         &rbd_dev->header.image_size);
3668 }
3669
3670 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3671 {
3672         void *reply_buf;
3673         int ret;
3674         void *p;
3675
3676         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3677         if (!reply_buf)
3678                 return -ENOMEM;
3679
3680         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3681                                 "rbd", "get_object_prefix", NULL, 0,
3682                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3683         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3684         if (ret < 0)
3685                 goto out;
3686
3687         p = reply_buf;
3688         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3689                                                 p + ret, NULL, GFP_NOIO);
3690         ret = 0;
3691
3692         if (IS_ERR(rbd_dev->header.object_prefix)) {
3693                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3694                 rbd_dev->header.object_prefix = NULL;
3695         } else {
3696                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3697         }
3698 out:
3699         kfree(reply_buf);
3700
3701         return ret;
3702 }
3703
3704 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3705                 u64 *snap_features)
3706 {
3707         __le64 snapid = cpu_to_le64(snap_id);
3708         struct {
3709                 __le64 features;
3710                 __le64 incompat;
3711         } __attribute__ ((packed)) features_buf = { 0 };
3712         u64 incompat;
3713         int ret;
3714
3715         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3716                                 "rbd", "get_features",
3717                                 &snapid, sizeof (snapid),
3718                                 &features_buf, sizeof (features_buf));
3719         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3720         if (ret < 0)
3721                 return ret;
3722         if (ret < sizeof (features_buf))
3723                 return -ERANGE;
3724
3725         incompat = le64_to_cpu(features_buf.incompat);
3726         if (incompat & ~RBD_FEATURES_SUPPORTED)
3727                 return -ENXIO;
3728
3729         *snap_features = le64_to_cpu(features_buf.features);
3730
3731         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3732                 (unsigned long long)snap_id,
3733                 (unsigned long long)*snap_features,
3734                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3735
3736         return 0;
3737 }
3738
3739 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3740 {
3741         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3742                                                 &rbd_dev->header.features);
3743 }
3744
3745 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3746 {
3747         struct rbd_spec *parent_spec;
3748         size_t size;
3749         void *reply_buf = NULL;
3750         __le64 snapid;
3751         void *p;
3752         void *end;
3753         u64 pool_id;
3754         char *image_id;
3755         u64 overlap;
3756         int ret;
3757
3758         parent_spec = rbd_spec_alloc();
3759         if (!parent_spec)
3760                 return -ENOMEM;
3761
3762         size = sizeof (__le64) +                                /* pool_id */
3763                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3764                 sizeof (__le64) +                               /* snap_id */
3765                 sizeof (__le64);                                /* overlap */
3766         reply_buf = kmalloc(size, GFP_KERNEL);
3767         if (!reply_buf) {
3768                 ret = -ENOMEM;
3769                 goto out_err;
3770         }
3771
3772         snapid = cpu_to_le64(CEPH_NOSNAP);
3773         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3774                                 "rbd", "get_parent",
3775                                 &snapid, sizeof (snapid),
3776                                 reply_buf, size);
3777         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3778         if (ret < 0)
3779                 goto out_err;
3780
3781         p = reply_buf;
3782         end = reply_buf + ret;
3783         ret = -ERANGE;
3784         ceph_decode_64_safe(&p, end, pool_id, out_err);
3785         if (pool_id == CEPH_NOPOOL)
3786                 goto out;       /* No parent?  No problem. */
3787
3788         /* The ceph file layout needs to fit pool id in 32 bits */
3789
3790         ret = -EIO;
3791         if (pool_id > (u64)U32_MAX) {
3792                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3793                         (unsigned long long)pool_id, U32_MAX);
3794                 goto out_err;
3795         }
3796         parent_spec->pool_id = pool_id;
3797
3798         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3799         if (IS_ERR(image_id)) {
3800                 ret = PTR_ERR(image_id);
3801                 goto out_err;
3802         }
3803         parent_spec->image_id = image_id;
3804         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3805         ceph_decode_64_safe(&p, end, overlap, out_err);
3806
3807         if (overlap) {
3808                 rbd_spec_put(rbd_dev->parent_spec);
3809                 rbd_dev->parent_spec = parent_spec;
3810                 parent_spec = NULL;     /* rbd_dev now owns this */
3811                 rbd_dev->parent_overlap = overlap;
3812         } else {
3813                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3814         }
3815 out:
3816         ret = 0;
3817 out_err:
3818         kfree(reply_buf);
3819         rbd_spec_put(parent_spec);
3820
3821         return ret;
3822 }
3823
3824 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3825 {
3826         struct {
3827                 __le64 stripe_unit;
3828                 __le64 stripe_count;
3829         } __attribute__ ((packed)) striping_info_buf = { 0 };
3830         size_t size = sizeof (striping_info_buf);
3831         void *p;
3832         u64 obj_size;
3833         u64 stripe_unit;
3834         u64 stripe_count;
3835         int ret;
3836
3837         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3838                                 "rbd", "get_stripe_unit_count", NULL, 0,
3839                                 (char *)&striping_info_buf, size);
3840         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3841         if (ret < 0)
3842                 return ret;
3843         if (ret < size)
3844                 return -ERANGE;
3845
3846         /*
3847          * We don't actually support the "fancy striping" feature
3848          * (STRIPINGV2) yet, but if the striping sizes are the
3849          * defaults the behavior is the same as before.  So find
3850          * out, and only fail if the image has non-default values.
3851          */
3852         ret = -EINVAL;
3853         obj_size = (u64)1 << rbd_dev->header.obj_order;
3854         p = &striping_info_buf;
3855         stripe_unit = ceph_decode_64(&p);
3856         if (stripe_unit != obj_size) {
3857                 rbd_warn(rbd_dev, "unsupported stripe unit "
3858                                 "(got %llu want %llu)",
3859                                 stripe_unit, obj_size);
3860                 return -EINVAL;
3861         }
3862         stripe_count = ceph_decode_64(&p);
3863         if (stripe_count != 1) {
3864                 rbd_warn(rbd_dev, "unsupported stripe count "
3865                                 "(got %llu want 1)", stripe_count);
3866                 return -EINVAL;
3867         }
3868         rbd_dev->header.stripe_unit = stripe_unit;
3869         rbd_dev->header.stripe_count = stripe_count;
3870
3871         return 0;
3872 }
3873
3874 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3875 {
3876         size_t image_id_size;
3877         char *image_id;
3878         void *p;
3879         void *end;
3880         size_t size;
3881         void *reply_buf = NULL;
3882         size_t len = 0;
3883         char *image_name = NULL;
3884         int ret;
3885
3886         rbd_assert(!rbd_dev->spec->image_name);
3887
3888         len = strlen(rbd_dev->spec->image_id);
3889         image_id_size = sizeof (__le32) + len;
3890         image_id = kmalloc(image_id_size, GFP_KERNEL);
3891         if (!image_id)
3892                 return NULL;
3893
3894         p = image_id;
3895         end = image_id + image_id_size;
3896         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3897
3898         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3899         reply_buf = kmalloc(size, GFP_KERNEL);
3900         if (!reply_buf)
3901                 goto out;
3902
3903         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3904                                 "rbd", "dir_get_name",
3905                                 image_id, image_id_size,
3906                                 reply_buf, size);
3907         if (ret < 0)
3908                 goto out;
3909         p = reply_buf;
3910         end = reply_buf + ret;
3911
3912         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3913         if (IS_ERR(image_name))
3914                 image_name = NULL;
3915         else
3916                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3917 out:
3918         kfree(reply_buf);
3919         kfree(image_id);
3920
3921         return image_name;
3922 }
3923
3924 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3925 {
3926         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3927         const char *snap_name;
3928         u32 which = 0;
3929
3930         /* Skip over names until we find the one we are looking for */
3931
3932         snap_name = rbd_dev->header.snap_names;
3933         while (which < snapc->num_snaps) {
3934                 if (!strcmp(name, snap_name))
3935                         return snapc->snaps[which];
3936                 snap_name += strlen(snap_name) + 1;
3937                 which++;
3938         }
3939         return CEPH_NOSNAP;
3940 }
3941
3942 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3943 {
3944         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3945         u32 which;
3946         bool found = false;
3947         u64 snap_id;
3948
3949         for (which = 0; !found && which < snapc->num_snaps; which++) {
3950                 const char *snap_name;
3951
3952                 snap_id = snapc->snaps[which];
3953                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3954                 if (IS_ERR(snap_name))
3955                         break;
3956                 found = !strcmp(name, snap_name);
3957                 kfree(snap_name);
3958         }
3959         return found ? snap_id : CEPH_NOSNAP;
3960 }
3961
3962 /*
3963  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3964  * no snapshot by that name is found, or if an error occurs.
3965  */
3966 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3967 {
3968         if (rbd_dev->image_format == 1)
3969                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3970
3971         return rbd_v2_snap_id_by_name(rbd_dev, name);
3972 }
3973
3974 /*
3975  * When an rbd image has a parent image, it is identified by the
3976  * pool, image, and snapshot ids (not names).  This function fills
3977  * in the names for those ids.  (It's OK if we can't figure out the
3978  * name for an image id, but the pool and snapshot ids should always
3979  * exist and have names.)  All names in an rbd spec are dynamically
3980  * allocated.
3981  *
3982  * When an image being mapped (not a parent) is probed, we have the
3983  * pool name and pool id, image name and image id, and the snapshot
3984  * name.  The only thing we're missing is the snapshot id.
3985  */
3986 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3987 {
3988         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3989         struct rbd_spec *spec = rbd_dev->spec;
3990         const char *pool_name;
3991         const char *image_name;
3992         const char *snap_name;
3993         int ret;
3994
3995         /*
3996          * An image being mapped will have the pool name (etc.), but
3997          * we need to look up the snapshot id.
3998          */
3999         if (spec->pool_name) {
4000                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4001                         u64 snap_id;
4002
4003                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4004                         if (snap_id == CEPH_NOSNAP)
4005                                 return -ENOENT;
4006                         spec->snap_id = snap_id;
4007                 } else {
4008                         spec->snap_id = CEPH_NOSNAP;
4009                 }
4010
4011                 return 0;
4012         }
4013
4014         /* Get the pool name; we have to make our own copy of this */
4015
4016         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4017         if (!pool_name) {
4018                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4019                 return -EIO;
4020         }
4021         pool_name = kstrdup(pool_name, GFP_KERNEL);
4022         if (!pool_name)
4023                 return -ENOMEM;
4024
4025         /* Fetch the image name; tolerate failure here */
4026
4027         image_name = rbd_dev_image_name(rbd_dev);
4028         if (!image_name)
4029                 rbd_warn(rbd_dev, "unable to get image name");
4030
4031         /* Look up the snapshot name, and make a copy */
4032
4033         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4034         if (!snap_name) {
4035                 ret = -ENOMEM;
4036                 goto out_err;
4037         }
4038
4039         spec->pool_name = pool_name;
4040         spec->image_name = image_name;
4041         spec->snap_name = snap_name;
4042
4043         return 0;
4044 out_err:
4045         kfree(image_name);
4046         kfree(pool_name);
4047
4048         return ret;
4049 }
4050
4051 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4052 {
4053         size_t size;
4054         int ret;
4055         void *reply_buf;
4056         void *p;
4057         void *end;
4058         u64 seq;
4059         u32 snap_count;
4060         struct ceph_snap_context *snapc;
4061         u32 i;
4062
4063         /*
4064          * We'll need room for the seq value (maximum snapshot id),
4065          * snapshot count, and array of that many snapshot ids.
4066          * For now we have a fixed upper limit on the number we're
4067          * prepared to receive.
4068          */
4069         size = sizeof (__le64) + sizeof (__le32) +
4070                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4071         reply_buf = kzalloc(size, GFP_KERNEL);
4072         if (!reply_buf)
4073                 return -ENOMEM;
4074
4075         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4076                                 "rbd", "get_snapcontext", NULL, 0,
4077                                 reply_buf, size);
4078         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4079         if (ret < 0)
4080                 goto out;
4081
4082         p = reply_buf;
4083         end = reply_buf + ret;
4084         ret = -ERANGE;
4085         ceph_decode_64_safe(&p, end, seq, out);
4086         ceph_decode_32_safe(&p, end, snap_count, out);
4087
4088         /*
4089          * Make sure the reported number of snapshot ids wouldn't go
4090          * beyond the end of our buffer.  But before checking that,
4091          * make sure the computed size of the snapshot context we
4092          * allocate is representable in a size_t.
4093          */
4094         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4095                                  / sizeof (u64)) {
4096                 ret = -EINVAL;
4097                 goto out;
4098         }
4099         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4100                 goto out;
4101         ret = 0;
4102
4103         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4104         if (!snapc) {
4105                 ret = -ENOMEM;
4106                 goto out;
4107         }
4108         snapc->seq = seq;
4109         for (i = 0; i < snap_count; i++)
4110                 snapc->snaps[i] = ceph_decode_64(&p);
4111
4112         ceph_put_snap_context(rbd_dev->header.snapc);
4113         rbd_dev->header.snapc = snapc;
4114
4115         dout("  snap context seq = %llu, snap_count = %u\n",
4116                 (unsigned long long)seq, (unsigned int)snap_count);
4117 out:
4118         kfree(reply_buf);
4119
4120         return ret;
4121 }
4122
4123 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4124                                         u64 snap_id)
4125 {
4126         size_t size;
4127         void *reply_buf;
4128         __le64 snapid;
4129         int ret;
4130         void *p;
4131         void *end;
4132         char *snap_name;
4133
4134         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4135         reply_buf = kmalloc(size, GFP_KERNEL);
4136         if (!reply_buf)
4137                 return ERR_PTR(-ENOMEM);
4138
4139         snapid = cpu_to_le64(snap_id);
4140         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4141                                 "rbd", "get_snapshot_name",
4142                                 &snapid, sizeof (snapid),
4143                                 reply_buf, size);
4144         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4145         if (ret < 0) {
4146                 snap_name = ERR_PTR(ret);
4147                 goto out;
4148         }
4149
4150         p = reply_buf;
4151         end = reply_buf + ret;
4152         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4153         if (IS_ERR(snap_name))
4154                 goto out;
4155
4156         dout("  snap_id 0x%016llx snap_name = %s\n",
4157                 (unsigned long long)snap_id, snap_name);
4158 out:
4159         kfree(reply_buf);
4160
4161         return snap_name;
4162 }
4163
4164 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4165 {
4166         bool first_time = rbd_dev->header.object_prefix == NULL;
4167         int ret;
4168
4169         down_write(&rbd_dev->header_rwsem);
4170
4171         if (first_time) {
4172                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4173                 if (ret)
4174                         goto out;
4175         }
4176
4177         /*
4178          * If the image supports layering, get the parent info.  We
4179          * need to probe the first time regardless.  Thereafter we
4180          * only need to if there's a parent, to see if it has
4181          * disappeared due to the mapped image getting flattened.
4182          */
4183         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4184                         (first_time || rbd_dev->parent_spec)) {
4185                 bool warn;
4186
4187                 ret = rbd_dev_v2_parent_info(rbd_dev);
4188                 if (ret)
4189                         goto out;
4190
4191                 /*
4192                  * Print a warning if this is the initial probe and
4193                  * the image has a parent.  Don't print it if the
4194                  * image now being probed is itself a parent.  We
4195                  * can tell at this point because we won't know its
4196                  * pool name yet (just its pool id).
4197                  */
4198                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4199                 if (first_time && warn)
4200                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4201                                         "is EXPERIMENTAL!");
4202         }
4203
4204         ret = rbd_dev_v2_image_size(rbd_dev);
4205         if (ret)
4206                 goto out;
4207
4208         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4209                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4210                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4211
4212         ret = rbd_dev_v2_snap_context(rbd_dev);
4213         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4214 out:
4215         up_write(&rbd_dev->header_rwsem);
4216
4217         return ret;
4218 }
4219
4220 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4221 {
4222         struct device *dev;
4223         int ret;
4224
4225         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4226
4227         dev = &rbd_dev->dev;
4228         dev->bus = &rbd_bus_type;
4229         dev->type = &rbd_device_type;
4230         dev->parent = &rbd_root_dev;
4231         dev->release = rbd_dev_device_release;
4232         dev_set_name(dev, "%d", rbd_dev->dev_id);
4233         ret = device_register(dev);
4234
4235         mutex_unlock(&ctl_mutex);
4236
4237         return ret;
4238 }
4239
4240 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4241 {
4242         device_unregister(&rbd_dev->dev);
4243 }
4244
4245 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4246
4247 /*
4248  * Get a unique rbd identifier for the given new rbd_dev, and add
4249  * the rbd_dev to the global list.  The minimum rbd id is 1.
4250  */
4251 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4252 {
4253         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4254
4255         spin_lock(&rbd_dev_list_lock);
4256         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4257         spin_unlock(&rbd_dev_list_lock);
4258         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4259                 (unsigned long long) rbd_dev->dev_id);
4260 }
4261
4262 /*
4263  * Remove an rbd_dev from the global list, and record that its
4264  * identifier is no longer in use.
4265  */
4266 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4267 {
4268         struct list_head *tmp;
4269         int rbd_id = rbd_dev->dev_id;
4270         int max_id;
4271
4272         rbd_assert(rbd_id > 0);
4273
4274         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4275                 (unsigned long long) rbd_dev->dev_id);
4276         spin_lock(&rbd_dev_list_lock);
4277         list_del_init(&rbd_dev->node);
4278
4279         /*
4280          * If the id being "put" is not the current maximum, there
4281          * is nothing special we need to do.
4282          */
4283         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4284                 spin_unlock(&rbd_dev_list_lock);
4285                 return;
4286         }
4287
4288         /*
4289          * We need to update the current maximum id.  Search the
4290          * list to find out what it is.  We're more likely to find
4291          * the maximum at the end, so search the list backward.
4292          */
4293         max_id = 0;
4294         list_for_each_prev(tmp, &rbd_dev_list) {
4295                 struct rbd_device *rbd_dev;
4296
4297                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4298                 if (rbd_dev->dev_id > max_id)
4299                         max_id = rbd_dev->dev_id;
4300         }
4301         spin_unlock(&rbd_dev_list_lock);
4302
4303         /*
4304          * The max id could have been updated by rbd_dev_id_get(), in
4305          * which case it now accurately reflects the new maximum.
4306          * Be careful not to overwrite the maximum value in that
4307          * case.
4308          */
4309         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4310         dout("  max dev id has been reset\n");
4311 }
4312
4313 /*
4314  * Skips over white space at *buf, and updates *buf to point to the
4315  * first found non-space character (if any). Returns the length of
4316  * the token (string of non-white space characters) found.  Note
4317  * that *buf must be terminated with '\0'.
4318  */
4319 static inline size_t next_token(const char **buf)
4320 {
4321         /*
4322         * These are the characters that produce nonzero for
4323         * isspace() in the "C" and "POSIX" locales.
4324         */
4325         const char *spaces = " \f\n\r\t\v";
4326
4327         *buf += strspn(*buf, spaces);   /* Find start of token */
4328
4329         return strcspn(*buf, spaces);   /* Return token length */
4330 }
4331
4332 /*
4333  * Finds the next token in *buf, and if the provided token buffer is
4334  * big enough, copies the found token into it.  The result, if
4335  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4336  * must be terminated with '\0' on entry.
4337  *
4338  * Returns the length of the token found (not including the '\0').
4339  * Return value will be 0 if no token is found, and it will be >=
4340  * token_size if the token would not fit.
4341  *
4342  * The *buf pointer will be updated to point beyond the end of the
4343  * found token.  Note that this occurs even if the token buffer is
4344  * too small to hold it.
4345  */
4346 static inline size_t copy_token(const char **buf,
4347                                 char *token,
4348                                 size_t token_size)
4349 {
4350         size_t len;
4351
4352         len = next_token(buf);
4353         if (len < token_size) {
4354                 memcpy(token, *buf, len);
4355                 *(token + len) = '\0';
4356         }
4357         *buf += len;
4358
4359         return len;
4360 }
4361
4362 /*
4363  * Finds the next token in *buf, dynamically allocates a buffer big
4364  * enough to hold a copy of it, and copies the token into the new
4365  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4366  * that a duplicate buffer is created even for a zero-length token.
4367  *
4368  * Returns a pointer to the newly-allocated duplicate, or a null
4369  * pointer if memory for the duplicate was not available.  If
4370  * the lenp argument is a non-null pointer, the length of the token
4371  * (not including the '\0') is returned in *lenp.
4372  *
4373  * If successful, the *buf pointer will be updated to point beyond
4374  * the end of the found token.
4375  *
4376  * Note: uses GFP_KERNEL for allocation.
4377  */
4378 static inline char *dup_token(const char **buf, size_t *lenp)
4379 {
4380         char *dup;
4381         size_t len;
4382
4383         len = next_token(buf);
4384         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4385         if (!dup)
4386                 return NULL;
4387         *(dup + len) = '\0';
4388         *buf += len;
4389
4390         if (lenp)
4391                 *lenp = len;
4392
4393         return dup;
4394 }
4395
4396 /*
4397  * Parse the options provided for an "rbd add" (i.e., rbd image
4398  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4399  * and the data written is passed here via a NUL-terminated buffer.
4400  * Returns 0 if successful or an error code otherwise.
4401  *
4402  * The information extracted from these options is recorded in
4403  * the other parameters which return dynamically-allocated
4404  * structures:
4405  *  ceph_opts
4406  *      The address of a pointer that will refer to a ceph options
4407  *      structure.  Caller must release the returned pointer using
4408  *      ceph_destroy_options() when it is no longer needed.
4409  *  rbd_opts
4410  *      Address of an rbd options pointer.  Fully initialized by
4411  *      this function; caller must release with kfree().
4412  *  spec
4413  *      Address of an rbd image specification pointer.  Fully
4414  *      initialized by this function based on parsed options.
4415  *      Caller must release with rbd_spec_put().
4416  *
4417  * The options passed take this form:
4418  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4419  * where:
4420  *  <mon_addrs>
4421  *      A comma-separated list of one or more monitor addresses.
4422  *      A monitor address is an ip address, optionally followed
4423  *      by a port number (separated by a colon).
4424  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4425  *  <options>
4426  *      A comma-separated list of ceph and/or rbd options.
4427  *  <pool_name>
4428  *      The name of the rados pool containing the rbd image.
4429  *  <image_name>
4430  *      The name of the image in that pool to map.
4431  *  <snap_id>
4432  *      An optional snapshot id.  If provided, the mapping will
4433  *      present data from the image at the time that snapshot was
4434  *      created.  The image head is used if no snapshot id is
4435  *      provided.  Snapshot mappings are always read-only.
4436  */
4437 static int rbd_add_parse_args(const char *buf,
4438                                 struct ceph_options **ceph_opts,
4439                                 struct rbd_options **opts,
4440                                 struct rbd_spec **rbd_spec)
4441 {
4442         size_t len;
4443         char *options;
4444         const char *mon_addrs;
4445         char *snap_name;
4446         size_t mon_addrs_size;
4447         struct rbd_spec *spec = NULL;
4448         struct rbd_options *rbd_opts = NULL;
4449         struct ceph_options *copts;
4450         int ret;
4451
4452         /* The first four tokens are required */
4453
4454         len = next_token(&buf);
4455         if (!len) {
4456                 rbd_warn(NULL, "no monitor address(es) provided");
4457                 return -EINVAL;
4458         }
4459         mon_addrs = buf;
4460         mon_addrs_size = len + 1;
4461         buf += len;
4462
4463         ret = -EINVAL;
4464         options = dup_token(&buf, NULL);
4465         if (!options)
4466                 return -ENOMEM;
4467         if (!*options) {
4468                 rbd_warn(NULL, "no options provided");
4469                 goto out_err;
4470         }
4471
4472         spec = rbd_spec_alloc();
4473         if (!spec)
4474                 goto out_mem;
4475
4476         spec->pool_name = dup_token(&buf, NULL);
4477         if (!spec->pool_name)
4478                 goto out_mem;
4479         if (!*spec->pool_name) {
4480                 rbd_warn(NULL, "no pool name provided");
4481                 goto out_err;
4482         }
4483
4484         spec->image_name = dup_token(&buf, NULL);
4485         if (!spec->image_name)
4486                 goto out_mem;
4487         if (!*spec->image_name) {
4488                 rbd_warn(NULL, "no image name provided");
4489                 goto out_err;
4490         }
4491
4492         /*
4493          * Snapshot name is optional; default is to use "-"
4494          * (indicating the head/no snapshot).
4495          */
4496         len = next_token(&buf);
4497         if (!len) {
4498                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4499                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4500         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4501                 ret = -ENAMETOOLONG;
4502                 goto out_err;
4503         }
4504         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4505         if (!snap_name)
4506                 goto out_mem;
4507         *(snap_name + len) = '\0';
4508         spec->snap_name = snap_name;
4509
4510         /* Initialize all rbd options to the defaults */
4511
4512         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4513         if (!rbd_opts)
4514                 goto out_mem;
4515
4516         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4517
4518         copts = ceph_parse_options(options, mon_addrs,
4519                                         mon_addrs + mon_addrs_size - 1,
4520                                         parse_rbd_opts_token, rbd_opts);
4521         if (IS_ERR(copts)) {
4522                 ret = PTR_ERR(copts);
4523                 goto out_err;
4524         }
4525         kfree(options);
4526
4527         *ceph_opts = copts;
4528         *opts = rbd_opts;
4529         *rbd_spec = spec;
4530
4531         return 0;
4532 out_mem:
4533         ret = -ENOMEM;
4534 out_err:
4535         kfree(rbd_opts);
4536         rbd_spec_put(spec);
4537         kfree(options);
4538
4539         return ret;
4540 }
4541
4542 /*
4543  * An rbd format 2 image has a unique identifier, distinct from the
4544  * name given to it by the user.  Internally, that identifier is
4545  * what's used to specify the names of objects related to the image.
4546  *
4547  * A special "rbd id" object is used to map an rbd image name to its
4548  * id.  If that object doesn't exist, then there is no v2 rbd image
4549  * with the supplied name.
4550  *
4551  * This function will record the given rbd_dev's image_id field if
4552  * it can be determined, and in that case will return 0.  If any
4553  * errors occur a negative errno will be returned and the rbd_dev's
4554  * image_id field will be unchanged (and should be NULL).
4555  */
4556 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4557 {
4558         int ret;
4559         size_t size;
4560         char *object_name;
4561         void *response;
4562         char *image_id;
4563
4564         /*
4565          * When probing a parent image, the image id is already
4566          * known (and the image name likely is not).  There's no
4567          * need to fetch the image id again in this case.  We
4568          * do still need to set the image format though.
4569          */
4570         if (rbd_dev->spec->image_id) {
4571                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4572
4573                 return 0;
4574         }
4575
4576         /*
4577          * First, see if the format 2 image id file exists, and if
4578          * so, get the image's persistent id from it.
4579          */
4580         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4581         object_name = kmalloc(size, GFP_NOIO);
4582         if (!object_name)
4583                 return -ENOMEM;
4584         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4585         dout("rbd id object name is %s\n", object_name);
4586
4587         /* Response will be an encoded string, which includes a length */
4588
4589         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4590         response = kzalloc(size, GFP_NOIO);
4591         if (!response) {
4592                 ret = -ENOMEM;
4593                 goto out;
4594         }
4595
4596         /* If it doesn't exist we'll assume it's a format 1 image */
4597
4598         ret = rbd_obj_method_sync(rbd_dev, object_name,
4599                                 "rbd", "get_id", NULL, 0,
4600                                 response, RBD_IMAGE_ID_LEN_MAX);
4601         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4602         if (ret == -ENOENT) {
4603                 image_id = kstrdup("", GFP_KERNEL);
4604                 ret = image_id ? 0 : -ENOMEM;
4605                 if (!ret)
4606                         rbd_dev->image_format = 1;
4607         } else if (ret > sizeof (__le32)) {
4608                 void *p = response;
4609
4610                 image_id = ceph_extract_encoded_string(&p, p + ret,
4611                                                 NULL, GFP_NOIO);
4612                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4613                 if (!ret)
4614                         rbd_dev->image_format = 2;
4615         } else {
4616                 ret = -EINVAL;
4617         }
4618
4619         if (!ret) {
4620                 rbd_dev->spec->image_id = image_id;
4621                 dout("image_id is %s\n", image_id);
4622         }
4623 out:
4624         kfree(response);
4625         kfree(object_name);
4626
4627         return ret;
4628 }
4629
4630 /* Undo whatever state changes are made by v1 or v2 image probe */
4631
4632 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4633 {
4634         struct rbd_image_header *header;
4635
4636         rbd_dev_parent_put(rbd_dev);
4637
4638         /* Free dynamic fields from the header, then zero it out */
4639
4640         header = &rbd_dev->header;
4641         ceph_put_snap_context(header->snapc);
4642         kfree(header->snap_sizes);
4643         kfree(header->snap_names);
4644         kfree(header->object_prefix);
4645         memset(header, 0, sizeof (*header));
4646 }
4647
4648 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4649 {
4650         int ret;
4651
4652         ret = rbd_dev_v2_object_prefix(rbd_dev);
4653         if (ret)
4654                 goto out_err;
4655
4656         /*
4657          * Get the and check features for the image.  Currently the
4658          * features are assumed to never change.
4659          */
4660         ret = rbd_dev_v2_features(rbd_dev);
4661         if (ret)
4662                 goto out_err;
4663
4664         /* If the image supports fancy striping, get its parameters */
4665
4666         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4667                 ret = rbd_dev_v2_striping_info(rbd_dev);
4668                 if (ret < 0)
4669                         goto out_err;
4670         }
4671         /* No support for crypto and compression type format 2 images */
4672
4673         return 0;
4674 out_err:
4675         rbd_dev->header.features = 0;
4676         kfree(rbd_dev->header.object_prefix);
4677         rbd_dev->header.object_prefix = NULL;
4678
4679         return ret;
4680 }
4681
4682 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4683 {
4684         struct rbd_device *parent = NULL;
4685         struct rbd_spec *parent_spec;
4686         struct rbd_client *rbdc;
4687         int ret;
4688
4689         if (!rbd_dev->parent_spec)
4690                 return 0;
4691         /*
4692          * We need to pass a reference to the client and the parent
4693          * spec when creating the parent rbd_dev.  Images related by
4694          * parent/child relationships always share both.
4695          */
4696         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4697         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4698
4699         ret = -ENOMEM;
4700         parent = rbd_dev_create(rbdc, parent_spec);
4701         if (!parent)
4702                 goto out_err;
4703
4704         ret = rbd_dev_image_probe(parent, false);
4705         if (ret < 0)
4706                 goto out_err;
4707         rbd_dev->parent = parent;
4708         atomic_set(&rbd_dev->parent_ref, 1);
4709
4710         return 0;
4711 out_err:
4712         if (parent) {
4713                 rbd_dev_unparent(rbd_dev);
4714                 kfree(rbd_dev->header_name);
4715                 rbd_dev_destroy(parent);
4716         } else {
4717                 rbd_put_client(rbdc);
4718                 rbd_spec_put(parent_spec);
4719         }
4720
4721         return ret;
4722 }
4723
4724 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4725 {
4726         int ret;
4727
4728         /* generate unique id: find highest unique id, add one */
4729         rbd_dev_id_get(rbd_dev);
4730
4731         /* Fill in the device name, now that we have its id. */
4732         BUILD_BUG_ON(DEV_NAME_LEN
4733                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4734         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4735
4736         /* Get our block major device number. */
4737
4738         ret = register_blkdev(0, rbd_dev->name);
4739         if (ret < 0)
4740                 goto err_out_id;
4741         rbd_dev->major = ret;
4742
4743         /* Set up the blkdev mapping. */
4744
4745         ret = rbd_init_disk(rbd_dev);
4746         if (ret)
4747                 goto err_out_blkdev;
4748
4749         ret = rbd_dev_mapping_set(rbd_dev);
4750         if (ret)
4751                 goto err_out_disk;
4752         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4753
4754         ret = rbd_bus_add_dev(rbd_dev);
4755         if (ret)
4756                 goto err_out_mapping;
4757
4758         /* Everything's ready.  Announce the disk to the world. */
4759
4760         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4761         add_disk(rbd_dev->disk);
4762
4763         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4764                 (unsigned long long) rbd_dev->mapping.size);
4765
4766         return ret;
4767
4768 err_out_mapping:
4769         rbd_dev_mapping_clear(rbd_dev);
4770 err_out_disk:
4771         rbd_free_disk(rbd_dev);
4772 err_out_blkdev:
4773         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4774 err_out_id:
4775         rbd_dev_id_put(rbd_dev);
4776         rbd_dev_mapping_clear(rbd_dev);
4777
4778         return ret;
4779 }
4780
4781 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4782 {
4783         struct rbd_spec *spec = rbd_dev->spec;
4784         size_t size;
4785
4786         /* Record the header object name for this rbd image. */
4787
4788         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4789
4790         if (rbd_dev->image_format == 1)
4791                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4792         else
4793                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4794
4795         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4796         if (!rbd_dev->header_name)
4797                 return -ENOMEM;
4798
4799         if (rbd_dev->image_format == 1)
4800                 sprintf(rbd_dev->header_name, "%s%s",
4801                         spec->image_name, RBD_SUFFIX);
4802         else
4803                 sprintf(rbd_dev->header_name, "%s%s",
4804                         RBD_HEADER_PREFIX, spec->image_id);
4805         return 0;
4806 }
4807
4808 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4809 {
4810         rbd_dev_unprobe(rbd_dev);
4811         kfree(rbd_dev->header_name);
4812         rbd_dev->header_name = NULL;
4813         rbd_dev->image_format = 0;
4814         kfree(rbd_dev->spec->image_id);
4815         rbd_dev->spec->image_id = NULL;
4816
4817         rbd_dev_destroy(rbd_dev);
4818 }
4819
4820 /*
4821  * Probe for the existence of the header object for the given rbd
4822  * device.  If this image is the one being mapped (i.e., not a
4823  * parent), initiate a watch on its header object before using that
4824  * object to get detailed information about the rbd image.
4825  */
4826 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4827 {
4828         int ret;
4829         int tmp;
4830
4831         /*
4832          * Get the id from the image id object.  If it's not a
4833          * format 2 image, we'll get ENOENT back, and we'll assume
4834          * it's a format 1 image.
4835          */
4836         ret = rbd_dev_image_id(rbd_dev);
4837         if (ret)
4838                 return ret;
4839         rbd_assert(rbd_dev->spec->image_id);
4840         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4841
4842         ret = rbd_dev_header_name(rbd_dev);
4843         if (ret)
4844                 goto err_out_format;
4845
4846         if (mapping) {
4847                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4848                 if (ret)
4849                         goto out_header_name;
4850         }
4851
4852         if (rbd_dev->image_format == 1)
4853                 ret = rbd_dev_v1_header_info(rbd_dev);
4854         else
4855                 ret = rbd_dev_v2_header_info(rbd_dev);
4856         if (ret)
4857                 goto err_out_watch;
4858
4859         ret = rbd_dev_spec_update(rbd_dev);
4860         if (ret)
4861                 goto err_out_probe;
4862
4863         ret = rbd_dev_probe_parent(rbd_dev);
4864         if (ret)
4865                 goto err_out_probe;
4866
4867         dout("discovered format %u image, header name is %s\n",
4868                 rbd_dev->image_format, rbd_dev->header_name);
4869
4870         return 0;
4871 err_out_probe:
4872         rbd_dev_unprobe(rbd_dev);
4873 err_out_watch:
4874         if (mapping) {
4875                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4876                 if (tmp)
4877                         rbd_warn(rbd_dev, "unable to tear down "
4878                                         "watch request (%d)\n", tmp);
4879         }
4880 out_header_name:
4881         kfree(rbd_dev->header_name);
4882         rbd_dev->header_name = NULL;
4883 err_out_format:
4884         rbd_dev->image_format = 0;
4885         kfree(rbd_dev->spec->image_id);
4886         rbd_dev->spec->image_id = NULL;
4887
4888         dout("probe failed, returning %d\n", ret);
4889
4890         return ret;
4891 }
4892
4893 static ssize_t rbd_add(struct bus_type *bus,
4894                        const char *buf,
4895                        size_t count)
4896 {
4897         struct rbd_device *rbd_dev = NULL;
4898         struct ceph_options *ceph_opts = NULL;
4899         struct rbd_options *rbd_opts = NULL;
4900         struct rbd_spec *spec = NULL;
4901         struct rbd_client *rbdc;
4902         struct ceph_osd_client *osdc;
4903         bool read_only;
4904         int rc = -ENOMEM;
4905
4906         if (!try_module_get(THIS_MODULE))
4907                 return -ENODEV;
4908
4909         /* parse add command */
4910         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4911         if (rc < 0)
4912                 goto err_out_module;
4913         read_only = rbd_opts->read_only;
4914         kfree(rbd_opts);
4915         rbd_opts = NULL;        /* done with this */
4916
4917         rbdc = rbd_get_client(ceph_opts);
4918         if (IS_ERR(rbdc)) {
4919                 rc = PTR_ERR(rbdc);
4920                 goto err_out_args;
4921         }
4922         ceph_opts = NULL;       /* rbd_dev client now owns this */
4923
4924         /* pick the pool */
4925         osdc = &rbdc->client->osdc;
4926         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4927         if (rc < 0)
4928                 goto err_out_client;
4929         spec->pool_id = (u64)rc;
4930
4931         /* The ceph file layout needs to fit pool id in 32 bits */
4932
4933         if (spec->pool_id > (u64)U32_MAX) {
4934                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4935                                 (unsigned long long)spec->pool_id, U32_MAX);
4936                 rc = -EIO;
4937                 goto err_out_client;
4938         }
4939
4940         rbd_dev = rbd_dev_create(rbdc, spec);
4941         if (!rbd_dev)
4942                 goto err_out_client;
4943         rbdc = NULL;            /* rbd_dev now owns this */
4944         spec = NULL;            /* rbd_dev now owns this */
4945
4946         rc = rbd_dev_image_probe(rbd_dev, true);
4947         if (rc < 0)
4948                 goto err_out_rbd_dev;
4949
4950         /* If we are mapping a snapshot it must be marked read-only */
4951
4952         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4953                 read_only = true;
4954         rbd_dev->mapping.read_only = read_only;
4955
4956         rc = rbd_dev_device_setup(rbd_dev);
4957         if (!rc)
4958                 return count;
4959
4960         rbd_dev_image_release(rbd_dev);
4961 err_out_rbd_dev:
4962         rbd_dev_destroy(rbd_dev);
4963 err_out_client:
4964         rbd_put_client(rbdc);
4965 err_out_args:
4966         if (ceph_opts)
4967                 ceph_destroy_options(ceph_opts);
4968         kfree(rbd_opts);
4969         rbd_spec_put(spec);
4970 err_out_module:
4971         module_put(THIS_MODULE);
4972
4973         dout("Error adding device %s\n", buf);
4974
4975         return (ssize_t)rc;
4976 }
4977
4978 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4979 {
4980         struct list_head *tmp;
4981         struct rbd_device *rbd_dev;
4982
4983         spin_lock(&rbd_dev_list_lock);
4984         list_for_each(tmp, &rbd_dev_list) {
4985                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4986                 if (rbd_dev->dev_id == dev_id) {
4987                         spin_unlock(&rbd_dev_list_lock);
4988                         return rbd_dev;
4989                 }
4990         }
4991         spin_unlock(&rbd_dev_list_lock);
4992         return NULL;
4993 }
4994
4995 static void rbd_dev_device_release(struct device *dev)
4996 {
4997         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4998
4999         rbd_free_disk(rbd_dev);
5000         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5001         rbd_dev_mapping_clear(rbd_dev);
5002         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5003         rbd_dev->major = 0;
5004         rbd_dev_id_put(rbd_dev);
5005         rbd_dev_mapping_clear(rbd_dev);
5006 }
5007
5008 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5009 {
5010         while (rbd_dev->parent) {
5011                 struct rbd_device *first = rbd_dev;
5012                 struct rbd_device *second = first->parent;
5013                 struct rbd_device *third;
5014
5015                 /*
5016                  * Follow to the parent with no grandparent and
5017                  * remove it.
5018                  */
5019                 while (second && (third = second->parent)) {
5020                         first = second;
5021                         second = third;
5022                 }
5023                 rbd_assert(second);
5024                 rbd_dev_image_release(second);
5025                 first->parent = NULL;
5026                 first->parent_overlap = 0;
5027
5028                 rbd_assert(first->parent_spec);
5029                 rbd_spec_put(first->parent_spec);
5030                 first->parent_spec = NULL;
5031         }
5032 }
5033
5034 static ssize_t rbd_remove(struct bus_type *bus,
5035                           const char *buf,
5036                           size_t count)
5037 {
5038         struct rbd_device *rbd_dev = NULL;
5039         int target_id;
5040         unsigned long ul;
5041         int ret;
5042
5043         ret = strict_strtoul(buf, 10, &ul);
5044         if (ret)
5045                 return ret;
5046
5047         /* convert to int; abort if we lost anything in the conversion */
5048         target_id = (int) ul;
5049         if (target_id != ul)
5050                 return -EINVAL;
5051
5052         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5053
5054         rbd_dev = __rbd_get_dev(target_id);
5055         if (!rbd_dev) {
5056                 ret = -ENOENT;
5057                 goto done;
5058         }
5059
5060         spin_lock_irq(&rbd_dev->lock);
5061         if (rbd_dev->open_count)
5062                 ret = -EBUSY;
5063         else
5064                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5065         spin_unlock_irq(&rbd_dev->lock);
5066         if (ret < 0)
5067                 goto done;
5068         rbd_bus_del_dev(rbd_dev);
5069         ret = rbd_dev_header_watch_sync(rbd_dev, false);
5070         if (ret)
5071                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5072         rbd_dev_image_release(rbd_dev);
5073         module_put(THIS_MODULE);
5074         ret = count;
5075 done:
5076         mutex_unlock(&ctl_mutex);
5077
5078         return ret;
5079 }
5080
5081 /*
5082  * create control files in sysfs
5083  * /sys/bus/rbd/...
5084  */
5085 static int rbd_sysfs_init(void)
5086 {
5087         int ret;
5088
5089         ret = device_register(&rbd_root_dev);
5090         if (ret < 0)
5091                 return ret;
5092
5093         ret = bus_register(&rbd_bus_type);
5094         if (ret < 0)
5095                 device_unregister(&rbd_root_dev);
5096
5097         return ret;
5098 }
5099
5100 static void rbd_sysfs_cleanup(void)
5101 {
5102         bus_unregister(&rbd_bus_type);
5103         device_unregister(&rbd_root_dev);
5104 }
5105
5106 static int rbd_slab_init(void)
5107 {
5108         rbd_assert(!rbd_img_request_cache);
5109         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5110                                         sizeof (struct rbd_img_request),
5111                                         __alignof__(struct rbd_img_request),
5112                                         0, NULL);
5113         if (!rbd_img_request_cache)
5114                 return -ENOMEM;
5115
5116         rbd_assert(!rbd_obj_request_cache);
5117         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5118                                         sizeof (struct rbd_obj_request),
5119                                         __alignof__(struct rbd_obj_request),
5120                                         0, NULL);
5121         if (!rbd_obj_request_cache)
5122                 goto out_err;
5123
5124         rbd_assert(!rbd_segment_name_cache);
5125         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5126                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5127         if (rbd_segment_name_cache)
5128                 return 0;
5129 out_err:
5130         if (rbd_obj_request_cache) {
5131                 kmem_cache_destroy(rbd_obj_request_cache);
5132                 rbd_obj_request_cache = NULL;
5133         }
5134
5135         kmem_cache_destroy(rbd_img_request_cache);
5136         rbd_img_request_cache = NULL;
5137
5138         return -ENOMEM;
5139 }
5140
5141 static void rbd_slab_exit(void)
5142 {
5143         rbd_assert(rbd_segment_name_cache);
5144         kmem_cache_destroy(rbd_segment_name_cache);
5145         rbd_segment_name_cache = NULL;
5146
5147         rbd_assert(rbd_obj_request_cache);
5148         kmem_cache_destroy(rbd_obj_request_cache);
5149         rbd_obj_request_cache = NULL;
5150
5151         rbd_assert(rbd_img_request_cache);
5152         kmem_cache_destroy(rbd_img_request_cache);
5153         rbd_img_request_cache = NULL;
5154 }
5155
5156 static int __init rbd_init(void)
5157 {
5158         int rc;
5159
5160         if (!libceph_compatible(NULL)) {
5161                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5162
5163                 return -EINVAL;
5164         }
5165         rc = rbd_slab_init();
5166         if (rc)
5167                 return rc;
5168         rc = rbd_sysfs_init();
5169         if (rc)
5170                 rbd_slab_exit();
5171         else
5172                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5173
5174         return rc;
5175 }
5176
5177 static void __exit rbd_exit(void)
5178 {
5179         rbd_sysfs_cleanup();
5180         rbd_slab_exit();
5181 }
5182
5183 module_init(rbd_init);
5184 module_exit(rbd_exit);
5185
5186 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5187 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5188 MODULE_DESCRIPTION("rados block device");
5189
5190 /* following authorship retained from original osdblk.c */
5191 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5192
5193 MODULE_LICENSE("GPL");