]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: clear EXISTS flag if mapped snapshot disappears
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
70
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX    64
74
75 #define RBD_OBJ_PREFIX_LEN_MAX  64
76
77 /* Feature bits */
78
79 #define RBD_FEATURE_LAYERING    (1<<0)
80 #define RBD_FEATURE_STRIPINGV2  (1<<1)
81 #define RBD_FEATURES_ALL \
82             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 /*
98  * block device image metadata (in-memory version)
99  */
100 struct rbd_image_header {
101         /* These four fields never change for a given rbd image */
102         char *object_prefix;
103         u64 features;
104         __u8 obj_order;
105         __u8 crypt_type;
106         __u8 comp_type;
107
108         /* The remaining fields need to be updated occasionally */
109         u64 image_size;
110         struct ceph_snap_context *snapc;
111         char *snap_names;
112         u64 *snap_sizes;
113
114         u64 stripe_unit;
115         u64 stripe_count;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         const char      *pool_name;
146
147         const char      *image_id;
148         const char      *image_name;
149
150         u64             snap_id;
151         const char      *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 enum obj_req_flags {
178         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
179         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
180         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
181         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
182 };
183
184 struct rbd_obj_request {
185         const char              *object_name;
186         u64                     offset;         /* object start byte */
187         u64                     length;         /* bytes from offset */
188         unsigned long           flags;
189
190         /*
191          * An object request associated with an image will have its
192          * img_data flag set; a standalone object request will not.
193          *
194          * A standalone object request will have which == BAD_WHICH
195          * and a null obj_request pointer.
196          *
197          * An object request initiated in support of a layered image
198          * object (to check for its existence before a write) will
199          * have which == BAD_WHICH and a non-null obj_request pointer.
200          *
201          * Finally, an object request for rbd image data will have
202          * which != BAD_WHICH, and will have a non-null img_request
203          * pointer.  The value of which will be in the range
204          * 0..(img_request->obj_request_count-1).
205          */
206         union {
207                 struct rbd_obj_request  *obj_request;   /* STAT op */
208                 struct {
209                         struct rbd_img_request  *img_request;
210                         u64                     img_offset;
211                         /* links for img_request->obj_requests list */
212                         struct list_head        links;
213                 };
214         };
215         u32                     which;          /* posn image request list */
216
217         enum obj_request_type   type;
218         union {
219                 struct bio      *bio_list;
220                 struct {
221                         struct page     **pages;
222                         u32             page_count;
223                 };
224         };
225         struct page             **copyup_pages;
226
227         struct ceph_osd_request *osd_req;
228
229         u64                     xferred;        /* bytes transferred */
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_mapping {
278         u64                     size;
279         u64                     features;
280         bool                    read_only;
281 };
282
283 /*
284  * a single device
285  */
286 struct rbd_device {
287         int                     dev_id;         /* blkdev unique id */
288
289         int                     major;          /* blkdev assigned major */
290         struct gendisk          *disk;          /* blkdev's gendisk and rq */
291
292         u32                     image_format;   /* Either 1 or 2 */
293         struct rbd_client       *rbd_client;
294
295         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
296
297         spinlock_t              lock;           /* queue, flags, open_count */
298
299         struct rbd_image_header header;
300         unsigned long           flags;          /* possibly lock protected */
301         struct rbd_spec         *spec;
302
303         char                    *header_name;
304
305         struct ceph_file_layout layout;
306
307         struct ceph_osd_event   *watch_event;
308         struct rbd_obj_request  *watch_request;
309
310         struct rbd_spec         *parent_spec;
311         u64                     parent_overlap;
312         struct rbd_device       *parent;
313
314         /* protects updating the header */
315         struct rw_semaphore     header_rwsem;
316
317         struct rbd_mapping      mapping;
318
319         struct list_head        node;
320
321         /* sysfs related */
322         struct device           dev;
323         unsigned long           open_count;     /* protected by lock */
324 };
325
326 /*
327  * Flag bits for rbd_dev->flags.  If atomicity is required,
328  * rbd_dev->lock is used to protect access.
329  *
330  * Currently, only the "removing" flag (which is coupled with the
331  * "open_count" field) requires atomic access.
332  */
333 enum rbd_dev_flags {
334         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
335         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
336 };
337
338 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
339
340 static LIST_HEAD(rbd_dev_list);    /* devices */
341 static DEFINE_SPINLOCK(rbd_dev_list_lock);
342
343 static LIST_HEAD(rbd_client_list);              /* clients */
344 static DEFINE_SPINLOCK(rbd_client_list_lock);
345
346 static int rbd_img_request_submit(struct rbd_img_request *img_request);
347
348 static void rbd_dev_device_release(struct device *dev);
349
350 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
351                        size_t count);
352 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
353                           size_t count);
354 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
355
356 static struct bus_attribute rbd_bus_attrs[] = {
357         __ATTR(add, S_IWUSR, NULL, rbd_add),
358         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
359         __ATTR_NULL
360 };
361
362 static struct bus_type rbd_bus_type = {
363         .name           = "rbd",
364         .bus_attrs      = rbd_bus_attrs,
365 };
366
367 static void rbd_root_dev_release(struct device *dev)
368 {
369 }
370
371 static struct device rbd_root_dev = {
372         .init_name =    "rbd",
373         .release =      rbd_root_dev_release,
374 };
375
376 static __printf(2, 3)
377 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
378 {
379         struct va_format vaf;
380         va_list args;
381
382         va_start(args, fmt);
383         vaf.fmt = fmt;
384         vaf.va = &args;
385
386         if (!rbd_dev)
387                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
388         else if (rbd_dev->disk)
389                 printk(KERN_WARNING "%s: %s: %pV\n",
390                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
391         else if (rbd_dev->spec && rbd_dev->spec->image_name)
392                 printk(KERN_WARNING "%s: image %s: %pV\n",
393                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
394         else if (rbd_dev->spec && rbd_dev->spec->image_id)
395                 printk(KERN_WARNING "%s: id %s: %pV\n",
396                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
397         else    /* punt */
398                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
399                         RBD_DRV_NAME, rbd_dev, &vaf);
400         va_end(args);
401 }
402
403 #ifdef RBD_DEBUG
404 #define rbd_assert(expr)                                                \
405                 if (unlikely(!(expr))) {                                \
406                         printk(KERN_ERR "\nAssertion failure in %s() "  \
407                                                 "at line %d:\n\n"       \
408                                         "\trbd_assert(%s);\n\n",        \
409                                         __func__, __LINE__, #expr);     \
410                         BUG();                                          \
411                 }
412 #else /* !RBD_DEBUG */
413 #  define rbd_assert(expr)      ((void) 0)
414 #endif /* !RBD_DEBUG */
415
416 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
417 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
418 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
419
420 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
421 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
422 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
423                                         u64 snap_id);
424 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
425                                 u8 *order, u64 *snap_size);
426 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
427                 u64 *snap_features);
428 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
429
430 static int rbd_open(struct block_device *bdev, fmode_t mode)
431 {
432         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
433         bool removing = false;
434
435         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
436                 return -EROFS;
437
438         spin_lock_irq(&rbd_dev->lock);
439         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
440                 removing = true;
441         else
442                 rbd_dev->open_count++;
443         spin_unlock_irq(&rbd_dev->lock);
444         if (removing)
445                 return -ENOENT;
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448         (void) get_device(&rbd_dev->dev);
449         set_device_ro(bdev, rbd_dev->mapping.read_only);
450         mutex_unlock(&ctl_mutex);
451
452         return 0;
453 }
454
455 static int rbd_release(struct gendisk *disk, fmode_t mode)
456 {
457         struct rbd_device *rbd_dev = disk->private_data;
458         unsigned long open_count_before;
459
460         spin_lock_irq(&rbd_dev->lock);
461         open_count_before = rbd_dev->open_count--;
462         spin_unlock_irq(&rbd_dev->lock);
463         rbd_assert(open_count_before > 0);
464
465         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
466         put_device(&rbd_dev->dev);
467         mutex_unlock(&ctl_mutex);
468
469         return 0;
470 }
471
472 static const struct block_device_operations rbd_bd_ops = {
473         .owner                  = THIS_MODULE,
474         .open                   = rbd_open,
475         .release                = rbd_release,
476 };
477
478 /*
479  * Initialize an rbd client instance.
480  * We own *ceph_opts.
481  */
482 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
483 {
484         struct rbd_client *rbdc;
485         int ret = -ENOMEM;
486
487         dout("%s:\n", __func__);
488         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
489         if (!rbdc)
490                 goto out_opt;
491
492         kref_init(&rbdc->kref);
493         INIT_LIST_HEAD(&rbdc->node);
494
495         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
496
497         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
498         if (IS_ERR(rbdc->client))
499                 goto out_mutex;
500         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
501
502         ret = ceph_open_session(rbdc->client);
503         if (ret < 0)
504                 goto out_err;
505
506         spin_lock(&rbd_client_list_lock);
507         list_add_tail(&rbdc->node, &rbd_client_list);
508         spin_unlock(&rbd_client_list_lock);
509
510         mutex_unlock(&ctl_mutex);
511         dout("%s: rbdc %p\n", __func__, rbdc);
512
513         return rbdc;
514
515 out_err:
516         ceph_destroy_client(rbdc->client);
517 out_mutex:
518         mutex_unlock(&ctl_mutex);
519         kfree(rbdc);
520 out_opt:
521         if (ceph_opts)
522                 ceph_destroy_options(ceph_opts);
523         dout("%s: error %d\n", __func__, ret);
524
525         return ERR_PTR(ret);
526 }
527
528 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
529 {
530         kref_get(&rbdc->kref);
531
532         return rbdc;
533 }
534
535 /*
536  * Find a ceph client with specific addr and configuration.  If
537  * found, bump its reference count.
538  */
539 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
540 {
541         struct rbd_client *client_node;
542         bool found = false;
543
544         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
545                 return NULL;
546
547         spin_lock(&rbd_client_list_lock);
548         list_for_each_entry(client_node, &rbd_client_list, node) {
549                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
550                         __rbd_get_client(client_node);
551
552                         found = true;
553                         break;
554                 }
555         }
556         spin_unlock(&rbd_client_list_lock);
557
558         return found ? client_node : NULL;
559 }
560
561 /*
562  * mount options
563  */
564 enum {
565         Opt_last_int,
566         /* int args above */
567         Opt_last_string,
568         /* string args above */
569         Opt_read_only,
570         Opt_read_write,
571         /* Boolean args above */
572         Opt_last_bool,
573 };
574
575 static match_table_t rbd_opts_tokens = {
576         /* int args above */
577         /* string args above */
578         {Opt_read_only, "read_only"},
579         {Opt_read_only, "ro"},          /* Alternate spelling */
580         {Opt_read_write, "read_write"},
581         {Opt_read_write, "rw"},         /* Alternate spelling */
582         /* Boolean args above */
583         {-1, NULL}
584 };
585
586 struct rbd_options {
587         bool    read_only;
588 };
589
590 #define RBD_READ_ONLY_DEFAULT   false
591
592 static int parse_rbd_opts_token(char *c, void *private)
593 {
594         struct rbd_options *rbd_opts = private;
595         substring_t argstr[MAX_OPT_ARGS];
596         int token, intval, ret;
597
598         token = match_token(c, rbd_opts_tokens, argstr);
599         if (token < 0)
600                 return -EINVAL;
601
602         if (token < Opt_last_int) {
603                 ret = match_int(&argstr[0], &intval);
604                 if (ret < 0) {
605                         pr_err("bad mount option arg (not int) "
606                                "at '%s'\n", c);
607                         return ret;
608                 }
609                 dout("got int token %d val %d\n", token, intval);
610         } else if (token > Opt_last_int && token < Opt_last_string) {
611                 dout("got string token %d val %s\n", token,
612                      argstr[0].from);
613         } else if (token > Opt_last_string && token < Opt_last_bool) {
614                 dout("got Boolean token %d\n", token);
615         } else {
616                 dout("got token %d\n", token);
617         }
618
619         switch (token) {
620         case Opt_read_only:
621                 rbd_opts->read_only = true;
622                 break;
623         case Opt_read_write:
624                 rbd_opts->read_only = false;
625                 break;
626         default:
627                 rbd_assert(false);
628                 break;
629         }
630         return 0;
631 }
632
633 /*
634  * Get a ceph client with specific addr and configuration, if one does
635  * not exist create it.
636  */
637 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
638 {
639         struct rbd_client *rbdc;
640
641         rbdc = rbd_client_find(ceph_opts);
642         if (rbdc)       /* using an existing client */
643                 ceph_destroy_options(ceph_opts);
644         else
645                 rbdc = rbd_client_create(ceph_opts);
646
647         return rbdc;
648 }
649
650 /*
651  * Destroy ceph client
652  *
653  * Caller must hold rbd_client_list_lock.
654  */
655 static void rbd_client_release(struct kref *kref)
656 {
657         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
658
659         dout("%s: rbdc %p\n", __func__, rbdc);
660         spin_lock(&rbd_client_list_lock);
661         list_del(&rbdc->node);
662         spin_unlock(&rbd_client_list_lock);
663
664         ceph_destroy_client(rbdc->client);
665         kfree(rbdc);
666 }
667
668 /*
669  * Drop reference to ceph client node. If it's not referenced anymore, release
670  * it.
671  */
672 static void rbd_put_client(struct rbd_client *rbdc)
673 {
674         if (rbdc)
675                 kref_put(&rbdc->kref, rbd_client_release);
676 }
677
678 static bool rbd_image_format_valid(u32 image_format)
679 {
680         return image_format == 1 || image_format == 2;
681 }
682
683 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
684 {
685         size_t size;
686         u32 snap_count;
687
688         /* The header has to start with the magic rbd header text */
689         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
690                 return false;
691
692         /* The bio layer requires at least sector-sized I/O */
693
694         if (ondisk->options.order < SECTOR_SHIFT)
695                 return false;
696
697         /* If we use u64 in a few spots we may be able to loosen this */
698
699         if (ondisk->options.order > 8 * sizeof (int) - 1)
700                 return false;
701
702         /*
703          * The size of a snapshot header has to fit in a size_t, and
704          * that limits the number of snapshots.
705          */
706         snap_count = le32_to_cpu(ondisk->snap_count);
707         size = SIZE_MAX - sizeof (struct ceph_snap_context);
708         if (snap_count > size / sizeof (__le64))
709                 return false;
710
711         /*
712          * Not only that, but the size of the entire the snapshot
713          * header must also be representable in a size_t.
714          */
715         size -= snap_count * sizeof (__le64);
716         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
717                 return false;
718
719         return true;
720 }
721
722 /*
723  * Create a new header structure, translate header format from the on-disk
724  * header.
725  */
726 static int rbd_header_from_disk(struct rbd_image_header *header,
727                                  struct rbd_image_header_ondisk *ondisk)
728 {
729         u32 snap_count;
730         size_t len;
731         size_t size;
732         u32 i;
733
734         memset(header, 0, sizeof (*header));
735
736         snap_count = le32_to_cpu(ondisk->snap_count);
737
738         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
739         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
740         if (!header->object_prefix)
741                 return -ENOMEM;
742         memcpy(header->object_prefix, ondisk->object_prefix, len);
743         header->object_prefix[len] = '\0';
744
745         if (snap_count) {
746                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
747
748                 /* Save a copy of the snapshot names */
749
750                 if (snap_names_len > (u64) SIZE_MAX)
751                         return -EIO;
752                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
753                 if (!header->snap_names)
754                         goto out_err;
755                 /*
756                  * Note that rbd_dev_v1_header_read() guarantees
757                  * the ondisk buffer we're working with has
758                  * snap_names_len bytes beyond the end of the
759                  * snapshot id array, this memcpy() is safe.
760                  */
761                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
762                         snap_names_len);
763
764                 /* Record each snapshot's size */
765
766                 size = snap_count * sizeof (*header->snap_sizes);
767                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
768                 if (!header->snap_sizes)
769                         goto out_err;
770                 for (i = 0; i < snap_count; i++)
771                         header->snap_sizes[i] =
772                                 le64_to_cpu(ondisk->snaps[i].image_size);
773         } else {
774                 header->snap_names = NULL;
775                 header->snap_sizes = NULL;
776         }
777
778         header->features = 0;   /* No features support in v1 images */
779         header->obj_order = ondisk->options.order;
780         header->crypt_type = ondisk->options.crypt_type;
781         header->comp_type = ondisk->options.comp_type;
782
783         /* Allocate and fill in the snapshot context */
784
785         header->image_size = le64_to_cpu(ondisk->image_size);
786
787         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
788         if (!header->snapc)
789                 goto out_err;
790         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
791         for (i = 0; i < snap_count; i++)
792                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
793
794         return 0;
795
796 out_err:
797         kfree(header->snap_sizes);
798         header->snap_sizes = NULL;
799         kfree(header->snap_names);
800         header->snap_names = NULL;
801         kfree(header->object_prefix);
802         header->object_prefix = NULL;
803
804         return -ENOMEM;
805 }
806
807 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
808 {
809         const char *snap_name;
810
811         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
812
813         /* Skip over names until we find the one we are looking for */
814
815         snap_name = rbd_dev->header.snap_names;
816         while (which--)
817                 snap_name += strlen(snap_name) + 1;
818
819         return kstrdup(snap_name, GFP_KERNEL);
820 }
821
822 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
823 {
824         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
825         u32 which;
826
827         for (which = 0; which < snapc->num_snaps; which++)
828                 if (snapc->snaps[which] == snap_id)
829                         return which;
830
831         return BAD_SNAP_INDEX;
832 }
833
834 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
835                                         u64 snap_id)
836 {
837         u32 which;
838
839         which = rbd_dev_snap_index(rbd_dev, snap_id);
840         if (which == BAD_SNAP_INDEX)
841                 return NULL;
842
843         return _rbd_dev_v1_snap_name(rbd_dev, which);
844 }
845
846 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
847 {
848         if (snap_id == CEPH_NOSNAP)
849                 return RBD_SNAP_HEAD_NAME;
850
851         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
852         if (rbd_dev->image_format == 1)
853                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
854
855         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
856 }
857
858 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
859                                 u64 *snap_size)
860 {
861         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
862         if (snap_id == CEPH_NOSNAP) {
863                 *snap_size = rbd_dev->header.image_size;
864         } else if (rbd_dev->image_format == 1) {
865                 u32 which;
866
867                 which = rbd_dev_snap_index(rbd_dev, snap_id);
868                 if (which == BAD_SNAP_INDEX)
869                         return -ENOENT;
870
871                 *snap_size = rbd_dev->header.snap_sizes[which];
872         } else {
873                 u64 size = 0;
874                 int ret;
875
876                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
877                 if (ret)
878                         return ret;
879
880                 *snap_size = size;
881         }
882         return 0;
883 }
884
885 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
886                         u64 *snap_features)
887 {
888         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
889         if (snap_id == CEPH_NOSNAP) {
890                 *snap_features = rbd_dev->header.features;
891         } else if (rbd_dev->image_format == 1) {
892                 *snap_features = 0;     /* No features for format 1 */
893         } else {
894                 u64 features = 0;
895                 int ret;
896
897                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
898                 if (ret)
899                         return ret;
900
901                 *snap_features = features;
902         }
903         return 0;
904 }
905
906 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
907 {
908         const char *snap_name = rbd_dev->spec->snap_name;
909         u64 snap_id;
910         u64 size = 0;
911         u64 features = 0;
912         int ret;
913
914         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
915                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
916                 if (snap_id == CEPH_NOSNAP)
917                         return -ENOENT;
918         } else {
919                 snap_id = CEPH_NOSNAP;
920         }
921
922         ret = rbd_snap_size(rbd_dev, snap_id, &size);
923         if (ret)
924                 return ret;
925         ret = rbd_snap_features(rbd_dev, snap_id, &features);
926         if (ret)
927                 return ret;
928
929         rbd_dev->mapping.size = size;
930         rbd_dev->mapping.features = features;
931
932         /* If we are mapping a snapshot it must be marked read-only */
933
934         if (snap_id != CEPH_NOSNAP)
935                 rbd_dev->mapping.read_only = true;
936
937         return 0;
938 }
939
940 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
941 {
942         rbd_dev->mapping.size = 0;
943         rbd_dev->mapping.features = 0;
944         rbd_dev->mapping.read_only = true;
945 }
946
947 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
948 {
949         rbd_dev->mapping.size = 0;
950         rbd_dev->mapping.features = 0;
951         rbd_dev->mapping.read_only = true;
952 }
953
954 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
955 {
956         char *name;
957         u64 segment;
958         int ret;
959
960         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
961         if (!name)
962                 return NULL;
963         segment = offset >> rbd_dev->header.obj_order;
964         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
965                         rbd_dev->header.object_prefix, segment);
966         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
967                 pr_err("error formatting segment name for #%llu (%d)\n",
968                         segment, ret);
969                 kfree(name);
970                 name = NULL;
971         }
972
973         return name;
974 }
975
976 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
977 {
978         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
979
980         return offset & (segment_size - 1);
981 }
982
983 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
984                                 u64 offset, u64 length)
985 {
986         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
987
988         offset &= segment_size - 1;
989
990         rbd_assert(length <= U64_MAX - offset);
991         if (offset + length > segment_size)
992                 length = segment_size - offset;
993
994         return length;
995 }
996
997 /*
998  * returns the size of an object in the image
999  */
1000 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1001 {
1002         return 1 << header->obj_order;
1003 }
1004
1005 /*
1006  * bio helpers
1007  */
1008
1009 static void bio_chain_put(struct bio *chain)
1010 {
1011         struct bio *tmp;
1012
1013         while (chain) {
1014                 tmp = chain;
1015                 chain = chain->bi_next;
1016                 bio_put(tmp);
1017         }
1018 }
1019
1020 /*
1021  * zeros a bio chain, starting at specific offset
1022  */
1023 static void zero_bio_chain(struct bio *chain, int start_ofs)
1024 {
1025         struct bio_vec *bv;
1026         unsigned long flags;
1027         void *buf;
1028         int i;
1029         int pos = 0;
1030
1031         while (chain) {
1032                 bio_for_each_segment(bv, chain, i) {
1033                         if (pos + bv->bv_len > start_ofs) {
1034                                 int remainder = max(start_ofs - pos, 0);
1035                                 buf = bvec_kmap_irq(bv, &flags);
1036                                 memset(buf + remainder, 0,
1037                                        bv->bv_len - remainder);
1038                                 bvec_kunmap_irq(buf, &flags);
1039                         }
1040                         pos += bv->bv_len;
1041                 }
1042
1043                 chain = chain->bi_next;
1044         }
1045 }
1046
1047 /*
1048  * similar to zero_bio_chain(), zeros data defined by a page array,
1049  * starting at the given byte offset from the start of the array and
1050  * continuing up to the given end offset.  The pages array is
1051  * assumed to be big enough to hold all bytes up to the end.
1052  */
1053 static void zero_pages(struct page **pages, u64 offset, u64 end)
1054 {
1055         struct page **page = &pages[offset >> PAGE_SHIFT];
1056
1057         rbd_assert(end > offset);
1058         rbd_assert(end - offset <= (u64)SIZE_MAX);
1059         while (offset < end) {
1060                 size_t page_offset;
1061                 size_t length;
1062                 unsigned long flags;
1063                 void *kaddr;
1064
1065                 page_offset = (size_t)(offset & ~PAGE_MASK);
1066                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1067                 local_irq_save(flags);
1068                 kaddr = kmap_atomic(*page);
1069                 memset(kaddr + page_offset, 0, length);
1070                 kunmap_atomic(kaddr);
1071                 local_irq_restore(flags);
1072
1073                 offset += length;
1074                 page++;
1075         }
1076 }
1077
1078 /*
1079  * Clone a portion of a bio, starting at the given byte offset
1080  * and continuing for the number of bytes indicated.
1081  */
1082 static struct bio *bio_clone_range(struct bio *bio_src,
1083                                         unsigned int offset,
1084                                         unsigned int len,
1085                                         gfp_t gfpmask)
1086 {
1087         struct bio_vec *bv;
1088         unsigned int resid;
1089         unsigned short idx;
1090         unsigned int voff;
1091         unsigned short end_idx;
1092         unsigned short vcnt;
1093         struct bio *bio;
1094
1095         /* Handle the easy case for the caller */
1096
1097         if (!offset && len == bio_src->bi_size)
1098                 return bio_clone(bio_src, gfpmask);
1099
1100         if (WARN_ON_ONCE(!len))
1101                 return NULL;
1102         if (WARN_ON_ONCE(len > bio_src->bi_size))
1103                 return NULL;
1104         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1105                 return NULL;
1106
1107         /* Find first affected segment... */
1108
1109         resid = offset;
1110         __bio_for_each_segment(bv, bio_src, idx, 0) {
1111                 if (resid < bv->bv_len)
1112                         break;
1113                 resid -= bv->bv_len;
1114         }
1115         voff = resid;
1116
1117         /* ...and the last affected segment */
1118
1119         resid += len;
1120         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1121                 if (resid <= bv->bv_len)
1122                         break;
1123                 resid -= bv->bv_len;
1124         }
1125         vcnt = end_idx - idx + 1;
1126
1127         /* Build the clone */
1128
1129         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1130         if (!bio)
1131                 return NULL;    /* ENOMEM */
1132
1133         bio->bi_bdev = bio_src->bi_bdev;
1134         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1135         bio->bi_rw = bio_src->bi_rw;
1136         bio->bi_flags |= 1 << BIO_CLONED;
1137
1138         /*
1139          * Copy over our part of the bio_vec, then update the first
1140          * and last (or only) entries.
1141          */
1142         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1143                         vcnt * sizeof (struct bio_vec));
1144         bio->bi_io_vec[0].bv_offset += voff;
1145         if (vcnt > 1) {
1146                 bio->bi_io_vec[0].bv_len -= voff;
1147                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1148         } else {
1149                 bio->bi_io_vec[0].bv_len = len;
1150         }
1151
1152         bio->bi_vcnt = vcnt;
1153         bio->bi_size = len;
1154         bio->bi_idx = 0;
1155
1156         return bio;
1157 }
1158
1159 /*
1160  * Clone a portion of a bio chain, starting at the given byte offset
1161  * into the first bio in the source chain and continuing for the
1162  * number of bytes indicated.  The result is another bio chain of
1163  * exactly the given length, or a null pointer on error.
1164  *
1165  * The bio_src and offset parameters are both in-out.  On entry they
1166  * refer to the first source bio and the offset into that bio where
1167  * the start of data to be cloned is located.
1168  *
1169  * On return, bio_src is updated to refer to the bio in the source
1170  * chain that contains first un-cloned byte, and *offset will
1171  * contain the offset of that byte within that bio.
1172  */
1173 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1174                                         unsigned int *offset,
1175                                         unsigned int len,
1176                                         gfp_t gfpmask)
1177 {
1178         struct bio *bi = *bio_src;
1179         unsigned int off = *offset;
1180         struct bio *chain = NULL;
1181         struct bio **end;
1182
1183         /* Build up a chain of clone bios up to the limit */
1184
1185         if (!bi || off >= bi->bi_size || !len)
1186                 return NULL;            /* Nothing to clone */
1187
1188         end = &chain;
1189         while (len) {
1190                 unsigned int bi_size;
1191                 struct bio *bio;
1192
1193                 if (!bi) {
1194                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1195                         goto out_err;   /* EINVAL; ran out of bio's */
1196                 }
1197                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1198                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1199                 if (!bio)
1200                         goto out_err;   /* ENOMEM */
1201
1202                 *end = bio;
1203                 end = &bio->bi_next;
1204
1205                 off += bi_size;
1206                 if (off == bi->bi_size) {
1207                         bi = bi->bi_next;
1208                         off = 0;
1209                 }
1210                 len -= bi_size;
1211         }
1212         *bio_src = bi;
1213         *offset = off;
1214
1215         return chain;
1216 out_err:
1217         bio_chain_put(chain);
1218
1219         return NULL;
1220 }
1221
1222 /*
1223  * The default/initial value for all object request flags is 0.  For
1224  * each flag, once its value is set to 1 it is never reset to 0
1225  * again.
1226  */
1227 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1228 {
1229         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1230                 struct rbd_device *rbd_dev;
1231
1232                 rbd_dev = obj_request->img_request->rbd_dev;
1233                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1234                         obj_request);
1235         }
1236 }
1237
1238 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1239 {
1240         smp_mb();
1241         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1242 }
1243
1244 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1245 {
1246         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1247                 struct rbd_device *rbd_dev = NULL;
1248
1249                 if (obj_request_img_data_test(obj_request))
1250                         rbd_dev = obj_request->img_request->rbd_dev;
1251                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1252                         obj_request);
1253         }
1254 }
1255
1256 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1257 {
1258         smp_mb();
1259         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1260 }
1261
1262 /*
1263  * This sets the KNOWN flag after (possibly) setting the EXISTS
1264  * flag.  The latter is set based on the "exists" value provided.
1265  *
1266  * Note that for our purposes once an object exists it never goes
1267  * away again.  It's possible that the response from two existence
1268  * checks are separated by the creation of the target object, and
1269  * the first ("doesn't exist") response arrives *after* the second
1270  * ("does exist").  In that case we ignore the second one.
1271  */
1272 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1273                                 bool exists)
1274 {
1275         if (exists)
1276                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1277         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1278         smp_mb();
1279 }
1280
1281 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1282 {
1283         smp_mb();
1284         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1285 }
1286
1287 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1288 {
1289         smp_mb();
1290         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1291 }
1292
1293 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1294 {
1295         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1296                 atomic_read(&obj_request->kref.refcount));
1297         kref_get(&obj_request->kref);
1298 }
1299
1300 static void rbd_obj_request_destroy(struct kref *kref);
1301 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1302 {
1303         rbd_assert(obj_request != NULL);
1304         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1305                 atomic_read(&obj_request->kref.refcount));
1306         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1307 }
1308
1309 static void rbd_img_request_get(struct rbd_img_request *img_request)
1310 {
1311         dout("%s: img %p (was %d)\n", __func__, img_request,
1312                 atomic_read(&img_request->kref.refcount));
1313         kref_get(&img_request->kref);
1314 }
1315
1316 static void rbd_img_request_destroy(struct kref *kref);
1317 static void rbd_img_request_put(struct rbd_img_request *img_request)
1318 {
1319         rbd_assert(img_request != NULL);
1320         dout("%s: img %p (was %d)\n", __func__, img_request,
1321                 atomic_read(&img_request->kref.refcount));
1322         kref_put(&img_request->kref, rbd_img_request_destroy);
1323 }
1324
1325 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1326                                         struct rbd_obj_request *obj_request)
1327 {
1328         rbd_assert(obj_request->img_request == NULL);
1329
1330         /* Image request now owns object's original reference */
1331         obj_request->img_request = img_request;
1332         obj_request->which = img_request->obj_request_count;
1333         rbd_assert(!obj_request_img_data_test(obj_request));
1334         obj_request_img_data_set(obj_request);
1335         rbd_assert(obj_request->which != BAD_WHICH);
1336         img_request->obj_request_count++;
1337         list_add_tail(&obj_request->links, &img_request->obj_requests);
1338         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1339                 obj_request->which);
1340 }
1341
1342 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1343                                         struct rbd_obj_request *obj_request)
1344 {
1345         rbd_assert(obj_request->which != BAD_WHICH);
1346
1347         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1348                 obj_request->which);
1349         list_del(&obj_request->links);
1350         rbd_assert(img_request->obj_request_count > 0);
1351         img_request->obj_request_count--;
1352         rbd_assert(obj_request->which == img_request->obj_request_count);
1353         obj_request->which = BAD_WHICH;
1354         rbd_assert(obj_request_img_data_test(obj_request));
1355         rbd_assert(obj_request->img_request == img_request);
1356         obj_request->img_request = NULL;
1357         obj_request->callback = NULL;
1358         rbd_obj_request_put(obj_request);
1359 }
1360
1361 static bool obj_request_type_valid(enum obj_request_type type)
1362 {
1363         switch (type) {
1364         case OBJ_REQUEST_NODATA:
1365         case OBJ_REQUEST_BIO:
1366         case OBJ_REQUEST_PAGES:
1367                 return true;
1368         default:
1369                 return false;
1370         }
1371 }
1372
1373 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1374                                 struct rbd_obj_request *obj_request)
1375 {
1376         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1377
1378         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1379 }
1380
1381 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1382 {
1383
1384         dout("%s: img %p\n", __func__, img_request);
1385
1386         /*
1387          * If no error occurred, compute the aggregate transfer
1388          * count for the image request.  We could instead use
1389          * atomic64_cmpxchg() to update it as each object request
1390          * completes; not clear which way is better off hand.
1391          */
1392         if (!img_request->result) {
1393                 struct rbd_obj_request *obj_request;
1394                 u64 xferred = 0;
1395
1396                 for_each_obj_request(img_request, obj_request)
1397                         xferred += obj_request->xferred;
1398                 img_request->xferred = xferred;
1399         }
1400
1401         if (img_request->callback)
1402                 img_request->callback(img_request);
1403         else
1404                 rbd_img_request_put(img_request);
1405 }
1406
1407 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1408
1409 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1410 {
1411         dout("%s: obj %p\n", __func__, obj_request);
1412
1413         return wait_for_completion_interruptible(&obj_request->completion);
1414 }
1415
1416 /*
1417  * The default/initial value for all image request flags is 0.  Each
1418  * is conditionally set to 1 at image request initialization time
1419  * and currently never change thereafter.
1420  */
1421 static void img_request_write_set(struct rbd_img_request *img_request)
1422 {
1423         set_bit(IMG_REQ_WRITE, &img_request->flags);
1424         smp_mb();
1425 }
1426
1427 static bool img_request_write_test(struct rbd_img_request *img_request)
1428 {
1429         smp_mb();
1430         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1431 }
1432
1433 static void img_request_child_set(struct rbd_img_request *img_request)
1434 {
1435         set_bit(IMG_REQ_CHILD, &img_request->flags);
1436         smp_mb();
1437 }
1438
1439 static bool img_request_child_test(struct rbd_img_request *img_request)
1440 {
1441         smp_mb();
1442         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1443 }
1444
1445 static void img_request_layered_set(struct rbd_img_request *img_request)
1446 {
1447         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1448         smp_mb();
1449 }
1450
1451 static bool img_request_layered_test(struct rbd_img_request *img_request)
1452 {
1453         smp_mb();
1454         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1455 }
1456
1457 static void
1458 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1459 {
1460         u64 xferred = obj_request->xferred;
1461         u64 length = obj_request->length;
1462
1463         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1464                 obj_request, obj_request->img_request, obj_request->result,
1465                 xferred, length);
1466         /*
1467          * ENOENT means a hole in the image.  We zero-fill the
1468          * entire length of the request.  A short read also implies
1469          * zero-fill to the end of the request.  Either way we
1470          * update the xferred count to indicate the whole request
1471          * was satisfied.
1472          */
1473         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1474         if (obj_request->result == -ENOENT) {
1475                 if (obj_request->type == OBJ_REQUEST_BIO)
1476                         zero_bio_chain(obj_request->bio_list, 0);
1477                 else
1478                         zero_pages(obj_request->pages, 0, length);
1479                 obj_request->result = 0;
1480                 obj_request->xferred = length;
1481         } else if (xferred < length && !obj_request->result) {
1482                 if (obj_request->type == OBJ_REQUEST_BIO)
1483                         zero_bio_chain(obj_request->bio_list, xferred);
1484                 else
1485                         zero_pages(obj_request->pages, xferred, length);
1486                 obj_request->xferred = length;
1487         }
1488         obj_request_done_set(obj_request);
1489 }
1490
1491 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1492 {
1493         dout("%s: obj %p cb %p\n", __func__, obj_request,
1494                 obj_request->callback);
1495         if (obj_request->callback)
1496                 obj_request->callback(obj_request);
1497         else
1498                 complete_all(&obj_request->completion);
1499 }
1500
1501 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1502 {
1503         dout("%s: obj %p\n", __func__, obj_request);
1504         obj_request_done_set(obj_request);
1505 }
1506
1507 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1508 {
1509         struct rbd_img_request *img_request = NULL;
1510         struct rbd_device *rbd_dev = NULL;
1511         bool layered = false;
1512
1513         if (obj_request_img_data_test(obj_request)) {
1514                 img_request = obj_request->img_request;
1515                 layered = img_request && img_request_layered_test(img_request);
1516                 rbd_dev = img_request->rbd_dev;
1517         }
1518
1519         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1520                 obj_request, img_request, obj_request->result,
1521                 obj_request->xferred, obj_request->length);
1522         if (layered && obj_request->result == -ENOENT &&
1523                         obj_request->img_offset < rbd_dev->parent_overlap)
1524                 rbd_img_parent_read(obj_request);
1525         else if (img_request)
1526                 rbd_img_obj_request_read_callback(obj_request);
1527         else
1528                 obj_request_done_set(obj_request);
1529 }
1530
1531 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1532 {
1533         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1534                 obj_request->result, obj_request->length);
1535         /*
1536          * There is no such thing as a successful short write.  Set
1537          * it to our originally-requested length.
1538          */
1539         obj_request->xferred = obj_request->length;
1540         obj_request_done_set(obj_request);
1541 }
1542
1543 /*
1544  * For a simple stat call there's nothing to do.  We'll do more if
1545  * this is part of a write sequence for a layered image.
1546  */
1547 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1554                                 struct ceph_msg *msg)
1555 {
1556         struct rbd_obj_request *obj_request = osd_req->r_priv;
1557         u16 opcode;
1558
1559         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1560         rbd_assert(osd_req == obj_request->osd_req);
1561         if (obj_request_img_data_test(obj_request)) {
1562                 rbd_assert(obj_request->img_request);
1563                 rbd_assert(obj_request->which != BAD_WHICH);
1564         } else {
1565                 rbd_assert(obj_request->which == BAD_WHICH);
1566         }
1567
1568         if (osd_req->r_result < 0)
1569                 obj_request->result = osd_req->r_result;
1570
1571         BUG_ON(osd_req->r_num_ops > 2);
1572
1573         /*
1574          * We support a 64-bit length, but ultimately it has to be
1575          * passed to blk_end_request(), which takes an unsigned int.
1576          */
1577         obj_request->xferred = osd_req->r_reply_op_len[0];
1578         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1579         opcode = osd_req->r_ops[0].op;
1580         switch (opcode) {
1581         case CEPH_OSD_OP_READ:
1582                 rbd_osd_read_callback(obj_request);
1583                 break;
1584         case CEPH_OSD_OP_WRITE:
1585                 rbd_osd_write_callback(obj_request);
1586                 break;
1587         case CEPH_OSD_OP_STAT:
1588                 rbd_osd_stat_callback(obj_request);
1589                 break;
1590         case CEPH_OSD_OP_CALL:
1591         case CEPH_OSD_OP_NOTIFY_ACK:
1592         case CEPH_OSD_OP_WATCH:
1593                 rbd_osd_trivial_callback(obj_request);
1594                 break;
1595         default:
1596                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1597                         obj_request->object_name, (unsigned short) opcode);
1598                 break;
1599         }
1600
1601         if (obj_request_done_test(obj_request))
1602                 rbd_obj_request_complete(obj_request);
1603 }
1604
1605 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1606 {
1607         struct rbd_img_request *img_request = obj_request->img_request;
1608         struct ceph_osd_request *osd_req = obj_request->osd_req;
1609         u64 snap_id;
1610
1611         rbd_assert(osd_req != NULL);
1612
1613         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1614         ceph_osdc_build_request(osd_req, obj_request->offset,
1615                         NULL, snap_id, NULL);
1616 }
1617
1618 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1619 {
1620         struct rbd_img_request *img_request = obj_request->img_request;
1621         struct ceph_osd_request *osd_req = obj_request->osd_req;
1622         struct ceph_snap_context *snapc;
1623         struct timespec mtime = CURRENT_TIME;
1624
1625         rbd_assert(osd_req != NULL);
1626
1627         snapc = img_request ? img_request->snapc : NULL;
1628         ceph_osdc_build_request(osd_req, obj_request->offset,
1629                         snapc, CEPH_NOSNAP, &mtime);
1630 }
1631
1632 static struct ceph_osd_request *rbd_osd_req_create(
1633                                         struct rbd_device *rbd_dev,
1634                                         bool write_request,
1635                                         struct rbd_obj_request *obj_request)
1636 {
1637         struct ceph_snap_context *snapc = NULL;
1638         struct ceph_osd_client *osdc;
1639         struct ceph_osd_request *osd_req;
1640
1641         if (obj_request_img_data_test(obj_request)) {
1642                 struct rbd_img_request *img_request = obj_request->img_request;
1643
1644                 rbd_assert(write_request ==
1645                                 img_request_write_test(img_request));
1646                 if (write_request)
1647                         snapc = img_request->snapc;
1648         }
1649
1650         /* Allocate and initialize the request, for the single op */
1651
1652         osdc = &rbd_dev->rbd_client->client->osdc;
1653         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1654         if (!osd_req)
1655                 return NULL;    /* ENOMEM */
1656
1657         if (write_request)
1658                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1659         else
1660                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1661
1662         osd_req->r_callback = rbd_osd_req_callback;
1663         osd_req->r_priv = obj_request;
1664
1665         osd_req->r_oid_len = strlen(obj_request->object_name);
1666         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1667         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1668
1669         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1670
1671         return osd_req;
1672 }
1673
1674 /*
1675  * Create a copyup osd request based on the information in the
1676  * object request supplied.  A copyup request has two osd ops,
1677  * a copyup method call, and a "normal" write request.
1678  */
1679 static struct ceph_osd_request *
1680 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1681 {
1682         struct rbd_img_request *img_request;
1683         struct ceph_snap_context *snapc;
1684         struct rbd_device *rbd_dev;
1685         struct ceph_osd_client *osdc;
1686         struct ceph_osd_request *osd_req;
1687
1688         rbd_assert(obj_request_img_data_test(obj_request));
1689         img_request = obj_request->img_request;
1690         rbd_assert(img_request);
1691         rbd_assert(img_request_write_test(img_request));
1692
1693         /* Allocate and initialize the request, for the two ops */
1694
1695         snapc = img_request->snapc;
1696         rbd_dev = img_request->rbd_dev;
1697         osdc = &rbd_dev->rbd_client->client->osdc;
1698         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1699         if (!osd_req)
1700                 return NULL;    /* ENOMEM */
1701
1702         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1703         osd_req->r_callback = rbd_osd_req_callback;
1704         osd_req->r_priv = obj_request;
1705
1706         osd_req->r_oid_len = strlen(obj_request->object_name);
1707         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1708         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1709
1710         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1711
1712         return osd_req;
1713 }
1714
1715
1716 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1717 {
1718         ceph_osdc_put_request(osd_req);
1719 }
1720
1721 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1722
1723 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1724                                                 u64 offset, u64 length,
1725                                                 enum obj_request_type type)
1726 {
1727         struct rbd_obj_request *obj_request;
1728         size_t size;
1729         char *name;
1730
1731         rbd_assert(obj_request_type_valid(type));
1732
1733         size = strlen(object_name) + 1;
1734         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1735         if (!obj_request)
1736                 return NULL;
1737
1738         name = (char *)(obj_request + 1);
1739         obj_request->object_name = memcpy(name, object_name, size);
1740         obj_request->offset = offset;
1741         obj_request->length = length;
1742         obj_request->flags = 0;
1743         obj_request->which = BAD_WHICH;
1744         obj_request->type = type;
1745         INIT_LIST_HEAD(&obj_request->links);
1746         init_completion(&obj_request->completion);
1747         kref_init(&obj_request->kref);
1748
1749         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1750                 offset, length, (int)type, obj_request);
1751
1752         return obj_request;
1753 }
1754
1755 static void rbd_obj_request_destroy(struct kref *kref)
1756 {
1757         struct rbd_obj_request *obj_request;
1758
1759         obj_request = container_of(kref, struct rbd_obj_request, kref);
1760
1761         dout("%s: obj %p\n", __func__, obj_request);
1762
1763         rbd_assert(obj_request->img_request == NULL);
1764         rbd_assert(obj_request->which == BAD_WHICH);
1765
1766         if (obj_request->osd_req)
1767                 rbd_osd_req_destroy(obj_request->osd_req);
1768
1769         rbd_assert(obj_request_type_valid(obj_request->type));
1770         switch (obj_request->type) {
1771         case OBJ_REQUEST_NODATA:
1772                 break;          /* Nothing to do */
1773         case OBJ_REQUEST_BIO:
1774                 if (obj_request->bio_list)
1775                         bio_chain_put(obj_request->bio_list);
1776                 break;
1777         case OBJ_REQUEST_PAGES:
1778                 if (obj_request->pages)
1779                         ceph_release_page_vector(obj_request->pages,
1780                                                 obj_request->page_count);
1781                 break;
1782         }
1783
1784         kfree(obj_request);
1785 }
1786
1787 /*
1788  * Caller is responsible for filling in the list of object requests
1789  * that comprises the image request, and the Linux request pointer
1790  * (if there is one).
1791  */
1792 static struct rbd_img_request *rbd_img_request_create(
1793                                         struct rbd_device *rbd_dev,
1794                                         u64 offset, u64 length,
1795                                         bool write_request,
1796                                         bool child_request)
1797 {
1798         struct rbd_img_request *img_request;
1799
1800         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1801         if (!img_request)
1802                 return NULL;
1803
1804         if (write_request) {
1805                 down_read(&rbd_dev->header_rwsem);
1806                 ceph_get_snap_context(rbd_dev->header.snapc);
1807                 up_read(&rbd_dev->header_rwsem);
1808         }
1809
1810         img_request->rq = NULL;
1811         img_request->rbd_dev = rbd_dev;
1812         img_request->offset = offset;
1813         img_request->length = length;
1814         img_request->flags = 0;
1815         if (write_request) {
1816                 img_request_write_set(img_request);
1817                 img_request->snapc = rbd_dev->header.snapc;
1818         } else {
1819                 img_request->snap_id = rbd_dev->spec->snap_id;
1820         }
1821         if (child_request)
1822                 img_request_child_set(img_request);
1823         if (rbd_dev->parent_spec)
1824                 img_request_layered_set(img_request);
1825         spin_lock_init(&img_request->completion_lock);
1826         img_request->next_completion = 0;
1827         img_request->callback = NULL;
1828         img_request->result = 0;
1829         img_request->obj_request_count = 0;
1830         INIT_LIST_HEAD(&img_request->obj_requests);
1831         kref_init(&img_request->kref);
1832
1833         rbd_img_request_get(img_request);       /* Avoid a warning */
1834         rbd_img_request_put(img_request);       /* TEMPORARY */
1835
1836         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1837                 write_request ? "write" : "read", offset, length,
1838                 img_request);
1839
1840         return img_request;
1841 }
1842
1843 static void rbd_img_request_destroy(struct kref *kref)
1844 {
1845         struct rbd_img_request *img_request;
1846         struct rbd_obj_request *obj_request;
1847         struct rbd_obj_request *next_obj_request;
1848
1849         img_request = container_of(kref, struct rbd_img_request, kref);
1850
1851         dout("%s: img %p\n", __func__, img_request);
1852
1853         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1854                 rbd_img_obj_request_del(img_request, obj_request);
1855         rbd_assert(img_request->obj_request_count == 0);
1856
1857         if (img_request_write_test(img_request))
1858                 ceph_put_snap_context(img_request->snapc);
1859
1860         if (img_request_child_test(img_request))
1861                 rbd_obj_request_put(img_request->obj_request);
1862
1863         kfree(img_request);
1864 }
1865
1866 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1867 {
1868         struct rbd_img_request *img_request;
1869         unsigned int xferred;
1870         int result;
1871         bool more;
1872
1873         rbd_assert(obj_request_img_data_test(obj_request));
1874         img_request = obj_request->img_request;
1875
1876         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1877         xferred = (unsigned int)obj_request->xferred;
1878         result = obj_request->result;
1879         if (result) {
1880                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881
1882                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1883                         img_request_write_test(img_request) ? "write" : "read",
1884                         obj_request->length, obj_request->img_offset,
1885                         obj_request->offset);
1886                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1887                         result, xferred);
1888                 if (!img_request->result)
1889                         img_request->result = result;
1890         }
1891
1892         /* Image object requests don't own their page array */
1893
1894         if (obj_request->type == OBJ_REQUEST_PAGES) {
1895                 obj_request->pages = NULL;
1896                 obj_request->page_count = 0;
1897         }
1898
1899         if (img_request_child_test(img_request)) {
1900                 rbd_assert(img_request->obj_request != NULL);
1901                 more = obj_request->which < img_request->obj_request_count - 1;
1902         } else {
1903                 rbd_assert(img_request->rq != NULL);
1904                 more = blk_end_request(img_request->rq, result, xferred);
1905         }
1906
1907         return more;
1908 }
1909
1910 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1911 {
1912         struct rbd_img_request *img_request;
1913         u32 which = obj_request->which;
1914         bool more = true;
1915
1916         rbd_assert(obj_request_img_data_test(obj_request));
1917         img_request = obj_request->img_request;
1918
1919         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1920         rbd_assert(img_request != NULL);
1921         rbd_assert(img_request->obj_request_count > 0);
1922         rbd_assert(which != BAD_WHICH);
1923         rbd_assert(which < img_request->obj_request_count);
1924         rbd_assert(which >= img_request->next_completion);
1925
1926         spin_lock_irq(&img_request->completion_lock);
1927         if (which != img_request->next_completion)
1928                 goto out;
1929
1930         for_each_obj_request_from(img_request, obj_request) {
1931                 rbd_assert(more);
1932                 rbd_assert(which < img_request->obj_request_count);
1933
1934                 if (!obj_request_done_test(obj_request))
1935                         break;
1936                 more = rbd_img_obj_end_request(obj_request);
1937                 which++;
1938         }
1939
1940         rbd_assert(more ^ (which == img_request->obj_request_count));
1941         img_request->next_completion = which;
1942 out:
1943         spin_unlock_irq(&img_request->completion_lock);
1944
1945         if (!more)
1946                 rbd_img_request_complete(img_request);
1947 }
1948
1949 /*
1950  * Split up an image request into one or more object requests, each
1951  * to a different object.  The "type" parameter indicates whether
1952  * "data_desc" is the pointer to the head of a list of bio
1953  * structures, or the base of a page array.  In either case this
1954  * function assumes data_desc describes memory sufficient to hold
1955  * all data described by the image request.
1956  */
1957 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1958                                         enum obj_request_type type,
1959                                         void *data_desc)
1960 {
1961         struct rbd_device *rbd_dev = img_request->rbd_dev;
1962         struct rbd_obj_request *obj_request = NULL;
1963         struct rbd_obj_request *next_obj_request;
1964         bool write_request = img_request_write_test(img_request);
1965         struct bio *bio_list;
1966         unsigned int bio_offset = 0;
1967         struct page **pages;
1968         u64 img_offset;
1969         u64 resid;
1970         u16 opcode;
1971
1972         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1973                 (int)type, data_desc);
1974
1975         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1976         img_offset = img_request->offset;
1977         resid = img_request->length;
1978         rbd_assert(resid > 0);
1979
1980         if (type == OBJ_REQUEST_BIO) {
1981                 bio_list = data_desc;
1982                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1983         } else {
1984                 rbd_assert(type == OBJ_REQUEST_PAGES);
1985                 pages = data_desc;
1986         }
1987
1988         while (resid) {
1989                 struct ceph_osd_request *osd_req;
1990                 const char *object_name;
1991                 u64 offset;
1992                 u64 length;
1993
1994                 object_name = rbd_segment_name(rbd_dev, img_offset);
1995                 if (!object_name)
1996                         goto out_unwind;
1997                 offset = rbd_segment_offset(rbd_dev, img_offset);
1998                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1999                 obj_request = rbd_obj_request_create(object_name,
2000                                                 offset, length, type);
2001                 kfree(object_name);     /* object request has its own copy */
2002                 if (!obj_request)
2003                         goto out_unwind;
2004
2005                 if (type == OBJ_REQUEST_BIO) {
2006                         unsigned int clone_size;
2007
2008                         rbd_assert(length <= (u64)UINT_MAX);
2009                         clone_size = (unsigned int)length;
2010                         obj_request->bio_list =
2011                                         bio_chain_clone_range(&bio_list,
2012                                                                 &bio_offset,
2013                                                                 clone_size,
2014                                                                 GFP_ATOMIC);
2015                         if (!obj_request->bio_list)
2016                                 goto out_partial;
2017                 } else {
2018                         unsigned int page_count;
2019
2020                         obj_request->pages = pages;
2021                         page_count = (u32)calc_pages_for(offset, length);
2022                         obj_request->page_count = page_count;
2023                         if ((offset + length) & ~PAGE_MASK)
2024                                 page_count--;   /* more on last page */
2025                         pages += page_count;
2026                 }
2027
2028                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2029                                                 obj_request);
2030                 if (!osd_req)
2031                         goto out_partial;
2032                 obj_request->osd_req = osd_req;
2033                 obj_request->callback = rbd_img_obj_callback;
2034
2035                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2036                                                 0, 0);
2037                 if (type == OBJ_REQUEST_BIO)
2038                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2039                                         obj_request->bio_list, length);
2040                 else
2041                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2042                                         obj_request->pages, length,
2043                                         offset & ~PAGE_MASK, false, false);
2044
2045                 if (write_request)
2046                         rbd_osd_req_format_write(obj_request);
2047                 else
2048                         rbd_osd_req_format_read(obj_request);
2049
2050                 obj_request->img_offset = img_offset;
2051                 rbd_img_obj_request_add(img_request, obj_request);
2052
2053                 img_offset += length;
2054                 resid -= length;
2055         }
2056
2057         return 0;
2058
2059 out_partial:
2060         rbd_obj_request_put(obj_request);
2061 out_unwind:
2062         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2063                 rbd_obj_request_put(obj_request);
2064
2065         return -ENOMEM;
2066 }
2067
2068 static void
2069 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2070 {
2071         struct rbd_img_request *img_request;
2072         struct rbd_device *rbd_dev;
2073         u64 length;
2074         u32 page_count;
2075
2076         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2077         rbd_assert(obj_request_img_data_test(obj_request));
2078         img_request = obj_request->img_request;
2079         rbd_assert(img_request);
2080
2081         rbd_dev = img_request->rbd_dev;
2082         rbd_assert(rbd_dev);
2083         length = (u64)1 << rbd_dev->header.obj_order;
2084         page_count = (u32)calc_pages_for(0, length);
2085
2086         rbd_assert(obj_request->copyup_pages);
2087         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2088         obj_request->copyup_pages = NULL;
2089
2090         /*
2091          * We want the transfer count to reflect the size of the
2092          * original write request.  There is no such thing as a
2093          * successful short write, so if the request was successful
2094          * we can just set it to the originally-requested length.
2095          */
2096         if (!obj_request->result)
2097                 obj_request->xferred = obj_request->length;
2098
2099         /* Finish up with the normal image object callback */
2100
2101         rbd_img_obj_callback(obj_request);
2102 }
2103
2104 static void
2105 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2106 {
2107         struct rbd_obj_request *orig_request;
2108         struct ceph_osd_request *osd_req;
2109         struct ceph_osd_client *osdc;
2110         struct rbd_device *rbd_dev;
2111         struct page **pages;
2112         int result;
2113         u64 obj_size;
2114         u64 xferred;
2115
2116         rbd_assert(img_request_child_test(img_request));
2117
2118         /* First get what we need from the image request */
2119
2120         pages = img_request->copyup_pages;
2121         rbd_assert(pages != NULL);
2122         img_request->copyup_pages = NULL;
2123
2124         orig_request = img_request->obj_request;
2125         rbd_assert(orig_request != NULL);
2126         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2127         result = img_request->result;
2128         obj_size = img_request->length;
2129         xferred = img_request->xferred;
2130
2131         rbd_dev = img_request->rbd_dev;
2132         rbd_assert(rbd_dev);
2133         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2134
2135         rbd_img_request_put(img_request);
2136
2137         if (result)
2138                 goto out_err;
2139
2140         /* Allocate the new copyup osd request for the original request */
2141
2142         result = -ENOMEM;
2143         rbd_assert(!orig_request->osd_req);
2144         osd_req = rbd_osd_req_create_copyup(orig_request);
2145         if (!osd_req)
2146                 goto out_err;
2147         orig_request->osd_req = osd_req;
2148         orig_request->copyup_pages = pages;
2149
2150         /* Initialize the copyup op */
2151
2152         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2153         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2154                                                 false, false);
2155
2156         /* Then the original write request op */
2157
2158         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2159                                         orig_request->offset,
2160                                         orig_request->length, 0, 0);
2161         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2162                                         orig_request->length);
2163
2164         rbd_osd_req_format_write(orig_request);
2165
2166         /* All set, send it off. */
2167
2168         orig_request->callback = rbd_img_obj_copyup_callback;
2169         osdc = &rbd_dev->rbd_client->client->osdc;
2170         result = rbd_obj_request_submit(osdc, orig_request);
2171         if (!result)
2172                 return;
2173 out_err:
2174         /* Record the error code and complete the request */
2175
2176         orig_request->result = result;
2177         orig_request->xferred = 0;
2178         obj_request_done_set(orig_request);
2179         rbd_obj_request_complete(orig_request);
2180 }
2181
2182 /*
2183  * Read from the parent image the range of data that covers the
2184  * entire target of the given object request.  This is used for
2185  * satisfying a layered image write request when the target of an
2186  * object request from the image request does not exist.
2187  *
2188  * A page array big enough to hold the returned data is allocated
2189  * and supplied to rbd_img_request_fill() as the "data descriptor."
2190  * When the read completes, this page array will be transferred to
2191  * the original object request for the copyup operation.
2192  *
2193  * If an error occurs, record it as the result of the original
2194  * object request and mark it done so it gets completed.
2195  */
2196 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2197 {
2198         struct rbd_img_request *img_request = NULL;
2199         struct rbd_img_request *parent_request = NULL;
2200         struct rbd_device *rbd_dev;
2201         u64 img_offset;
2202         u64 length;
2203         struct page **pages = NULL;
2204         u32 page_count;
2205         int result;
2206
2207         rbd_assert(obj_request_img_data_test(obj_request));
2208         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2209
2210         img_request = obj_request->img_request;
2211         rbd_assert(img_request != NULL);
2212         rbd_dev = img_request->rbd_dev;
2213         rbd_assert(rbd_dev->parent != NULL);
2214
2215         /*
2216          * First things first.  The original osd request is of no
2217          * use to use any more, we'll need a new one that can hold
2218          * the two ops in a copyup request.  We'll get that later,
2219          * but for now we can release the old one.
2220          */
2221         rbd_osd_req_destroy(obj_request->osd_req);
2222         obj_request->osd_req = NULL;
2223
2224         /*
2225          * Determine the byte range covered by the object in the
2226          * child image to which the original request was to be sent.
2227          */
2228         img_offset = obj_request->img_offset - obj_request->offset;
2229         length = (u64)1 << rbd_dev->header.obj_order;
2230
2231         /*
2232          * There is no defined parent data beyond the parent
2233          * overlap, so limit what we read at that boundary if
2234          * necessary.
2235          */
2236         if (img_offset + length > rbd_dev->parent_overlap) {
2237                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2238                 length = rbd_dev->parent_overlap - img_offset;
2239         }
2240
2241         /*
2242          * Allocate a page array big enough to receive the data read
2243          * from the parent.
2244          */
2245         page_count = (u32)calc_pages_for(0, length);
2246         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2247         if (IS_ERR(pages)) {
2248                 result = PTR_ERR(pages);
2249                 pages = NULL;
2250                 goto out_err;
2251         }
2252
2253         result = -ENOMEM;
2254         parent_request = rbd_img_request_create(rbd_dev->parent,
2255                                                 img_offset, length,
2256                                                 false, true);
2257         if (!parent_request)
2258                 goto out_err;
2259         rbd_obj_request_get(obj_request);
2260         parent_request->obj_request = obj_request;
2261
2262         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2263         if (result)
2264                 goto out_err;
2265         parent_request->copyup_pages = pages;
2266
2267         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2268         result = rbd_img_request_submit(parent_request);
2269         if (!result)
2270                 return 0;
2271
2272         parent_request->copyup_pages = NULL;
2273         parent_request->obj_request = NULL;
2274         rbd_obj_request_put(obj_request);
2275 out_err:
2276         if (pages)
2277                 ceph_release_page_vector(pages, page_count);
2278         if (parent_request)
2279                 rbd_img_request_put(parent_request);
2280         obj_request->result = result;
2281         obj_request->xferred = 0;
2282         obj_request_done_set(obj_request);
2283
2284         return result;
2285 }
2286
2287 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2288 {
2289         struct rbd_obj_request *orig_request;
2290         int result;
2291
2292         rbd_assert(!obj_request_img_data_test(obj_request));
2293
2294         /*
2295          * All we need from the object request is the original
2296          * request and the result of the STAT op.  Grab those, then
2297          * we're done with the request.
2298          */
2299         orig_request = obj_request->obj_request;
2300         obj_request->obj_request = NULL;
2301         rbd_assert(orig_request);
2302         rbd_assert(orig_request->img_request);
2303
2304         result = obj_request->result;
2305         obj_request->result = 0;
2306
2307         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2308                 obj_request, orig_request, result,
2309                 obj_request->xferred, obj_request->length);
2310         rbd_obj_request_put(obj_request);
2311
2312         rbd_assert(orig_request);
2313         rbd_assert(orig_request->img_request);
2314
2315         /*
2316          * Our only purpose here is to determine whether the object
2317          * exists, and we don't want to treat the non-existence as
2318          * an error.  If something else comes back, transfer the
2319          * error to the original request and complete it now.
2320          */
2321         if (!result) {
2322                 obj_request_existence_set(orig_request, true);
2323         } else if (result == -ENOENT) {
2324                 obj_request_existence_set(orig_request, false);
2325         } else if (result) {
2326                 orig_request->result = result;
2327                 goto out;
2328         }
2329
2330         /*
2331          * Resubmit the original request now that we have recorded
2332          * whether the target object exists.
2333          */
2334         orig_request->result = rbd_img_obj_request_submit(orig_request);
2335 out:
2336         if (orig_request->result)
2337                 rbd_obj_request_complete(orig_request);
2338         rbd_obj_request_put(orig_request);
2339 }
2340
2341 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2342 {
2343         struct rbd_obj_request *stat_request;
2344         struct rbd_device *rbd_dev;
2345         struct ceph_osd_client *osdc;
2346         struct page **pages = NULL;
2347         u32 page_count;
2348         size_t size;
2349         int ret;
2350
2351         /*
2352          * The response data for a STAT call consists of:
2353          *     le64 length;
2354          *     struct {
2355          *         le32 tv_sec;
2356          *         le32 tv_nsec;
2357          *     } mtime;
2358          */
2359         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2360         page_count = (u32)calc_pages_for(0, size);
2361         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2362         if (IS_ERR(pages))
2363                 return PTR_ERR(pages);
2364
2365         ret = -ENOMEM;
2366         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2367                                                         OBJ_REQUEST_PAGES);
2368         if (!stat_request)
2369                 goto out;
2370
2371         rbd_obj_request_get(obj_request);
2372         stat_request->obj_request = obj_request;
2373         stat_request->pages = pages;
2374         stat_request->page_count = page_count;
2375
2376         rbd_assert(obj_request->img_request);
2377         rbd_dev = obj_request->img_request->rbd_dev;
2378         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2379                                                 stat_request);
2380         if (!stat_request->osd_req)
2381                 goto out;
2382         stat_request->callback = rbd_img_obj_exists_callback;
2383
2384         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2385         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2386                                         false, false);
2387         rbd_osd_req_format_read(stat_request);
2388
2389         osdc = &rbd_dev->rbd_client->client->osdc;
2390         ret = rbd_obj_request_submit(osdc, stat_request);
2391 out:
2392         if (ret)
2393                 rbd_obj_request_put(obj_request);
2394
2395         return ret;
2396 }
2397
2398 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2399 {
2400         struct rbd_img_request *img_request;
2401         struct rbd_device *rbd_dev;
2402         bool known;
2403
2404         rbd_assert(obj_request_img_data_test(obj_request));
2405
2406         img_request = obj_request->img_request;
2407         rbd_assert(img_request);
2408         rbd_dev = img_request->rbd_dev;
2409
2410         /*
2411          * Only writes to layered images need special handling.
2412          * Reads and non-layered writes are simple object requests.
2413          * Layered writes that start beyond the end of the overlap
2414          * with the parent have no parent data, so they too are
2415          * simple object requests.  Finally, if the target object is
2416          * known to already exist, its parent data has already been
2417          * copied, so a write to the object can also be handled as a
2418          * simple object request.
2419          */
2420         if (!img_request_write_test(img_request) ||
2421                 !img_request_layered_test(img_request) ||
2422                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2423                 ((known = obj_request_known_test(obj_request)) &&
2424                         obj_request_exists_test(obj_request))) {
2425
2426                 struct rbd_device *rbd_dev;
2427                 struct ceph_osd_client *osdc;
2428
2429                 rbd_dev = obj_request->img_request->rbd_dev;
2430                 osdc = &rbd_dev->rbd_client->client->osdc;
2431
2432                 return rbd_obj_request_submit(osdc, obj_request);
2433         }
2434
2435         /*
2436          * It's a layered write.  The target object might exist but
2437          * we may not know that yet.  If we know it doesn't exist,
2438          * start by reading the data for the full target object from
2439          * the parent so we can use it for a copyup to the target.
2440          */
2441         if (known)
2442                 return rbd_img_obj_parent_read_full(obj_request);
2443
2444         /* We don't know whether the target exists.  Go find out. */
2445
2446         return rbd_img_obj_exists_submit(obj_request);
2447 }
2448
2449 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2450 {
2451         struct rbd_obj_request *obj_request;
2452         struct rbd_obj_request *next_obj_request;
2453
2454         dout("%s: img %p\n", __func__, img_request);
2455         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2456                 int ret;
2457
2458                 ret = rbd_img_obj_request_submit(obj_request);
2459                 if (ret)
2460                         return ret;
2461         }
2462
2463         return 0;
2464 }
2465
2466 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2467 {
2468         struct rbd_obj_request *obj_request;
2469         struct rbd_device *rbd_dev;
2470         u64 obj_end;
2471
2472         rbd_assert(img_request_child_test(img_request));
2473
2474         obj_request = img_request->obj_request;
2475         rbd_assert(obj_request);
2476         rbd_assert(obj_request->img_request);
2477
2478         obj_request->result = img_request->result;
2479         if (obj_request->result)
2480                 goto out;
2481
2482         /*
2483          * We need to zero anything beyond the parent overlap
2484          * boundary.  Since rbd_img_obj_request_read_callback()
2485          * will zero anything beyond the end of a short read, an
2486          * easy way to do this is to pretend the data from the
2487          * parent came up short--ending at the overlap boundary.
2488          */
2489         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2490         obj_end = obj_request->img_offset + obj_request->length;
2491         rbd_dev = obj_request->img_request->rbd_dev;
2492         if (obj_end > rbd_dev->parent_overlap) {
2493                 u64 xferred = 0;
2494
2495                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2496                         xferred = rbd_dev->parent_overlap -
2497                                         obj_request->img_offset;
2498
2499                 obj_request->xferred = min(img_request->xferred, xferred);
2500         } else {
2501                 obj_request->xferred = img_request->xferred;
2502         }
2503 out:
2504         rbd_img_obj_request_read_callback(obj_request);
2505         rbd_obj_request_complete(obj_request);
2506 }
2507
2508 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2509 {
2510         struct rbd_device *rbd_dev;
2511         struct rbd_img_request *img_request;
2512         int result;
2513
2514         rbd_assert(obj_request_img_data_test(obj_request));
2515         rbd_assert(obj_request->img_request != NULL);
2516         rbd_assert(obj_request->result == (s32) -ENOENT);
2517         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2518
2519         rbd_dev = obj_request->img_request->rbd_dev;
2520         rbd_assert(rbd_dev->parent != NULL);
2521         /* rbd_read_finish(obj_request, obj_request->length); */
2522         img_request = rbd_img_request_create(rbd_dev->parent,
2523                                                 obj_request->img_offset,
2524                                                 obj_request->length,
2525                                                 false, true);
2526         result = -ENOMEM;
2527         if (!img_request)
2528                 goto out_err;
2529
2530         rbd_obj_request_get(obj_request);
2531         img_request->obj_request = obj_request;
2532
2533         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2534                                         obj_request->bio_list);
2535         if (result)
2536                 goto out_err;
2537
2538         img_request->callback = rbd_img_parent_read_callback;
2539         result = rbd_img_request_submit(img_request);
2540         if (result)
2541                 goto out_err;
2542
2543         return;
2544 out_err:
2545         if (img_request)
2546                 rbd_img_request_put(img_request);
2547         obj_request->result = result;
2548         obj_request->xferred = 0;
2549         obj_request_done_set(obj_request);
2550 }
2551
2552 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2553 {
2554         struct rbd_obj_request *obj_request;
2555         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2556         int ret;
2557
2558         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2559                                                         OBJ_REQUEST_NODATA);
2560         if (!obj_request)
2561                 return -ENOMEM;
2562
2563         ret = -ENOMEM;
2564         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2565         if (!obj_request->osd_req)
2566                 goto out;
2567         obj_request->callback = rbd_obj_request_put;
2568
2569         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2570                                         notify_id, 0, 0);
2571         rbd_osd_req_format_read(obj_request);
2572
2573         ret = rbd_obj_request_submit(osdc, obj_request);
2574 out:
2575         if (ret)
2576                 rbd_obj_request_put(obj_request);
2577
2578         return ret;
2579 }
2580
2581 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2582 {
2583         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2584
2585         if (!rbd_dev)
2586                 return;
2587
2588         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2589                 rbd_dev->header_name, (unsigned long long)notify_id,
2590                 (unsigned int)opcode);
2591         (void)rbd_dev_refresh(rbd_dev);
2592
2593         rbd_obj_notify_ack(rbd_dev, notify_id);
2594 }
2595
2596 /*
2597  * Request sync osd watch/unwatch.  The value of "start" determines
2598  * whether a watch request is being initiated or torn down.
2599  */
2600 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2601 {
2602         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2603         struct rbd_obj_request *obj_request;
2604         int ret;
2605
2606         rbd_assert(start ^ !!rbd_dev->watch_event);
2607         rbd_assert(start ^ !!rbd_dev->watch_request);
2608
2609         if (start) {
2610                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2611                                                 &rbd_dev->watch_event);
2612                 if (ret < 0)
2613                         return ret;
2614                 rbd_assert(rbd_dev->watch_event != NULL);
2615         }
2616
2617         ret = -ENOMEM;
2618         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2619                                                         OBJ_REQUEST_NODATA);
2620         if (!obj_request)
2621                 goto out_cancel;
2622
2623         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2624         if (!obj_request->osd_req)
2625                 goto out_cancel;
2626
2627         if (start)
2628                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2629         else
2630                 ceph_osdc_unregister_linger_request(osdc,
2631                                         rbd_dev->watch_request->osd_req);
2632
2633         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2634                                 rbd_dev->watch_event->cookie, 0, start);
2635         rbd_osd_req_format_write(obj_request);
2636
2637         ret = rbd_obj_request_submit(osdc, obj_request);
2638         if (ret)
2639                 goto out_cancel;
2640         ret = rbd_obj_request_wait(obj_request);
2641         if (ret)
2642                 goto out_cancel;
2643         ret = obj_request->result;
2644         if (ret)
2645                 goto out_cancel;
2646
2647         /*
2648          * A watch request is set to linger, so the underlying osd
2649          * request won't go away until we unregister it.  We retain
2650          * a pointer to the object request during that time (in
2651          * rbd_dev->watch_request), so we'll keep a reference to
2652          * it.  We'll drop that reference (below) after we've
2653          * unregistered it.
2654          */
2655         if (start) {
2656                 rbd_dev->watch_request = obj_request;
2657
2658                 return 0;
2659         }
2660
2661         /* We have successfully torn down the watch request */
2662
2663         rbd_obj_request_put(rbd_dev->watch_request);
2664         rbd_dev->watch_request = NULL;
2665 out_cancel:
2666         /* Cancel the event if we're tearing down, or on error */
2667         ceph_osdc_cancel_event(rbd_dev->watch_event);
2668         rbd_dev->watch_event = NULL;
2669         if (obj_request)
2670                 rbd_obj_request_put(obj_request);
2671
2672         return ret;
2673 }
2674
2675 /*
2676  * Synchronous osd object method call.  Returns the number of bytes
2677  * returned in the outbound buffer, or a negative error code.
2678  */
2679 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2680                              const char *object_name,
2681                              const char *class_name,
2682                              const char *method_name,
2683                              const void *outbound,
2684                              size_t outbound_size,
2685                              void *inbound,
2686                              size_t inbound_size)
2687 {
2688         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2689         struct rbd_obj_request *obj_request;
2690         struct page **pages;
2691         u32 page_count;
2692         int ret;
2693
2694         /*
2695          * Method calls are ultimately read operations.  The result
2696          * should placed into the inbound buffer provided.  They
2697          * also supply outbound data--parameters for the object
2698          * method.  Currently if this is present it will be a
2699          * snapshot id.
2700          */
2701         page_count = (u32)calc_pages_for(0, inbound_size);
2702         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2703         if (IS_ERR(pages))
2704                 return PTR_ERR(pages);
2705
2706         ret = -ENOMEM;
2707         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2708                                                         OBJ_REQUEST_PAGES);
2709         if (!obj_request)
2710                 goto out;
2711
2712         obj_request->pages = pages;
2713         obj_request->page_count = page_count;
2714
2715         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2716         if (!obj_request->osd_req)
2717                 goto out;
2718
2719         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2720                                         class_name, method_name);
2721         if (outbound_size) {
2722                 struct ceph_pagelist *pagelist;
2723
2724                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2725                 if (!pagelist)
2726                         goto out;
2727
2728                 ceph_pagelist_init(pagelist);
2729                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2730                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2731                                                 pagelist);
2732         }
2733         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2734                                         obj_request->pages, inbound_size,
2735                                         0, false, false);
2736         rbd_osd_req_format_read(obj_request);
2737
2738         ret = rbd_obj_request_submit(osdc, obj_request);
2739         if (ret)
2740                 goto out;
2741         ret = rbd_obj_request_wait(obj_request);
2742         if (ret)
2743                 goto out;
2744
2745         ret = obj_request->result;
2746         if (ret < 0)
2747                 goto out;
2748
2749         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2750         ret = (int)obj_request->xferred;
2751         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2752 out:
2753         if (obj_request)
2754                 rbd_obj_request_put(obj_request);
2755         else
2756                 ceph_release_page_vector(pages, page_count);
2757
2758         return ret;
2759 }
2760
2761 static void rbd_request_fn(struct request_queue *q)
2762                 __releases(q->queue_lock) __acquires(q->queue_lock)
2763 {
2764         struct rbd_device *rbd_dev = q->queuedata;
2765         bool read_only = rbd_dev->mapping.read_only;
2766         struct request *rq;
2767         int result;
2768
2769         while ((rq = blk_fetch_request(q))) {
2770                 bool write_request = rq_data_dir(rq) == WRITE;
2771                 struct rbd_img_request *img_request;
2772                 u64 offset;
2773                 u64 length;
2774
2775                 /* Ignore any non-FS requests that filter through. */
2776
2777                 if (rq->cmd_type != REQ_TYPE_FS) {
2778                         dout("%s: non-fs request type %d\n", __func__,
2779                                 (int) rq->cmd_type);
2780                         __blk_end_request_all(rq, 0);
2781                         continue;
2782                 }
2783
2784                 /* Ignore/skip any zero-length requests */
2785
2786                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2787                 length = (u64) blk_rq_bytes(rq);
2788
2789                 if (!length) {
2790                         dout("%s: zero-length request\n", __func__);
2791                         __blk_end_request_all(rq, 0);
2792                         continue;
2793                 }
2794
2795                 spin_unlock_irq(q->queue_lock);
2796
2797                 /* Disallow writes to a read-only device */
2798
2799                 if (write_request) {
2800                         result = -EROFS;
2801                         if (read_only)
2802                                 goto end_request;
2803                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2804                 }
2805
2806                 /*
2807                  * Quit early if the mapped snapshot no longer
2808                  * exists.  It's still possible the snapshot will
2809                  * have disappeared by the time our request arrives
2810                  * at the osd, but there's no sense in sending it if
2811                  * we already know.
2812                  */
2813                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2814                         dout("request for non-existent snapshot");
2815                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2816                         result = -ENXIO;
2817                         goto end_request;
2818                 }
2819
2820                 result = -EINVAL;
2821                 if (offset && length > U64_MAX - offset + 1) {
2822                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2823                                 offset, length);
2824                         goto end_request;       /* Shouldn't happen */
2825                 }
2826
2827                 result = -ENOMEM;
2828                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2829                                                         write_request, false);
2830                 if (!img_request)
2831                         goto end_request;
2832
2833                 img_request->rq = rq;
2834
2835                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2836                                                 rq->bio);
2837                 if (!result)
2838                         result = rbd_img_request_submit(img_request);
2839                 if (result)
2840                         rbd_img_request_put(img_request);
2841 end_request:
2842                 spin_lock_irq(q->queue_lock);
2843                 if (result < 0) {
2844                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2845                                 write_request ? "write" : "read",
2846                                 length, offset, result);
2847
2848                         __blk_end_request_all(rq, result);
2849                 }
2850         }
2851 }
2852
2853 /*
2854  * a queue callback. Makes sure that we don't create a bio that spans across
2855  * multiple osd objects. One exception would be with a single page bios,
2856  * which we handle later at bio_chain_clone_range()
2857  */
2858 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2859                           struct bio_vec *bvec)
2860 {
2861         struct rbd_device *rbd_dev = q->queuedata;
2862         sector_t sector_offset;
2863         sector_t sectors_per_obj;
2864         sector_t obj_sector_offset;
2865         int ret;
2866
2867         /*
2868          * Find how far into its rbd object the partition-relative
2869          * bio start sector is to offset relative to the enclosing
2870          * device.
2871          */
2872         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2873         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2874         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2875
2876         /*
2877          * Compute the number of bytes from that offset to the end
2878          * of the object.  Account for what's already used by the bio.
2879          */
2880         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2881         if (ret > bmd->bi_size)
2882                 ret -= bmd->bi_size;
2883         else
2884                 ret = 0;
2885
2886         /*
2887          * Don't send back more than was asked for.  And if the bio
2888          * was empty, let the whole thing through because:  "Note
2889          * that a block device *must* allow a single page to be
2890          * added to an empty bio."
2891          */
2892         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2893         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2894                 ret = (int) bvec->bv_len;
2895
2896         return ret;
2897 }
2898
2899 static void rbd_free_disk(struct rbd_device *rbd_dev)
2900 {
2901         struct gendisk *disk = rbd_dev->disk;
2902
2903         if (!disk)
2904                 return;
2905
2906         rbd_dev->disk = NULL;
2907         if (disk->flags & GENHD_FL_UP) {
2908                 del_gendisk(disk);
2909                 if (disk->queue)
2910                         blk_cleanup_queue(disk->queue);
2911         }
2912         put_disk(disk);
2913 }
2914
2915 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2916                                 const char *object_name,
2917                                 u64 offset, u64 length, void *buf)
2918
2919 {
2920         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2921         struct rbd_obj_request *obj_request;
2922         struct page **pages = NULL;
2923         u32 page_count;
2924         size_t size;
2925         int ret;
2926
2927         page_count = (u32) calc_pages_for(offset, length);
2928         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2929         if (IS_ERR(pages))
2930                 ret = PTR_ERR(pages);
2931
2932         ret = -ENOMEM;
2933         obj_request = rbd_obj_request_create(object_name, offset, length,
2934                                                         OBJ_REQUEST_PAGES);
2935         if (!obj_request)
2936                 goto out;
2937
2938         obj_request->pages = pages;
2939         obj_request->page_count = page_count;
2940
2941         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2942         if (!obj_request->osd_req)
2943                 goto out;
2944
2945         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2946                                         offset, length, 0, 0);
2947         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2948                                         obj_request->pages,
2949                                         obj_request->length,
2950                                         obj_request->offset & ~PAGE_MASK,
2951                                         false, false);
2952         rbd_osd_req_format_read(obj_request);
2953
2954         ret = rbd_obj_request_submit(osdc, obj_request);
2955         if (ret)
2956                 goto out;
2957         ret = rbd_obj_request_wait(obj_request);
2958         if (ret)
2959                 goto out;
2960
2961         ret = obj_request->result;
2962         if (ret < 0)
2963                 goto out;
2964
2965         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2966         size = (size_t) obj_request->xferred;
2967         ceph_copy_from_page_vector(pages, buf, 0, size);
2968         rbd_assert(size <= (size_t)INT_MAX);
2969         ret = (int)size;
2970 out:
2971         if (obj_request)
2972                 rbd_obj_request_put(obj_request);
2973         else
2974                 ceph_release_page_vector(pages, page_count);
2975
2976         return ret;
2977 }
2978
2979 /*
2980  * Read the complete header for the given rbd device.
2981  *
2982  * Returns a pointer to a dynamically-allocated buffer containing
2983  * the complete and validated header.  Caller can pass the address
2984  * of a variable that will be filled in with the version of the
2985  * header object at the time it was read.
2986  *
2987  * Returns a pointer-coded errno if a failure occurs.
2988  */
2989 static struct rbd_image_header_ondisk *
2990 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2991 {
2992         struct rbd_image_header_ondisk *ondisk = NULL;
2993         u32 snap_count = 0;
2994         u64 names_size = 0;
2995         u32 want_count;
2996         int ret;
2997
2998         /*
2999          * The complete header will include an array of its 64-bit
3000          * snapshot ids, followed by the names of those snapshots as
3001          * a contiguous block of NUL-terminated strings.  Note that
3002          * the number of snapshots could change by the time we read
3003          * it in, in which case we re-read it.
3004          */
3005         do {
3006                 size_t size;
3007
3008                 kfree(ondisk);
3009
3010                 size = sizeof (*ondisk);
3011                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3012                 size += names_size;
3013                 ondisk = kmalloc(size, GFP_KERNEL);
3014                 if (!ondisk)
3015                         return ERR_PTR(-ENOMEM);
3016
3017                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3018                                        0, size, ondisk);
3019                 if (ret < 0)
3020                         goto out_err;
3021                 if ((size_t)ret < size) {
3022                         ret = -ENXIO;
3023                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3024                                 size, ret);
3025                         goto out_err;
3026                 }
3027                 if (!rbd_dev_ondisk_valid(ondisk)) {
3028                         ret = -ENXIO;
3029                         rbd_warn(rbd_dev, "invalid header");
3030                         goto out_err;
3031                 }
3032
3033                 names_size = le64_to_cpu(ondisk->snap_names_len);
3034                 want_count = snap_count;
3035                 snap_count = le32_to_cpu(ondisk->snap_count);
3036         } while (snap_count != want_count);
3037
3038         return ondisk;
3039
3040 out_err:
3041         kfree(ondisk);
3042
3043         return ERR_PTR(ret);
3044 }
3045
3046 /*
3047  * reload the ondisk the header
3048  */
3049 static int rbd_read_header(struct rbd_device *rbd_dev,
3050                            struct rbd_image_header *header)
3051 {
3052         struct rbd_image_header_ondisk *ondisk;
3053         int ret;
3054
3055         ondisk = rbd_dev_v1_header_read(rbd_dev);
3056         if (IS_ERR(ondisk))
3057                 return PTR_ERR(ondisk);
3058         ret = rbd_header_from_disk(header, ondisk);
3059         kfree(ondisk);
3060
3061         return ret;
3062 }
3063
3064 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3065 {
3066         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3067                 return;
3068
3069         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3070                 sector_t size;
3071
3072                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3073                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3074                 dout("setting size to %llu sectors", (unsigned long long)size);
3075                 set_capacity(rbd_dev->disk, size);
3076         }
3077 }
3078
3079 /*
3080  * only read the first part of the ondisk header, without the snaps info
3081  */
3082 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3083 {
3084         int ret;
3085         struct rbd_image_header h;
3086
3087         ret = rbd_read_header(rbd_dev, &h);
3088         if (ret < 0)
3089                 return ret;
3090
3091         down_write(&rbd_dev->header_rwsem);
3092
3093         /* Update image size, and check for resize of mapped image */
3094         rbd_dev->header.image_size = h.image_size;
3095         rbd_update_mapping_size(rbd_dev);
3096
3097         /* rbd_dev->header.object_prefix shouldn't change */
3098         kfree(rbd_dev->header.snap_sizes);
3099         kfree(rbd_dev->header.snap_names);
3100         /* osd requests may still refer to snapc */
3101         ceph_put_snap_context(rbd_dev->header.snapc);
3102
3103         rbd_dev->header.image_size = h.image_size;
3104         rbd_dev->header.snapc = h.snapc;
3105         rbd_dev->header.snap_names = h.snap_names;
3106         rbd_dev->header.snap_sizes = h.snap_sizes;
3107         /* Free the extra copy of the object prefix */
3108         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3109                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3110         kfree(h.object_prefix);
3111
3112         up_write(&rbd_dev->header_rwsem);
3113
3114         return ret;
3115 }
3116
3117 /*
3118  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3119  * has disappeared from the (just updated) snapshot context.
3120  */
3121 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3122 {
3123         u64 snap_id;
3124
3125         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3126                 return;
3127
3128         snap_id = rbd_dev->spec->snap_id;
3129         if (snap_id == CEPH_NOSNAP)
3130                 return;
3131
3132         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3133                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3134 }
3135
3136 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3137 {
3138         u64 image_size;
3139         int ret;
3140
3141         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3142         image_size = rbd_dev->header.image_size;
3143         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3144         if (rbd_dev->image_format == 1)
3145                 ret = rbd_dev_v1_refresh(rbd_dev);
3146         else
3147                 ret = rbd_dev_v2_refresh(rbd_dev);
3148
3149         /* If it's a mapped snapshot, validate its EXISTS flag */
3150
3151         rbd_exists_validate(rbd_dev);
3152         mutex_unlock(&ctl_mutex);
3153         if (ret)
3154                 rbd_warn(rbd_dev, "got notification but failed to "
3155                            " update snaps: %d\n", ret);
3156         if (image_size != rbd_dev->header.image_size)
3157                 revalidate_disk(rbd_dev->disk);
3158
3159         return ret;
3160 }
3161
3162 static int rbd_init_disk(struct rbd_device *rbd_dev)
3163 {
3164         struct gendisk *disk;
3165         struct request_queue *q;
3166         u64 segment_size;
3167
3168         /* create gendisk info */
3169         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3170         if (!disk)
3171                 return -ENOMEM;
3172
3173         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3174                  rbd_dev->dev_id);
3175         disk->major = rbd_dev->major;
3176         disk->first_minor = 0;
3177         disk->fops = &rbd_bd_ops;
3178         disk->private_data = rbd_dev;
3179
3180         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3181         if (!q)
3182                 goto out_disk;
3183
3184         /* We use the default size, but let's be explicit about it. */
3185         blk_queue_physical_block_size(q, SECTOR_SIZE);
3186
3187         /* set io sizes to object size */
3188         segment_size = rbd_obj_bytes(&rbd_dev->header);
3189         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3190         blk_queue_max_segment_size(q, segment_size);
3191         blk_queue_io_min(q, segment_size);
3192         blk_queue_io_opt(q, segment_size);
3193
3194         blk_queue_merge_bvec(q, rbd_merge_bvec);
3195         disk->queue = q;
3196
3197         q->queuedata = rbd_dev;
3198
3199         rbd_dev->disk = disk;
3200
3201         return 0;
3202 out_disk:
3203         put_disk(disk);
3204
3205         return -ENOMEM;
3206 }
3207
3208 /*
3209   sysfs
3210 */
3211
3212 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3213 {
3214         return container_of(dev, struct rbd_device, dev);
3215 }
3216
3217 static ssize_t rbd_size_show(struct device *dev,
3218                              struct device_attribute *attr, char *buf)
3219 {
3220         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3221
3222         return sprintf(buf, "%llu\n",
3223                 (unsigned long long)rbd_dev->mapping.size);
3224 }
3225
3226 /*
3227  * Note this shows the features for whatever's mapped, which is not
3228  * necessarily the base image.
3229  */
3230 static ssize_t rbd_features_show(struct device *dev,
3231                              struct device_attribute *attr, char *buf)
3232 {
3233         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3234
3235         return sprintf(buf, "0x%016llx\n",
3236                         (unsigned long long)rbd_dev->mapping.features);
3237 }
3238
3239 static ssize_t rbd_major_show(struct device *dev,
3240                               struct device_attribute *attr, char *buf)
3241 {
3242         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3243
3244         if (rbd_dev->major)
3245                 return sprintf(buf, "%d\n", rbd_dev->major);
3246
3247         return sprintf(buf, "(none)\n");
3248
3249 }
3250
3251 static ssize_t rbd_client_id_show(struct device *dev,
3252                                   struct device_attribute *attr, char *buf)
3253 {
3254         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3255
3256         return sprintf(buf, "client%lld\n",
3257                         ceph_client_id(rbd_dev->rbd_client->client));
3258 }
3259
3260 static ssize_t rbd_pool_show(struct device *dev,
3261                              struct device_attribute *attr, char *buf)
3262 {
3263         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3264
3265         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3266 }
3267
3268 static ssize_t rbd_pool_id_show(struct device *dev,
3269                              struct device_attribute *attr, char *buf)
3270 {
3271         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3272
3273         return sprintf(buf, "%llu\n",
3274                         (unsigned long long) rbd_dev->spec->pool_id);
3275 }
3276
3277 static ssize_t rbd_name_show(struct device *dev,
3278                              struct device_attribute *attr, char *buf)
3279 {
3280         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281
3282         if (rbd_dev->spec->image_name)
3283                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3284
3285         return sprintf(buf, "(unknown)\n");
3286 }
3287
3288 static ssize_t rbd_image_id_show(struct device *dev,
3289                              struct device_attribute *attr, char *buf)
3290 {
3291         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3292
3293         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3294 }
3295
3296 /*
3297  * Shows the name of the currently-mapped snapshot (or
3298  * RBD_SNAP_HEAD_NAME for the base image).
3299  */
3300 static ssize_t rbd_snap_show(struct device *dev,
3301                              struct device_attribute *attr,
3302                              char *buf)
3303 {
3304         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3305
3306         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3307 }
3308
3309 /*
3310  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3311  * for the parent image.  If there is no parent, simply shows
3312  * "(no parent image)".
3313  */
3314 static ssize_t rbd_parent_show(struct device *dev,
3315                              struct device_attribute *attr,
3316                              char *buf)
3317 {
3318         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3319         struct rbd_spec *spec = rbd_dev->parent_spec;
3320         int count;
3321         char *bufp = buf;
3322
3323         if (!spec)
3324                 return sprintf(buf, "(no parent image)\n");
3325
3326         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3327                         (unsigned long long) spec->pool_id, spec->pool_name);
3328         if (count < 0)
3329                 return count;
3330         bufp += count;
3331
3332         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3333                         spec->image_name ? spec->image_name : "(unknown)");
3334         if (count < 0)
3335                 return count;
3336         bufp += count;
3337
3338         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3339                         (unsigned long long) spec->snap_id, spec->snap_name);
3340         if (count < 0)
3341                 return count;
3342         bufp += count;
3343
3344         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3345         if (count < 0)
3346                 return count;
3347         bufp += count;
3348
3349         return (ssize_t) (bufp - buf);
3350 }
3351
3352 static ssize_t rbd_image_refresh(struct device *dev,
3353                                  struct device_attribute *attr,
3354                                  const char *buf,
3355                                  size_t size)
3356 {
3357         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3358         int ret;
3359
3360         ret = rbd_dev_refresh(rbd_dev);
3361
3362         return ret < 0 ? ret : size;
3363 }
3364
3365 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3366 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3367 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3368 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3369 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3370 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3371 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3372 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3373 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3374 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3375 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3376
3377 static struct attribute *rbd_attrs[] = {
3378         &dev_attr_size.attr,
3379         &dev_attr_features.attr,
3380         &dev_attr_major.attr,
3381         &dev_attr_client_id.attr,
3382         &dev_attr_pool.attr,
3383         &dev_attr_pool_id.attr,
3384         &dev_attr_name.attr,
3385         &dev_attr_image_id.attr,
3386         &dev_attr_current_snap.attr,
3387         &dev_attr_parent.attr,
3388         &dev_attr_refresh.attr,
3389         NULL
3390 };
3391
3392 static struct attribute_group rbd_attr_group = {
3393         .attrs = rbd_attrs,
3394 };
3395
3396 static const struct attribute_group *rbd_attr_groups[] = {
3397         &rbd_attr_group,
3398         NULL
3399 };
3400
3401 static void rbd_sysfs_dev_release(struct device *dev)
3402 {
3403 }
3404
3405 static struct device_type rbd_device_type = {
3406         .name           = "rbd",
3407         .groups         = rbd_attr_groups,
3408         .release        = rbd_sysfs_dev_release,
3409 };
3410
3411 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3412 {
3413         kref_get(&spec->kref);
3414
3415         return spec;
3416 }
3417
3418 static void rbd_spec_free(struct kref *kref);
3419 static void rbd_spec_put(struct rbd_spec *spec)
3420 {
3421         if (spec)
3422                 kref_put(&spec->kref, rbd_spec_free);
3423 }
3424
3425 static struct rbd_spec *rbd_spec_alloc(void)
3426 {
3427         struct rbd_spec *spec;
3428
3429         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3430         if (!spec)
3431                 return NULL;
3432         kref_init(&spec->kref);
3433
3434         return spec;
3435 }
3436
3437 static void rbd_spec_free(struct kref *kref)
3438 {
3439         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3440
3441         kfree(spec->pool_name);
3442         kfree(spec->image_id);
3443         kfree(spec->image_name);
3444         kfree(spec->snap_name);
3445         kfree(spec);
3446 }
3447
3448 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3449                                 struct rbd_spec *spec)
3450 {
3451         struct rbd_device *rbd_dev;
3452
3453         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3454         if (!rbd_dev)
3455                 return NULL;
3456
3457         spin_lock_init(&rbd_dev->lock);
3458         rbd_dev->flags = 0;
3459         INIT_LIST_HEAD(&rbd_dev->node);
3460         init_rwsem(&rbd_dev->header_rwsem);
3461
3462         rbd_dev->spec = spec;
3463         rbd_dev->rbd_client = rbdc;
3464
3465         /* Initialize the layout used for all rbd requests */
3466
3467         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3468         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3469         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3470         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3471
3472         return rbd_dev;
3473 }
3474
3475 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3476 {
3477         rbd_put_client(rbd_dev->rbd_client);
3478         rbd_spec_put(rbd_dev->spec);
3479         kfree(rbd_dev);
3480 }
3481
3482 /*
3483  * Get the size and object order for an image snapshot, or if
3484  * snap_id is CEPH_NOSNAP, gets this information for the base
3485  * image.
3486  */
3487 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3488                                 u8 *order, u64 *snap_size)
3489 {
3490         __le64 snapid = cpu_to_le64(snap_id);
3491         int ret;
3492         struct {
3493                 u8 order;
3494                 __le64 size;
3495         } __attribute__ ((packed)) size_buf = { 0 };
3496
3497         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3498                                 "rbd", "get_size",
3499                                 &snapid, sizeof (snapid),
3500                                 &size_buf, sizeof (size_buf));
3501         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3502         if (ret < 0)
3503                 return ret;
3504         if (ret < sizeof (size_buf))
3505                 return -ERANGE;
3506
3507         if (order)
3508                 *order = size_buf.order;
3509         *snap_size = le64_to_cpu(size_buf.size);
3510
3511         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3512                 (unsigned long long)snap_id, (unsigned int)*order,
3513                 (unsigned long long)*snap_size);
3514
3515         return 0;
3516 }
3517
3518 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3519 {
3520         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3521                                         &rbd_dev->header.obj_order,
3522                                         &rbd_dev->header.image_size);
3523 }
3524
3525 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3526 {
3527         void *reply_buf;
3528         int ret;
3529         void *p;
3530
3531         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3532         if (!reply_buf)
3533                 return -ENOMEM;
3534
3535         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3536                                 "rbd", "get_object_prefix", NULL, 0,
3537                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3538         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3539         if (ret < 0)
3540                 goto out;
3541
3542         p = reply_buf;
3543         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3544                                                 p + ret, NULL, GFP_NOIO);
3545         ret = 0;
3546
3547         if (IS_ERR(rbd_dev->header.object_prefix)) {
3548                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3549                 rbd_dev->header.object_prefix = NULL;
3550         } else {
3551                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3552         }
3553 out:
3554         kfree(reply_buf);
3555
3556         return ret;
3557 }
3558
3559 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3560                 u64 *snap_features)
3561 {
3562         __le64 snapid = cpu_to_le64(snap_id);
3563         struct {
3564                 __le64 features;
3565                 __le64 incompat;
3566         } __attribute__ ((packed)) features_buf = { 0 };
3567         u64 incompat;
3568         int ret;
3569
3570         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3571                                 "rbd", "get_features",
3572                                 &snapid, sizeof (snapid),
3573                                 &features_buf, sizeof (features_buf));
3574         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3575         if (ret < 0)
3576                 return ret;
3577         if (ret < sizeof (features_buf))
3578                 return -ERANGE;
3579
3580         incompat = le64_to_cpu(features_buf.incompat);
3581         if (incompat & ~RBD_FEATURES_SUPPORTED)
3582                 return -ENXIO;
3583
3584         *snap_features = le64_to_cpu(features_buf.features);
3585
3586         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3587                 (unsigned long long)snap_id,
3588                 (unsigned long long)*snap_features,
3589                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3590
3591         return 0;
3592 }
3593
3594 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3595 {
3596         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3597                                                 &rbd_dev->header.features);
3598 }
3599
3600 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3601 {
3602         struct rbd_spec *parent_spec;
3603         size_t size;
3604         void *reply_buf = NULL;
3605         __le64 snapid;
3606         void *p;
3607         void *end;
3608         char *image_id;
3609         u64 overlap;
3610         int ret;
3611
3612         parent_spec = rbd_spec_alloc();
3613         if (!parent_spec)
3614                 return -ENOMEM;
3615
3616         size = sizeof (__le64) +                                /* pool_id */
3617                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3618                 sizeof (__le64) +                               /* snap_id */
3619                 sizeof (__le64);                                /* overlap */
3620         reply_buf = kmalloc(size, GFP_KERNEL);
3621         if (!reply_buf) {
3622                 ret = -ENOMEM;
3623                 goto out_err;
3624         }
3625
3626         snapid = cpu_to_le64(CEPH_NOSNAP);
3627         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3628                                 "rbd", "get_parent",
3629                                 &snapid, sizeof (snapid),
3630                                 reply_buf, size);
3631         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3632         if (ret < 0)
3633                 goto out_err;
3634
3635         p = reply_buf;
3636         end = reply_buf + ret;
3637         ret = -ERANGE;
3638         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3639         if (parent_spec->pool_id == CEPH_NOPOOL)
3640                 goto out;       /* No parent?  No problem. */
3641
3642         /* The ceph file layout needs to fit pool id in 32 bits */
3643
3644         ret = -EIO;
3645         if (parent_spec->pool_id > (u64)U32_MAX) {
3646                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3647                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3648                 goto out_err;
3649         }
3650
3651         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3652         if (IS_ERR(image_id)) {
3653                 ret = PTR_ERR(image_id);
3654                 goto out_err;
3655         }
3656         parent_spec->image_id = image_id;
3657         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3658         ceph_decode_64_safe(&p, end, overlap, out_err);
3659
3660         rbd_dev->parent_overlap = overlap;
3661         rbd_dev->parent_spec = parent_spec;
3662         parent_spec = NULL;     /* rbd_dev now owns this */
3663 out:
3664         ret = 0;
3665 out_err:
3666         kfree(reply_buf);
3667         rbd_spec_put(parent_spec);
3668
3669         return ret;
3670 }
3671
3672 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3673 {
3674         struct {
3675                 __le64 stripe_unit;
3676                 __le64 stripe_count;
3677         } __attribute__ ((packed)) striping_info_buf = { 0 };
3678         size_t size = sizeof (striping_info_buf);
3679         void *p;
3680         u64 obj_size;
3681         u64 stripe_unit;
3682         u64 stripe_count;
3683         int ret;
3684
3685         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3686                                 "rbd", "get_stripe_unit_count", NULL, 0,
3687                                 (char *)&striping_info_buf, size);
3688         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3689         if (ret < 0)
3690                 return ret;
3691         if (ret < size)
3692                 return -ERANGE;
3693
3694         /*
3695          * We don't actually support the "fancy striping" feature
3696          * (STRIPINGV2) yet, but if the striping sizes are the
3697          * defaults the behavior is the same as before.  So find
3698          * out, and only fail if the image has non-default values.
3699          */
3700         ret = -EINVAL;
3701         obj_size = (u64)1 << rbd_dev->header.obj_order;
3702         p = &striping_info_buf;
3703         stripe_unit = ceph_decode_64(&p);
3704         if (stripe_unit != obj_size) {
3705                 rbd_warn(rbd_dev, "unsupported stripe unit "
3706                                 "(got %llu want %llu)",
3707                                 stripe_unit, obj_size);
3708                 return -EINVAL;
3709         }
3710         stripe_count = ceph_decode_64(&p);
3711         if (stripe_count != 1) {
3712                 rbd_warn(rbd_dev, "unsupported stripe count "
3713                                 "(got %llu want 1)", stripe_count);
3714                 return -EINVAL;
3715         }
3716         rbd_dev->header.stripe_unit = stripe_unit;
3717         rbd_dev->header.stripe_count = stripe_count;
3718
3719         return 0;
3720 }
3721
3722 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3723 {
3724         size_t image_id_size;
3725         char *image_id;
3726         void *p;
3727         void *end;
3728         size_t size;
3729         void *reply_buf = NULL;
3730         size_t len = 0;
3731         char *image_name = NULL;
3732         int ret;
3733
3734         rbd_assert(!rbd_dev->spec->image_name);
3735
3736         len = strlen(rbd_dev->spec->image_id);
3737         image_id_size = sizeof (__le32) + len;
3738         image_id = kmalloc(image_id_size, GFP_KERNEL);
3739         if (!image_id)
3740                 return NULL;
3741
3742         p = image_id;
3743         end = image_id + image_id_size;
3744         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3745
3746         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3747         reply_buf = kmalloc(size, GFP_KERNEL);
3748         if (!reply_buf)
3749                 goto out;
3750
3751         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3752                                 "rbd", "dir_get_name",
3753                                 image_id, image_id_size,
3754                                 reply_buf, size);
3755         if (ret < 0)
3756                 goto out;
3757         p = reply_buf;
3758         end = reply_buf + ret;
3759
3760         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3761         if (IS_ERR(image_name))
3762                 image_name = NULL;
3763         else
3764                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3765 out:
3766         kfree(reply_buf);
3767         kfree(image_id);
3768
3769         return image_name;
3770 }
3771
3772 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3773 {
3774         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3775         const char *snap_name;
3776         u32 which = 0;
3777
3778         /* Skip over names until we find the one we are looking for */
3779
3780         snap_name = rbd_dev->header.snap_names;
3781         while (which < snapc->num_snaps) {
3782                 if (!strcmp(name, snap_name))
3783                         return snapc->snaps[which];
3784                 snap_name += strlen(snap_name) + 1;
3785                 which++;
3786         }
3787         return CEPH_NOSNAP;
3788 }
3789
3790 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3791 {
3792         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3793         u32 which;
3794         bool found = false;
3795         u64 snap_id;
3796
3797         for (which = 0; !found && which < snapc->num_snaps; which++) {
3798                 const char *snap_name;
3799
3800                 snap_id = snapc->snaps[which];
3801                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3802                 if (IS_ERR(snap_name))
3803                         break;
3804                 found = !strcmp(name, snap_name);
3805                 kfree(snap_name);
3806         }
3807         return found ? snap_id : CEPH_NOSNAP;
3808 }
3809
3810 /*
3811  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3812  * no snapshot by that name is found, or if an error occurs.
3813  */
3814 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3815 {
3816         if (rbd_dev->image_format == 1)
3817                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3818
3819         return rbd_v2_snap_id_by_name(rbd_dev, name);
3820 }
3821
3822 /*
3823  * When an rbd image has a parent image, it is identified by the
3824  * pool, image, and snapshot ids (not names).  This function fills
3825  * in the names for those ids.  (It's OK if we can't figure out the
3826  * name for an image id, but the pool and snapshot ids should always
3827  * exist and have names.)  All names in an rbd spec are dynamically
3828  * allocated.
3829  *
3830  * When an image being mapped (not a parent) is probed, we have the
3831  * pool name and pool id, image name and image id, and the snapshot
3832  * name.  The only thing we're missing is the snapshot id.
3833  */
3834 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3835 {
3836         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3837         struct rbd_spec *spec = rbd_dev->spec;
3838         const char *pool_name;
3839         const char *image_name;
3840         const char *snap_name;
3841         int ret;
3842
3843         /*
3844          * An image being mapped will have the pool name (etc.), but
3845          * we need to look up the snapshot id.
3846          */
3847         if (spec->pool_name) {
3848                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3849                         u64 snap_id;
3850
3851                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3852                         if (snap_id == CEPH_NOSNAP)
3853                                 return -ENOENT;
3854                         spec->snap_id = snap_id;
3855                 } else {
3856                         spec->snap_id = CEPH_NOSNAP;
3857                 }
3858
3859                 return 0;
3860         }
3861
3862         /* Get the pool name; we have to make our own copy of this */
3863
3864         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3865         if (!pool_name) {
3866                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3867                 return -EIO;
3868         }
3869         pool_name = kstrdup(pool_name, GFP_KERNEL);
3870         if (!pool_name)
3871                 return -ENOMEM;
3872
3873         /* Fetch the image name; tolerate failure here */
3874
3875         image_name = rbd_dev_image_name(rbd_dev);
3876         if (!image_name)
3877                 rbd_warn(rbd_dev, "unable to get image name");
3878
3879         /* Look up the snapshot name, and make a copy */
3880
3881         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3882         if (!snap_name) {
3883                 ret = -ENOMEM;
3884                 goto out_err;
3885         }
3886
3887         spec->pool_name = pool_name;
3888         spec->image_name = image_name;
3889         spec->snap_name = snap_name;
3890
3891         return 0;
3892 out_err:
3893         kfree(image_name);
3894         kfree(pool_name);
3895
3896         return ret;
3897 }
3898
3899 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3900 {
3901         size_t size;
3902         int ret;
3903         void *reply_buf;
3904         void *p;
3905         void *end;
3906         u64 seq;
3907         u32 snap_count;
3908         struct ceph_snap_context *snapc;
3909         u32 i;
3910
3911         /*
3912          * We'll need room for the seq value (maximum snapshot id),
3913          * snapshot count, and array of that many snapshot ids.
3914          * For now we have a fixed upper limit on the number we're
3915          * prepared to receive.
3916          */
3917         size = sizeof (__le64) + sizeof (__le32) +
3918                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3919         reply_buf = kzalloc(size, GFP_KERNEL);
3920         if (!reply_buf)
3921                 return -ENOMEM;
3922
3923         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3924                                 "rbd", "get_snapcontext", NULL, 0,
3925                                 reply_buf, size);
3926         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3927         if (ret < 0)
3928                 goto out;
3929
3930         p = reply_buf;
3931         end = reply_buf + ret;
3932         ret = -ERANGE;
3933         ceph_decode_64_safe(&p, end, seq, out);
3934         ceph_decode_32_safe(&p, end, snap_count, out);
3935
3936         /*
3937          * Make sure the reported number of snapshot ids wouldn't go
3938          * beyond the end of our buffer.  But before checking that,
3939          * make sure the computed size of the snapshot context we
3940          * allocate is representable in a size_t.
3941          */
3942         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3943                                  / sizeof (u64)) {
3944                 ret = -EINVAL;
3945                 goto out;
3946         }
3947         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3948                 goto out;
3949         ret = 0;
3950
3951         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3952         if (!snapc) {
3953                 ret = -ENOMEM;
3954                 goto out;
3955         }
3956         snapc->seq = seq;
3957         for (i = 0; i < snap_count; i++)
3958                 snapc->snaps[i] = ceph_decode_64(&p);
3959
3960         rbd_dev->header.snapc = snapc;
3961
3962         dout("  snap context seq = %llu, snap_count = %u\n",
3963                 (unsigned long long)seq, (unsigned int)snap_count);
3964 out:
3965         kfree(reply_buf);
3966
3967         return ret;
3968 }
3969
3970 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3971                                         u64 snap_id)
3972 {
3973         size_t size;
3974         void *reply_buf;
3975         __le64 snapid;
3976         int ret;
3977         void *p;
3978         void *end;
3979         char *snap_name;
3980
3981         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3982         reply_buf = kmalloc(size, GFP_KERNEL);
3983         if (!reply_buf)
3984                 return ERR_PTR(-ENOMEM);
3985
3986         snapid = cpu_to_le64(snap_id);
3987         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3988                                 "rbd", "get_snapshot_name",
3989                                 &snapid, sizeof (snapid),
3990                                 reply_buf, size);
3991         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3992         if (ret < 0) {
3993                 snap_name = ERR_PTR(ret);
3994                 goto out;
3995         }
3996
3997         p = reply_buf;
3998         end = reply_buf + ret;
3999         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4000         if (IS_ERR(snap_name))
4001                 goto out;
4002
4003         dout("  snap_id 0x%016llx snap_name = %s\n",
4004                 (unsigned long long)snap_id, snap_name);
4005 out:
4006         kfree(reply_buf);
4007
4008         return snap_name;
4009 }
4010
4011 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4012 {
4013         int ret;
4014
4015         down_write(&rbd_dev->header_rwsem);
4016
4017         ret = rbd_dev_v2_image_size(rbd_dev);
4018         if (ret)
4019                 goto out;
4020         rbd_update_mapping_size(rbd_dev);
4021
4022         ret = rbd_dev_v2_snap_context(rbd_dev);
4023         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4024         if (ret)
4025                 goto out;
4026 out:
4027         up_write(&rbd_dev->header_rwsem);
4028
4029         return ret;
4030 }
4031
4032 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4033 {
4034         struct device *dev;
4035         int ret;
4036
4037         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4038
4039         dev = &rbd_dev->dev;
4040         dev->bus = &rbd_bus_type;
4041         dev->type = &rbd_device_type;
4042         dev->parent = &rbd_root_dev;
4043         dev->release = rbd_dev_device_release;
4044         dev_set_name(dev, "%d", rbd_dev->dev_id);
4045         ret = device_register(dev);
4046
4047         mutex_unlock(&ctl_mutex);
4048
4049         return ret;
4050 }
4051
4052 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4053 {
4054         device_unregister(&rbd_dev->dev);
4055 }
4056
4057 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4058
4059 /*
4060  * Get a unique rbd identifier for the given new rbd_dev, and add
4061  * the rbd_dev to the global list.  The minimum rbd id is 1.
4062  */
4063 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4064 {
4065         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4066
4067         spin_lock(&rbd_dev_list_lock);
4068         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4069         spin_unlock(&rbd_dev_list_lock);
4070         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4071                 (unsigned long long) rbd_dev->dev_id);
4072 }
4073
4074 /*
4075  * Remove an rbd_dev from the global list, and record that its
4076  * identifier is no longer in use.
4077  */
4078 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4079 {
4080         struct list_head *tmp;
4081         int rbd_id = rbd_dev->dev_id;
4082         int max_id;
4083
4084         rbd_assert(rbd_id > 0);
4085
4086         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4087                 (unsigned long long) rbd_dev->dev_id);
4088         spin_lock(&rbd_dev_list_lock);
4089         list_del_init(&rbd_dev->node);
4090
4091         /*
4092          * If the id being "put" is not the current maximum, there
4093          * is nothing special we need to do.
4094          */
4095         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4096                 spin_unlock(&rbd_dev_list_lock);
4097                 return;
4098         }
4099
4100         /*
4101          * We need to update the current maximum id.  Search the
4102          * list to find out what it is.  We're more likely to find
4103          * the maximum at the end, so search the list backward.
4104          */
4105         max_id = 0;
4106         list_for_each_prev(tmp, &rbd_dev_list) {
4107                 struct rbd_device *rbd_dev;
4108
4109                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4110                 if (rbd_dev->dev_id > max_id)
4111                         max_id = rbd_dev->dev_id;
4112         }
4113         spin_unlock(&rbd_dev_list_lock);
4114
4115         /*
4116          * The max id could have been updated by rbd_dev_id_get(), in
4117          * which case it now accurately reflects the new maximum.
4118          * Be careful not to overwrite the maximum value in that
4119          * case.
4120          */
4121         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4122         dout("  max dev id has been reset\n");
4123 }
4124
4125 /*
4126  * Skips over white space at *buf, and updates *buf to point to the
4127  * first found non-space character (if any). Returns the length of
4128  * the token (string of non-white space characters) found.  Note
4129  * that *buf must be terminated with '\0'.
4130  */
4131 static inline size_t next_token(const char **buf)
4132 {
4133         /*
4134         * These are the characters that produce nonzero for
4135         * isspace() in the "C" and "POSIX" locales.
4136         */
4137         const char *spaces = " \f\n\r\t\v";
4138
4139         *buf += strspn(*buf, spaces);   /* Find start of token */
4140
4141         return strcspn(*buf, spaces);   /* Return token length */
4142 }
4143
4144 /*
4145  * Finds the next token in *buf, and if the provided token buffer is
4146  * big enough, copies the found token into it.  The result, if
4147  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4148  * must be terminated with '\0' on entry.
4149  *
4150  * Returns the length of the token found (not including the '\0').
4151  * Return value will be 0 if no token is found, and it will be >=
4152  * token_size if the token would not fit.
4153  *
4154  * The *buf pointer will be updated to point beyond the end of the
4155  * found token.  Note that this occurs even if the token buffer is
4156  * too small to hold it.
4157  */
4158 static inline size_t copy_token(const char **buf,
4159                                 char *token,
4160                                 size_t token_size)
4161 {
4162         size_t len;
4163
4164         len = next_token(buf);
4165         if (len < token_size) {
4166                 memcpy(token, *buf, len);
4167                 *(token + len) = '\0';
4168         }
4169         *buf += len;
4170
4171         return len;
4172 }
4173
4174 /*
4175  * Finds the next token in *buf, dynamically allocates a buffer big
4176  * enough to hold a copy of it, and copies the token into the new
4177  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4178  * that a duplicate buffer is created even for a zero-length token.
4179  *
4180  * Returns a pointer to the newly-allocated duplicate, or a null
4181  * pointer if memory for the duplicate was not available.  If
4182  * the lenp argument is a non-null pointer, the length of the token
4183  * (not including the '\0') is returned in *lenp.
4184  *
4185  * If successful, the *buf pointer will be updated to point beyond
4186  * the end of the found token.
4187  *
4188  * Note: uses GFP_KERNEL for allocation.
4189  */
4190 static inline char *dup_token(const char **buf, size_t *lenp)
4191 {
4192         char *dup;
4193         size_t len;
4194
4195         len = next_token(buf);
4196         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4197         if (!dup)
4198                 return NULL;
4199         *(dup + len) = '\0';
4200         *buf += len;
4201
4202         if (lenp)
4203                 *lenp = len;
4204
4205         return dup;
4206 }
4207
4208 /*
4209  * Parse the options provided for an "rbd add" (i.e., rbd image
4210  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4211  * and the data written is passed here via a NUL-terminated buffer.
4212  * Returns 0 if successful or an error code otherwise.
4213  *
4214  * The information extracted from these options is recorded in
4215  * the other parameters which return dynamically-allocated
4216  * structures:
4217  *  ceph_opts
4218  *      The address of a pointer that will refer to a ceph options
4219  *      structure.  Caller must release the returned pointer using
4220  *      ceph_destroy_options() when it is no longer needed.
4221  *  rbd_opts
4222  *      Address of an rbd options pointer.  Fully initialized by
4223  *      this function; caller must release with kfree().
4224  *  spec
4225  *      Address of an rbd image specification pointer.  Fully
4226  *      initialized by this function based on parsed options.
4227  *      Caller must release with rbd_spec_put().
4228  *
4229  * The options passed take this form:
4230  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4231  * where:
4232  *  <mon_addrs>
4233  *      A comma-separated list of one or more monitor addresses.
4234  *      A monitor address is an ip address, optionally followed
4235  *      by a port number (separated by a colon).
4236  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4237  *  <options>
4238  *      A comma-separated list of ceph and/or rbd options.
4239  *  <pool_name>
4240  *      The name of the rados pool containing the rbd image.
4241  *  <image_name>
4242  *      The name of the image in that pool to map.
4243  *  <snap_id>
4244  *      An optional snapshot id.  If provided, the mapping will
4245  *      present data from the image at the time that snapshot was
4246  *      created.  The image head is used if no snapshot id is
4247  *      provided.  Snapshot mappings are always read-only.
4248  */
4249 static int rbd_add_parse_args(const char *buf,
4250                                 struct ceph_options **ceph_opts,
4251                                 struct rbd_options **opts,
4252                                 struct rbd_spec **rbd_spec)
4253 {
4254         size_t len;
4255         char *options;
4256         const char *mon_addrs;
4257         char *snap_name;
4258         size_t mon_addrs_size;
4259         struct rbd_spec *spec = NULL;
4260         struct rbd_options *rbd_opts = NULL;
4261         struct ceph_options *copts;
4262         int ret;
4263
4264         /* The first four tokens are required */
4265
4266         len = next_token(&buf);
4267         if (!len) {
4268                 rbd_warn(NULL, "no monitor address(es) provided");
4269                 return -EINVAL;
4270         }
4271         mon_addrs = buf;
4272         mon_addrs_size = len + 1;
4273         buf += len;
4274
4275         ret = -EINVAL;
4276         options = dup_token(&buf, NULL);
4277         if (!options)
4278                 return -ENOMEM;
4279         if (!*options) {
4280                 rbd_warn(NULL, "no options provided");
4281                 goto out_err;
4282         }
4283
4284         spec = rbd_spec_alloc();
4285         if (!spec)
4286                 goto out_mem;
4287
4288         spec->pool_name = dup_token(&buf, NULL);
4289         if (!spec->pool_name)
4290                 goto out_mem;
4291         if (!*spec->pool_name) {
4292                 rbd_warn(NULL, "no pool name provided");
4293                 goto out_err;
4294         }
4295
4296         spec->image_name = dup_token(&buf, NULL);
4297         if (!spec->image_name)
4298                 goto out_mem;
4299         if (!*spec->image_name) {
4300                 rbd_warn(NULL, "no image name provided");
4301                 goto out_err;
4302         }
4303
4304         /*
4305          * Snapshot name is optional; default is to use "-"
4306          * (indicating the head/no snapshot).
4307          */
4308         len = next_token(&buf);
4309         if (!len) {
4310                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4311                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4312         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4313                 ret = -ENAMETOOLONG;
4314                 goto out_err;
4315         }
4316         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4317         if (!snap_name)
4318                 goto out_mem;
4319         *(snap_name + len) = '\0';
4320         spec->snap_name = snap_name;
4321
4322         /* Initialize all rbd options to the defaults */
4323
4324         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4325         if (!rbd_opts)
4326                 goto out_mem;
4327
4328         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4329
4330         copts = ceph_parse_options(options, mon_addrs,
4331                                         mon_addrs + mon_addrs_size - 1,
4332                                         parse_rbd_opts_token, rbd_opts);
4333         if (IS_ERR(copts)) {
4334                 ret = PTR_ERR(copts);
4335                 goto out_err;
4336         }
4337         kfree(options);
4338
4339         *ceph_opts = copts;
4340         *opts = rbd_opts;
4341         *rbd_spec = spec;
4342
4343         return 0;
4344 out_mem:
4345         ret = -ENOMEM;
4346 out_err:
4347         kfree(rbd_opts);
4348         rbd_spec_put(spec);
4349         kfree(options);
4350
4351         return ret;
4352 }
4353
4354 /*
4355  * An rbd format 2 image has a unique identifier, distinct from the
4356  * name given to it by the user.  Internally, that identifier is
4357  * what's used to specify the names of objects related to the image.
4358  *
4359  * A special "rbd id" object is used to map an rbd image name to its
4360  * id.  If that object doesn't exist, then there is no v2 rbd image
4361  * with the supplied name.
4362  *
4363  * This function will record the given rbd_dev's image_id field if
4364  * it can be determined, and in that case will return 0.  If any
4365  * errors occur a negative errno will be returned and the rbd_dev's
4366  * image_id field will be unchanged (and should be NULL).
4367  */
4368 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4369 {
4370         int ret;
4371         size_t size;
4372         char *object_name;
4373         void *response;
4374         char *image_id;
4375
4376         /*
4377          * When probing a parent image, the image id is already
4378          * known (and the image name likely is not).  There's no
4379          * need to fetch the image id again in this case.  We
4380          * do still need to set the image format though.
4381          */
4382         if (rbd_dev->spec->image_id) {
4383                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4384
4385                 return 0;
4386         }
4387
4388         /*
4389          * First, see if the format 2 image id file exists, and if
4390          * so, get the image's persistent id from it.
4391          */
4392         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4393         object_name = kmalloc(size, GFP_NOIO);
4394         if (!object_name)
4395                 return -ENOMEM;
4396         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4397         dout("rbd id object name is %s\n", object_name);
4398
4399         /* Response will be an encoded string, which includes a length */
4400
4401         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4402         response = kzalloc(size, GFP_NOIO);
4403         if (!response) {
4404                 ret = -ENOMEM;
4405                 goto out;
4406         }
4407
4408         /* If it doesn't exist we'll assume it's a format 1 image */
4409
4410         ret = rbd_obj_method_sync(rbd_dev, object_name,
4411                                 "rbd", "get_id", NULL, 0,
4412                                 response, RBD_IMAGE_ID_LEN_MAX);
4413         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4414         if (ret == -ENOENT) {
4415                 image_id = kstrdup("", GFP_KERNEL);
4416                 ret = image_id ? 0 : -ENOMEM;
4417                 if (!ret)
4418                         rbd_dev->image_format = 1;
4419         } else if (ret > sizeof (__le32)) {
4420                 void *p = response;
4421
4422                 image_id = ceph_extract_encoded_string(&p, p + ret,
4423                                                 NULL, GFP_NOIO);
4424                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4425                 if (!ret)
4426                         rbd_dev->image_format = 2;
4427         } else {
4428                 ret = -EINVAL;
4429         }
4430
4431         if (!ret) {
4432                 rbd_dev->spec->image_id = image_id;
4433                 dout("image_id is %s\n", image_id);
4434         }
4435 out:
4436         kfree(response);
4437         kfree(object_name);
4438
4439         return ret;
4440 }
4441
4442 /* Undo whatever state changes are made by v1 or v2 image probe */
4443
4444 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4445 {
4446         struct rbd_image_header *header;
4447
4448         rbd_dev_remove_parent(rbd_dev);
4449         rbd_spec_put(rbd_dev->parent_spec);
4450         rbd_dev->parent_spec = NULL;
4451         rbd_dev->parent_overlap = 0;
4452
4453         /* Free dynamic fields from the header, then zero it out */
4454
4455         header = &rbd_dev->header;
4456         ceph_put_snap_context(header->snapc);
4457         kfree(header->snap_sizes);
4458         kfree(header->snap_names);
4459         kfree(header->object_prefix);
4460         memset(header, 0, sizeof (*header));
4461 }
4462
4463 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4464 {
4465         int ret;
4466
4467         /* Populate rbd image metadata */
4468
4469         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4470         if (ret < 0)
4471                 goto out_err;
4472
4473         /* Version 1 images have no parent (no layering) */
4474
4475         rbd_dev->parent_spec = NULL;
4476         rbd_dev->parent_overlap = 0;
4477
4478         dout("discovered version 1 image, header name is %s\n",
4479                 rbd_dev->header_name);
4480
4481         return 0;
4482
4483 out_err:
4484         kfree(rbd_dev->header_name);
4485         rbd_dev->header_name = NULL;
4486         kfree(rbd_dev->spec->image_id);
4487         rbd_dev->spec->image_id = NULL;
4488
4489         return ret;
4490 }
4491
4492 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4493 {
4494         int ret;
4495
4496         ret = rbd_dev_v2_image_size(rbd_dev);
4497         if (ret)
4498                 goto out_err;
4499
4500         /* Get the object prefix (a.k.a. block_name) for the image */
4501
4502         ret = rbd_dev_v2_object_prefix(rbd_dev);
4503         if (ret)
4504                 goto out_err;
4505
4506         /* Get the and check features for the image */
4507
4508         ret = rbd_dev_v2_features(rbd_dev);
4509         if (ret)
4510                 goto out_err;
4511
4512         /* If the image supports layering, get the parent info */
4513
4514         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4515                 ret = rbd_dev_v2_parent_info(rbd_dev);
4516                 if (ret)
4517                         goto out_err;
4518
4519                 /*
4520                  * Don't print a warning for parent images.  We can
4521                  * tell this point because we won't know its pool
4522                  * name yet (just its pool id).
4523                  */
4524                 if (rbd_dev->spec->pool_name)
4525                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4526                                         "is EXPERIMENTAL!");
4527         }
4528
4529         /* If the image supports fancy striping, get its parameters */
4530
4531         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4532                 ret = rbd_dev_v2_striping_info(rbd_dev);
4533                 if (ret < 0)
4534                         goto out_err;
4535         }
4536
4537         /* crypto and compression type aren't (yet) supported for v2 images */
4538
4539         rbd_dev->header.crypt_type = 0;
4540         rbd_dev->header.comp_type = 0;
4541
4542         /* Get the snapshot context, plus the header version */
4543
4544         ret = rbd_dev_v2_snap_context(rbd_dev);
4545         if (ret)
4546                 goto out_err;
4547
4548         dout("discovered version 2 image, header name is %s\n",
4549                 rbd_dev->header_name);
4550
4551         return 0;
4552 out_err:
4553         rbd_dev->parent_overlap = 0;
4554         rbd_spec_put(rbd_dev->parent_spec);
4555         rbd_dev->parent_spec = NULL;
4556         kfree(rbd_dev->header_name);
4557         rbd_dev->header_name = NULL;
4558         kfree(rbd_dev->header.object_prefix);
4559         rbd_dev->header.object_prefix = NULL;
4560
4561         return ret;
4562 }
4563
4564 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4565 {
4566         struct rbd_device *parent = NULL;
4567         struct rbd_spec *parent_spec;
4568         struct rbd_client *rbdc;
4569         int ret;
4570
4571         if (!rbd_dev->parent_spec)
4572                 return 0;
4573         /*
4574          * We need to pass a reference to the client and the parent
4575          * spec when creating the parent rbd_dev.  Images related by
4576          * parent/child relationships always share both.
4577          */
4578         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4579         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4580
4581         ret = -ENOMEM;
4582         parent = rbd_dev_create(rbdc, parent_spec);
4583         if (!parent)
4584                 goto out_err;
4585
4586         ret = rbd_dev_image_probe(parent);
4587         if (ret < 0)
4588                 goto out_err;
4589         rbd_dev->parent = parent;
4590
4591         return 0;
4592 out_err:
4593         if (parent) {
4594                 rbd_spec_put(rbd_dev->parent_spec);
4595                 kfree(rbd_dev->header_name);
4596                 rbd_dev_destroy(parent);
4597         } else {
4598                 rbd_put_client(rbdc);
4599                 rbd_spec_put(parent_spec);
4600         }
4601
4602         return ret;
4603 }
4604
4605 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4606 {
4607         int ret;
4608
4609         ret = rbd_dev_mapping_set(rbd_dev);
4610         if (ret)
4611                 return ret;
4612
4613         /* generate unique id: find highest unique id, add one */
4614         rbd_dev_id_get(rbd_dev);
4615
4616         /* Fill in the device name, now that we have its id. */
4617         BUILD_BUG_ON(DEV_NAME_LEN
4618                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4619         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4620
4621         /* Get our block major device number. */
4622
4623         ret = register_blkdev(0, rbd_dev->name);
4624         if (ret < 0)
4625                 goto err_out_id;
4626         rbd_dev->major = ret;
4627
4628         /* Set up the blkdev mapping. */
4629
4630         ret = rbd_init_disk(rbd_dev);
4631         if (ret)
4632                 goto err_out_blkdev;
4633
4634         ret = rbd_bus_add_dev(rbd_dev);
4635         if (ret)
4636                 goto err_out_disk;
4637
4638         /* Everything's ready.  Announce the disk to the world. */
4639
4640         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4641         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4642         add_disk(rbd_dev->disk);
4643
4644         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4645                 (unsigned long long) rbd_dev->mapping.size);
4646
4647         return ret;
4648
4649 err_out_disk:
4650         rbd_free_disk(rbd_dev);
4651 err_out_blkdev:
4652         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4653 err_out_id:
4654         rbd_dev_id_put(rbd_dev);
4655         rbd_dev_mapping_clear(rbd_dev);
4656
4657         return ret;
4658 }
4659
4660 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4661 {
4662         struct rbd_spec *spec = rbd_dev->spec;
4663         size_t size;
4664
4665         /* Record the header object name for this rbd image. */
4666
4667         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4668
4669         if (rbd_dev->image_format == 1)
4670                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4671         else
4672                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4673
4674         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4675         if (!rbd_dev->header_name)
4676                 return -ENOMEM;
4677
4678         if (rbd_dev->image_format == 1)
4679                 sprintf(rbd_dev->header_name, "%s%s",
4680                         spec->image_name, RBD_SUFFIX);
4681         else
4682                 sprintf(rbd_dev->header_name, "%s%s",
4683                         RBD_HEADER_PREFIX, spec->image_id);
4684         return 0;
4685 }
4686
4687 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4688 {
4689         int ret;
4690
4691         rbd_dev_unprobe(rbd_dev);
4692         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4693         if (ret)
4694                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4695         kfree(rbd_dev->header_name);
4696         rbd_dev->header_name = NULL;
4697         rbd_dev->image_format = 0;
4698         kfree(rbd_dev->spec->image_id);
4699         rbd_dev->spec->image_id = NULL;
4700
4701         rbd_dev_destroy(rbd_dev);
4702 }
4703
4704 /*
4705  * Probe for the existence of the header object for the given rbd
4706  * device.  For format 2 images this includes determining the image
4707  * id.
4708  */
4709 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4710 {
4711         int ret;
4712         int tmp;
4713
4714         /*
4715          * Get the id from the image id object.  If it's not a
4716          * format 2 image, we'll get ENOENT back, and we'll assume
4717          * it's a format 1 image.
4718          */
4719         ret = rbd_dev_image_id(rbd_dev);
4720         if (ret)
4721                 return ret;
4722         rbd_assert(rbd_dev->spec->image_id);
4723         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4724
4725         ret = rbd_dev_header_name(rbd_dev);
4726         if (ret)
4727                 goto err_out_format;
4728
4729         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4730         if (ret)
4731                 goto out_header_name;
4732
4733         if (rbd_dev->image_format == 1)
4734                 ret = rbd_dev_v1_probe(rbd_dev);
4735         else
4736                 ret = rbd_dev_v2_probe(rbd_dev);
4737         if (ret)
4738                 goto err_out_watch;
4739
4740         ret = rbd_dev_spec_update(rbd_dev);
4741         if (ret)
4742                 goto err_out_probe;
4743
4744         ret = rbd_dev_probe_parent(rbd_dev);
4745         if (!ret)
4746                 return 0;
4747
4748 err_out_probe:
4749         rbd_dev_unprobe(rbd_dev);
4750 err_out_watch:
4751         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4752         if (tmp)
4753                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4754 out_header_name:
4755         kfree(rbd_dev->header_name);
4756         rbd_dev->header_name = NULL;
4757 err_out_format:
4758         rbd_dev->image_format = 0;
4759         kfree(rbd_dev->spec->image_id);
4760         rbd_dev->spec->image_id = NULL;
4761
4762         dout("probe failed, returning %d\n", ret);
4763
4764         return ret;
4765 }
4766
4767 static ssize_t rbd_add(struct bus_type *bus,
4768                        const char *buf,
4769                        size_t count)
4770 {
4771         struct rbd_device *rbd_dev = NULL;
4772         struct ceph_options *ceph_opts = NULL;
4773         struct rbd_options *rbd_opts = NULL;
4774         struct rbd_spec *spec = NULL;
4775         struct rbd_client *rbdc;
4776         struct ceph_osd_client *osdc;
4777         int rc = -ENOMEM;
4778
4779         if (!try_module_get(THIS_MODULE))
4780                 return -ENODEV;
4781
4782         /* parse add command */
4783         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4784         if (rc < 0)
4785                 goto err_out_module;
4786
4787         rbdc = rbd_get_client(ceph_opts);
4788         if (IS_ERR(rbdc)) {
4789                 rc = PTR_ERR(rbdc);
4790                 goto err_out_args;
4791         }
4792         ceph_opts = NULL;       /* rbd_dev client now owns this */
4793
4794         /* pick the pool */
4795         osdc = &rbdc->client->osdc;
4796         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4797         if (rc < 0)
4798                 goto err_out_client;
4799         spec->pool_id = (u64)rc;
4800
4801         /* The ceph file layout needs to fit pool id in 32 bits */
4802
4803         if (spec->pool_id > (u64)U32_MAX) {
4804                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4805                                 (unsigned long long)spec->pool_id, U32_MAX);
4806                 rc = -EIO;
4807                 goto err_out_client;
4808         }
4809
4810         rbd_dev = rbd_dev_create(rbdc, spec);
4811         if (!rbd_dev)
4812                 goto err_out_client;
4813         rbdc = NULL;            /* rbd_dev now owns this */
4814         spec = NULL;            /* rbd_dev now owns this */
4815
4816         rbd_dev->mapping.read_only = rbd_opts->read_only;
4817         kfree(rbd_opts);
4818         rbd_opts = NULL;        /* done with this */
4819
4820         rc = rbd_dev_image_probe(rbd_dev);
4821         if (rc < 0)
4822                 goto err_out_rbd_dev;
4823
4824         rc = rbd_dev_device_setup(rbd_dev);
4825         if (!rc)
4826                 return count;
4827
4828         rbd_dev_image_release(rbd_dev);
4829 err_out_rbd_dev:
4830         rbd_dev_destroy(rbd_dev);
4831 err_out_client:
4832         rbd_put_client(rbdc);
4833 err_out_args:
4834         if (ceph_opts)
4835                 ceph_destroy_options(ceph_opts);
4836         kfree(rbd_opts);
4837         rbd_spec_put(spec);
4838 err_out_module:
4839         module_put(THIS_MODULE);
4840
4841         dout("Error adding device %s\n", buf);
4842
4843         return (ssize_t)rc;
4844 }
4845
4846 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4847 {
4848         struct list_head *tmp;
4849         struct rbd_device *rbd_dev;
4850
4851         spin_lock(&rbd_dev_list_lock);
4852         list_for_each(tmp, &rbd_dev_list) {
4853                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4854                 if (rbd_dev->dev_id == dev_id) {
4855                         spin_unlock(&rbd_dev_list_lock);
4856                         return rbd_dev;
4857                 }
4858         }
4859         spin_unlock(&rbd_dev_list_lock);
4860         return NULL;
4861 }
4862
4863 static void rbd_dev_device_release(struct device *dev)
4864 {
4865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4866
4867         rbd_free_disk(rbd_dev);
4868         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4869         rbd_dev_clear_mapping(rbd_dev);
4870         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4871         rbd_dev->major = 0;
4872         rbd_dev_id_put(rbd_dev);
4873         rbd_dev_mapping_clear(rbd_dev);
4874 }
4875
4876 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4877 {
4878         while (rbd_dev->parent) {
4879                 struct rbd_device *first = rbd_dev;
4880                 struct rbd_device *second = first->parent;
4881                 struct rbd_device *third;
4882
4883                 /*
4884                  * Follow to the parent with no grandparent and
4885                  * remove it.
4886                  */
4887                 while (second && (third = second->parent)) {
4888                         first = second;
4889                         second = third;
4890                 }
4891                 rbd_assert(second);
4892                 rbd_dev_image_release(second);
4893                 first->parent = NULL;
4894                 first->parent_overlap = 0;
4895
4896                 rbd_assert(first->parent_spec);
4897                 rbd_spec_put(first->parent_spec);
4898                 first->parent_spec = NULL;
4899         }
4900 }
4901
4902 static ssize_t rbd_remove(struct bus_type *bus,
4903                           const char *buf,
4904                           size_t count)
4905 {
4906         struct rbd_device *rbd_dev = NULL;
4907         int target_id;
4908         unsigned long ul;
4909         int ret;
4910
4911         ret = strict_strtoul(buf, 10, &ul);
4912         if (ret)
4913                 return ret;
4914
4915         /* convert to int; abort if we lost anything in the conversion */
4916         target_id = (int) ul;
4917         if (target_id != ul)
4918                 return -EINVAL;
4919
4920         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4921
4922         rbd_dev = __rbd_get_dev(target_id);
4923         if (!rbd_dev) {
4924                 ret = -ENOENT;
4925                 goto done;
4926         }
4927
4928         spin_lock_irq(&rbd_dev->lock);
4929         if (rbd_dev->open_count)
4930                 ret = -EBUSY;
4931         else
4932                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4933         spin_unlock_irq(&rbd_dev->lock);
4934         if (ret < 0)
4935                 goto done;
4936         ret = count;
4937         rbd_bus_del_dev(rbd_dev);
4938         rbd_dev_image_release(rbd_dev);
4939         module_put(THIS_MODULE);
4940 done:
4941         mutex_unlock(&ctl_mutex);
4942
4943         return ret;
4944 }
4945
4946 /*
4947  * create control files in sysfs
4948  * /sys/bus/rbd/...
4949  */
4950 static int rbd_sysfs_init(void)
4951 {
4952         int ret;
4953
4954         ret = device_register(&rbd_root_dev);
4955         if (ret < 0)
4956                 return ret;
4957
4958         ret = bus_register(&rbd_bus_type);
4959         if (ret < 0)
4960                 device_unregister(&rbd_root_dev);
4961
4962         return ret;
4963 }
4964
4965 static void rbd_sysfs_cleanup(void)
4966 {
4967         bus_unregister(&rbd_bus_type);
4968         device_unregister(&rbd_root_dev);
4969 }
4970
4971 static int __init rbd_init(void)
4972 {
4973         int rc;
4974
4975         if (!libceph_compatible(NULL)) {
4976                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4977
4978                 return -EINVAL;
4979         }
4980         rc = rbd_sysfs_init();
4981         if (rc)
4982                 return rc;
4983         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4984         return 0;
4985 }
4986
4987 static void __exit rbd_exit(void)
4988 {
4989         rbd_sysfs_cleanup();
4990 }
4991
4992 module_init(rbd_init);
4993 module_exit(rbd_exit);
4994
4995 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4996 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4997 MODULE_DESCRIPTION("rados block device");
4998
4999 /* following authorship retained from original osdblk.c */
5000 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5001
5002 MODULE_LICENSE("GPL");