]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: snap names are pointer to constant data
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Create a new header structure, translate header format from the on-disk
731  * header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         u32 snap_count;
737         size_t len;
738         size_t size;
739         u32 i;
740
741         memset(header, 0, sizeof (*header));
742
743         snap_count = le32_to_cpu(ondisk->snap_count);
744
745         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
746         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
747         if (!header->object_prefix)
748                 return -ENOMEM;
749         memcpy(header->object_prefix, ondisk->object_prefix, len);
750         header->object_prefix[len] = '\0';
751
752         if (snap_count) {
753                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754
755                 /* Save a copy of the snapshot names */
756
757                 if (snap_names_len > (u64) SIZE_MAX)
758                         return -EIO;
759                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
760                 if (!header->snap_names)
761                         goto out_err;
762                 /*
763                  * Note that rbd_dev_v1_header_read() guarantees
764                  * the ondisk buffer we're working with has
765                  * snap_names_len bytes beyond the end of the
766                  * snapshot id array, this memcpy() is safe.
767                  */
768                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769                         snap_names_len);
770
771                 /* Record each snapshot's size */
772
773                 size = snap_count * sizeof (*header->snap_sizes);
774                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
775                 if (!header->snap_sizes)
776                         goto out_err;
777                 for (i = 0; i < snap_count; i++)
778                         header->snap_sizes[i] =
779                                 le64_to_cpu(ondisk->snaps[i].image_size);
780         } else {
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793
794         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
798         for (i = 0; i < snap_count; i++)
799                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815 {
816         struct rbd_snap *snap;
817
818         if (snap_id == CEPH_NOSNAP)
819                 return RBD_SNAP_HEAD_NAME;
820
821         list_for_each_entry(snap, &rbd_dev->snaps, node)
822                 if (snap_id == snap->id)
823                         return snap->name;
824
825         return NULL;
826 }
827
828 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
829                                         const char *snap_name)
830 {
831         struct rbd_snap *snap;
832
833         list_for_each_entry(snap, &rbd_dev->snaps, node)
834                 if (!strcmp(snap_name, snap->name))
835                         return snap;
836
837         return NULL;
838 }
839
840 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
841 {
842         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
843                     sizeof (RBD_SNAP_HEAD_NAME))) {
844                 rbd_dev->mapping.size = rbd_dev->header.image_size;
845                 rbd_dev->mapping.features = rbd_dev->header.features;
846         } else {
847                 struct rbd_snap *snap;
848
849                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
850                 if (!snap)
851                         return -ENOENT;
852                 rbd_dev->mapping.size = snap->size;
853                 rbd_dev->mapping.features = snap->features;
854                 rbd_dev->mapping.read_only = true;
855         }
856
857         return 0;
858 }
859
860 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
861 {
862         rbd_dev->mapping.size = 0;
863         rbd_dev->mapping.features = 0;
864         rbd_dev->mapping.read_only = true;
865 }
866
867 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
868 {
869         rbd_dev->mapping.size = 0;
870         rbd_dev->mapping.features = 0;
871         rbd_dev->mapping.read_only = true;
872 }
873
874 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
875 {
876         char *name;
877         u64 segment;
878         int ret;
879
880         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
881         if (!name)
882                 return NULL;
883         segment = offset >> rbd_dev->header.obj_order;
884         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
885                         rbd_dev->header.object_prefix, segment);
886         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
887                 pr_err("error formatting segment name for #%llu (%d)\n",
888                         segment, ret);
889                 kfree(name);
890                 name = NULL;
891         }
892
893         return name;
894 }
895
896 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
897 {
898         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
899
900         return offset & (segment_size - 1);
901 }
902
903 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
904                                 u64 offset, u64 length)
905 {
906         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
907
908         offset &= segment_size - 1;
909
910         rbd_assert(length <= U64_MAX - offset);
911         if (offset + length > segment_size)
912                 length = segment_size - offset;
913
914         return length;
915 }
916
917 /*
918  * returns the size of an object in the image
919  */
920 static u64 rbd_obj_bytes(struct rbd_image_header *header)
921 {
922         return 1 << header->obj_order;
923 }
924
925 /*
926  * bio helpers
927  */
928
929 static void bio_chain_put(struct bio *chain)
930 {
931         struct bio *tmp;
932
933         while (chain) {
934                 tmp = chain;
935                 chain = chain->bi_next;
936                 bio_put(tmp);
937         }
938 }
939
940 /*
941  * zeros a bio chain, starting at specific offset
942  */
943 static void zero_bio_chain(struct bio *chain, int start_ofs)
944 {
945         struct bio_vec *bv;
946         unsigned long flags;
947         void *buf;
948         int i;
949         int pos = 0;
950
951         while (chain) {
952                 bio_for_each_segment(bv, chain, i) {
953                         if (pos + bv->bv_len > start_ofs) {
954                                 int remainder = max(start_ofs - pos, 0);
955                                 buf = bvec_kmap_irq(bv, &flags);
956                                 memset(buf + remainder, 0,
957                                        bv->bv_len - remainder);
958                                 bvec_kunmap_irq(buf, &flags);
959                         }
960                         pos += bv->bv_len;
961                 }
962
963                 chain = chain->bi_next;
964         }
965 }
966
967 /*
968  * similar to zero_bio_chain(), zeros data defined by a page array,
969  * starting at the given byte offset from the start of the array and
970  * continuing up to the given end offset.  The pages array is
971  * assumed to be big enough to hold all bytes up to the end.
972  */
973 static void zero_pages(struct page **pages, u64 offset, u64 end)
974 {
975         struct page **page = &pages[offset >> PAGE_SHIFT];
976
977         rbd_assert(end > offset);
978         rbd_assert(end - offset <= (u64)SIZE_MAX);
979         while (offset < end) {
980                 size_t page_offset;
981                 size_t length;
982                 unsigned long flags;
983                 void *kaddr;
984
985                 page_offset = (size_t)(offset & ~PAGE_MASK);
986                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
987                 local_irq_save(flags);
988                 kaddr = kmap_atomic(*page);
989                 memset(kaddr + page_offset, 0, length);
990                 kunmap_atomic(kaddr);
991                 local_irq_restore(flags);
992
993                 offset += length;
994                 page++;
995         }
996 }
997
998 /*
999  * Clone a portion of a bio, starting at the given byte offset
1000  * and continuing for the number of bytes indicated.
1001  */
1002 static struct bio *bio_clone_range(struct bio *bio_src,
1003                                         unsigned int offset,
1004                                         unsigned int len,
1005                                         gfp_t gfpmask)
1006 {
1007         struct bio_vec *bv;
1008         unsigned int resid;
1009         unsigned short idx;
1010         unsigned int voff;
1011         unsigned short end_idx;
1012         unsigned short vcnt;
1013         struct bio *bio;
1014
1015         /* Handle the easy case for the caller */
1016
1017         if (!offset && len == bio_src->bi_size)
1018                 return bio_clone(bio_src, gfpmask);
1019
1020         if (WARN_ON_ONCE(!len))
1021                 return NULL;
1022         if (WARN_ON_ONCE(len > bio_src->bi_size))
1023                 return NULL;
1024         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1025                 return NULL;
1026
1027         /* Find first affected segment... */
1028
1029         resid = offset;
1030         __bio_for_each_segment(bv, bio_src, idx, 0) {
1031                 if (resid < bv->bv_len)
1032                         break;
1033                 resid -= bv->bv_len;
1034         }
1035         voff = resid;
1036
1037         /* ...and the last affected segment */
1038
1039         resid += len;
1040         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1041                 if (resid <= bv->bv_len)
1042                         break;
1043                 resid -= bv->bv_len;
1044         }
1045         vcnt = end_idx - idx + 1;
1046
1047         /* Build the clone */
1048
1049         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1050         if (!bio)
1051                 return NULL;    /* ENOMEM */
1052
1053         bio->bi_bdev = bio_src->bi_bdev;
1054         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1055         bio->bi_rw = bio_src->bi_rw;
1056         bio->bi_flags |= 1 << BIO_CLONED;
1057
1058         /*
1059          * Copy over our part of the bio_vec, then update the first
1060          * and last (or only) entries.
1061          */
1062         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1063                         vcnt * sizeof (struct bio_vec));
1064         bio->bi_io_vec[0].bv_offset += voff;
1065         if (vcnt > 1) {
1066                 bio->bi_io_vec[0].bv_len -= voff;
1067                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1068         } else {
1069                 bio->bi_io_vec[0].bv_len = len;
1070         }
1071
1072         bio->bi_vcnt = vcnt;
1073         bio->bi_size = len;
1074         bio->bi_idx = 0;
1075
1076         return bio;
1077 }
1078
1079 /*
1080  * Clone a portion of a bio chain, starting at the given byte offset
1081  * into the first bio in the source chain and continuing for the
1082  * number of bytes indicated.  The result is another bio chain of
1083  * exactly the given length, or a null pointer on error.
1084  *
1085  * The bio_src and offset parameters are both in-out.  On entry they
1086  * refer to the first source bio and the offset into that bio where
1087  * the start of data to be cloned is located.
1088  *
1089  * On return, bio_src is updated to refer to the bio in the source
1090  * chain that contains first un-cloned byte, and *offset will
1091  * contain the offset of that byte within that bio.
1092  */
1093 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1094                                         unsigned int *offset,
1095                                         unsigned int len,
1096                                         gfp_t gfpmask)
1097 {
1098         struct bio *bi = *bio_src;
1099         unsigned int off = *offset;
1100         struct bio *chain = NULL;
1101         struct bio **end;
1102
1103         /* Build up a chain of clone bios up to the limit */
1104
1105         if (!bi || off >= bi->bi_size || !len)
1106                 return NULL;            /* Nothing to clone */
1107
1108         end = &chain;
1109         while (len) {
1110                 unsigned int bi_size;
1111                 struct bio *bio;
1112
1113                 if (!bi) {
1114                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1115                         goto out_err;   /* EINVAL; ran out of bio's */
1116                 }
1117                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1118                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1119                 if (!bio)
1120                         goto out_err;   /* ENOMEM */
1121
1122                 *end = bio;
1123                 end = &bio->bi_next;
1124
1125                 off += bi_size;
1126                 if (off == bi->bi_size) {
1127                         bi = bi->bi_next;
1128                         off = 0;
1129                 }
1130                 len -= bi_size;
1131         }
1132         *bio_src = bi;
1133         *offset = off;
1134
1135         return chain;
1136 out_err:
1137         bio_chain_put(chain);
1138
1139         return NULL;
1140 }
1141
1142 /*
1143  * The default/initial value for all object request flags is 0.  For
1144  * each flag, once its value is set to 1 it is never reset to 0
1145  * again.
1146  */
1147 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1148 {
1149         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1150                 struct rbd_device *rbd_dev;
1151
1152                 rbd_dev = obj_request->img_request->rbd_dev;
1153                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1154                         obj_request);
1155         }
1156 }
1157
1158 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1159 {
1160         smp_mb();
1161         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1162 }
1163
1164 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1165 {
1166         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1167                 struct rbd_device *rbd_dev = NULL;
1168
1169                 if (obj_request_img_data_test(obj_request))
1170                         rbd_dev = obj_request->img_request->rbd_dev;
1171                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1172                         obj_request);
1173         }
1174 }
1175
1176 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1177 {
1178         smp_mb();
1179         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1180 }
1181
1182 /*
1183  * This sets the KNOWN flag after (possibly) setting the EXISTS
1184  * flag.  The latter is set based on the "exists" value provided.
1185  *
1186  * Note that for our purposes once an object exists it never goes
1187  * away again.  It's possible that the response from two existence
1188  * checks are separated by the creation of the target object, and
1189  * the first ("doesn't exist") response arrives *after* the second
1190  * ("does exist").  In that case we ignore the second one.
1191  */
1192 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1193                                 bool exists)
1194 {
1195         if (exists)
1196                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1197         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1198         smp_mb();
1199 }
1200
1201 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1202 {
1203         smp_mb();
1204         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1205 }
1206
1207 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1208 {
1209         smp_mb();
1210         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1211 }
1212
1213 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1214 {
1215         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1216                 atomic_read(&obj_request->kref.refcount));
1217         kref_get(&obj_request->kref);
1218 }
1219
1220 static void rbd_obj_request_destroy(struct kref *kref);
1221 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1222 {
1223         rbd_assert(obj_request != NULL);
1224         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1225                 atomic_read(&obj_request->kref.refcount));
1226         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1227 }
1228
1229 static void rbd_img_request_get(struct rbd_img_request *img_request)
1230 {
1231         dout("%s: img %p (was %d)\n", __func__, img_request,
1232                 atomic_read(&img_request->kref.refcount));
1233         kref_get(&img_request->kref);
1234 }
1235
1236 static void rbd_img_request_destroy(struct kref *kref);
1237 static void rbd_img_request_put(struct rbd_img_request *img_request)
1238 {
1239         rbd_assert(img_request != NULL);
1240         dout("%s: img %p (was %d)\n", __func__, img_request,
1241                 atomic_read(&img_request->kref.refcount));
1242         kref_put(&img_request->kref, rbd_img_request_destroy);
1243 }
1244
1245 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1246                                         struct rbd_obj_request *obj_request)
1247 {
1248         rbd_assert(obj_request->img_request == NULL);
1249
1250         /* Image request now owns object's original reference */
1251         obj_request->img_request = img_request;
1252         obj_request->which = img_request->obj_request_count;
1253         rbd_assert(!obj_request_img_data_test(obj_request));
1254         obj_request_img_data_set(obj_request);
1255         rbd_assert(obj_request->which != BAD_WHICH);
1256         img_request->obj_request_count++;
1257         list_add_tail(&obj_request->links, &img_request->obj_requests);
1258         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1259                 obj_request->which);
1260 }
1261
1262 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1263                                         struct rbd_obj_request *obj_request)
1264 {
1265         rbd_assert(obj_request->which != BAD_WHICH);
1266
1267         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1268                 obj_request->which);
1269         list_del(&obj_request->links);
1270         rbd_assert(img_request->obj_request_count > 0);
1271         img_request->obj_request_count--;
1272         rbd_assert(obj_request->which == img_request->obj_request_count);
1273         obj_request->which = BAD_WHICH;
1274         rbd_assert(obj_request_img_data_test(obj_request));
1275         rbd_assert(obj_request->img_request == img_request);
1276         obj_request->img_request = NULL;
1277         obj_request->callback = NULL;
1278         rbd_obj_request_put(obj_request);
1279 }
1280
1281 static bool obj_request_type_valid(enum obj_request_type type)
1282 {
1283         switch (type) {
1284         case OBJ_REQUEST_NODATA:
1285         case OBJ_REQUEST_BIO:
1286         case OBJ_REQUEST_PAGES:
1287                 return true;
1288         default:
1289                 return false;
1290         }
1291 }
1292
1293 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1294                                 struct rbd_obj_request *obj_request)
1295 {
1296         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1297
1298         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1299 }
1300
1301 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1302 {
1303
1304         dout("%s: img %p\n", __func__, img_request);
1305
1306         /*
1307          * If no error occurred, compute the aggregate transfer
1308          * count for the image request.  We could instead use
1309          * atomic64_cmpxchg() to update it as each object request
1310          * completes; not clear which way is better off hand.
1311          */
1312         if (!img_request->result) {
1313                 struct rbd_obj_request *obj_request;
1314                 u64 xferred = 0;
1315
1316                 for_each_obj_request(img_request, obj_request)
1317                         xferred += obj_request->xferred;
1318                 img_request->xferred = xferred;
1319         }
1320
1321         if (img_request->callback)
1322                 img_request->callback(img_request);
1323         else
1324                 rbd_img_request_put(img_request);
1325 }
1326
1327 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1328
1329 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1330 {
1331         dout("%s: obj %p\n", __func__, obj_request);
1332
1333         return wait_for_completion_interruptible(&obj_request->completion);
1334 }
1335
1336 /*
1337  * The default/initial value for all image request flags is 0.  Each
1338  * is conditionally set to 1 at image request initialization time
1339  * and currently never change thereafter.
1340  */
1341 static void img_request_write_set(struct rbd_img_request *img_request)
1342 {
1343         set_bit(IMG_REQ_WRITE, &img_request->flags);
1344         smp_mb();
1345 }
1346
1347 static bool img_request_write_test(struct rbd_img_request *img_request)
1348 {
1349         smp_mb();
1350         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1351 }
1352
1353 static void img_request_child_set(struct rbd_img_request *img_request)
1354 {
1355         set_bit(IMG_REQ_CHILD, &img_request->flags);
1356         smp_mb();
1357 }
1358
1359 static bool img_request_child_test(struct rbd_img_request *img_request)
1360 {
1361         smp_mb();
1362         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1363 }
1364
1365 static void img_request_layered_set(struct rbd_img_request *img_request)
1366 {
1367         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1368         smp_mb();
1369 }
1370
1371 static bool img_request_layered_test(struct rbd_img_request *img_request)
1372 {
1373         smp_mb();
1374         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1375 }
1376
1377 static void
1378 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1379 {
1380         u64 xferred = obj_request->xferred;
1381         u64 length = obj_request->length;
1382
1383         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1384                 obj_request, obj_request->img_request, obj_request->result,
1385                 xferred, length);
1386         /*
1387          * ENOENT means a hole in the image.  We zero-fill the
1388          * entire length of the request.  A short read also implies
1389          * zero-fill to the end of the request.  Either way we
1390          * update the xferred count to indicate the whole request
1391          * was satisfied.
1392          */
1393         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1394         if (obj_request->result == -ENOENT) {
1395                 if (obj_request->type == OBJ_REQUEST_BIO)
1396                         zero_bio_chain(obj_request->bio_list, 0);
1397                 else
1398                         zero_pages(obj_request->pages, 0, length);
1399                 obj_request->result = 0;
1400                 obj_request->xferred = length;
1401         } else if (xferred < length && !obj_request->result) {
1402                 if (obj_request->type == OBJ_REQUEST_BIO)
1403                         zero_bio_chain(obj_request->bio_list, xferred);
1404                 else
1405                         zero_pages(obj_request->pages, xferred, length);
1406                 obj_request->xferred = length;
1407         }
1408         obj_request_done_set(obj_request);
1409 }
1410
1411 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1412 {
1413         dout("%s: obj %p cb %p\n", __func__, obj_request,
1414                 obj_request->callback);
1415         if (obj_request->callback)
1416                 obj_request->callback(obj_request);
1417         else
1418                 complete_all(&obj_request->completion);
1419 }
1420
1421 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1422 {
1423         dout("%s: obj %p\n", __func__, obj_request);
1424         obj_request_done_set(obj_request);
1425 }
1426
1427 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1428 {
1429         struct rbd_img_request *img_request = NULL;
1430         struct rbd_device *rbd_dev = NULL;
1431         bool layered = false;
1432
1433         if (obj_request_img_data_test(obj_request)) {
1434                 img_request = obj_request->img_request;
1435                 layered = img_request && img_request_layered_test(img_request);
1436                 rbd_dev = img_request->rbd_dev;
1437         }
1438
1439         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1440                 obj_request, img_request, obj_request->result,
1441                 obj_request->xferred, obj_request->length);
1442         if (layered && obj_request->result == -ENOENT &&
1443                         obj_request->img_offset < rbd_dev->parent_overlap)
1444                 rbd_img_parent_read(obj_request);
1445         else if (img_request)
1446                 rbd_img_obj_request_read_callback(obj_request);
1447         else
1448                 obj_request_done_set(obj_request);
1449 }
1450
1451 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1452 {
1453         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1454                 obj_request->result, obj_request->length);
1455         /*
1456          * There is no such thing as a successful short write.  Set
1457          * it to our originally-requested length.
1458          */
1459         obj_request->xferred = obj_request->length;
1460         obj_request_done_set(obj_request);
1461 }
1462
1463 /*
1464  * For a simple stat call there's nothing to do.  We'll do more if
1465  * this is part of a write sequence for a layered image.
1466  */
1467 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1468 {
1469         dout("%s: obj %p\n", __func__, obj_request);
1470         obj_request_done_set(obj_request);
1471 }
1472
1473 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1474                                 struct ceph_msg *msg)
1475 {
1476         struct rbd_obj_request *obj_request = osd_req->r_priv;
1477         u16 opcode;
1478
1479         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1480         rbd_assert(osd_req == obj_request->osd_req);
1481         if (obj_request_img_data_test(obj_request)) {
1482                 rbd_assert(obj_request->img_request);
1483                 rbd_assert(obj_request->which != BAD_WHICH);
1484         } else {
1485                 rbd_assert(obj_request->which == BAD_WHICH);
1486         }
1487
1488         if (osd_req->r_result < 0)
1489                 obj_request->result = osd_req->r_result;
1490         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1491
1492         BUG_ON(osd_req->r_num_ops > 2);
1493
1494         /*
1495          * We support a 64-bit length, but ultimately it has to be
1496          * passed to blk_end_request(), which takes an unsigned int.
1497          */
1498         obj_request->xferred = osd_req->r_reply_op_len[0];
1499         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1500         opcode = osd_req->r_ops[0].op;
1501         switch (opcode) {
1502         case CEPH_OSD_OP_READ:
1503                 rbd_osd_read_callback(obj_request);
1504                 break;
1505         case CEPH_OSD_OP_WRITE:
1506                 rbd_osd_write_callback(obj_request);
1507                 break;
1508         case CEPH_OSD_OP_STAT:
1509                 rbd_osd_stat_callback(obj_request);
1510                 break;
1511         case CEPH_OSD_OP_CALL:
1512         case CEPH_OSD_OP_NOTIFY_ACK:
1513         case CEPH_OSD_OP_WATCH:
1514                 rbd_osd_trivial_callback(obj_request);
1515                 break;
1516         default:
1517                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1518                         obj_request->object_name, (unsigned short) opcode);
1519                 break;
1520         }
1521
1522         if (obj_request_done_test(obj_request))
1523                 rbd_obj_request_complete(obj_request);
1524 }
1525
1526 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1527 {
1528         struct rbd_img_request *img_request = obj_request->img_request;
1529         struct ceph_osd_request *osd_req = obj_request->osd_req;
1530         u64 snap_id;
1531
1532         rbd_assert(osd_req != NULL);
1533
1534         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1535         ceph_osdc_build_request(osd_req, obj_request->offset,
1536                         NULL, snap_id, NULL);
1537 }
1538
1539 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1540 {
1541         struct rbd_img_request *img_request = obj_request->img_request;
1542         struct ceph_osd_request *osd_req = obj_request->osd_req;
1543         struct ceph_snap_context *snapc;
1544         struct timespec mtime = CURRENT_TIME;
1545
1546         rbd_assert(osd_req != NULL);
1547
1548         snapc = img_request ? img_request->snapc : NULL;
1549         ceph_osdc_build_request(osd_req, obj_request->offset,
1550                         snapc, CEPH_NOSNAP, &mtime);
1551 }
1552
1553 static struct ceph_osd_request *rbd_osd_req_create(
1554                                         struct rbd_device *rbd_dev,
1555                                         bool write_request,
1556                                         struct rbd_obj_request *obj_request)
1557 {
1558         struct ceph_snap_context *snapc = NULL;
1559         struct ceph_osd_client *osdc;
1560         struct ceph_osd_request *osd_req;
1561
1562         if (obj_request_img_data_test(obj_request)) {
1563                 struct rbd_img_request *img_request = obj_request->img_request;
1564
1565                 rbd_assert(write_request ==
1566                                 img_request_write_test(img_request));
1567                 if (write_request)
1568                         snapc = img_request->snapc;
1569         }
1570
1571         /* Allocate and initialize the request, for the single op */
1572
1573         osdc = &rbd_dev->rbd_client->client->osdc;
1574         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1575         if (!osd_req)
1576                 return NULL;    /* ENOMEM */
1577
1578         if (write_request)
1579                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1580         else
1581                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1582
1583         osd_req->r_callback = rbd_osd_req_callback;
1584         osd_req->r_priv = obj_request;
1585
1586         osd_req->r_oid_len = strlen(obj_request->object_name);
1587         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1588         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1589
1590         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1591
1592         return osd_req;
1593 }
1594
1595 /*
1596  * Create a copyup osd request based on the information in the
1597  * object request supplied.  A copyup request has two osd ops,
1598  * a copyup method call, and a "normal" write request.
1599  */
1600 static struct ceph_osd_request *
1601 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1602 {
1603         struct rbd_img_request *img_request;
1604         struct ceph_snap_context *snapc;
1605         struct rbd_device *rbd_dev;
1606         struct ceph_osd_client *osdc;
1607         struct ceph_osd_request *osd_req;
1608
1609         rbd_assert(obj_request_img_data_test(obj_request));
1610         img_request = obj_request->img_request;
1611         rbd_assert(img_request);
1612         rbd_assert(img_request_write_test(img_request));
1613
1614         /* Allocate and initialize the request, for the two ops */
1615
1616         snapc = img_request->snapc;
1617         rbd_dev = img_request->rbd_dev;
1618         osdc = &rbd_dev->rbd_client->client->osdc;
1619         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1620         if (!osd_req)
1621                 return NULL;    /* ENOMEM */
1622
1623         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1624         osd_req->r_callback = rbd_osd_req_callback;
1625         osd_req->r_priv = obj_request;
1626
1627         osd_req->r_oid_len = strlen(obj_request->object_name);
1628         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1629         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1630
1631         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1632
1633         return osd_req;
1634 }
1635
1636
1637 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1638 {
1639         ceph_osdc_put_request(osd_req);
1640 }
1641
1642 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1643
1644 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1645                                                 u64 offset, u64 length,
1646                                                 enum obj_request_type type)
1647 {
1648         struct rbd_obj_request *obj_request;
1649         size_t size;
1650         char *name;
1651
1652         rbd_assert(obj_request_type_valid(type));
1653
1654         size = strlen(object_name) + 1;
1655         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1656         if (!obj_request)
1657                 return NULL;
1658
1659         name = (char *)(obj_request + 1);
1660         obj_request->object_name = memcpy(name, object_name, size);
1661         obj_request->offset = offset;
1662         obj_request->length = length;
1663         obj_request->flags = 0;
1664         obj_request->which = BAD_WHICH;
1665         obj_request->type = type;
1666         INIT_LIST_HEAD(&obj_request->links);
1667         init_completion(&obj_request->completion);
1668         kref_init(&obj_request->kref);
1669
1670         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1671                 offset, length, (int)type, obj_request);
1672
1673         return obj_request;
1674 }
1675
1676 static void rbd_obj_request_destroy(struct kref *kref)
1677 {
1678         struct rbd_obj_request *obj_request;
1679
1680         obj_request = container_of(kref, struct rbd_obj_request, kref);
1681
1682         dout("%s: obj %p\n", __func__, obj_request);
1683
1684         rbd_assert(obj_request->img_request == NULL);
1685         rbd_assert(obj_request->which == BAD_WHICH);
1686
1687         if (obj_request->osd_req)
1688                 rbd_osd_req_destroy(obj_request->osd_req);
1689
1690         rbd_assert(obj_request_type_valid(obj_request->type));
1691         switch (obj_request->type) {
1692         case OBJ_REQUEST_NODATA:
1693                 break;          /* Nothing to do */
1694         case OBJ_REQUEST_BIO:
1695                 if (obj_request->bio_list)
1696                         bio_chain_put(obj_request->bio_list);
1697                 break;
1698         case OBJ_REQUEST_PAGES:
1699                 if (obj_request->pages)
1700                         ceph_release_page_vector(obj_request->pages,
1701                                                 obj_request->page_count);
1702                 break;
1703         }
1704
1705         kfree(obj_request);
1706 }
1707
1708 /*
1709  * Caller is responsible for filling in the list of object requests
1710  * that comprises the image request, and the Linux request pointer
1711  * (if there is one).
1712  */
1713 static struct rbd_img_request *rbd_img_request_create(
1714                                         struct rbd_device *rbd_dev,
1715                                         u64 offset, u64 length,
1716                                         bool write_request,
1717                                         bool child_request)
1718 {
1719         struct rbd_img_request *img_request;
1720
1721         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1722         if (!img_request)
1723                 return NULL;
1724
1725         if (write_request) {
1726                 down_read(&rbd_dev->header_rwsem);
1727                 ceph_get_snap_context(rbd_dev->header.snapc);
1728                 up_read(&rbd_dev->header_rwsem);
1729         }
1730
1731         img_request->rq = NULL;
1732         img_request->rbd_dev = rbd_dev;
1733         img_request->offset = offset;
1734         img_request->length = length;
1735         img_request->flags = 0;
1736         if (write_request) {
1737                 img_request_write_set(img_request);
1738                 img_request->snapc = rbd_dev->header.snapc;
1739         } else {
1740                 img_request->snap_id = rbd_dev->spec->snap_id;
1741         }
1742         if (child_request)
1743                 img_request_child_set(img_request);
1744         if (rbd_dev->parent_spec)
1745                 img_request_layered_set(img_request);
1746         spin_lock_init(&img_request->completion_lock);
1747         img_request->next_completion = 0;
1748         img_request->callback = NULL;
1749         img_request->result = 0;
1750         img_request->obj_request_count = 0;
1751         INIT_LIST_HEAD(&img_request->obj_requests);
1752         kref_init(&img_request->kref);
1753
1754         rbd_img_request_get(img_request);       /* Avoid a warning */
1755         rbd_img_request_put(img_request);       /* TEMPORARY */
1756
1757         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1758                 write_request ? "write" : "read", offset, length,
1759                 img_request);
1760
1761         return img_request;
1762 }
1763
1764 static void rbd_img_request_destroy(struct kref *kref)
1765 {
1766         struct rbd_img_request *img_request;
1767         struct rbd_obj_request *obj_request;
1768         struct rbd_obj_request *next_obj_request;
1769
1770         img_request = container_of(kref, struct rbd_img_request, kref);
1771
1772         dout("%s: img %p\n", __func__, img_request);
1773
1774         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1775                 rbd_img_obj_request_del(img_request, obj_request);
1776         rbd_assert(img_request->obj_request_count == 0);
1777
1778         if (img_request_write_test(img_request))
1779                 ceph_put_snap_context(img_request->snapc);
1780
1781         if (img_request_child_test(img_request))
1782                 rbd_obj_request_put(img_request->obj_request);
1783
1784         kfree(img_request);
1785 }
1786
1787 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1788 {
1789         struct rbd_img_request *img_request;
1790         unsigned int xferred;
1791         int result;
1792         bool more;
1793
1794         rbd_assert(obj_request_img_data_test(obj_request));
1795         img_request = obj_request->img_request;
1796
1797         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1798         xferred = (unsigned int)obj_request->xferred;
1799         result = obj_request->result;
1800         if (result) {
1801                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1802
1803                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1804                         img_request_write_test(img_request) ? "write" : "read",
1805                         obj_request->length, obj_request->img_offset,
1806                         obj_request->offset);
1807                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1808                         result, xferred);
1809                 if (!img_request->result)
1810                         img_request->result = result;
1811         }
1812
1813         /* Image object requests don't own their page array */
1814
1815         if (obj_request->type == OBJ_REQUEST_PAGES) {
1816                 obj_request->pages = NULL;
1817                 obj_request->page_count = 0;
1818         }
1819
1820         if (img_request_child_test(img_request)) {
1821                 rbd_assert(img_request->obj_request != NULL);
1822                 more = obj_request->which < img_request->obj_request_count - 1;
1823         } else {
1824                 rbd_assert(img_request->rq != NULL);
1825                 more = blk_end_request(img_request->rq, result, xferred);
1826         }
1827
1828         return more;
1829 }
1830
1831 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1832 {
1833         struct rbd_img_request *img_request;
1834         u32 which = obj_request->which;
1835         bool more = true;
1836
1837         rbd_assert(obj_request_img_data_test(obj_request));
1838         img_request = obj_request->img_request;
1839
1840         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1841         rbd_assert(img_request != NULL);
1842         rbd_assert(img_request->obj_request_count > 0);
1843         rbd_assert(which != BAD_WHICH);
1844         rbd_assert(which < img_request->obj_request_count);
1845         rbd_assert(which >= img_request->next_completion);
1846
1847         spin_lock_irq(&img_request->completion_lock);
1848         if (which != img_request->next_completion)
1849                 goto out;
1850
1851         for_each_obj_request_from(img_request, obj_request) {
1852                 rbd_assert(more);
1853                 rbd_assert(which < img_request->obj_request_count);
1854
1855                 if (!obj_request_done_test(obj_request))
1856                         break;
1857                 more = rbd_img_obj_end_request(obj_request);
1858                 which++;
1859         }
1860
1861         rbd_assert(more ^ (which == img_request->obj_request_count));
1862         img_request->next_completion = which;
1863 out:
1864         spin_unlock_irq(&img_request->completion_lock);
1865
1866         if (!more)
1867                 rbd_img_request_complete(img_request);
1868 }
1869
1870 /*
1871  * Split up an image request into one or more object requests, each
1872  * to a different object.  The "type" parameter indicates whether
1873  * "data_desc" is the pointer to the head of a list of bio
1874  * structures, or the base of a page array.  In either case this
1875  * function assumes data_desc describes memory sufficient to hold
1876  * all data described by the image request.
1877  */
1878 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1879                                         enum obj_request_type type,
1880                                         void *data_desc)
1881 {
1882         struct rbd_device *rbd_dev = img_request->rbd_dev;
1883         struct rbd_obj_request *obj_request = NULL;
1884         struct rbd_obj_request *next_obj_request;
1885         bool write_request = img_request_write_test(img_request);
1886         struct bio *bio_list;
1887         unsigned int bio_offset = 0;
1888         struct page **pages;
1889         u64 img_offset;
1890         u64 resid;
1891         u16 opcode;
1892
1893         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1894                 (int)type, data_desc);
1895
1896         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1897         img_offset = img_request->offset;
1898         resid = img_request->length;
1899         rbd_assert(resid > 0);
1900
1901         if (type == OBJ_REQUEST_BIO) {
1902                 bio_list = data_desc;
1903                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1904         } else {
1905                 rbd_assert(type == OBJ_REQUEST_PAGES);
1906                 pages = data_desc;
1907         }
1908
1909         while (resid) {
1910                 struct ceph_osd_request *osd_req;
1911                 const char *object_name;
1912                 u64 offset;
1913                 u64 length;
1914
1915                 object_name = rbd_segment_name(rbd_dev, img_offset);
1916                 if (!object_name)
1917                         goto out_unwind;
1918                 offset = rbd_segment_offset(rbd_dev, img_offset);
1919                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1920                 obj_request = rbd_obj_request_create(object_name,
1921                                                 offset, length, type);
1922                 kfree(object_name);     /* object request has its own copy */
1923                 if (!obj_request)
1924                         goto out_unwind;
1925
1926                 if (type == OBJ_REQUEST_BIO) {
1927                         unsigned int clone_size;
1928
1929                         rbd_assert(length <= (u64)UINT_MAX);
1930                         clone_size = (unsigned int)length;
1931                         obj_request->bio_list =
1932                                         bio_chain_clone_range(&bio_list,
1933                                                                 &bio_offset,
1934                                                                 clone_size,
1935                                                                 GFP_ATOMIC);
1936                         if (!obj_request->bio_list)
1937                                 goto out_partial;
1938                 } else {
1939                         unsigned int page_count;
1940
1941                         obj_request->pages = pages;
1942                         page_count = (u32)calc_pages_for(offset, length);
1943                         obj_request->page_count = page_count;
1944                         if ((offset + length) & ~PAGE_MASK)
1945                                 page_count--;   /* more on last page */
1946                         pages += page_count;
1947                 }
1948
1949                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1950                                                 obj_request);
1951                 if (!osd_req)
1952                         goto out_partial;
1953                 obj_request->osd_req = osd_req;
1954                 obj_request->callback = rbd_img_obj_callback;
1955
1956                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1957                                                 0, 0);
1958                 if (type == OBJ_REQUEST_BIO)
1959                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1960                                         obj_request->bio_list, length);
1961                 else
1962                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1963                                         obj_request->pages, length,
1964                                         offset & ~PAGE_MASK, false, false);
1965
1966                 if (write_request)
1967                         rbd_osd_req_format_write(obj_request);
1968                 else
1969                         rbd_osd_req_format_read(obj_request);
1970
1971                 obj_request->img_offset = img_offset;
1972                 rbd_img_obj_request_add(img_request, obj_request);
1973
1974                 img_offset += length;
1975                 resid -= length;
1976         }
1977
1978         return 0;
1979
1980 out_partial:
1981         rbd_obj_request_put(obj_request);
1982 out_unwind:
1983         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1984                 rbd_obj_request_put(obj_request);
1985
1986         return -ENOMEM;
1987 }
1988
1989 static void
1990 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1991 {
1992         struct rbd_img_request *img_request;
1993         struct rbd_device *rbd_dev;
1994         u64 length;
1995         u32 page_count;
1996
1997         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1998         rbd_assert(obj_request_img_data_test(obj_request));
1999         img_request = obj_request->img_request;
2000         rbd_assert(img_request);
2001
2002         rbd_dev = img_request->rbd_dev;
2003         rbd_assert(rbd_dev);
2004         length = (u64)1 << rbd_dev->header.obj_order;
2005         page_count = (u32)calc_pages_for(0, length);
2006
2007         rbd_assert(obj_request->copyup_pages);
2008         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2009         obj_request->copyup_pages = NULL;
2010
2011         /*
2012          * We want the transfer count to reflect the size of the
2013          * original write request.  There is no such thing as a
2014          * successful short write, so if the request was successful
2015          * we can just set it to the originally-requested length.
2016          */
2017         if (!obj_request->result)
2018                 obj_request->xferred = obj_request->length;
2019
2020         /* Finish up with the normal image object callback */
2021
2022         rbd_img_obj_callback(obj_request);
2023 }
2024
2025 static void
2026 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2027 {
2028         struct rbd_obj_request *orig_request;
2029         struct ceph_osd_request *osd_req;
2030         struct ceph_osd_client *osdc;
2031         struct rbd_device *rbd_dev;
2032         struct page **pages;
2033         int result;
2034         u64 obj_size;
2035         u64 xferred;
2036
2037         rbd_assert(img_request_child_test(img_request));
2038
2039         /* First get what we need from the image request */
2040
2041         pages = img_request->copyup_pages;
2042         rbd_assert(pages != NULL);
2043         img_request->copyup_pages = NULL;
2044
2045         orig_request = img_request->obj_request;
2046         rbd_assert(orig_request != NULL);
2047         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2048         result = img_request->result;
2049         obj_size = img_request->length;
2050         xferred = img_request->xferred;
2051
2052         rbd_dev = img_request->rbd_dev;
2053         rbd_assert(rbd_dev);
2054         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2055
2056         rbd_img_request_put(img_request);
2057
2058         if (result)
2059                 goto out_err;
2060
2061         /* Allocate the new copyup osd request for the original request */
2062
2063         result = -ENOMEM;
2064         rbd_assert(!orig_request->osd_req);
2065         osd_req = rbd_osd_req_create_copyup(orig_request);
2066         if (!osd_req)
2067                 goto out_err;
2068         orig_request->osd_req = osd_req;
2069         orig_request->copyup_pages = pages;
2070
2071         /* Initialize the copyup op */
2072
2073         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2074         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2075                                                 false, false);
2076
2077         /* Then the original write request op */
2078
2079         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2080                                         orig_request->offset,
2081                                         orig_request->length, 0, 0);
2082         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2083                                         orig_request->length);
2084
2085         rbd_osd_req_format_write(orig_request);
2086
2087         /* All set, send it off. */
2088
2089         orig_request->callback = rbd_img_obj_copyup_callback;
2090         osdc = &rbd_dev->rbd_client->client->osdc;
2091         result = rbd_obj_request_submit(osdc, orig_request);
2092         if (!result)
2093                 return;
2094 out_err:
2095         /* Record the error code and complete the request */
2096
2097         orig_request->result = result;
2098         orig_request->xferred = 0;
2099         obj_request_done_set(orig_request);
2100         rbd_obj_request_complete(orig_request);
2101 }
2102
2103 /*
2104  * Read from the parent image the range of data that covers the
2105  * entire target of the given object request.  This is used for
2106  * satisfying a layered image write request when the target of an
2107  * object request from the image request does not exist.
2108  *
2109  * A page array big enough to hold the returned data is allocated
2110  * and supplied to rbd_img_request_fill() as the "data descriptor."
2111  * When the read completes, this page array will be transferred to
2112  * the original object request for the copyup operation.
2113  *
2114  * If an error occurs, record it as the result of the original
2115  * object request and mark it done so it gets completed.
2116  */
2117 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2118 {
2119         struct rbd_img_request *img_request = NULL;
2120         struct rbd_img_request *parent_request = NULL;
2121         struct rbd_device *rbd_dev;
2122         u64 img_offset;
2123         u64 length;
2124         struct page **pages = NULL;
2125         u32 page_count;
2126         int result;
2127
2128         rbd_assert(obj_request_img_data_test(obj_request));
2129         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2130
2131         img_request = obj_request->img_request;
2132         rbd_assert(img_request != NULL);
2133         rbd_dev = img_request->rbd_dev;
2134         rbd_assert(rbd_dev->parent != NULL);
2135
2136         /*
2137          * First things first.  The original osd request is of no
2138          * use to use any more, we'll need a new one that can hold
2139          * the two ops in a copyup request.  We'll get that later,
2140          * but for now we can release the old one.
2141          */
2142         rbd_osd_req_destroy(obj_request->osd_req);
2143         obj_request->osd_req = NULL;
2144
2145         /*
2146          * Determine the byte range covered by the object in the
2147          * child image to which the original request was to be sent.
2148          */
2149         img_offset = obj_request->img_offset - obj_request->offset;
2150         length = (u64)1 << rbd_dev->header.obj_order;
2151
2152         /*
2153          * There is no defined parent data beyond the parent
2154          * overlap, so limit what we read at that boundary if
2155          * necessary.
2156          */
2157         if (img_offset + length > rbd_dev->parent_overlap) {
2158                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2159                 length = rbd_dev->parent_overlap - img_offset;
2160         }
2161
2162         /*
2163          * Allocate a page array big enough to receive the data read
2164          * from the parent.
2165          */
2166         page_count = (u32)calc_pages_for(0, length);
2167         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2168         if (IS_ERR(pages)) {
2169                 result = PTR_ERR(pages);
2170                 pages = NULL;
2171                 goto out_err;
2172         }
2173
2174         result = -ENOMEM;
2175         parent_request = rbd_img_request_create(rbd_dev->parent,
2176                                                 img_offset, length,
2177                                                 false, true);
2178         if (!parent_request)
2179                 goto out_err;
2180         rbd_obj_request_get(obj_request);
2181         parent_request->obj_request = obj_request;
2182
2183         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2184         if (result)
2185                 goto out_err;
2186         parent_request->copyup_pages = pages;
2187
2188         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2189         result = rbd_img_request_submit(parent_request);
2190         if (!result)
2191                 return 0;
2192
2193         parent_request->copyup_pages = NULL;
2194         parent_request->obj_request = NULL;
2195         rbd_obj_request_put(obj_request);
2196 out_err:
2197         if (pages)
2198                 ceph_release_page_vector(pages, page_count);
2199         if (parent_request)
2200                 rbd_img_request_put(parent_request);
2201         obj_request->result = result;
2202         obj_request->xferred = 0;
2203         obj_request_done_set(obj_request);
2204
2205         return result;
2206 }
2207
2208 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2209 {
2210         struct rbd_obj_request *orig_request;
2211         int result;
2212
2213         rbd_assert(!obj_request_img_data_test(obj_request));
2214
2215         /*
2216          * All we need from the object request is the original
2217          * request and the result of the STAT op.  Grab those, then
2218          * we're done with the request.
2219          */
2220         orig_request = obj_request->obj_request;
2221         obj_request->obj_request = NULL;
2222         rbd_assert(orig_request);
2223         rbd_assert(orig_request->img_request);
2224
2225         result = obj_request->result;
2226         obj_request->result = 0;
2227
2228         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2229                 obj_request, orig_request, result,
2230                 obj_request->xferred, obj_request->length);
2231         rbd_obj_request_put(obj_request);
2232
2233         rbd_assert(orig_request);
2234         rbd_assert(orig_request->img_request);
2235
2236         /*
2237          * Our only purpose here is to determine whether the object
2238          * exists, and we don't want to treat the non-existence as
2239          * an error.  If something else comes back, transfer the
2240          * error to the original request and complete it now.
2241          */
2242         if (!result) {
2243                 obj_request_existence_set(orig_request, true);
2244         } else if (result == -ENOENT) {
2245                 obj_request_existence_set(orig_request, false);
2246         } else if (result) {
2247                 orig_request->result = result;
2248                 goto out;
2249         }
2250
2251         /*
2252          * Resubmit the original request now that we have recorded
2253          * whether the target object exists.
2254          */
2255         orig_request->result = rbd_img_obj_request_submit(orig_request);
2256 out:
2257         if (orig_request->result)
2258                 rbd_obj_request_complete(orig_request);
2259         rbd_obj_request_put(orig_request);
2260 }
2261
2262 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2263 {
2264         struct rbd_obj_request *stat_request;
2265         struct rbd_device *rbd_dev;
2266         struct ceph_osd_client *osdc;
2267         struct page **pages = NULL;
2268         u32 page_count;
2269         size_t size;
2270         int ret;
2271
2272         /*
2273          * The response data for a STAT call consists of:
2274          *     le64 length;
2275          *     struct {
2276          *         le32 tv_sec;
2277          *         le32 tv_nsec;
2278          *     } mtime;
2279          */
2280         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2281         page_count = (u32)calc_pages_for(0, size);
2282         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2283         if (IS_ERR(pages))
2284                 return PTR_ERR(pages);
2285
2286         ret = -ENOMEM;
2287         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2288                                                         OBJ_REQUEST_PAGES);
2289         if (!stat_request)
2290                 goto out;
2291
2292         rbd_obj_request_get(obj_request);
2293         stat_request->obj_request = obj_request;
2294         stat_request->pages = pages;
2295         stat_request->page_count = page_count;
2296
2297         rbd_assert(obj_request->img_request);
2298         rbd_dev = obj_request->img_request->rbd_dev;
2299         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2300                                                 stat_request);
2301         if (!stat_request->osd_req)
2302                 goto out;
2303         stat_request->callback = rbd_img_obj_exists_callback;
2304
2305         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2306         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2307                                         false, false);
2308         rbd_osd_req_format_read(stat_request);
2309
2310         osdc = &rbd_dev->rbd_client->client->osdc;
2311         ret = rbd_obj_request_submit(osdc, stat_request);
2312 out:
2313         if (ret)
2314                 rbd_obj_request_put(obj_request);
2315
2316         return ret;
2317 }
2318
2319 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2320 {
2321         struct rbd_img_request *img_request;
2322         struct rbd_device *rbd_dev;
2323         bool known;
2324
2325         rbd_assert(obj_request_img_data_test(obj_request));
2326
2327         img_request = obj_request->img_request;
2328         rbd_assert(img_request);
2329         rbd_dev = img_request->rbd_dev;
2330
2331         /*
2332          * Only writes to layered images need special handling.
2333          * Reads and non-layered writes are simple object requests.
2334          * Layered writes that start beyond the end of the overlap
2335          * with the parent have no parent data, so they too are
2336          * simple object requests.  Finally, if the target object is
2337          * known to already exist, its parent data has already been
2338          * copied, so a write to the object can also be handled as a
2339          * simple object request.
2340          */
2341         if (!img_request_write_test(img_request) ||
2342                 !img_request_layered_test(img_request) ||
2343                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2344                 ((known = obj_request_known_test(obj_request)) &&
2345                         obj_request_exists_test(obj_request))) {
2346
2347                 struct rbd_device *rbd_dev;
2348                 struct ceph_osd_client *osdc;
2349
2350                 rbd_dev = obj_request->img_request->rbd_dev;
2351                 osdc = &rbd_dev->rbd_client->client->osdc;
2352
2353                 return rbd_obj_request_submit(osdc, obj_request);
2354         }
2355
2356         /*
2357          * It's a layered write.  The target object might exist but
2358          * we may not know that yet.  If we know it doesn't exist,
2359          * start by reading the data for the full target object from
2360          * the parent so we can use it for a copyup to the target.
2361          */
2362         if (known)
2363                 return rbd_img_obj_parent_read_full(obj_request);
2364
2365         /* We don't know whether the target exists.  Go find out. */
2366
2367         return rbd_img_obj_exists_submit(obj_request);
2368 }
2369
2370 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2371 {
2372         struct rbd_obj_request *obj_request;
2373         struct rbd_obj_request *next_obj_request;
2374
2375         dout("%s: img %p\n", __func__, img_request);
2376         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2377                 int ret;
2378
2379                 ret = rbd_img_obj_request_submit(obj_request);
2380                 if (ret)
2381                         return ret;
2382         }
2383
2384         return 0;
2385 }
2386
2387 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2388 {
2389         struct rbd_obj_request *obj_request;
2390         struct rbd_device *rbd_dev;
2391         u64 obj_end;
2392
2393         rbd_assert(img_request_child_test(img_request));
2394
2395         obj_request = img_request->obj_request;
2396         rbd_assert(obj_request);
2397         rbd_assert(obj_request->img_request);
2398
2399         obj_request->result = img_request->result;
2400         if (obj_request->result)
2401                 goto out;
2402
2403         /*
2404          * We need to zero anything beyond the parent overlap
2405          * boundary.  Since rbd_img_obj_request_read_callback()
2406          * will zero anything beyond the end of a short read, an
2407          * easy way to do this is to pretend the data from the
2408          * parent came up short--ending at the overlap boundary.
2409          */
2410         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2411         obj_end = obj_request->img_offset + obj_request->length;
2412         rbd_dev = obj_request->img_request->rbd_dev;
2413         if (obj_end > rbd_dev->parent_overlap) {
2414                 u64 xferred = 0;
2415
2416                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2417                         xferred = rbd_dev->parent_overlap -
2418                                         obj_request->img_offset;
2419
2420                 obj_request->xferred = min(img_request->xferred, xferred);
2421         } else {
2422                 obj_request->xferred = img_request->xferred;
2423         }
2424 out:
2425         rbd_img_obj_request_read_callback(obj_request);
2426         rbd_obj_request_complete(obj_request);
2427 }
2428
2429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2430 {
2431         struct rbd_device *rbd_dev;
2432         struct rbd_img_request *img_request;
2433         int result;
2434
2435         rbd_assert(obj_request_img_data_test(obj_request));
2436         rbd_assert(obj_request->img_request != NULL);
2437         rbd_assert(obj_request->result == (s32) -ENOENT);
2438         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2439
2440         rbd_dev = obj_request->img_request->rbd_dev;
2441         rbd_assert(rbd_dev->parent != NULL);
2442         /* rbd_read_finish(obj_request, obj_request->length); */
2443         img_request = rbd_img_request_create(rbd_dev->parent,
2444                                                 obj_request->img_offset,
2445                                                 obj_request->length,
2446                                                 false, true);
2447         result = -ENOMEM;
2448         if (!img_request)
2449                 goto out_err;
2450
2451         rbd_obj_request_get(obj_request);
2452         img_request->obj_request = obj_request;
2453
2454         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2455                                         obj_request->bio_list);
2456         if (result)
2457                 goto out_err;
2458
2459         img_request->callback = rbd_img_parent_read_callback;
2460         result = rbd_img_request_submit(img_request);
2461         if (result)
2462                 goto out_err;
2463
2464         return;
2465 out_err:
2466         if (img_request)
2467                 rbd_img_request_put(img_request);
2468         obj_request->result = result;
2469         obj_request->xferred = 0;
2470         obj_request_done_set(obj_request);
2471 }
2472
2473 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2474                                    u64 ver, u64 notify_id)
2475 {
2476         struct rbd_obj_request *obj_request;
2477         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2478         int ret;
2479
2480         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2481                                                         OBJ_REQUEST_NODATA);
2482         if (!obj_request)
2483                 return -ENOMEM;
2484
2485         ret = -ENOMEM;
2486         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2487         if (!obj_request->osd_req)
2488                 goto out;
2489         obj_request->callback = rbd_obj_request_put;
2490
2491         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2492                                         notify_id, ver, 0);
2493         rbd_osd_req_format_read(obj_request);
2494
2495         ret = rbd_obj_request_submit(osdc, obj_request);
2496 out:
2497         if (ret)
2498                 rbd_obj_request_put(obj_request);
2499
2500         return ret;
2501 }
2502
2503 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2504 {
2505         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2506         u64 hver;
2507
2508         if (!rbd_dev)
2509                 return;
2510
2511         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2512                 rbd_dev->header_name, (unsigned long long) notify_id,
2513                 (unsigned int) opcode);
2514         (void)rbd_dev_refresh(rbd_dev, &hver);
2515
2516         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2517 }
2518
2519 /*
2520  * Request sync osd watch/unwatch.  The value of "start" determines
2521  * whether a watch request is being initiated or torn down.
2522  */
2523 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2524 {
2525         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2526         struct rbd_obj_request *obj_request;
2527         int ret;
2528
2529         rbd_assert(start ^ !!rbd_dev->watch_event);
2530         rbd_assert(start ^ !!rbd_dev->watch_request);
2531
2532         if (start) {
2533                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2534                                                 &rbd_dev->watch_event);
2535                 if (ret < 0)
2536                         return ret;
2537                 rbd_assert(rbd_dev->watch_event != NULL);
2538         }
2539
2540         ret = -ENOMEM;
2541         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2542                                                         OBJ_REQUEST_NODATA);
2543         if (!obj_request)
2544                 goto out_cancel;
2545
2546         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2547         if (!obj_request->osd_req)
2548                 goto out_cancel;
2549
2550         if (start)
2551                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2552         else
2553                 ceph_osdc_unregister_linger_request(osdc,
2554                                         rbd_dev->watch_request->osd_req);
2555
2556         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2557                                 rbd_dev->watch_event->cookie,
2558                                 rbd_dev->header.obj_version, start);
2559         rbd_osd_req_format_write(obj_request);
2560
2561         ret = rbd_obj_request_submit(osdc, obj_request);
2562         if (ret)
2563                 goto out_cancel;
2564         ret = rbd_obj_request_wait(obj_request);
2565         if (ret)
2566                 goto out_cancel;
2567         ret = obj_request->result;
2568         if (ret)
2569                 goto out_cancel;
2570
2571         /*
2572          * A watch request is set to linger, so the underlying osd
2573          * request won't go away until we unregister it.  We retain
2574          * a pointer to the object request during that time (in
2575          * rbd_dev->watch_request), so we'll keep a reference to
2576          * it.  We'll drop that reference (below) after we've
2577          * unregistered it.
2578          */
2579         if (start) {
2580                 rbd_dev->watch_request = obj_request;
2581
2582                 return 0;
2583         }
2584
2585         /* We have successfully torn down the watch request */
2586
2587         rbd_obj_request_put(rbd_dev->watch_request);
2588         rbd_dev->watch_request = NULL;
2589 out_cancel:
2590         /* Cancel the event if we're tearing down, or on error */
2591         ceph_osdc_cancel_event(rbd_dev->watch_event);
2592         rbd_dev->watch_event = NULL;
2593         if (obj_request)
2594                 rbd_obj_request_put(obj_request);
2595
2596         return ret;
2597 }
2598
2599 /*
2600  * Synchronous osd object method call.  Returns the number of bytes
2601  * returned in the outbound buffer, or a negative error code.
2602  */
2603 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2604                              const char *object_name,
2605                              const char *class_name,
2606                              const char *method_name,
2607                              const void *outbound,
2608                              size_t outbound_size,
2609                              void *inbound,
2610                              size_t inbound_size,
2611                              u64 *version)
2612 {
2613         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2614         struct rbd_obj_request *obj_request;
2615         struct page **pages;
2616         u32 page_count;
2617         int ret;
2618
2619         /*
2620          * Method calls are ultimately read operations.  The result
2621          * should placed into the inbound buffer provided.  They
2622          * also supply outbound data--parameters for the object
2623          * method.  Currently if this is present it will be a
2624          * snapshot id.
2625          */
2626         page_count = (u32)calc_pages_for(0, inbound_size);
2627         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2628         if (IS_ERR(pages))
2629                 return PTR_ERR(pages);
2630
2631         ret = -ENOMEM;
2632         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2633                                                         OBJ_REQUEST_PAGES);
2634         if (!obj_request)
2635                 goto out;
2636
2637         obj_request->pages = pages;
2638         obj_request->page_count = page_count;
2639
2640         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2641         if (!obj_request->osd_req)
2642                 goto out;
2643
2644         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2645                                         class_name, method_name);
2646         if (outbound_size) {
2647                 struct ceph_pagelist *pagelist;
2648
2649                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2650                 if (!pagelist)
2651                         goto out;
2652
2653                 ceph_pagelist_init(pagelist);
2654                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2655                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2656                                                 pagelist);
2657         }
2658         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2659                                         obj_request->pages, inbound_size,
2660                                         0, false, false);
2661         rbd_osd_req_format_read(obj_request);
2662
2663         ret = rbd_obj_request_submit(osdc, obj_request);
2664         if (ret)
2665                 goto out;
2666         ret = rbd_obj_request_wait(obj_request);
2667         if (ret)
2668                 goto out;
2669
2670         ret = obj_request->result;
2671         if (ret < 0)
2672                 goto out;
2673
2674         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2675         ret = (int)obj_request->xferred;
2676         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2677         if (version)
2678                 *version = obj_request->version;
2679 out:
2680         if (obj_request)
2681                 rbd_obj_request_put(obj_request);
2682         else
2683                 ceph_release_page_vector(pages, page_count);
2684
2685         return ret;
2686 }
2687
2688 static void rbd_request_fn(struct request_queue *q)
2689                 __releases(q->queue_lock) __acquires(q->queue_lock)
2690 {
2691         struct rbd_device *rbd_dev = q->queuedata;
2692         bool read_only = rbd_dev->mapping.read_only;
2693         struct request *rq;
2694         int result;
2695
2696         while ((rq = blk_fetch_request(q))) {
2697                 bool write_request = rq_data_dir(rq) == WRITE;
2698                 struct rbd_img_request *img_request;
2699                 u64 offset;
2700                 u64 length;
2701
2702                 /* Ignore any non-FS requests that filter through. */
2703
2704                 if (rq->cmd_type != REQ_TYPE_FS) {
2705                         dout("%s: non-fs request type %d\n", __func__,
2706                                 (int) rq->cmd_type);
2707                         __blk_end_request_all(rq, 0);
2708                         continue;
2709                 }
2710
2711                 /* Ignore/skip any zero-length requests */
2712
2713                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2714                 length = (u64) blk_rq_bytes(rq);
2715
2716                 if (!length) {
2717                         dout("%s: zero-length request\n", __func__);
2718                         __blk_end_request_all(rq, 0);
2719                         continue;
2720                 }
2721
2722                 spin_unlock_irq(q->queue_lock);
2723
2724                 /* Disallow writes to a read-only device */
2725
2726                 if (write_request) {
2727                         result = -EROFS;
2728                         if (read_only)
2729                                 goto end_request;
2730                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2731                 }
2732
2733                 /*
2734                  * Quit early if the mapped snapshot no longer
2735                  * exists.  It's still possible the snapshot will
2736                  * have disappeared by the time our request arrives
2737                  * at the osd, but there's no sense in sending it if
2738                  * we already know.
2739                  */
2740                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2741                         dout("request for non-existent snapshot");
2742                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2743                         result = -ENXIO;
2744                         goto end_request;
2745                 }
2746
2747                 result = -EINVAL;
2748                 if (offset && length > U64_MAX - offset + 1) {
2749                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2750                                 offset, length);
2751                         goto end_request;       /* Shouldn't happen */
2752                 }
2753
2754                 result = -ENOMEM;
2755                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2756                                                         write_request, false);
2757                 if (!img_request)
2758                         goto end_request;
2759
2760                 img_request->rq = rq;
2761
2762                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2763                                                 rq->bio);
2764                 if (!result)
2765                         result = rbd_img_request_submit(img_request);
2766                 if (result)
2767                         rbd_img_request_put(img_request);
2768 end_request:
2769                 spin_lock_irq(q->queue_lock);
2770                 if (result < 0) {
2771                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2772                                 write_request ? "write" : "read",
2773                                 length, offset, result);
2774
2775                         __blk_end_request_all(rq, result);
2776                 }
2777         }
2778 }
2779
2780 /*
2781  * a queue callback. Makes sure that we don't create a bio that spans across
2782  * multiple osd objects. One exception would be with a single page bios,
2783  * which we handle later at bio_chain_clone_range()
2784  */
2785 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2786                           struct bio_vec *bvec)
2787 {
2788         struct rbd_device *rbd_dev = q->queuedata;
2789         sector_t sector_offset;
2790         sector_t sectors_per_obj;
2791         sector_t obj_sector_offset;
2792         int ret;
2793
2794         /*
2795          * Find how far into its rbd object the partition-relative
2796          * bio start sector is to offset relative to the enclosing
2797          * device.
2798          */
2799         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2800         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2801         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2802
2803         /*
2804          * Compute the number of bytes from that offset to the end
2805          * of the object.  Account for what's already used by the bio.
2806          */
2807         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2808         if (ret > bmd->bi_size)
2809                 ret -= bmd->bi_size;
2810         else
2811                 ret = 0;
2812
2813         /*
2814          * Don't send back more than was asked for.  And if the bio
2815          * was empty, let the whole thing through because:  "Note
2816          * that a block device *must* allow a single page to be
2817          * added to an empty bio."
2818          */
2819         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2820         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2821                 ret = (int) bvec->bv_len;
2822
2823         return ret;
2824 }
2825
2826 static void rbd_free_disk(struct rbd_device *rbd_dev)
2827 {
2828         struct gendisk *disk = rbd_dev->disk;
2829
2830         if (!disk)
2831                 return;
2832
2833         rbd_dev->disk = NULL;
2834         if (disk->flags & GENHD_FL_UP) {
2835                 del_gendisk(disk);
2836                 if (disk->queue)
2837                         blk_cleanup_queue(disk->queue);
2838         }
2839         put_disk(disk);
2840 }
2841
2842 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2843                                 const char *object_name,
2844                                 u64 offset, u64 length,
2845                                 void *buf, u64 *version)
2846
2847 {
2848         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2849         struct rbd_obj_request *obj_request;
2850         struct page **pages = NULL;
2851         u32 page_count;
2852         size_t size;
2853         int ret;
2854
2855         page_count = (u32) calc_pages_for(offset, length);
2856         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2857         if (IS_ERR(pages))
2858                 ret = PTR_ERR(pages);
2859
2860         ret = -ENOMEM;
2861         obj_request = rbd_obj_request_create(object_name, offset, length,
2862                                                         OBJ_REQUEST_PAGES);
2863         if (!obj_request)
2864                 goto out;
2865
2866         obj_request->pages = pages;
2867         obj_request->page_count = page_count;
2868
2869         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2870         if (!obj_request->osd_req)
2871                 goto out;
2872
2873         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2874                                         offset, length, 0, 0);
2875         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2876                                         obj_request->pages,
2877                                         obj_request->length,
2878                                         obj_request->offset & ~PAGE_MASK,
2879                                         false, false);
2880         rbd_osd_req_format_read(obj_request);
2881
2882         ret = rbd_obj_request_submit(osdc, obj_request);
2883         if (ret)
2884                 goto out;
2885         ret = rbd_obj_request_wait(obj_request);
2886         if (ret)
2887                 goto out;
2888
2889         ret = obj_request->result;
2890         if (ret < 0)
2891                 goto out;
2892
2893         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2894         size = (size_t) obj_request->xferred;
2895         ceph_copy_from_page_vector(pages, buf, 0, size);
2896         rbd_assert(size <= (size_t) INT_MAX);
2897         ret = (int) size;
2898         if (version)
2899                 *version = obj_request->version;
2900 out:
2901         if (obj_request)
2902                 rbd_obj_request_put(obj_request);
2903         else
2904                 ceph_release_page_vector(pages, page_count);
2905
2906         return ret;
2907 }
2908
2909 /*
2910  * Read the complete header for the given rbd device.
2911  *
2912  * Returns a pointer to a dynamically-allocated buffer containing
2913  * the complete and validated header.  Caller can pass the address
2914  * of a variable that will be filled in with the version of the
2915  * header object at the time it was read.
2916  *
2917  * Returns a pointer-coded errno if a failure occurs.
2918  */
2919 static struct rbd_image_header_ondisk *
2920 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2921 {
2922         struct rbd_image_header_ondisk *ondisk = NULL;
2923         u32 snap_count = 0;
2924         u64 names_size = 0;
2925         u32 want_count;
2926         int ret;
2927
2928         /*
2929          * The complete header will include an array of its 64-bit
2930          * snapshot ids, followed by the names of those snapshots as
2931          * a contiguous block of NUL-terminated strings.  Note that
2932          * the number of snapshots could change by the time we read
2933          * it in, in which case we re-read it.
2934          */
2935         do {
2936                 size_t size;
2937
2938                 kfree(ondisk);
2939
2940                 size = sizeof (*ondisk);
2941                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2942                 size += names_size;
2943                 ondisk = kmalloc(size, GFP_KERNEL);
2944                 if (!ondisk)
2945                         return ERR_PTR(-ENOMEM);
2946
2947                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2948                                        0, size, ondisk, version);
2949                 if (ret < 0)
2950                         goto out_err;
2951                 if ((size_t)ret < size) {
2952                         ret = -ENXIO;
2953                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2954                                 size, ret);
2955                         goto out_err;
2956                 }
2957                 if (!rbd_dev_ondisk_valid(ondisk)) {
2958                         ret = -ENXIO;
2959                         rbd_warn(rbd_dev, "invalid header");
2960                         goto out_err;
2961                 }
2962
2963                 names_size = le64_to_cpu(ondisk->snap_names_len);
2964                 want_count = snap_count;
2965                 snap_count = le32_to_cpu(ondisk->snap_count);
2966         } while (snap_count != want_count);
2967
2968         return ondisk;
2969
2970 out_err:
2971         kfree(ondisk);
2972
2973         return ERR_PTR(ret);
2974 }
2975
2976 /*
2977  * reload the ondisk the header
2978  */
2979 static int rbd_read_header(struct rbd_device *rbd_dev,
2980                            struct rbd_image_header *header)
2981 {
2982         struct rbd_image_header_ondisk *ondisk;
2983         u64 ver = 0;
2984         int ret;
2985
2986         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2987         if (IS_ERR(ondisk))
2988                 return PTR_ERR(ondisk);
2989         ret = rbd_header_from_disk(header, ondisk);
2990         if (ret >= 0)
2991                 header->obj_version = ver;
2992         kfree(ondisk);
2993
2994         return ret;
2995 }
2996
2997 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2998 {
2999         struct rbd_snap *snap;
3000         struct rbd_snap *next;
3001
3002         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3003                 list_del(&snap->node);
3004                 rbd_snap_destroy(snap);
3005         }
3006 }
3007
3008 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3009 {
3010         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3011                 return;
3012
3013         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3014                 sector_t size;
3015
3016                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3017                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3018                 dout("setting size to %llu sectors", (unsigned long long)size);
3019                 set_capacity(rbd_dev->disk, size);
3020         }
3021 }
3022
3023 /*
3024  * only read the first part of the ondisk header, without the snaps info
3025  */
3026 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3027 {
3028         int ret;
3029         struct rbd_image_header h;
3030
3031         ret = rbd_read_header(rbd_dev, &h);
3032         if (ret < 0)
3033                 return ret;
3034
3035         down_write(&rbd_dev->header_rwsem);
3036
3037         /* Update image size, and check for resize of mapped image */
3038         rbd_dev->header.image_size = h.image_size;
3039         rbd_update_mapping_size(rbd_dev);
3040
3041         /* rbd_dev->header.object_prefix shouldn't change */
3042         kfree(rbd_dev->header.snap_sizes);
3043         kfree(rbd_dev->header.snap_names);
3044         /* osd requests may still refer to snapc */
3045         ceph_put_snap_context(rbd_dev->header.snapc);
3046
3047         if (hver)
3048                 *hver = h.obj_version;
3049         rbd_dev->header.obj_version = h.obj_version;
3050         rbd_dev->header.image_size = h.image_size;
3051         rbd_dev->header.snapc = h.snapc;
3052         rbd_dev->header.snap_names = h.snap_names;
3053         rbd_dev->header.snap_sizes = h.snap_sizes;
3054         /* Free the extra copy of the object prefix */
3055         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3056                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3057         kfree(h.object_prefix);
3058
3059         ret = rbd_dev_snaps_update(rbd_dev);
3060
3061         up_write(&rbd_dev->header_rwsem);
3062
3063         return ret;
3064 }
3065
3066 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3067 {
3068         u64 image_size;
3069         int ret;
3070
3071         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3072         image_size = rbd_dev->header.image_size;
3073         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3074         if (rbd_dev->image_format == 1)
3075                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3076         else
3077                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3078         mutex_unlock(&ctl_mutex);
3079         if (ret)
3080                 rbd_warn(rbd_dev, "got notification but failed to "
3081                            " update snaps: %d\n", ret);
3082         if (image_size != rbd_dev->header.image_size)
3083                 revalidate_disk(rbd_dev->disk);
3084
3085         return ret;
3086 }
3087
3088 static int rbd_init_disk(struct rbd_device *rbd_dev)
3089 {
3090         struct gendisk *disk;
3091         struct request_queue *q;
3092         u64 segment_size;
3093
3094         /* create gendisk info */
3095         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3096         if (!disk)
3097                 return -ENOMEM;
3098
3099         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3100                  rbd_dev->dev_id);
3101         disk->major = rbd_dev->major;
3102         disk->first_minor = 0;
3103         disk->fops = &rbd_bd_ops;
3104         disk->private_data = rbd_dev;
3105
3106         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3107         if (!q)
3108                 goto out_disk;
3109
3110         /* We use the default size, but let's be explicit about it. */
3111         blk_queue_physical_block_size(q, SECTOR_SIZE);
3112
3113         /* set io sizes to object size */
3114         segment_size = rbd_obj_bytes(&rbd_dev->header);
3115         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3116         blk_queue_max_segment_size(q, segment_size);
3117         blk_queue_io_min(q, segment_size);
3118         blk_queue_io_opt(q, segment_size);
3119
3120         blk_queue_merge_bvec(q, rbd_merge_bvec);
3121         disk->queue = q;
3122
3123         q->queuedata = rbd_dev;
3124
3125         rbd_dev->disk = disk;
3126
3127         return 0;
3128 out_disk:
3129         put_disk(disk);
3130
3131         return -ENOMEM;
3132 }
3133
3134 /*
3135   sysfs
3136 */
3137
3138 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3139 {
3140         return container_of(dev, struct rbd_device, dev);
3141 }
3142
3143 static ssize_t rbd_size_show(struct device *dev,
3144                              struct device_attribute *attr, char *buf)
3145 {
3146         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3147
3148         return sprintf(buf, "%llu\n",
3149                 (unsigned long long)rbd_dev->mapping.size);
3150 }
3151
3152 /*
3153  * Note this shows the features for whatever's mapped, which is not
3154  * necessarily the base image.
3155  */
3156 static ssize_t rbd_features_show(struct device *dev,
3157                              struct device_attribute *attr, char *buf)
3158 {
3159         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3160
3161         return sprintf(buf, "0x%016llx\n",
3162                         (unsigned long long)rbd_dev->mapping.features);
3163 }
3164
3165 static ssize_t rbd_major_show(struct device *dev,
3166                               struct device_attribute *attr, char *buf)
3167 {
3168         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3169
3170         if (rbd_dev->major)
3171                 return sprintf(buf, "%d\n", rbd_dev->major);
3172
3173         return sprintf(buf, "(none)\n");
3174
3175 }
3176
3177 static ssize_t rbd_client_id_show(struct device *dev,
3178                                   struct device_attribute *attr, char *buf)
3179 {
3180         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3181
3182         return sprintf(buf, "client%lld\n",
3183                         ceph_client_id(rbd_dev->rbd_client->client));
3184 }
3185
3186 static ssize_t rbd_pool_show(struct device *dev,
3187                              struct device_attribute *attr, char *buf)
3188 {
3189         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3190
3191         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3192 }
3193
3194 static ssize_t rbd_pool_id_show(struct device *dev,
3195                              struct device_attribute *attr, char *buf)
3196 {
3197         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3198
3199         return sprintf(buf, "%llu\n",
3200                         (unsigned long long) rbd_dev->spec->pool_id);
3201 }
3202
3203 static ssize_t rbd_name_show(struct device *dev,
3204                              struct device_attribute *attr, char *buf)
3205 {
3206         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3207
3208         if (rbd_dev->spec->image_name)
3209                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3210
3211         return sprintf(buf, "(unknown)\n");
3212 }
3213
3214 static ssize_t rbd_image_id_show(struct device *dev,
3215                              struct device_attribute *attr, char *buf)
3216 {
3217         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3218
3219         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3220 }
3221
3222 /*
3223  * Shows the name of the currently-mapped snapshot (or
3224  * RBD_SNAP_HEAD_NAME for the base image).
3225  */
3226 static ssize_t rbd_snap_show(struct device *dev,
3227                              struct device_attribute *attr,
3228                              char *buf)
3229 {
3230         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3231
3232         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3233 }
3234
3235 /*
3236  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3237  * for the parent image.  If there is no parent, simply shows
3238  * "(no parent image)".
3239  */
3240 static ssize_t rbd_parent_show(struct device *dev,
3241                              struct device_attribute *attr,
3242                              char *buf)
3243 {
3244         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3245         struct rbd_spec *spec = rbd_dev->parent_spec;
3246         int count;
3247         char *bufp = buf;
3248
3249         if (!spec)
3250                 return sprintf(buf, "(no parent image)\n");
3251
3252         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3253                         (unsigned long long) spec->pool_id, spec->pool_name);
3254         if (count < 0)
3255                 return count;
3256         bufp += count;
3257
3258         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3259                         spec->image_name ? spec->image_name : "(unknown)");
3260         if (count < 0)
3261                 return count;
3262         bufp += count;
3263
3264         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3265                         (unsigned long long) spec->snap_id, spec->snap_name);
3266         if (count < 0)
3267                 return count;
3268         bufp += count;
3269
3270         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3271         if (count < 0)
3272                 return count;
3273         bufp += count;
3274
3275         return (ssize_t) (bufp - buf);
3276 }
3277
3278 static ssize_t rbd_image_refresh(struct device *dev,
3279                                  struct device_attribute *attr,
3280                                  const char *buf,
3281                                  size_t size)
3282 {
3283         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3284         int ret;
3285
3286         ret = rbd_dev_refresh(rbd_dev, NULL);
3287
3288         return ret < 0 ? ret : size;
3289 }
3290
3291 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3292 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3293 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3294 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3295 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3296 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3297 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3298 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3299 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3300 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3301 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3302
3303 static struct attribute *rbd_attrs[] = {
3304         &dev_attr_size.attr,
3305         &dev_attr_features.attr,
3306         &dev_attr_major.attr,
3307         &dev_attr_client_id.attr,
3308         &dev_attr_pool.attr,
3309         &dev_attr_pool_id.attr,
3310         &dev_attr_name.attr,
3311         &dev_attr_image_id.attr,
3312         &dev_attr_current_snap.attr,
3313         &dev_attr_parent.attr,
3314         &dev_attr_refresh.attr,
3315         NULL
3316 };
3317
3318 static struct attribute_group rbd_attr_group = {
3319         .attrs = rbd_attrs,
3320 };
3321
3322 static const struct attribute_group *rbd_attr_groups[] = {
3323         &rbd_attr_group,
3324         NULL
3325 };
3326
3327 static void rbd_sysfs_dev_release(struct device *dev)
3328 {
3329 }
3330
3331 static struct device_type rbd_device_type = {
3332         .name           = "rbd",
3333         .groups         = rbd_attr_groups,
3334         .release        = rbd_sysfs_dev_release,
3335 };
3336
3337 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3338 {
3339         kref_get(&spec->kref);
3340
3341         return spec;
3342 }
3343
3344 static void rbd_spec_free(struct kref *kref);
3345 static void rbd_spec_put(struct rbd_spec *spec)
3346 {
3347         if (spec)
3348                 kref_put(&spec->kref, rbd_spec_free);
3349 }
3350
3351 static struct rbd_spec *rbd_spec_alloc(void)
3352 {
3353         struct rbd_spec *spec;
3354
3355         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3356         if (!spec)
3357                 return NULL;
3358         kref_init(&spec->kref);
3359
3360         return spec;
3361 }
3362
3363 static void rbd_spec_free(struct kref *kref)
3364 {
3365         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3366
3367         kfree(spec->pool_name);
3368         kfree(spec->image_id);
3369         kfree(spec->image_name);
3370         kfree(spec->snap_name);
3371         kfree(spec);
3372 }
3373
3374 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3375                                 struct rbd_spec *spec)
3376 {
3377         struct rbd_device *rbd_dev;
3378
3379         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3380         if (!rbd_dev)
3381                 return NULL;
3382
3383         spin_lock_init(&rbd_dev->lock);
3384         rbd_dev->flags = 0;
3385         INIT_LIST_HEAD(&rbd_dev->node);
3386         INIT_LIST_HEAD(&rbd_dev->snaps);
3387         init_rwsem(&rbd_dev->header_rwsem);
3388
3389         rbd_dev->spec = spec;
3390         rbd_dev->rbd_client = rbdc;
3391
3392         /* Initialize the layout used for all rbd requests */
3393
3394         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3395         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3396         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3397         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3398
3399         return rbd_dev;
3400 }
3401
3402 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3403 {
3404         rbd_put_client(rbd_dev->rbd_client);
3405         rbd_spec_put(rbd_dev->spec);
3406         kfree(rbd_dev);
3407 }
3408
3409 static void rbd_snap_destroy(struct rbd_snap *snap)
3410 {
3411         kfree(snap->name);
3412         kfree(snap);
3413 }
3414
3415 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3416                                                 const char *snap_name,
3417                                                 u64 snap_id, u64 snap_size,
3418                                                 u64 snap_features)
3419 {
3420         struct rbd_snap *snap;
3421
3422         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3423         if (!snap)
3424                 return ERR_PTR(-ENOMEM);
3425
3426         snap->name = snap_name;
3427         snap->id = snap_id;
3428         snap->size = snap_size;
3429         snap->features = snap_features;
3430
3431         return snap;
3432 }
3433
3434 /*
3435  * Returns a dynamically-allocated snapshot name if successful, or a
3436  * pointer-coded error otherwise.
3437  */
3438 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3439                 u64 *snap_size, u64 *snap_features)
3440 {
3441         const char *snap_name;
3442         int i;
3443
3444         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3445
3446         /* Skip over names until we find the one we are looking for */
3447
3448         snap_name = rbd_dev->header.snap_names;
3449         for (i = 0; i < which; i++)
3450                 snap_name += strlen(snap_name) + 1;
3451
3452         snap_name = kstrdup(snap_name, GFP_KERNEL);
3453         if (!snap_name)
3454                 return ERR_PTR(-ENOMEM);
3455
3456         *snap_size = rbd_dev->header.snap_sizes[which];
3457         *snap_features = 0;     /* No features for v1 */
3458
3459         return snap_name;
3460 }
3461
3462 /*
3463  * Get the size and object order for an image snapshot, or if
3464  * snap_id is CEPH_NOSNAP, gets this information for the base
3465  * image.
3466  */
3467 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3468                                 u8 *order, u64 *snap_size)
3469 {
3470         __le64 snapid = cpu_to_le64(snap_id);
3471         int ret;
3472         struct {
3473                 u8 order;
3474                 __le64 size;
3475         } __attribute__ ((packed)) size_buf = { 0 };
3476
3477         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3478                                 "rbd", "get_size",
3479                                 &snapid, sizeof (snapid),
3480                                 &size_buf, sizeof (size_buf), NULL);
3481         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3482         if (ret < 0)
3483                 return ret;
3484         if (ret < sizeof (size_buf))
3485                 return -ERANGE;
3486
3487         if (order)
3488                 *order = size_buf.order;
3489         *snap_size = le64_to_cpu(size_buf.size);
3490
3491         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3492                 (unsigned long long)snap_id, (unsigned int)*order,
3493                 (unsigned long long)*snap_size);
3494
3495         return 0;
3496 }
3497
3498 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3499 {
3500         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3501                                         &rbd_dev->header.obj_order,
3502                                         &rbd_dev->header.image_size);
3503 }
3504
3505 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3506 {
3507         void *reply_buf;
3508         int ret;
3509         void *p;
3510
3511         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3512         if (!reply_buf)
3513                 return -ENOMEM;
3514
3515         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3516                                 "rbd", "get_object_prefix", NULL, 0,
3517                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3518         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3519         if (ret < 0)
3520                 goto out;
3521
3522         p = reply_buf;
3523         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3524                                                 p + ret, NULL, GFP_NOIO);
3525         ret = 0;
3526
3527         if (IS_ERR(rbd_dev->header.object_prefix)) {
3528                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3529                 rbd_dev->header.object_prefix = NULL;
3530         } else {
3531                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3532         }
3533 out:
3534         kfree(reply_buf);
3535
3536         return ret;
3537 }
3538
3539 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3540                 u64 *snap_features)
3541 {
3542         __le64 snapid = cpu_to_le64(snap_id);
3543         struct {
3544                 __le64 features;
3545                 __le64 incompat;
3546         } __attribute__ ((packed)) features_buf = { 0 };
3547         u64 incompat;
3548         int ret;
3549
3550         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3551                                 "rbd", "get_features",
3552                                 &snapid, sizeof (snapid),
3553                                 &features_buf, sizeof (features_buf), NULL);
3554         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3555         if (ret < 0)
3556                 return ret;
3557         if (ret < sizeof (features_buf))
3558                 return -ERANGE;
3559
3560         incompat = le64_to_cpu(features_buf.incompat);
3561         if (incompat & ~RBD_FEATURES_SUPPORTED)
3562                 return -ENXIO;
3563
3564         *snap_features = le64_to_cpu(features_buf.features);
3565
3566         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3567                 (unsigned long long)snap_id,
3568                 (unsigned long long)*snap_features,
3569                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3570
3571         return 0;
3572 }
3573
3574 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3575 {
3576         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3577                                                 &rbd_dev->header.features);
3578 }
3579
3580 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3581 {
3582         struct rbd_spec *parent_spec;
3583         size_t size;
3584         void *reply_buf = NULL;
3585         __le64 snapid;
3586         void *p;
3587         void *end;
3588         char *image_id;
3589         u64 overlap;
3590         int ret;
3591
3592         parent_spec = rbd_spec_alloc();
3593         if (!parent_spec)
3594                 return -ENOMEM;
3595
3596         size = sizeof (__le64) +                                /* pool_id */
3597                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3598                 sizeof (__le64) +                               /* snap_id */
3599                 sizeof (__le64);                                /* overlap */
3600         reply_buf = kmalloc(size, GFP_KERNEL);
3601         if (!reply_buf) {
3602                 ret = -ENOMEM;
3603                 goto out_err;
3604         }
3605
3606         snapid = cpu_to_le64(CEPH_NOSNAP);
3607         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3608                                 "rbd", "get_parent",
3609                                 &snapid, sizeof (snapid),
3610                                 reply_buf, size, NULL);
3611         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3612         if (ret < 0)
3613                 goto out_err;
3614
3615         p = reply_buf;
3616         end = reply_buf + ret;
3617         ret = -ERANGE;
3618         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3619         if (parent_spec->pool_id == CEPH_NOPOOL)
3620                 goto out;       /* No parent?  No problem. */
3621
3622         /* The ceph file layout needs to fit pool id in 32 bits */
3623
3624         ret = -EIO;
3625         if (parent_spec->pool_id > (u64)U32_MAX) {
3626                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3627                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3628                 goto out_err;
3629         }
3630
3631         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3632         if (IS_ERR(image_id)) {
3633                 ret = PTR_ERR(image_id);
3634                 goto out_err;
3635         }
3636         parent_spec->image_id = image_id;
3637         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3638         ceph_decode_64_safe(&p, end, overlap, out_err);
3639
3640         rbd_dev->parent_overlap = overlap;
3641         rbd_dev->parent_spec = parent_spec;
3642         parent_spec = NULL;     /* rbd_dev now owns this */
3643 out:
3644         ret = 0;
3645 out_err:
3646         kfree(reply_buf);
3647         rbd_spec_put(parent_spec);
3648
3649         return ret;
3650 }
3651
3652 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3653 {
3654         struct {
3655                 __le64 stripe_unit;
3656                 __le64 stripe_count;
3657         } __attribute__ ((packed)) striping_info_buf = { 0 };
3658         size_t size = sizeof (striping_info_buf);
3659         void *p;
3660         u64 obj_size;
3661         u64 stripe_unit;
3662         u64 stripe_count;
3663         int ret;
3664
3665         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3666                                 "rbd", "get_stripe_unit_count", NULL, 0,
3667                                 (char *)&striping_info_buf, size, NULL);
3668         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3669         if (ret < 0)
3670                 return ret;
3671         if (ret < size)
3672                 return -ERANGE;
3673
3674         /*
3675          * We don't actually support the "fancy striping" feature
3676          * (STRIPINGV2) yet, but if the striping sizes are the
3677          * defaults the behavior is the same as before.  So find
3678          * out, and only fail if the image has non-default values.
3679          */
3680         ret = -EINVAL;
3681         obj_size = (u64)1 << rbd_dev->header.obj_order;
3682         p = &striping_info_buf;
3683         stripe_unit = ceph_decode_64(&p);
3684         if (stripe_unit != obj_size) {
3685                 rbd_warn(rbd_dev, "unsupported stripe unit "
3686                                 "(got %llu want %llu)",
3687                                 stripe_unit, obj_size);
3688                 return -EINVAL;
3689         }
3690         stripe_count = ceph_decode_64(&p);
3691         if (stripe_count != 1) {
3692                 rbd_warn(rbd_dev, "unsupported stripe count "
3693                                 "(got %llu want 1)", stripe_count);
3694                 return -EINVAL;
3695         }
3696         rbd_dev->header.stripe_unit = stripe_unit;
3697         rbd_dev->header.stripe_count = stripe_count;
3698
3699         return 0;
3700 }
3701
3702 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3703 {
3704         size_t image_id_size;
3705         char *image_id;
3706         void *p;
3707         void *end;
3708         size_t size;
3709         void *reply_buf = NULL;
3710         size_t len = 0;
3711         char *image_name = NULL;
3712         int ret;
3713
3714         rbd_assert(!rbd_dev->spec->image_name);
3715
3716         len = strlen(rbd_dev->spec->image_id);
3717         image_id_size = sizeof (__le32) + len;
3718         image_id = kmalloc(image_id_size, GFP_KERNEL);
3719         if (!image_id)
3720                 return NULL;
3721
3722         p = image_id;
3723         end = image_id + image_id_size;
3724         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3725
3726         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3727         reply_buf = kmalloc(size, GFP_KERNEL);
3728         if (!reply_buf)
3729                 goto out;
3730
3731         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3732                                 "rbd", "dir_get_name",
3733                                 image_id, image_id_size,
3734                                 reply_buf, size, NULL);
3735         if (ret < 0)
3736                 goto out;
3737         p = reply_buf;
3738         end = reply_buf + ret;
3739
3740         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3741         if (IS_ERR(image_name))
3742                 image_name = NULL;
3743         else
3744                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3745 out:
3746         kfree(reply_buf);
3747         kfree(image_id);
3748
3749         return image_name;
3750 }
3751
3752 /*
3753  * When an rbd image has a parent image, it is identified by the
3754  * pool, image, and snapshot ids (not names).  This function fills
3755  * in the names for those ids.  (It's OK if we can't figure out the
3756  * name for an image id, but the pool and snapshot ids should always
3757  * exist and have names.)  All names in an rbd spec are dynamically
3758  * allocated.
3759  *
3760  * When an image being mapped (not a parent) is probed, we have the
3761  * pool name and pool id, image name and image id, and the snapshot
3762  * name.  The only thing we're missing is the snapshot id.
3763  *
3764  * The set of snapshots for an image is not known until they have
3765  * been read by rbd_dev_snaps_update(), so we can't completely fill
3766  * in this information until after that has been called.
3767  */
3768 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3769 {
3770         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3771         struct rbd_spec *spec = rbd_dev->spec;
3772         const char *pool_name;
3773         const char *image_name;
3774         const char *snap_name;
3775         int ret;
3776
3777         /*
3778          * An image being mapped will have the pool name (etc.), but
3779          * we need to look up the snapshot id.
3780          */
3781         if (spec->pool_name) {
3782                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3783                         struct rbd_snap *snap;
3784
3785                         snap = snap_by_name(rbd_dev, spec->snap_name);
3786                         if (!snap)
3787                                 return -ENOENT;
3788                         spec->snap_id = snap->id;
3789                 } else {
3790                         spec->snap_id = CEPH_NOSNAP;
3791                 }
3792
3793                 return 0;
3794         }
3795
3796         /* Get the pool name; we have to make our own copy of this */
3797
3798         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3799         if (!pool_name) {
3800                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3801                 return -EIO;
3802         }
3803         pool_name = kstrdup(pool_name, GFP_KERNEL);
3804         if (!pool_name)
3805                 return -ENOMEM;
3806
3807         /* Fetch the image name; tolerate failure here */
3808
3809         image_name = rbd_dev_image_name(rbd_dev);
3810         if (!image_name)
3811                 rbd_warn(rbd_dev, "unable to get image name");
3812
3813         /* Look up the snapshot name, and make a copy */
3814
3815         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3816         if (!snap_name) {
3817                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3818                 ret = -EIO;
3819                 goto out_err;
3820         }
3821         snap_name = kstrdup(snap_name, GFP_KERNEL);
3822         if (!snap_name) {
3823                 ret = -ENOMEM;
3824                 goto out_err;
3825         }
3826
3827         spec->pool_name = pool_name;
3828         spec->image_name = image_name;
3829         spec->snap_name = snap_name;
3830
3831         return 0;
3832 out_err:
3833         kfree(image_name);
3834         kfree(pool_name);
3835
3836         return ret;
3837 }
3838
3839 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3840 {
3841         size_t size;
3842         int ret;
3843         void *reply_buf;
3844         void *p;
3845         void *end;
3846         u64 seq;
3847         u32 snap_count;
3848         struct ceph_snap_context *snapc;
3849         u32 i;
3850
3851         /*
3852          * We'll need room for the seq value (maximum snapshot id),
3853          * snapshot count, and array of that many snapshot ids.
3854          * For now we have a fixed upper limit on the number we're
3855          * prepared to receive.
3856          */
3857         size = sizeof (__le64) + sizeof (__le32) +
3858                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3859         reply_buf = kzalloc(size, GFP_KERNEL);
3860         if (!reply_buf)
3861                 return -ENOMEM;
3862
3863         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3864                                 "rbd", "get_snapcontext", NULL, 0,
3865                                 reply_buf, size, ver);
3866         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3867         if (ret < 0)
3868                 goto out;
3869
3870         p = reply_buf;
3871         end = reply_buf + ret;
3872         ret = -ERANGE;
3873         ceph_decode_64_safe(&p, end, seq, out);
3874         ceph_decode_32_safe(&p, end, snap_count, out);
3875
3876         /*
3877          * Make sure the reported number of snapshot ids wouldn't go
3878          * beyond the end of our buffer.  But before checking that,
3879          * make sure the computed size of the snapshot context we
3880          * allocate is representable in a size_t.
3881          */
3882         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3883                                  / sizeof (u64)) {
3884                 ret = -EINVAL;
3885                 goto out;
3886         }
3887         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3888                 goto out;
3889         ret = 0;
3890
3891         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3892         if (!snapc) {
3893                 ret = -ENOMEM;
3894                 goto out;
3895         }
3896         snapc->seq = seq;
3897         for (i = 0; i < snap_count; i++)
3898                 snapc->snaps[i] = ceph_decode_64(&p);
3899
3900         rbd_dev->header.snapc = snapc;
3901
3902         dout("  snap context seq = %llu, snap_count = %u\n",
3903                 (unsigned long long)seq, (unsigned int)snap_count);
3904 out:
3905         kfree(reply_buf);
3906
3907         return ret;
3908 }
3909
3910 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3911 {
3912         size_t size;
3913         void *reply_buf;
3914         __le64 snap_id;
3915         int ret;
3916         void *p;
3917         void *end;
3918         char *snap_name;
3919
3920         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3921         reply_buf = kmalloc(size, GFP_KERNEL);
3922         if (!reply_buf)
3923                 return ERR_PTR(-ENOMEM);
3924
3925         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3926         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3927         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3928                                 "rbd", "get_snapshot_name",
3929                                 &snap_id, sizeof (snap_id),
3930                                 reply_buf, size, NULL);
3931         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3932         if (ret < 0) {
3933                 snap_name = ERR_PTR(ret);
3934                 goto out;
3935         }
3936
3937         p = reply_buf;
3938         end = reply_buf + ret;
3939         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3940         if (IS_ERR(snap_name))
3941                 goto out;
3942
3943         dout("  snap_id 0x%016llx snap_name = %s\n",
3944                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3945 out:
3946         kfree(reply_buf);
3947
3948         return snap_name;
3949 }
3950
3951 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3952                 u64 *snap_size, u64 *snap_features)
3953 {
3954         u64 snap_id;
3955         u64 size;
3956         u64 features;
3957         const char *snap_name;
3958         int ret;
3959
3960         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3961         snap_id = rbd_dev->header.snapc->snaps[which];
3962         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3963         if (ret)
3964                 goto out_err;
3965
3966         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3967         if (ret)
3968                 goto out_err;
3969
3970         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3971         if (!IS_ERR(snap_name)) {
3972                 *snap_size = size;
3973                 *snap_features = features;
3974         }
3975
3976         return snap_name;
3977 out_err:
3978         return ERR_PTR(ret);
3979 }
3980
3981 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3982                 u64 *snap_size, u64 *snap_features)
3983 {
3984         if (rbd_dev->image_format == 1)
3985                 return rbd_dev_v1_snap_info(rbd_dev, which,
3986                                         snap_size, snap_features);
3987         if (rbd_dev->image_format == 2)
3988                 return rbd_dev_v2_snap_info(rbd_dev, which,
3989                                         snap_size, snap_features);
3990         return ERR_PTR(-EINVAL);
3991 }
3992
3993 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3994 {
3995         int ret;
3996
3997         down_write(&rbd_dev->header_rwsem);
3998
3999         ret = rbd_dev_v2_image_size(rbd_dev);
4000         if (ret)
4001                 goto out;
4002         rbd_update_mapping_size(rbd_dev);
4003
4004         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4005         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4006         if (ret)
4007                 goto out;
4008         ret = rbd_dev_snaps_update(rbd_dev);
4009         dout("rbd_dev_snaps_update returned %d\n", ret);
4010         if (ret)
4011                 goto out;
4012 out:
4013         up_write(&rbd_dev->header_rwsem);
4014
4015         return ret;
4016 }
4017
4018 /*
4019  * Scan the rbd device's current snapshot list and compare it to the
4020  * newly-received snapshot context.  Remove any existing snapshots
4021  * not present in the new snapshot context.  Add a new snapshot for
4022  * any snaphots in the snapshot context not in the current list.
4023  * And verify there are no changes to snapshots we already know
4024  * about.
4025  *
4026  * Assumes the snapshots in the snapshot context are sorted by
4027  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4028  * are also maintained in that order.)
4029  *
4030  * Note that any error occurs while updating the snapshot list
4031  * aborts the update, and the entire list is cleared.  The snapshot
4032  * list becomes inconsistent at that point anyway, so it might as
4033  * well be empty.
4034  */
4035 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4036 {
4037         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4038         const u32 snap_count = snapc->num_snaps;
4039         struct list_head *head = &rbd_dev->snaps;
4040         struct list_head *links = head->next;
4041         u32 index = 0;
4042         int ret = 0;
4043
4044         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4045         while (index < snap_count || links != head) {
4046                 u64 snap_id;
4047                 struct rbd_snap *snap;
4048                 const char *snap_name;
4049                 u64 snap_size = 0;
4050                 u64 snap_features = 0;
4051
4052                 snap_id = index < snap_count ? snapc->snaps[index]
4053                                              : CEPH_NOSNAP;
4054                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4055                                      : NULL;
4056                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4057
4058                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4059                         struct list_head *next = links->next;
4060
4061                         /*
4062                          * A previously-existing snapshot is not in
4063                          * the new snap context.
4064                          *
4065                          * If the now-missing snapshot is the one
4066                          * the image represents, clear its existence
4067                          * flag so we can avoid sending any more
4068                          * requests to it.
4069                          */
4070                         if (rbd_dev->spec->snap_id == snap->id)
4071                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4072                         dout("removing %ssnap id %llu\n",
4073                                 rbd_dev->spec->snap_id == snap->id ?
4074                                                         "mapped " : "",
4075                                 (unsigned long long)snap->id);
4076
4077                         list_del(&snap->node);
4078                         rbd_snap_destroy(snap);
4079
4080                         /* Done with this list entry; advance */
4081
4082                         links = next;
4083                         continue;
4084                 }
4085
4086                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4087                                         &snap_size, &snap_features);
4088                 if (IS_ERR(snap_name)) {
4089                         ret = PTR_ERR(snap_name);
4090                         dout("failed to get snap info, error %d\n", ret);
4091                         goto out_err;
4092                 }
4093
4094                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4095                         (unsigned long long)snap_id);
4096                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4097                         struct rbd_snap *new_snap;
4098
4099                         /* We haven't seen this snapshot before */
4100
4101                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4102                                         snap_id, snap_size, snap_features);
4103                         if (IS_ERR(new_snap)) {
4104                                 ret = PTR_ERR(new_snap);
4105                                 dout("  failed to add dev, error %d\n", ret);
4106                                 goto out_err;
4107                         }
4108
4109                         /* New goes before existing, or at end of list */
4110
4111                         dout("  added dev%s\n", snap ? "" : " at end\n");
4112                         if (snap)
4113                                 list_add_tail(&new_snap->node, &snap->node);
4114                         else
4115                                 list_add_tail(&new_snap->node, head);
4116                 } else {
4117                         /* Already have this one */
4118
4119                         dout("  already present\n");
4120
4121                         rbd_assert(snap->size == snap_size);
4122                         rbd_assert(!strcmp(snap->name, snap_name));
4123                         rbd_assert(snap->features == snap_features);
4124
4125                         /* Done with this list entry; advance */
4126
4127                         links = links->next;
4128                 }
4129
4130                 /* Advance to the next entry in the snapshot context */
4131
4132                 index++;
4133         }
4134         dout("%s: done\n", __func__);
4135
4136         return 0;
4137 out_err:
4138         rbd_remove_all_snaps(rbd_dev);
4139
4140         return ret;
4141 }
4142
4143 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4144 {
4145         struct device *dev;
4146         int ret;
4147
4148         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4149
4150         dev = &rbd_dev->dev;
4151         dev->bus = &rbd_bus_type;
4152         dev->type = &rbd_device_type;
4153         dev->parent = &rbd_root_dev;
4154         dev->release = rbd_dev_device_release;
4155         dev_set_name(dev, "%d", rbd_dev->dev_id);
4156         ret = device_register(dev);
4157
4158         mutex_unlock(&ctl_mutex);
4159
4160         return ret;
4161 }
4162
4163 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4164 {
4165         device_unregister(&rbd_dev->dev);
4166 }
4167
4168 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4169
4170 /*
4171  * Get a unique rbd identifier for the given new rbd_dev, and add
4172  * the rbd_dev to the global list.  The minimum rbd id is 1.
4173  */
4174 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4175 {
4176         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4177
4178         spin_lock(&rbd_dev_list_lock);
4179         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4180         spin_unlock(&rbd_dev_list_lock);
4181         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4182                 (unsigned long long) rbd_dev->dev_id);
4183 }
4184
4185 /*
4186  * Remove an rbd_dev from the global list, and record that its
4187  * identifier is no longer in use.
4188  */
4189 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4190 {
4191         struct list_head *tmp;
4192         int rbd_id = rbd_dev->dev_id;
4193         int max_id;
4194
4195         rbd_assert(rbd_id > 0);
4196
4197         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4198                 (unsigned long long) rbd_dev->dev_id);
4199         spin_lock(&rbd_dev_list_lock);
4200         list_del_init(&rbd_dev->node);
4201
4202         /*
4203          * If the id being "put" is not the current maximum, there
4204          * is nothing special we need to do.
4205          */
4206         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4207                 spin_unlock(&rbd_dev_list_lock);
4208                 return;
4209         }
4210
4211         /*
4212          * We need to update the current maximum id.  Search the
4213          * list to find out what it is.  We're more likely to find
4214          * the maximum at the end, so search the list backward.
4215          */
4216         max_id = 0;
4217         list_for_each_prev(tmp, &rbd_dev_list) {
4218                 struct rbd_device *rbd_dev;
4219
4220                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4221                 if (rbd_dev->dev_id > max_id)
4222                         max_id = rbd_dev->dev_id;
4223         }
4224         spin_unlock(&rbd_dev_list_lock);
4225
4226         /*
4227          * The max id could have been updated by rbd_dev_id_get(), in
4228          * which case it now accurately reflects the new maximum.
4229          * Be careful not to overwrite the maximum value in that
4230          * case.
4231          */
4232         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4233         dout("  max dev id has been reset\n");
4234 }
4235
4236 /*
4237  * Skips over white space at *buf, and updates *buf to point to the
4238  * first found non-space character (if any). Returns the length of
4239  * the token (string of non-white space characters) found.  Note
4240  * that *buf must be terminated with '\0'.
4241  */
4242 static inline size_t next_token(const char **buf)
4243 {
4244         /*
4245         * These are the characters that produce nonzero for
4246         * isspace() in the "C" and "POSIX" locales.
4247         */
4248         const char *spaces = " \f\n\r\t\v";
4249
4250         *buf += strspn(*buf, spaces);   /* Find start of token */
4251
4252         return strcspn(*buf, spaces);   /* Return token length */
4253 }
4254
4255 /*
4256  * Finds the next token in *buf, and if the provided token buffer is
4257  * big enough, copies the found token into it.  The result, if
4258  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4259  * must be terminated with '\0' on entry.
4260  *
4261  * Returns the length of the token found (not including the '\0').
4262  * Return value will be 0 if no token is found, and it will be >=
4263  * token_size if the token would not fit.
4264  *
4265  * The *buf pointer will be updated to point beyond the end of the
4266  * found token.  Note that this occurs even if the token buffer is
4267  * too small to hold it.
4268  */
4269 static inline size_t copy_token(const char **buf,
4270                                 char *token,
4271                                 size_t token_size)
4272 {
4273         size_t len;
4274
4275         len = next_token(buf);
4276         if (len < token_size) {
4277                 memcpy(token, *buf, len);
4278                 *(token + len) = '\0';
4279         }
4280         *buf += len;
4281
4282         return len;
4283 }
4284
4285 /*
4286  * Finds the next token in *buf, dynamically allocates a buffer big
4287  * enough to hold a copy of it, and copies the token into the new
4288  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4289  * that a duplicate buffer is created even for a zero-length token.
4290  *
4291  * Returns a pointer to the newly-allocated duplicate, or a null
4292  * pointer if memory for the duplicate was not available.  If
4293  * the lenp argument is a non-null pointer, the length of the token
4294  * (not including the '\0') is returned in *lenp.
4295  *
4296  * If successful, the *buf pointer will be updated to point beyond
4297  * the end of the found token.
4298  *
4299  * Note: uses GFP_KERNEL for allocation.
4300  */
4301 static inline char *dup_token(const char **buf, size_t *lenp)
4302 {
4303         char *dup;
4304         size_t len;
4305
4306         len = next_token(buf);
4307         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4308         if (!dup)
4309                 return NULL;
4310         *(dup + len) = '\0';
4311         *buf += len;
4312
4313         if (lenp)
4314                 *lenp = len;
4315
4316         return dup;
4317 }
4318
4319 /*
4320  * Parse the options provided for an "rbd add" (i.e., rbd image
4321  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4322  * and the data written is passed here via a NUL-terminated buffer.
4323  * Returns 0 if successful or an error code otherwise.
4324  *
4325  * The information extracted from these options is recorded in
4326  * the other parameters which return dynamically-allocated
4327  * structures:
4328  *  ceph_opts
4329  *      The address of a pointer that will refer to a ceph options
4330  *      structure.  Caller must release the returned pointer using
4331  *      ceph_destroy_options() when it is no longer needed.
4332  *  rbd_opts
4333  *      Address of an rbd options pointer.  Fully initialized by
4334  *      this function; caller must release with kfree().
4335  *  spec
4336  *      Address of an rbd image specification pointer.  Fully
4337  *      initialized by this function based on parsed options.
4338  *      Caller must release with rbd_spec_put().
4339  *
4340  * The options passed take this form:
4341  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4342  * where:
4343  *  <mon_addrs>
4344  *      A comma-separated list of one or more monitor addresses.
4345  *      A monitor address is an ip address, optionally followed
4346  *      by a port number (separated by a colon).
4347  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4348  *  <options>
4349  *      A comma-separated list of ceph and/or rbd options.
4350  *  <pool_name>
4351  *      The name of the rados pool containing the rbd image.
4352  *  <image_name>
4353  *      The name of the image in that pool to map.
4354  *  <snap_id>
4355  *      An optional snapshot id.  If provided, the mapping will
4356  *      present data from the image at the time that snapshot was
4357  *      created.  The image head is used if no snapshot id is
4358  *      provided.  Snapshot mappings are always read-only.
4359  */
4360 static int rbd_add_parse_args(const char *buf,
4361                                 struct ceph_options **ceph_opts,
4362                                 struct rbd_options **opts,
4363                                 struct rbd_spec **rbd_spec)
4364 {
4365         size_t len;
4366         char *options;
4367         const char *mon_addrs;
4368         char *snap_name;
4369         size_t mon_addrs_size;
4370         struct rbd_spec *spec = NULL;
4371         struct rbd_options *rbd_opts = NULL;
4372         struct ceph_options *copts;
4373         int ret;
4374
4375         /* The first four tokens are required */
4376
4377         len = next_token(&buf);
4378         if (!len) {
4379                 rbd_warn(NULL, "no monitor address(es) provided");
4380                 return -EINVAL;
4381         }
4382         mon_addrs = buf;
4383         mon_addrs_size = len + 1;
4384         buf += len;
4385
4386         ret = -EINVAL;
4387         options = dup_token(&buf, NULL);
4388         if (!options)
4389                 return -ENOMEM;
4390         if (!*options) {
4391                 rbd_warn(NULL, "no options provided");
4392                 goto out_err;
4393         }
4394
4395         spec = rbd_spec_alloc();
4396         if (!spec)
4397                 goto out_mem;
4398
4399         spec->pool_name = dup_token(&buf, NULL);
4400         if (!spec->pool_name)
4401                 goto out_mem;
4402         if (!*spec->pool_name) {
4403                 rbd_warn(NULL, "no pool name provided");
4404                 goto out_err;
4405         }
4406
4407         spec->image_name = dup_token(&buf, NULL);
4408         if (!spec->image_name)
4409                 goto out_mem;
4410         if (!*spec->image_name) {
4411                 rbd_warn(NULL, "no image name provided");
4412                 goto out_err;
4413         }
4414
4415         /*
4416          * Snapshot name is optional; default is to use "-"
4417          * (indicating the head/no snapshot).
4418          */
4419         len = next_token(&buf);
4420         if (!len) {
4421                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4422                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4423         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4424                 ret = -ENAMETOOLONG;
4425                 goto out_err;
4426         }
4427         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4428         if (!snap_name)
4429                 goto out_mem;
4430         *(snap_name + len) = '\0';
4431         spec->snap_name = snap_name;
4432
4433         /* Initialize all rbd options to the defaults */
4434
4435         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4436         if (!rbd_opts)
4437                 goto out_mem;
4438
4439         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4440
4441         copts = ceph_parse_options(options, mon_addrs,
4442                                         mon_addrs + mon_addrs_size - 1,
4443                                         parse_rbd_opts_token, rbd_opts);
4444         if (IS_ERR(copts)) {
4445                 ret = PTR_ERR(copts);
4446                 goto out_err;
4447         }
4448         kfree(options);
4449
4450         *ceph_opts = copts;
4451         *opts = rbd_opts;
4452         *rbd_spec = spec;
4453
4454         return 0;
4455 out_mem:
4456         ret = -ENOMEM;
4457 out_err:
4458         kfree(rbd_opts);
4459         rbd_spec_put(spec);
4460         kfree(options);
4461
4462         return ret;
4463 }
4464
4465 /*
4466  * An rbd format 2 image has a unique identifier, distinct from the
4467  * name given to it by the user.  Internally, that identifier is
4468  * what's used to specify the names of objects related to the image.
4469  *
4470  * A special "rbd id" object is used to map an rbd image name to its
4471  * id.  If that object doesn't exist, then there is no v2 rbd image
4472  * with the supplied name.
4473  *
4474  * This function will record the given rbd_dev's image_id field if
4475  * it can be determined, and in that case will return 0.  If any
4476  * errors occur a negative errno will be returned and the rbd_dev's
4477  * image_id field will be unchanged (and should be NULL).
4478  */
4479 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4480 {
4481         int ret;
4482         size_t size;
4483         char *object_name;
4484         void *response;
4485         char *image_id;
4486
4487         /*
4488          * When probing a parent image, the image id is already
4489          * known (and the image name likely is not).  There's no
4490          * need to fetch the image id again in this case.  We
4491          * do still need to set the image format though.
4492          */
4493         if (rbd_dev->spec->image_id) {
4494                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4495
4496                 return 0;
4497         }
4498
4499         /*
4500          * First, see if the format 2 image id file exists, and if
4501          * so, get the image's persistent id from it.
4502          */
4503         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4504         object_name = kmalloc(size, GFP_NOIO);
4505         if (!object_name)
4506                 return -ENOMEM;
4507         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4508         dout("rbd id object name is %s\n", object_name);
4509
4510         /* Response will be an encoded string, which includes a length */
4511
4512         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4513         response = kzalloc(size, GFP_NOIO);
4514         if (!response) {
4515                 ret = -ENOMEM;
4516                 goto out;
4517         }
4518
4519         /* If it doesn't exist we'll assume it's a format 1 image */
4520
4521         ret = rbd_obj_method_sync(rbd_dev, object_name,
4522                                 "rbd", "get_id", NULL, 0,
4523                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4524         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4525         if (ret == -ENOENT) {
4526                 image_id = kstrdup("", GFP_KERNEL);
4527                 ret = image_id ? 0 : -ENOMEM;
4528                 if (!ret)
4529                         rbd_dev->image_format = 1;
4530         } else if (ret > sizeof (__le32)) {
4531                 void *p = response;
4532
4533                 image_id = ceph_extract_encoded_string(&p, p + ret,
4534                                                 NULL, GFP_NOIO);
4535                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4536                 if (!ret)
4537                         rbd_dev->image_format = 2;
4538         } else {
4539                 ret = -EINVAL;
4540         }
4541
4542         if (!ret) {
4543                 rbd_dev->spec->image_id = image_id;
4544                 dout("image_id is %s\n", image_id);
4545         }
4546 out:
4547         kfree(response);
4548         kfree(object_name);
4549
4550         return ret;
4551 }
4552
4553 /* Undo whatever state changes are made by v1 or v2 image probe */
4554
4555 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4556 {
4557         struct rbd_image_header *header;
4558
4559         rbd_dev_remove_parent(rbd_dev);
4560         rbd_spec_put(rbd_dev->parent_spec);
4561         rbd_dev->parent_spec = NULL;
4562         rbd_dev->parent_overlap = 0;
4563
4564         /* Free dynamic fields from the header, then zero it out */
4565
4566         header = &rbd_dev->header;
4567         ceph_put_snap_context(header->snapc);
4568         kfree(header->snap_sizes);
4569         kfree(header->snap_names);
4570         kfree(header->object_prefix);
4571         memset(header, 0, sizeof (*header));
4572 }
4573
4574 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4575 {
4576         int ret;
4577
4578         /* Populate rbd image metadata */
4579
4580         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4581         if (ret < 0)
4582                 goto out_err;
4583
4584         /* Version 1 images have no parent (no layering) */
4585
4586         rbd_dev->parent_spec = NULL;
4587         rbd_dev->parent_overlap = 0;
4588
4589         dout("discovered version 1 image, header name is %s\n",
4590                 rbd_dev->header_name);
4591
4592         return 0;
4593
4594 out_err:
4595         kfree(rbd_dev->header_name);
4596         rbd_dev->header_name = NULL;
4597         kfree(rbd_dev->spec->image_id);
4598         rbd_dev->spec->image_id = NULL;
4599
4600         return ret;
4601 }
4602
4603 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4604 {
4605         int ret;
4606         u64 ver = 0;
4607
4608         ret = rbd_dev_v2_image_size(rbd_dev);
4609         if (ret)
4610                 goto out_err;
4611
4612         /* Get the object prefix (a.k.a. block_name) for the image */
4613
4614         ret = rbd_dev_v2_object_prefix(rbd_dev);
4615         if (ret)
4616                 goto out_err;
4617
4618         /* Get the and check features for the image */
4619
4620         ret = rbd_dev_v2_features(rbd_dev);
4621         if (ret)
4622                 goto out_err;
4623
4624         /* If the image supports layering, get the parent info */
4625
4626         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4627                 ret = rbd_dev_v2_parent_info(rbd_dev);
4628                 if (ret)
4629                         goto out_err;
4630
4631                 /*
4632                  * Don't print a warning for parent images.  We can
4633                  * tell this point because we won't know its pool
4634                  * name yet (just its pool id).
4635                  */
4636                 if (rbd_dev->spec->pool_name)
4637                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4638                                         "is EXPERIMENTAL!");
4639         }
4640
4641         /* If the image supports fancy striping, get its parameters */
4642
4643         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4644                 ret = rbd_dev_v2_striping_info(rbd_dev);
4645                 if (ret < 0)
4646                         goto out_err;
4647         }
4648
4649         /* crypto and compression type aren't (yet) supported for v2 images */
4650
4651         rbd_dev->header.crypt_type = 0;
4652         rbd_dev->header.comp_type = 0;
4653
4654         /* Get the snapshot context, plus the header version */
4655
4656         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4657         if (ret)
4658                 goto out_err;
4659         rbd_dev->header.obj_version = ver;
4660
4661         dout("discovered version 2 image, header name is %s\n",
4662                 rbd_dev->header_name);
4663
4664         return 0;
4665 out_err:
4666         rbd_dev->parent_overlap = 0;
4667         rbd_spec_put(rbd_dev->parent_spec);
4668         rbd_dev->parent_spec = NULL;
4669         kfree(rbd_dev->header_name);
4670         rbd_dev->header_name = NULL;
4671         kfree(rbd_dev->header.object_prefix);
4672         rbd_dev->header.object_prefix = NULL;
4673
4674         return ret;
4675 }
4676
4677 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4678 {
4679         struct rbd_device *parent = NULL;
4680         struct rbd_spec *parent_spec;
4681         struct rbd_client *rbdc;
4682         int ret;
4683
4684         if (!rbd_dev->parent_spec)
4685                 return 0;
4686         /*
4687          * We need to pass a reference to the client and the parent
4688          * spec when creating the parent rbd_dev.  Images related by
4689          * parent/child relationships always share both.
4690          */
4691         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4692         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4693
4694         ret = -ENOMEM;
4695         parent = rbd_dev_create(rbdc, parent_spec);
4696         if (!parent)
4697                 goto out_err;
4698
4699         ret = rbd_dev_image_probe(parent);
4700         if (ret < 0)
4701                 goto out_err;
4702         rbd_dev->parent = parent;
4703
4704         return 0;
4705 out_err:
4706         if (parent) {
4707                 rbd_spec_put(rbd_dev->parent_spec);
4708                 kfree(rbd_dev->header_name);
4709                 rbd_dev_destroy(parent);
4710         } else {
4711                 rbd_put_client(rbdc);
4712                 rbd_spec_put(parent_spec);
4713         }
4714
4715         return ret;
4716 }
4717
4718 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4719 {
4720         int ret;
4721
4722         ret = rbd_dev_mapping_set(rbd_dev);
4723         if (ret)
4724                 return ret;
4725
4726         /* generate unique id: find highest unique id, add one */
4727         rbd_dev_id_get(rbd_dev);
4728
4729         /* Fill in the device name, now that we have its id. */
4730         BUILD_BUG_ON(DEV_NAME_LEN
4731                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4732         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4733
4734         /* Get our block major device number. */
4735
4736         ret = register_blkdev(0, rbd_dev->name);
4737         if (ret < 0)
4738                 goto err_out_id;
4739         rbd_dev->major = ret;
4740
4741         /* Set up the blkdev mapping. */
4742
4743         ret = rbd_init_disk(rbd_dev);
4744         if (ret)
4745                 goto err_out_blkdev;
4746
4747         ret = rbd_bus_add_dev(rbd_dev);
4748         if (ret)
4749                 goto err_out_disk;
4750
4751         /* Everything's ready.  Announce the disk to the world. */
4752
4753         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4754         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4755         add_disk(rbd_dev->disk);
4756
4757         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4758                 (unsigned long long) rbd_dev->mapping.size);
4759
4760         return ret;
4761
4762 err_out_disk:
4763         rbd_free_disk(rbd_dev);
4764 err_out_blkdev:
4765         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4766 err_out_id:
4767         rbd_dev_id_put(rbd_dev);
4768         rbd_dev_mapping_clear(rbd_dev);
4769
4770         return ret;
4771 }
4772
4773 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4774 {
4775         struct rbd_spec *spec = rbd_dev->spec;
4776         size_t size;
4777
4778         /* Record the header object name for this rbd image. */
4779
4780         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4781
4782         if (rbd_dev->image_format == 1)
4783                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4784         else
4785                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4786
4787         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4788         if (!rbd_dev->header_name)
4789                 return -ENOMEM;
4790
4791         if (rbd_dev->image_format == 1)
4792                 sprintf(rbd_dev->header_name, "%s%s",
4793                         spec->image_name, RBD_SUFFIX);
4794         else
4795                 sprintf(rbd_dev->header_name, "%s%s",
4796                         RBD_HEADER_PREFIX, spec->image_id);
4797         return 0;
4798 }
4799
4800 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4801 {
4802         int ret;
4803
4804         rbd_remove_all_snaps(rbd_dev);
4805         rbd_dev_unprobe(rbd_dev);
4806         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4807         if (ret)
4808                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4809         kfree(rbd_dev->header_name);
4810         rbd_dev->header_name = NULL;
4811         rbd_dev->image_format = 0;
4812         kfree(rbd_dev->spec->image_id);
4813         rbd_dev->spec->image_id = NULL;
4814
4815         rbd_dev_destroy(rbd_dev);
4816 }
4817
4818 /*
4819  * Probe for the existence of the header object for the given rbd
4820  * device.  For format 2 images this includes determining the image
4821  * id.
4822  */
4823 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4824 {
4825         int ret;
4826         int tmp;
4827
4828         /*
4829          * Get the id from the image id object.  If it's not a
4830          * format 2 image, we'll get ENOENT back, and we'll assume
4831          * it's a format 1 image.
4832          */
4833         ret = rbd_dev_image_id(rbd_dev);
4834         if (ret)
4835                 return ret;
4836         rbd_assert(rbd_dev->spec->image_id);
4837         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4838
4839         ret = rbd_dev_header_name(rbd_dev);
4840         if (ret)
4841                 goto err_out_format;
4842
4843         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4844         if (ret)
4845                 goto out_header_name;
4846
4847         if (rbd_dev->image_format == 1)
4848                 ret = rbd_dev_v1_probe(rbd_dev);
4849         else
4850                 ret = rbd_dev_v2_probe(rbd_dev);
4851         if (ret)
4852                 goto err_out_watch;
4853
4854         ret = rbd_dev_snaps_update(rbd_dev);
4855         if (ret)
4856                 goto err_out_probe;
4857
4858         ret = rbd_dev_spec_update(rbd_dev);
4859         if (ret)
4860                 goto err_out_snaps;
4861
4862         ret = rbd_dev_probe_parent(rbd_dev);
4863         if (!ret)
4864                 return 0;
4865
4866 err_out_snaps:
4867         rbd_remove_all_snaps(rbd_dev);
4868 err_out_probe:
4869         rbd_dev_unprobe(rbd_dev);
4870 err_out_watch:
4871         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4872         if (tmp)
4873                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4874 out_header_name:
4875         kfree(rbd_dev->header_name);
4876         rbd_dev->header_name = NULL;
4877 err_out_format:
4878         rbd_dev->image_format = 0;
4879         kfree(rbd_dev->spec->image_id);
4880         rbd_dev->spec->image_id = NULL;
4881
4882         dout("probe failed, returning %d\n", ret);
4883
4884         return ret;
4885 }
4886
4887 static ssize_t rbd_add(struct bus_type *bus,
4888                        const char *buf,
4889                        size_t count)
4890 {
4891         struct rbd_device *rbd_dev = NULL;
4892         struct ceph_options *ceph_opts = NULL;
4893         struct rbd_options *rbd_opts = NULL;
4894         struct rbd_spec *spec = NULL;
4895         struct rbd_client *rbdc;
4896         struct ceph_osd_client *osdc;
4897         int rc = -ENOMEM;
4898
4899         if (!try_module_get(THIS_MODULE))
4900                 return -ENODEV;
4901
4902         /* parse add command */
4903         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4904         if (rc < 0)
4905                 goto err_out_module;
4906
4907         rbdc = rbd_get_client(ceph_opts);
4908         if (IS_ERR(rbdc)) {
4909                 rc = PTR_ERR(rbdc);
4910                 goto err_out_args;
4911         }
4912         ceph_opts = NULL;       /* rbd_dev client now owns this */
4913
4914         /* pick the pool */
4915         osdc = &rbdc->client->osdc;
4916         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4917         if (rc < 0)
4918                 goto err_out_client;
4919         spec->pool_id = (u64)rc;
4920
4921         /* The ceph file layout needs to fit pool id in 32 bits */
4922
4923         if (spec->pool_id > (u64)U32_MAX) {
4924                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4925                                 (unsigned long long)spec->pool_id, U32_MAX);
4926                 rc = -EIO;
4927                 goto err_out_client;
4928         }
4929
4930         rbd_dev = rbd_dev_create(rbdc, spec);
4931         if (!rbd_dev)
4932                 goto err_out_client;
4933         rbdc = NULL;            /* rbd_dev now owns this */
4934         spec = NULL;            /* rbd_dev now owns this */
4935
4936         rbd_dev->mapping.read_only = rbd_opts->read_only;
4937         kfree(rbd_opts);
4938         rbd_opts = NULL;        /* done with this */
4939
4940         rc = rbd_dev_image_probe(rbd_dev);
4941         if (rc < 0)
4942                 goto err_out_rbd_dev;
4943
4944         rc = rbd_dev_device_setup(rbd_dev);
4945         if (!rc)
4946                 return count;
4947
4948         rbd_dev_image_release(rbd_dev);
4949 err_out_rbd_dev:
4950         rbd_dev_destroy(rbd_dev);
4951 err_out_client:
4952         rbd_put_client(rbdc);
4953 err_out_args:
4954         if (ceph_opts)
4955                 ceph_destroy_options(ceph_opts);
4956         kfree(rbd_opts);
4957         rbd_spec_put(spec);
4958 err_out_module:
4959         module_put(THIS_MODULE);
4960
4961         dout("Error adding device %s\n", buf);
4962
4963         return (ssize_t)rc;
4964 }
4965
4966 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4967 {
4968         struct list_head *tmp;
4969         struct rbd_device *rbd_dev;
4970
4971         spin_lock(&rbd_dev_list_lock);
4972         list_for_each(tmp, &rbd_dev_list) {
4973                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4974                 if (rbd_dev->dev_id == dev_id) {
4975                         spin_unlock(&rbd_dev_list_lock);
4976                         return rbd_dev;
4977                 }
4978         }
4979         spin_unlock(&rbd_dev_list_lock);
4980         return NULL;
4981 }
4982
4983 static void rbd_dev_device_release(struct device *dev)
4984 {
4985         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4986
4987         rbd_free_disk(rbd_dev);
4988         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4989         rbd_dev_clear_mapping(rbd_dev);
4990         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4991         rbd_dev->major = 0;
4992         rbd_dev_id_put(rbd_dev);
4993         rbd_dev_mapping_clear(rbd_dev);
4994 }
4995
4996 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4997 {
4998         while (rbd_dev->parent) {
4999                 struct rbd_device *first = rbd_dev;
5000                 struct rbd_device *second = first->parent;
5001                 struct rbd_device *third;
5002
5003                 /*
5004                  * Follow to the parent with no grandparent and
5005                  * remove it.
5006                  */
5007                 while (second && (third = second->parent)) {
5008                         first = second;
5009                         second = third;
5010                 }
5011                 rbd_assert(second);
5012                 rbd_dev_image_release(second);
5013                 first->parent = NULL;
5014                 first->parent_overlap = 0;
5015
5016                 rbd_assert(first->parent_spec);
5017                 rbd_spec_put(first->parent_spec);
5018                 first->parent_spec = NULL;
5019         }
5020 }
5021
5022 static ssize_t rbd_remove(struct bus_type *bus,
5023                           const char *buf,
5024                           size_t count)
5025 {
5026         struct rbd_device *rbd_dev = NULL;
5027         int target_id;
5028         unsigned long ul;
5029         int ret;
5030
5031         ret = strict_strtoul(buf, 10, &ul);
5032         if (ret)
5033                 return ret;
5034
5035         /* convert to int; abort if we lost anything in the conversion */
5036         target_id = (int) ul;
5037         if (target_id != ul)
5038                 return -EINVAL;
5039
5040         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5041
5042         rbd_dev = __rbd_get_dev(target_id);
5043         if (!rbd_dev) {
5044                 ret = -ENOENT;
5045                 goto done;
5046         }
5047
5048         spin_lock_irq(&rbd_dev->lock);
5049         if (rbd_dev->open_count)
5050                 ret = -EBUSY;
5051         else
5052                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5053         spin_unlock_irq(&rbd_dev->lock);
5054         if (ret < 0)
5055                 goto done;
5056         ret = count;
5057         rbd_bus_del_dev(rbd_dev);
5058         rbd_dev_image_release(rbd_dev);
5059         module_put(THIS_MODULE);
5060 done:
5061         mutex_unlock(&ctl_mutex);
5062
5063         return ret;
5064 }
5065
5066 /*
5067  * create control files in sysfs
5068  * /sys/bus/rbd/...
5069  */
5070 static int rbd_sysfs_init(void)
5071 {
5072         int ret;
5073
5074         ret = device_register(&rbd_root_dev);
5075         if (ret < 0)
5076                 return ret;
5077
5078         ret = bus_register(&rbd_bus_type);
5079         if (ret < 0)
5080                 device_unregister(&rbd_root_dev);
5081
5082         return ret;
5083 }
5084
5085 static void rbd_sysfs_cleanup(void)
5086 {
5087         bus_unregister(&rbd_bus_type);
5088         device_unregister(&rbd_root_dev);
5089 }
5090
5091 static int __init rbd_init(void)
5092 {
5093         int rc;
5094
5095         if (!libceph_compatible(NULL)) {
5096                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5097
5098                 return -EINVAL;
5099         }
5100         rc = rbd_sysfs_init();
5101         if (rc)
5102                 return rc;
5103         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5104         return 0;
5105 }
5106
5107 static void __exit rbd_exit(void)
5108 {
5109         rbd_sysfs_cleanup();
5110 }
5111
5112 module_init(rbd_init);
5113 module_exit(rbd_exit);
5114
5115 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5116 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5117 MODULE_DESCRIPTION("rados block device");
5118
5119 /* following authorship retained from original osdblk.c */
5120 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5121
5122 MODULE_LICENSE("GPL");