]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: give rbd_obj_read_sync() buffer void type
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221         struct page             **copyup_pages;
222
223         struct ceph_osd_request *osd_req;
224
225         u64                     xferred;        /* bytes transferred */
226         u64                     version;
227         int                     result;
228
229         rbd_obj_callback_t      callback;
230         struct completion       completion;
231
232         struct kref             kref;
233 };
234
235 enum img_req_flags {
236         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
237         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
238         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
239 };
240
241 struct rbd_img_request {
242         struct rbd_device       *rbd_dev;
243         u64                     offset; /* starting image byte offset */
244         u64                     length; /* byte count from offset */
245         unsigned long           flags;
246         union {
247                 u64                     snap_id;        /* for reads */
248                 struct ceph_snap_context *snapc;        /* for writes */
249         };
250         union {
251                 struct request          *rq;            /* block request */
252                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
253         };
254         struct page             **copyup_pages;
255         spinlock_t              completion_lock;/* protects next_completion */
256         u32                     next_completion;
257         rbd_img_callback_t      callback;
258         u64                     xferred;/* aggregate bytes transferred */
259         int                     result; /* first nonzero obj_request result */
260
261         u32                     obj_request_count;
262         struct list_head        obj_requests;   /* rbd_obj_request structs */
263
264         struct kref             kref;
265 };
266
267 #define for_each_obj_request(ireq, oreq) \
268         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_from(ireq, oreq) \
270         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_safe(ireq, oreq, n) \
272         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
273
274 struct rbd_snap {
275         struct  device          dev;
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         /* protects updating the header */
321         struct rw_semaphore     header_rwsem;
322
323         struct rbd_mapping      mapping;
324
325         struct list_head        node;
326
327         /* list of snapshots */
328         struct list_head        snaps;
329
330         /* sysfs related */
331         struct device           dev;
332         unsigned long           open_count;     /* protected by lock */
333 };
334
335 /*
336  * Flag bits for rbd_dev->flags.  If atomicity is required,
337  * rbd_dev->lock is used to protect access.
338  *
339  * Currently, only the "removing" flag (which is coupled with the
340  * "open_count" field) requires atomic access.
341  */
342 enum rbd_dev_flags {
343         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
344         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
345 };
346
347 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
348
349 static LIST_HEAD(rbd_dev_list);    /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
352 static LIST_HEAD(rbd_client_list);              /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
359
360 static void rbd_dev_release(struct device *dev);
361 static void rbd_remove_snap_dev(struct rbd_snap *snap);
362
363 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364                        size_t count);
365 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366                           size_t count);
367 static int rbd_dev_probe(struct rbd_device *rbd_dev);
368
369 static struct bus_attribute rbd_bus_attrs[] = {
370         __ATTR(add, S_IWUSR, NULL, rbd_add),
371         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
372         __ATTR_NULL
373 };
374
375 static struct bus_type rbd_bus_type = {
376         .name           = "rbd",
377         .bus_attrs      = rbd_bus_attrs,
378 };
379
380 static void rbd_root_dev_release(struct device *dev)
381 {
382 }
383
384 static struct device rbd_root_dev = {
385         .init_name =    "rbd",
386         .release =      rbd_root_dev_release,
387 };
388
389 static __printf(2, 3)
390 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391 {
392         struct va_format vaf;
393         va_list args;
394
395         va_start(args, fmt);
396         vaf.fmt = fmt;
397         vaf.va = &args;
398
399         if (!rbd_dev)
400                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
401         else if (rbd_dev->disk)
402                 printk(KERN_WARNING "%s: %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
404         else if (rbd_dev->spec && rbd_dev->spec->image_name)
405                 printk(KERN_WARNING "%s: image %s: %pV\n",
406                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
407         else if (rbd_dev->spec && rbd_dev->spec->image_id)
408                 printk(KERN_WARNING "%s: id %s: %pV\n",
409                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410         else    /* punt */
411                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
412                         RBD_DRV_NAME, rbd_dev, &vaf);
413         va_end(args);
414 }
415
416 #ifdef RBD_DEBUG
417 #define rbd_assert(expr)                                                \
418                 if (unlikely(!(expr))) {                                \
419                         printk(KERN_ERR "\nAssertion failure in %s() "  \
420                                                 "at line %d:\n\n"       \
421                                         "\trbd_assert(%s);\n\n",        \
422                                         __func__, __LINE__, #expr);     \
423                         BUG();                                          \
424                 }
425 #else /* !RBD_DEBUG */
426 #  define rbd_assert(expr)      ((void) 0)
427 #endif /* !RBD_DEBUG */
428
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 WARN_ON(ondisk->snap_names_len);
780                 header->snap_names = NULL;
781                 header->snap_sizes = NULL;
782         }
783
784         header->features = 0;   /* No features support in v1 images */
785         header->obj_order = ondisk->options.order;
786         header->crypt_type = ondisk->options.crypt_type;
787         header->comp_type = ondisk->options.comp_type;
788
789         /* Allocate and fill in the snapshot context */
790
791         header->image_size = le64_to_cpu(ondisk->image_size);
792         size = sizeof (struct ceph_snap_context);
793         size += snap_count * sizeof (header->snapc->snaps[0]);
794         header->snapc = kzalloc(size, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797
798         atomic_set(&header->snapc->nref, 1);
799         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
800         header->snapc->num_snaps = snap_count;
801         for (i = 0; i < snap_count; i++)
802                 header->snapc->snaps[i] =
803                         le64_to_cpu(ondisk->snaps[i].id);
804
805         return 0;
806
807 out_err:
808         kfree(header->snap_sizes);
809         header->snap_sizes = NULL;
810         kfree(header->snap_names);
811         header->snap_names = NULL;
812         kfree(header->object_prefix);
813         header->object_prefix = NULL;
814
815         return -ENOMEM;
816 }
817
818 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
819 {
820         struct rbd_snap *snap;
821
822         if (snap_id == CEPH_NOSNAP)
823                 return RBD_SNAP_HEAD_NAME;
824
825         list_for_each_entry(snap, &rbd_dev->snaps, node)
826                 if (snap_id == snap->id)
827                         return snap->name;
828
829         return NULL;
830 }
831
832 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
833 {
834
835         struct rbd_snap *snap;
836
837         list_for_each_entry(snap, &rbd_dev->snaps, node) {
838                 if (!strcmp(snap_name, snap->name)) {
839                         rbd_dev->spec->snap_id = snap->id;
840                         rbd_dev->mapping.size = snap->size;
841                         rbd_dev->mapping.features = snap->features;
842
843                         return 0;
844                 }
845         }
846
847         return -ENOENT;
848 }
849
850 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
851 {
852         int ret;
853
854         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
855                     sizeof (RBD_SNAP_HEAD_NAME))) {
856                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
857                 rbd_dev->mapping.size = rbd_dev->header.image_size;
858                 rbd_dev->mapping.features = rbd_dev->header.features;
859                 ret = 0;
860         } else {
861                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
862                 if (ret < 0)
863                         goto done;
864                 rbd_dev->mapping.read_only = true;
865         }
866         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
867
868 done:
869         return ret;
870 }
871
872 static void rbd_header_free(struct rbd_image_header *header)
873 {
874         kfree(header->object_prefix);
875         header->object_prefix = NULL;
876         kfree(header->snap_sizes);
877         header->snap_sizes = NULL;
878         kfree(header->snap_names);
879         header->snap_names = NULL;
880         ceph_put_snap_context(header->snapc);
881         header->snapc = NULL;
882 }
883
884 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
885 {
886         char *name;
887         u64 segment;
888         int ret;
889
890         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
891         if (!name)
892                 return NULL;
893         segment = offset >> rbd_dev->header.obj_order;
894         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
895                         rbd_dev->header.object_prefix, segment);
896         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
897                 pr_err("error formatting segment name for #%llu (%d)\n",
898                         segment, ret);
899                 kfree(name);
900                 name = NULL;
901         }
902
903         return name;
904 }
905
906 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
907 {
908         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
909
910         return offset & (segment_size - 1);
911 }
912
913 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
914                                 u64 offset, u64 length)
915 {
916         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
917
918         offset &= segment_size - 1;
919
920         rbd_assert(length <= U64_MAX - offset);
921         if (offset + length > segment_size)
922                 length = segment_size - offset;
923
924         return length;
925 }
926
927 /*
928  * returns the size of an object in the image
929  */
930 static u64 rbd_obj_bytes(struct rbd_image_header *header)
931 {
932         return 1 << header->obj_order;
933 }
934
935 /*
936  * bio helpers
937  */
938
939 static void bio_chain_put(struct bio *chain)
940 {
941         struct bio *tmp;
942
943         while (chain) {
944                 tmp = chain;
945                 chain = chain->bi_next;
946                 bio_put(tmp);
947         }
948 }
949
950 /*
951  * zeros a bio chain, starting at specific offset
952  */
953 static void zero_bio_chain(struct bio *chain, int start_ofs)
954 {
955         struct bio_vec *bv;
956         unsigned long flags;
957         void *buf;
958         int i;
959         int pos = 0;
960
961         while (chain) {
962                 bio_for_each_segment(bv, chain, i) {
963                         if (pos + bv->bv_len > start_ofs) {
964                                 int remainder = max(start_ofs - pos, 0);
965                                 buf = bvec_kmap_irq(bv, &flags);
966                                 memset(buf + remainder, 0,
967                                        bv->bv_len - remainder);
968                                 bvec_kunmap_irq(buf, &flags);
969                         }
970                         pos += bv->bv_len;
971                 }
972
973                 chain = chain->bi_next;
974         }
975 }
976
977 /*
978  * similar to zero_bio_chain(), zeros data defined by a page array,
979  * starting at the given byte offset from the start of the array and
980  * continuing up to the given end offset.  The pages array is
981  * assumed to be big enough to hold all bytes up to the end.
982  */
983 static void zero_pages(struct page **pages, u64 offset, u64 end)
984 {
985         struct page **page = &pages[offset >> PAGE_SHIFT];
986
987         rbd_assert(end > offset);
988         rbd_assert(end - offset <= (u64)SIZE_MAX);
989         while (offset < end) {
990                 size_t page_offset;
991                 size_t length;
992                 unsigned long flags;
993                 void *kaddr;
994
995                 page_offset = (size_t)(offset & ~PAGE_MASK);
996                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
997                 local_irq_save(flags);
998                 kaddr = kmap_atomic(*page);
999                 memset(kaddr + page_offset, 0, length);
1000                 kunmap_atomic(kaddr);
1001                 local_irq_restore(flags);
1002
1003                 offset += length;
1004                 page++;
1005         }
1006 }
1007
1008 /*
1009  * Clone a portion of a bio, starting at the given byte offset
1010  * and continuing for the number of bytes indicated.
1011  */
1012 static struct bio *bio_clone_range(struct bio *bio_src,
1013                                         unsigned int offset,
1014                                         unsigned int len,
1015                                         gfp_t gfpmask)
1016 {
1017         struct bio_vec *bv;
1018         unsigned int resid;
1019         unsigned short idx;
1020         unsigned int voff;
1021         unsigned short end_idx;
1022         unsigned short vcnt;
1023         struct bio *bio;
1024
1025         /* Handle the easy case for the caller */
1026
1027         if (!offset && len == bio_src->bi_size)
1028                 return bio_clone(bio_src, gfpmask);
1029
1030         if (WARN_ON_ONCE(!len))
1031                 return NULL;
1032         if (WARN_ON_ONCE(len > bio_src->bi_size))
1033                 return NULL;
1034         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1035                 return NULL;
1036
1037         /* Find first affected segment... */
1038
1039         resid = offset;
1040         __bio_for_each_segment(bv, bio_src, idx, 0) {
1041                 if (resid < bv->bv_len)
1042                         break;
1043                 resid -= bv->bv_len;
1044         }
1045         voff = resid;
1046
1047         /* ...and the last affected segment */
1048
1049         resid += len;
1050         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1051                 if (resid <= bv->bv_len)
1052                         break;
1053                 resid -= bv->bv_len;
1054         }
1055         vcnt = end_idx - idx + 1;
1056
1057         /* Build the clone */
1058
1059         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1060         if (!bio)
1061                 return NULL;    /* ENOMEM */
1062
1063         bio->bi_bdev = bio_src->bi_bdev;
1064         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1065         bio->bi_rw = bio_src->bi_rw;
1066         bio->bi_flags |= 1 << BIO_CLONED;
1067
1068         /*
1069          * Copy over our part of the bio_vec, then update the first
1070          * and last (or only) entries.
1071          */
1072         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1073                         vcnt * sizeof (struct bio_vec));
1074         bio->bi_io_vec[0].bv_offset += voff;
1075         if (vcnt > 1) {
1076                 bio->bi_io_vec[0].bv_len -= voff;
1077                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1078         } else {
1079                 bio->bi_io_vec[0].bv_len = len;
1080         }
1081
1082         bio->bi_vcnt = vcnt;
1083         bio->bi_size = len;
1084         bio->bi_idx = 0;
1085
1086         return bio;
1087 }
1088
1089 /*
1090  * Clone a portion of a bio chain, starting at the given byte offset
1091  * into the first bio in the source chain and continuing for the
1092  * number of bytes indicated.  The result is another bio chain of
1093  * exactly the given length, or a null pointer on error.
1094  *
1095  * The bio_src and offset parameters are both in-out.  On entry they
1096  * refer to the first source bio and the offset into that bio where
1097  * the start of data to be cloned is located.
1098  *
1099  * On return, bio_src is updated to refer to the bio in the source
1100  * chain that contains first un-cloned byte, and *offset will
1101  * contain the offset of that byte within that bio.
1102  */
1103 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1104                                         unsigned int *offset,
1105                                         unsigned int len,
1106                                         gfp_t gfpmask)
1107 {
1108         struct bio *bi = *bio_src;
1109         unsigned int off = *offset;
1110         struct bio *chain = NULL;
1111         struct bio **end;
1112
1113         /* Build up a chain of clone bios up to the limit */
1114
1115         if (!bi || off >= bi->bi_size || !len)
1116                 return NULL;            /* Nothing to clone */
1117
1118         end = &chain;
1119         while (len) {
1120                 unsigned int bi_size;
1121                 struct bio *bio;
1122
1123                 if (!bi) {
1124                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1125                         goto out_err;   /* EINVAL; ran out of bio's */
1126                 }
1127                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1128                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1129                 if (!bio)
1130                         goto out_err;   /* ENOMEM */
1131
1132                 *end = bio;
1133                 end = &bio->bi_next;
1134
1135                 off += bi_size;
1136                 if (off == bi->bi_size) {
1137                         bi = bi->bi_next;
1138                         off = 0;
1139                 }
1140                 len -= bi_size;
1141         }
1142         *bio_src = bi;
1143         *offset = off;
1144
1145         return chain;
1146 out_err:
1147         bio_chain_put(chain);
1148
1149         return NULL;
1150 }
1151
1152 /*
1153  * The default/initial value for all object request flags is 0.  For
1154  * each flag, once its value is set to 1 it is never reset to 0
1155  * again.
1156  */
1157 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1158 {
1159         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1160                 struct rbd_device *rbd_dev;
1161
1162                 rbd_dev = obj_request->img_request->rbd_dev;
1163                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1164                         obj_request);
1165         }
1166 }
1167
1168 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1169 {
1170         smp_mb();
1171         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1172 }
1173
1174 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1175 {
1176         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1177                 struct rbd_device *rbd_dev = NULL;
1178
1179                 if (obj_request_img_data_test(obj_request))
1180                         rbd_dev = obj_request->img_request->rbd_dev;
1181                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1182                         obj_request);
1183         }
1184 }
1185
1186 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1187 {
1188         smp_mb();
1189         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1190 }
1191
1192 /*
1193  * This sets the KNOWN flag after (possibly) setting the EXISTS
1194  * flag.  The latter is set based on the "exists" value provided.
1195  *
1196  * Note that for our purposes once an object exists it never goes
1197  * away again.  It's possible that the response from two existence
1198  * checks are separated by the creation of the target object, and
1199  * the first ("doesn't exist") response arrives *after* the second
1200  * ("does exist").  In that case we ignore the second one.
1201  */
1202 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1203                                 bool exists)
1204 {
1205         if (exists)
1206                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1207         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1208         smp_mb();
1209 }
1210
1211 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1212 {
1213         smp_mb();
1214         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1215 }
1216
1217 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1218 {
1219         smp_mb();
1220         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1221 }
1222
1223 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1224 {
1225         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1226                 atomic_read(&obj_request->kref.refcount));
1227         kref_get(&obj_request->kref);
1228 }
1229
1230 static void rbd_obj_request_destroy(struct kref *kref);
1231 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1232 {
1233         rbd_assert(obj_request != NULL);
1234         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1235                 atomic_read(&obj_request->kref.refcount));
1236         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1237 }
1238
1239 static void rbd_img_request_get(struct rbd_img_request *img_request)
1240 {
1241         dout("%s: img %p (was %d)\n", __func__, img_request,
1242                 atomic_read(&img_request->kref.refcount));
1243         kref_get(&img_request->kref);
1244 }
1245
1246 static void rbd_img_request_destroy(struct kref *kref);
1247 static void rbd_img_request_put(struct rbd_img_request *img_request)
1248 {
1249         rbd_assert(img_request != NULL);
1250         dout("%s: img %p (was %d)\n", __func__, img_request,
1251                 atomic_read(&img_request->kref.refcount));
1252         kref_put(&img_request->kref, rbd_img_request_destroy);
1253 }
1254
1255 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1256                                         struct rbd_obj_request *obj_request)
1257 {
1258         rbd_assert(obj_request->img_request == NULL);
1259
1260         /* Image request now owns object's original reference */
1261         obj_request->img_request = img_request;
1262         obj_request->which = img_request->obj_request_count;
1263         rbd_assert(!obj_request_img_data_test(obj_request));
1264         obj_request_img_data_set(obj_request);
1265         rbd_assert(obj_request->which != BAD_WHICH);
1266         img_request->obj_request_count++;
1267         list_add_tail(&obj_request->links, &img_request->obj_requests);
1268         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1269                 obj_request->which);
1270 }
1271
1272 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1273                                         struct rbd_obj_request *obj_request)
1274 {
1275         rbd_assert(obj_request->which != BAD_WHICH);
1276
1277         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1278                 obj_request->which);
1279         list_del(&obj_request->links);
1280         rbd_assert(img_request->obj_request_count > 0);
1281         img_request->obj_request_count--;
1282         rbd_assert(obj_request->which == img_request->obj_request_count);
1283         obj_request->which = BAD_WHICH;
1284         rbd_assert(obj_request_img_data_test(obj_request));
1285         rbd_assert(obj_request->img_request == img_request);
1286         obj_request->img_request = NULL;
1287         obj_request->callback = NULL;
1288         rbd_obj_request_put(obj_request);
1289 }
1290
1291 static bool obj_request_type_valid(enum obj_request_type type)
1292 {
1293         switch (type) {
1294         case OBJ_REQUEST_NODATA:
1295         case OBJ_REQUEST_BIO:
1296         case OBJ_REQUEST_PAGES:
1297                 return true;
1298         default:
1299                 return false;
1300         }
1301 }
1302
1303 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1304                                 struct rbd_obj_request *obj_request)
1305 {
1306         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1307
1308         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1309 }
1310
1311 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1312 {
1313
1314         dout("%s: img %p\n", __func__, img_request);
1315
1316         /*
1317          * If no error occurred, compute the aggregate transfer
1318          * count for the image request.  We could instead use
1319          * atomic64_cmpxchg() to update it as each object request
1320          * completes; not clear which way is better off hand.
1321          */
1322         if (!img_request->result) {
1323                 struct rbd_obj_request *obj_request;
1324                 u64 xferred = 0;
1325
1326                 for_each_obj_request(img_request, obj_request)
1327                         xferred += obj_request->xferred;
1328                 img_request->xferred = xferred;
1329         }
1330
1331         if (img_request->callback)
1332                 img_request->callback(img_request);
1333         else
1334                 rbd_img_request_put(img_request);
1335 }
1336
1337 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1338
1339 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1340 {
1341         dout("%s: obj %p\n", __func__, obj_request);
1342
1343         return wait_for_completion_interruptible(&obj_request->completion);
1344 }
1345
1346 /*
1347  * The default/initial value for all image request flags is 0.  Each
1348  * is conditionally set to 1 at image request initialization time
1349  * and currently never change thereafter.
1350  */
1351 static void img_request_write_set(struct rbd_img_request *img_request)
1352 {
1353         set_bit(IMG_REQ_WRITE, &img_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool img_request_write_test(struct rbd_img_request *img_request)
1358 {
1359         smp_mb();
1360         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1361 }
1362
1363 static void img_request_child_set(struct rbd_img_request *img_request)
1364 {
1365         set_bit(IMG_REQ_CHILD, &img_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool img_request_child_test(struct rbd_img_request *img_request)
1370 {
1371         smp_mb();
1372         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1373 }
1374
1375 static void img_request_layered_set(struct rbd_img_request *img_request)
1376 {
1377         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1378         smp_mb();
1379 }
1380
1381 static bool img_request_layered_test(struct rbd_img_request *img_request)
1382 {
1383         smp_mb();
1384         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1385 }
1386
1387 static void
1388 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1389 {
1390         u64 xferred = obj_request->xferred;
1391         u64 length = obj_request->length;
1392
1393         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1394                 obj_request, obj_request->img_request, obj_request->result,
1395                 xferred, length);
1396         /*
1397          * ENOENT means a hole in the image.  We zero-fill the
1398          * entire length of the request.  A short read also implies
1399          * zero-fill to the end of the request.  Either way we
1400          * update the xferred count to indicate the whole request
1401          * was satisfied.
1402          */
1403         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1404         if (obj_request->result == -ENOENT) {
1405                 if (obj_request->type == OBJ_REQUEST_BIO)
1406                         zero_bio_chain(obj_request->bio_list, 0);
1407                 else
1408                         zero_pages(obj_request->pages, 0, length);
1409                 obj_request->result = 0;
1410                 obj_request->xferred = length;
1411         } else if (xferred < length && !obj_request->result) {
1412                 if (obj_request->type == OBJ_REQUEST_BIO)
1413                         zero_bio_chain(obj_request->bio_list, xferred);
1414                 else
1415                         zero_pages(obj_request->pages, xferred, length);
1416                 obj_request->xferred = length;
1417         }
1418         obj_request_done_set(obj_request);
1419 }
1420
1421 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1422 {
1423         dout("%s: obj %p cb %p\n", __func__, obj_request,
1424                 obj_request->callback);
1425         if (obj_request->callback)
1426                 obj_request->callback(obj_request);
1427         else
1428                 complete_all(&obj_request->completion);
1429 }
1430
1431 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1432 {
1433         dout("%s: obj %p\n", __func__, obj_request);
1434         obj_request_done_set(obj_request);
1435 }
1436
1437 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1438 {
1439         struct rbd_img_request *img_request = NULL;
1440         struct rbd_device *rbd_dev = NULL;
1441         bool layered = false;
1442
1443         if (obj_request_img_data_test(obj_request)) {
1444                 img_request = obj_request->img_request;
1445                 layered = img_request && img_request_layered_test(img_request);
1446                 rbd_dev = img_request->rbd_dev;
1447         }
1448
1449         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1450                 obj_request, img_request, obj_request->result,
1451                 obj_request->xferred, obj_request->length);
1452         if (layered && obj_request->result == -ENOENT &&
1453                         obj_request->img_offset < rbd_dev->parent_overlap)
1454                 rbd_img_parent_read(obj_request);
1455         else if (img_request)
1456                 rbd_img_obj_request_read_callback(obj_request);
1457         else
1458                 obj_request_done_set(obj_request);
1459 }
1460
1461 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1462 {
1463         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1464                 obj_request->result, obj_request->length);
1465         /*
1466          * There is no such thing as a successful short write.  Set
1467          * it to our originally-requested length.
1468          */
1469         obj_request->xferred = obj_request->length;
1470         obj_request_done_set(obj_request);
1471 }
1472
1473 /*
1474  * For a simple stat call there's nothing to do.  We'll do more if
1475  * this is part of a write sequence for a layered image.
1476  */
1477 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1478 {
1479         dout("%s: obj %p\n", __func__, obj_request);
1480         obj_request_done_set(obj_request);
1481 }
1482
1483 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1484                                 struct ceph_msg *msg)
1485 {
1486         struct rbd_obj_request *obj_request = osd_req->r_priv;
1487         u16 opcode;
1488
1489         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1490         rbd_assert(osd_req == obj_request->osd_req);
1491         if (obj_request_img_data_test(obj_request)) {
1492                 rbd_assert(obj_request->img_request);
1493                 rbd_assert(obj_request->which != BAD_WHICH);
1494         } else {
1495                 rbd_assert(obj_request->which == BAD_WHICH);
1496         }
1497
1498         if (osd_req->r_result < 0)
1499                 obj_request->result = osd_req->r_result;
1500         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1501
1502         BUG_ON(osd_req->r_num_ops > 2);
1503
1504         /*
1505          * We support a 64-bit length, but ultimately it has to be
1506          * passed to blk_end_request(), which takes an unsigned int.
1507          */
1508         obj_request->xferred = osd_req->r_reply_op_len[0];
1509         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1510         opcode = osd_req->r_ops[0].op;
1511         switch (opcode) {
1512         case CEPH_OSD_OP_READ:
1513                 rbd_osd_read_callback(obj_request);
1514                 break;
1515         case CEPH_OSD_OP_WRITE:
1516                 rbd_osd_write_callback(obj_request);
1517                 break;
1518         case CEPH_OSD_OP_STAT:
1519                 rbd_osd_stat_callback(obj_request);
1520                 break;
1521         case CEPH_OSD_OP_CALL:
1522         case CEPH_OSD_OP_NOTIFY_ACK:
1523         case CEPH_OSD_OP_WATCH:
1524                 rbd_osd_trivial_callback(obj_request);
1525                 break;
1526         default:
1527                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1528                         obj_request->object_name, (unsigned short) opcode);
1529                 break;
1530         }
1531
1532         if (obj_request_done_test(obj_request))
1533                 rbd_obj_request_complete(obj_request);
1534 }
1535
1536 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1537 {
1538         struct rbd_img_request *img_request = obj_request->img_request;
1539         struct ceph_osd_request *osd_req = obj_request->osd_req;
1540         u64 snap_id;
1541
1542         rbd_assert(osd_req != NULL);
1543
1544         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1545         ceph_osdc_build_request(osd_req, obj_request->offset,
1546                         NULL, snap_id, NULL);
1547 }
1548
1549 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1550 {
1551         struct rbd_img_request *img_request = obj_request->img_request;
1552         struct ceph_osd_request *osd_req = obj_request->osd_req;
1553         struct ceph_snap_context *snapc;
1554         struct timespec mtime = CURRENT_TIME;
1555
1556         rbd_assert(osd_req != NULL);
1557
1558         snapc = img_request ? img_request->snapc : NULL;
1559         ceph_osdc_build_request(osd_req, obj_request->offset,
1560                         snapc, CEPH_NOSNAP, &mtime);
1561 }
1562
1563 static struct ceph_osd_request *rbd_osd_req_create(
1564                                         struct rbd_device *rbd_dev,
1565                                         bool write_request,
1566                                         struct rbd_obj_request *obj_request)
1567 {
1568         struct ceph_snap_context *snapc = NULL;
1569         struct ceph_osd_client *osdc;
1570         struct ceph_osd_request *osd_req;
1571
1572         if (obj_request_img_data_test(obj_request)) {
1573                 struct rbd_img_request *img_request = obj_request->img_request;
1574
1575                 rbd_assert(write_request ==
1576                                 img_request_write_test(img_request));
1577                 if (write_request)
1578                         snapc = img_request->snapc;
1579         }
1580
1581         /* Allocate and initialize the request, for the single op */
1582
1583         osdc = &rbd_dev->rbd_client->client->osdc;
1584         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1585         if (!osd_req)
1586                 return NULL;    /* ENOMEM */
1587
1588         if (write_request)
1589                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1590         else
1591                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1592
1593         osd_req->r_callback = rbd_osd_req_callback;
1594         osd_req->r_priv = obj_request;
1595
1596         osd_req->r_oid_len = strlen(obj_request->object_name);
1597         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1598         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1599
1600         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1601
1602         return osd_req;
1603 }
1604
1605 /*
1606  * Create a copyup osd request based on the information in the
1607  * object request supplied.  A copyup request has two osd ops,
1608  * a copyup method call, and a "normal" write request.
1609  */
1610 static struct ceph_osd_request *
1611 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1612 {
1613         struct rbd_img_request *img_request;
1614         struct ceph_snap_context *snapc;
1615         struct rbd_device *rbd_dev;
1616         struct ceph_osd_client *osdc;
1617         struct ceph_osd_request *osd_req;
1618
1619         rbd_assert(obj_request_img_data_test(obj_request));
1620         img_request = obj_request->img_request;
1621         rbd_assert(img_request);
1622         rbd_assert(img_request_write_test(img_request));
1623
1624         /* Allocate and initialize the request, for the two ops */
1625
1626         snapc = img_request->snapc;
1627         rbd_dev = img_request->rbd_dev;
1628         osdc = &rbd_dev->rbd_client->client->osdc;
1629         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1630         if (!osd_req)
1631                 return NULL;    /* ENOMEM */
1632
1633         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1634         osd_req->r_callback = rbd_osd_req_callback;
1635         osd_req->r_priv = obj_request;
1636
1637         osd_req->r_oid_len = strlen(obj_request->object_name);
1638         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1639         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1640
1641         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1642
1643         return osd_req;
1644 }
1645
1646
1647 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1648 {
1649         ceph_osdc_put_request(osd_req);
1650 }
1651
1652 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1653
1654 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1655                                                 u64 offset, u64 length,
1656                                                 enum obj_request_type type)
1657 {
1658         struct rbd_obj_request *obj_request;
1659         size_t size;
1660         char *name;
1661
1662         rbd_assert(obj_request_type_valid(type));
1663
1664         size = strlen(object_name) + 1;
1665         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1666         if (!obj_request)
1667                 return NULL;
1668
1669         name = (char *)(obj_request + 1);
1670         obj_request->object_name = memcpy(name, object_name, size);
1671         obj_request->offset = offset;
1672         obj_request->length = length;
1673         obj_request->flags = 0;
1674         obj_request->which = BAD_WHICH;
1675         obj_request->type = type;
1676         INIT_LIST_HEAD(&obj_request->links);
1677         init_completion(&obj_request->completion);
1678         kref_init(&obj_request->kref);
1679
1680         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1681                 offset, length, (int)type, obj_request);
1682
1683         return obj_request;
1684 }
1685
1686 static void rbd_obj_request_destroy(struct kref *kref)
1687 {
1688         struct rbd_obj_request *obj_request;
1689
1690         obj_request = container_of(kref, struct rbd_obj_request, kref);
1691
1692         dout("%s: obj %p\n", __func__, obj_request);
1693
1694         rbd_assert(obj_request->img_request == NULL);
1695         rbd_assert(obj_request->which == BAD_WHICH);
1696
1697         if (obj_request->osd_req)
1698                 rbd_osd_req_destroy(obj_request->osd_req);
1699
1700         rbd_assert(obj_request_type_valid(obj_request->type));
1701         switch (obj_request->type) {
1702         case OBJ_REQUEST_NODATA:
1703                 break;          /* Nothing to do */
1704         case OBJ_REQUEST_BIO:
1705                 if (obj_request->bio_list)
1706                         bio_chain_put(obj_request->bio_list);
1707                 break;
1708         case OBJ_REQUEST_PAGES:
1709                 if (obj_request->pages)
1710                         ceph_release_page_vector(obj_request->pages,
1711                                                 obj_request->page_count);
1712                 break;
1713         }
1714
1715         kfree(obj_request);
1716 }
1717
1718 /*
1719  * Caller is responsible for filling in the list of object requests
1720  * that comprises the image request, and the Linux request pointer
1721  * (if there is one).
1722  */
1723 static struct rbd_img_request *rbd_img_request_create(
1724                                         struct rbd_device *rbd_dev,
1725                                         u64 offset, u64 length,
1726                                         bool write_request,
1727                                         bool child_request)
1728 {
1729         struct rbd_img_request *img_request;
1730         struct ceph_snap_context *snapc = NULL;
1731
1732         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1733         if (!img_request)
1734                 return NULL;
1735
1736         if (write_request) {
1737                 down_read(&rbd_dev->header_rwsem);
1738                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1739                 up_read(&rbd_dev->header_rwsem);
1740                 if (WARN_ON(!snapc)) {
1741                         kfree(img_request);
1742                         return NULL;    /* Shouldn't happen */
1743                 }
1744
1745         }
1746
1747         img_request->rq = NULL;
1748         img_request->rbd_dev = rbd_dev;
1749         img_request->offset = offset;
1750         img_request->length = length;
1751         img_request->flags = 0;
1752         if (write_request) {
1753                 img_request_write_set(img_request);
1754                 img_request->snapc = snapc;
1755         } else {
1756                 img_request->snap_id = rbd_dev->spec->snap_id;
1757         }
1758         if (child_request)
1759                 img_request_child_set(img_request);
1760         if (rbd_dev->parent_spec)
1761                 img_request_layered_set(img_request);
1762         spin_lock_init(&img_request->completion_lock);
1763         img_request->next_completion = 0;
1764         img_request->callback = NULL;
1765         img_request->result = 0;
1766         img_request->obj_request_count = 0;
1767         INIT_LIST_HEAD(&img_request->obj_requests);
1768         kref_init(&img_request->kref);
1769
1770         rbd_img_request_get(img_request);       /* Avoid a warning */
1771         rbd_img_request_put(img_request);       /* TEMPORARY */
1772
1773         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1774                 write_request ? "write" : "read", offset, length,
1775                 img_request);
1776
1777         return img_request;
1778 }
1779
1780 static void rbd_img_request_destroy(struct kref *kref)
1781 {
1782         struct rbd_img_request *img_request;
1783         struct rbd_obj_request *obj_request;
1784         struct rbd_obj_request *next_obj_request;
1785
1786         img_request = container_of(kref, struct rbd_img_request, kref);
1787
1788         dout("%s: img %p\n", __func__, img_request);
1789
1790         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1791                 rbd_img_obj_request_del(img_request, obj_request);
1792         rbd_assert(img_request->obj_request_count == 0);
1793
1794         if (img_request_write_test(img_request))
1795                 ceph_put_snap_context(img_request->snapc);
1796
1797         if (img_request_child_test(img_request))
1798                 rbd_obj_request_put(img_request->obj_request);
1799
1800         kfree(img_request);
1801 }
1802
1803 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1804 {
1805         struct rbd_img_request *img_request;
1806         unsigned int xferred;
1807         int result;
1808         bool more;
1809
1810         rbd_assert(obj_request_img_data_test(obj_request));
1811         img_request = obj_request->img_request;
1812
1813         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1814         xferred = (unsigned int)obj_request->xferred;
1815         result = obj_request->result;
1816         if (result) {
1817                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1818
1819                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1820                         img_request_write_test(img_request) ? "write" : "read",
1821                         obj_request->length, obj_request->img_offset,
1822                         obj_request->offset);
1823                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1824                         result, xferred);
1825                 if (!img_request->result)
1826                         img_request->result = result;
1827         }
1828
1829         /* Image object requests don't own their page array */
1830
1831         if (obj_request->type == OBJ_REQUEST_PAGES) {
1832                 obj_request->pages = NULL;
1833                 obj_request->page_count = 0;
1834         }
1835
1836         if (img_request_child_test(img_request)) {
1837                 rbd_assert(img_request->obj_request != NULL);
1838                 more = obj_request->which < img_request->obj_request_count - 1;
1839         } else {
1840                 rbd_assert(img_request->rq != NULL);
1841                 more = blk_end_request(img_request->rq, result, xferred);
1842         }
1843
1844         return more;
1845 }
1846
1847 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1848 {
1849         struct rbd_img_request *img_request;
1850         u32 which = obj_request->which;
1851         bool more = true;
1852
1853         rbd_assert(obj_request_img_data_test(obj_request));
1854         img_request = obj_request->img_request;
1855
1856         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1857         rbd_assert(img_request != NULL);
1858         rbd_assert(img_request->obj_request_count > 0);
1859         rbd_assert(which != BAD_WHICH);
1860         rbd_assert(which < img_request->obj_request_count);
1861         rbd_assert(which >= img_request->next_completion);
1862
1863         spin_lock_irq(&img_request->completion_lock);
1864         if (which != img_request->next_completion)
1865                 goto out;
1866
1867         for_each_obj_request_from(img_request, obj_request) {
1868                 rbd_assert(more);
1869                 rbd_assert(which < img_request->obj_request_count);
1870
1871                 if (!obj_request_done_test(obj_request))
1872                         break;
1873                 more = rbd_img_obj_end_request(obj_request);
1874                 which++;
1875         }
1876
1877         rbd_assert(more ^ (which == img_request->obj_request_count));
1878         img_request->next_completion = which;
1879 out:
1880         spin_unlock_irq(&img_request->completion_lock);
1881
1882         if (!more)
1883                 rbd_img_request_complete(img_request);
1884 }
1885
1886 /*
1887  * Split up an image request into one or more object requests, each
1888  * to a different object.  The "type" parameter indicates whether
1889  * "data_desc" is the pointer to the head of a list of bio
1890  * structures, or the base of a page array.  In either case this
1891  * function assumes data_desc describes memory sufficient to hold
1892  * all data described by the image request.
1893  */
1894 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1895                                         enum obj_request_type type,
1896                                         void *data_desc)
1897 {
1898         struct rbd_device *rbd_dev = img_request->rbd_dev;
1899         struct rbd_obj_request *obj_request = NULL;
1900         struct rbd_obj_request *next_obj_request;
1901         bool write_request = img_request_write_test(img_request);
1902         struct bio *bio_list;
1903         unsigned int bio_offset = 0;
1904         struct page **pages;
1905         u64 img_offset;
1906         u64 resid;
1907         u16 opcode;
1908
1909         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1910                 (int)type, data_desc);
1911
1912         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1913         img_offset = img_request->offset;
1914         resid = img_request->length;
1915         rbd_assert(resid > 0);
1916
1917         if (type == OBJ_REQUEST_BIO) {
1918                 bio_list = data_desc;
1919                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1920         } else {
1921                 rbd_assert(type == OBJ_REQUEST_PAGES);
1922                 pages = data_desc;
1923         }
1924
1925         while (resid) {
1926                 struct ceph_osd_request *osd_req;
1927                 const char *object_name;
1928                 u64 offset;
1929                 u64 length;
1930
1931                 object_name = rbd_segment_name(rbd_dev, img_offset);
1932                 if (!object_name)
1933                         goto out_unwind;
1934                 offset = rbd_segment_offset(rbd_dev, img_offset);
1935                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1936                 obj_request = rbd_obj_request_create(object_name,
1937                                                 offset, length, type);
1938                 kfree(object_name);     /* object request has its own copy */
1939                 if (!obj_request)
1940                         goto out_unwind;
1941
1942                 if (type == OBJ_REQUEST_BIO) {
1943                         unsigned int clone_size;
1944
1945                         rbd_assert(length <= (u64)UINT_MAX);
1946                         clone_size = (unsigned int)length;
1947                         obj_request->bio_list =
1948                                         bio_chain_clone_range(&bio_list,
1949                                                                 &bio_offset,
1950                                                                 clone_size,
1951                                                                 GFP_ATOMIC);
1952                         if (!obj_request->bio_list)
1953                                 goto out_partial;
1954                 } else {
1955                         unsigned int page_count;
1956
1957                         obj_request->pages = pages;
1958                         page_count = (u32)calc_pages_for(offset, length);
1959                         obj_request->page_count = page_count;
1960                         if ((offset + length) & ~PAGE_MASK)
1961                                 page_count--;   /* more on last page */
1962                         pages += page_count;
1963                 }
1964
1965                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1966                                                 obj_request);
1967                 if (!osd_req)
1968                         goto out_partial;
1969                 obj_request->osd_req = osd_req;
1970                 obj_request->callback = rbd_img_obj_callback;
1971
1972                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1973                                                 0, 0);
1974                 if (type == OBJ_REQUEST_BIO)
1975                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1976                                         obj_request->bio_list, length);
1977                 else
1978                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1979                                         obj_request->pages, length,
1980                                         offset & ~PAGE_MASK, false, false);
1981
1982                 if (write_request)
1983                         rbd_osd_req_format_write(obj_request);
1984                 else
1985                         rbd_osd_req_format_read(obj_request);
1986
1987                 obj_request->img_offset = img_offset;
1988                 rbd_img_obj_request_add(img_request, obj_request);
1989
1990                 img_offset += length;
1991                 resid -= length;
1992         }
1993
1994         return 0;
1995
1996 out_partial:
1997         rbd_obj_request_put(obj_request);
1998 out_unwind:
1999         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2000                 rbd_obj_request_put(obj_request);
2001
2002         return -ENOMEM;
2003 }
2004
2005 static void
2006 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2007 {
2008         struct rbd_img_request *img_request;
2009         struct rbd_device *rbd_dev;
2010         u64 length;
2011         u32 page_count;
2012
2013         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2014         rbd_assert(obj_request_img_data_test(obj_request));
2015         img_request = obj_request->img_request;
2016         rbd_assert(img_request);
2017
2018         rbd_dev = img_request->rbd_dev;
2019         rbd_assert(rbd_dev);
2020         length = (u64)1 << rbd_dev->header.obj_order;
2021         page_count = (u32)calc_pages_for(0, length);
2022
2023         rbd_assert(obj_request->copyup_pages);
2024         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2025         obj_request->copyup_pages = NULL;
2026
2027         /*
2028          * We want the transfer count to reflect the size of the
2029          * original write request.  There is no such thing as a
2030          * successful short write, so if the request was successful
2031          * we can just set it to the originally-requested length.
2032          */
2033         if (!obj_request->result)
2034                 obj_request->xferred = obj_request->length;
2035
2036         /* Finish up with the normal image object callback */
2037
2038         rbd_img_obj_callback(obj_request);
2039 }
2040
2041 static void
2042 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2043 {
2044         struct rbd_obj_request *orig_request;
2045         struct ceph_osd_request *osd_req;
2046         struct ceph_osd_client *osdc;
2047         struct rbd_device *rbd_dev;
2048         struct page **pages;
2049         int result;
2050         u64 obj_size;
2051         u64 xferred;
2052
2053         rbd_assert(img_request_child_test(img_request));
2054
2055         /* First get what we need from the image request */
2056
2057         pages = img_request->copyup_pages;
2058         rbd_assert(pages != NULL);
2059         img_request->copyup_pages = NULL;
2060
2061         orig_request = img_request->obj_request;
2062         rbd_assert(orig_request != NULL);
2063         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2064         result = img_request->result;
2065         obj_size = img_request->length;
2066         xferred = img_request->xferred;
2067
2068         rbd_dev = img_request->rbd_dev;
2069         rbd_assert(rbd_dev);
2070         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2071
2072         rbd_img_request_put(img_request);
2073
2074         if (result)
2075                 goto out_err;
2076
2077         /* Allocate the new copyup osd request for the original request */
2078
2079         result = -ENOMEM;
2080         rbd_assert(!orig_request->osd_req);
2081         osd_req = rbd_osd_req_create_copyup(orig_request);
2082         if (!osd_req)
2083                 goto out_err;
2084         orig_request->osd_req = osd_req;
2085         orig_request->copyup_pages = pages;
2086
2087         /* Initialize the copyup op */
2088
2089         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2090         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2091                                                 false, false);
2092
2093         /* Then the original write request op */
2094
2095         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2096                                         orig_request->offset,
2097                                         orig_request->length, 0, 0);
2098         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2099                                         orig_request->length);
2100
2101         rbd_osd_req_format_write(orig_request);
2102
2103         /* All set, send it off. */
2104
2105         orig_request->callback = rbd_img_obj_copyup_callback;
2106         osdc = &rbd_dev->rbd_client->client->osdc;
2107         result = rbd_obj_request_submit(osdc, orig_request);
2108         if (!result)
2109                 return;
2110 out_err:
2111         /* Record the error code and complete the request */
2112
2113         orig_request->result = result;
2114         orig_request->xferred = 0;
2115         obj_request_done_set(orig_request);
2116         rbd_obj_request_complete(orig_request);
2117 }
2118
2119 /*
2120  * Read from the parent image the range of data that covers the
2121  * entire target of the given object request.  This is used for
2122  * satisfying a layered image write request when the target of an
2123  * object request from the image request does not exist.
2124  *
2125  * A page array big enough to hold the returned data is allocated
2126  * and supplied to rbd_img_request_fill() as the "data descriptor."
2127  * When the read completes, this page array will be transferred to
2128  * the original object request for the copyup operation.
2129  *
2130  * If an error occurs, record it as the result of the original
2131  * object request and mark it done so it gets completed.
2132  */
2133 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2134 {
2135         struct rbd_img_request *img_request = NULL;
2136         struct rbd_img_request *parent_request = NULL;
2137         struct rbd_device *rbd_dev;
2138         u64 img_offset;
2139         u64 length;
2140         struct page **pages = NULL;
2141         u32 page_count;
2142         int result;
2143
2144         rbd_assert(obj_request_img_data_test(obj_request));
2145         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2146
2147         img_request = obj_request->img_request;
2148         rbd_assert(img_request != NULL);
2149         rbd_dev = img_request->rbd_dev;
2150         rbd_assert(rbd_dev->parent != NULL);
2151
2152         /*
2153          * First things first.  The original osd request is of no
2154          * use to use any more, we'll need a new one that can hold
2155          * the two ops in a copyup request.  We'll get that later,
2156          * but for now we can release the old one.
2157          */
2158         rbd_osd_req_destroy(obj_request->osd_req);
2159         obj_request->osd_req = NULL;
2160
2161         /*
2162          * Determine the byte range covered by the object in the
2163          * child image to which the original request was to be sent.
2164          */
2165         img_offset = obj_request->img_offset - obj_request->offset;
2166         length = (u64)1 << rbd_dev->header.obj_order;
2167
2168         /*
2169          * There is no defined parent data beyond the parent
2170          * overlap, so limit what we read at that boundary if
2171          * necessary.
2172          */
2173         if (img_offset + length > rbd_dev->parent_overlap) {
2174                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2175                 length = rbd_dev->parent_overlap - img_offset;
2176         }
2177
2178         /*
2179          * Allocate a page array big enough to receive the data read
2180          * from the parent.
2181          */
2182         page_count = (u32)calc_pages_for(0, length);
2183         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2184         if (IS_ERR(pages)) {
2185                 result = PTR_ERR(pages);
2186                 pages = NULL;
2187                 goto out_err;
2188         }
2189
2190         result = -ENOMEM;
2191         parent_request = rbd_img_request_create(rbd_dev->parent,
2192                                                 img_offset, length,
2193                                                 false, true);
2194         if (!parent_request)
2195                 goto out_err;
2196         rbd_obj_request_get(obj_request);
2197         parent_request->obj_request = obj_request;
2198
2199         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2200         if (result)
2201                 goto out_err;
2202         parent_request->copyup_pages = pages;
2203
2204         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2205         result = rbd_img_request_submit(parent_request);
2206         if (!result)
2207                 return 0;
2208
2209         parent_request->copyup_pages = NULL;
2210         parent_request->obj_request = NULL;
2211         rbd_obj_request_put(obj_request);
2212 out_err:
2213         if (pages)
2214                 ceph_release_page_vector(pages, page_count);
2215         if (parent_request)
2216                 rbd_img_request_put(parent_request);
2217         obj_request->result = result;
2218         obj_request->xferred = 0;
2219         obj_request_done_set(obj_request);
2220
2221         return result;
2222 }
2223
2224 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2225 {
2226         struct rbd_obj_request *orig_request;
2227         int result;
2228
2229         rbd_assert(!obj_request_img_data_test(obj_request));
2230
2231         /*
2232          * All we need from the object request is the original
2233          * request and the result of the STAT op.  Grab those, then
2234          * we're done with the request.
2235          */
2236         orig_request = obj_request->obj_request;
2237         obj_request->obj_request = NULL;
2238         rbd_assert(orig_request);
2239         rbd_assert(orig_request->img_request);
2240
2241         result = obj_request->result;
2242         obj_request->result = 0;
2243
2244         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2245                 obj_request, orig_request, result,
2246                 obj_request->xferred, obj_request->length);
2247         rbd_obj_request_put(obj_request);
2248
2249         rbd_assert(orig_request);
2250         rbd_assert(orig_request->img_request);
2251
2252         /*
2253          * Our only purpose here is to determine whether the object
2254          * exists, and we don't want to treat the non-existence as
2255          * an error.  If something else comes back, transfer the
2256          * error to the original request and complete it now.
2257          */
2258         if (!result) {
2259                 obj_request_existence_set(orig_request, true);
2260         } else if (result == -ENOENT) {
2261                 obj_request_existence_set(orig_request, false);
2262         } else if (result) {
2263                 orig_request->result = result;
2264                 goto out;
2265         }
2266
2267         /*
2268          * Resubmit the original request now that we have recorded
2269          * whether the target object exists.
2270          */
2271         orig_request->result = rbd_img_obj_request_submit(orig_request);
2272 out:
2273         if (orig_request->result)
2274                 rbd_obj_request_complete(orig_request);
2275         rbd_obj_request_put(orig_request);
2276 }
2277
2278 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2279 {
2280         struct rbd_obj_request *stat_request;
2281         struct rbd_device *rbd_dev;
2282         struct ceph_osd_client *osdc;
2283         struct page **pages = NULL;
2284         u32 page_count;
2285         size_t size;
2286         int ret;
2287
2288         /*
2289          * The response data for a STAT call consists of:
2290          *     le64 length;
2291          *     struct {
2292          *         le32 tv_sec;
2293          *         le32 tv_nsec;
2294          *     } mtime;
2295          */
2296         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2297         page_count = (u32)calc_pages_for(0, size);
2298         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2299         if (IS_ERR(pages))
2300                 return PTR_ERR(pages);
2301
2302         ret = -ENOMEM;
2303         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2304                                                         OBJ_REQUEST_PAGES);
2305         if (!stat_request)
2306                 goto out;
2307
2308         rbd_obj_request_get(obj_request);
2309         stat_request->obj_request = obj_request;
2310         stat_request->pages = pages;
2311         stat_request->page_count = page_count;
2312
2313         rbd_assert(obj_request->img_request);
2314         rbd_dev = obj_request->img_request->rbd_dev;
2315         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2316                                                 stat_request);
2317         if (!stat_request->osd_req)
2318                 goto out;
2319         stat_request->callback = rbd_img_obj_exists_callback;
2320
2321         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2322         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2323                                         false, false);
2324         rbd_osd_req_format_read(stat_request);
2325
2326         osdc = &rbd_dev->rbd_client->client->osdc;
2327         ret = rbd_obj_request_submit(osdc, stat_request);
2328 out:
2329         if (ret)
2330                 rbd_obj_request_put(obj_request);
2331
2332         return ret;
2333 }
2334
2335 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2336 {
2337         struct rbd_img_request *img_request;
2338         struct rbd_device *rbd_dev;
2339         bool known;
2340
2341         rbd_assert(obj_request_img_data_test(obj_request));
2342
2343         img_request = obj_request->img_request;
2344         rbd_assert(img_request);
2345         rbd_dev = img_request->rbd_dev;
2346
2347         /*
2348          * Only writes to layered images need special handling.
2349          * Reads and non-layered writes are simple object requests.
2350          * Layered writes that start beyond the end of the overlap
2351          * with the parent have no parent data, so they too are
2352          * simple object requests.  Finally, if the target object is
2353          * known to already exist, its parent data has already been
2354          * copied, so a write to the object can also be handled as a
2355          * simple object request.
2356          */
2357         if (!img_request_write_test(img_request) ||
2358                 !img_request_layered_test(img_request) ||
2359                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2360                 ((known = obj_request_known_test(obj_request)) &&
2361                         obj_request_exists_test(obj_request))) {
2362
2363                 struct rbd_device *rbd_dev;
2364                 struct ceph_osd_client *osdc;
2365
2366                 rbd_dev = obj_request->img_request->rbd_dev;
2367                 osdc = &rbd_dev->rbd_client->client->osdc;
2368
2369                 return rbd_obj_request_submit(osdc, obj_request);
2370         }
2371
2372         /*
2373          * It's a layered write.  The target object might exist but
2374          * we may not know that yet.  If we know it doesn't exist,
2375          * start by reading the data for the full target object from
2376          * the parent so we can use it for a copyup to the target.
2377          */
2378         if (known)
2379                 return rbd_img_obj_parent_read_full(obj_request);
2380
2381         /* We don't know whether the target exists.  Go find out. */
2382
2383         return rbd_img_obj_exists_submit(obj_request);
2384 }
2385
2386 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2387 {
2388         struct rbd_obj_request *obj_request;
2389         struct rbd_obj_request *next_obj_request;
2390
2391         dout("%s: img %p\n", __func__, img_request);
2392         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2393                 int ret;
2394
2395                 ret = rbd_img_obj_request_submit(obj_request);
2396                 if (ret)
2397                         return ret;
2398         }
2399
2400         return 0;
2401 }
2402
2403 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2404 {
2405         struct rbd_obj_request *obj_request;
2406         struct rbd_device *rbd_dev;
2407         u64 obj_end;
2408
2409         rbd_assert(img_request_child_test(img_request));
2410
2411         obj_request = img_request->obj_request;
2412         rbd_assert(obj_request);
2413         rbd_assert(obj_request->img_request);
2414
2415         obj_request->result = img_request->result;
2416         if (obj_request->result)
2417                 goto out;
2418
2419         /*
2420          * We need to zero anything beyond the parent overlap
2421          * boundary.  Since rbd_img_obj_request_read_callback()
2422          * will zero anything beyond the end of a short read, an
2423          * easy way to do this is to pretend the data from the
2424          * parent came up short--ending at the overlap boundary.
2425          */
2426         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2427         obj_end = obj_request->img_offset + obj_request->length;
2428         rbd_dev = obj_request->img_request->rbd_dev;
2429         if (obj_end > rbd_dev->parent_overlap) {
2430                 u64 xferred = 0;
2431
2432                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2433                         xferred = rbd_dev->parent_overlap -
2434                                         obj_request->img_offset;
2435
2436                 obj_request->xferred = min(img_request->xferred, xferred);
2437         } else {
2438                 obj_request->xferred = img_request->xferred;
2439         }
2440 out:
2441         rbd_img_obj_request_read_callback(obj_request);
2442         rbd_obj_request_complete(obj_request);
2443 }
2444
2445 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2446 {
2447         struct rbd_device *rbd_dev;
2448         struct rbd_img_request *img_request;
2449         int result;
2450
2451         rbd_assert(obj_request_img_data_test(obj_request));
2452         rbd_assert(obj_request->img_request != NULL);
2453         rbd_assert(obj_request->result == (s32) -ENOENT);
2454         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2455
2456         rbd_dev = obj_request->img_request->rbd_dev;
2457         rbd_assert(rbd_dev->parent != NULL);
2458         /* rbd_read_finish(obj_request, obj_request->length); */
2459         img_request = rbd_img_request_create(rbd_dev->parent,
2460                                                 obj_request->img_offset,
2461                                                 obj_request->length,
2462                                                 false, true);
2463         result = -ENOMEM;
2464         if (!img_request)
2465                 goto out_err;
2466
2467         rbd_obj_request_get(obj_request);
2468         img_request->obj_request = obj_request;
2469
2470         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2471                                         obj_request->bio_list);
2472         if (result)
2473                 goto out_err;
2474
2475         img_request->callback = rbd_img_parent_read_callback;
2476         result = rbd_img_request_submit(img_request);
2477         if (result)
2478                 goto out_err;
2479
2480         return;
2481 out_err:
2482         if (img_request)
2483                 rbd_img_request_put(img_request);
2484         obj_request->result = result;
2485         obj_request->xferred = 0;
2486         obj_request_done_set(obj_request);
2487 }
2488
2489 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2490                                    u64 ver, u64 notify_id)
2491 {
2492         struct rbd_obj_request *obj_request;
2493         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2494         int ret;
2495
2496         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2497                                                         OBJ_REQUEST_NODATA);
2498         if (!obj_request)
2499                 return -ENOMEM;
2500
2501         ret = -ENOMEM;
2502         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2503         if (!obj_request->osd_req)
2504                 goto out;
2505         obj_request->callback = rbd_obj_request_put;
2506
2507         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2508                                         notify_id, ver, 0);
2509         rbd_osd_req_format_read(obj_request);
2510
2511         ret = rbd_obj_request_submit(osdc, obj_request);
2512 out:
2513         if (ret)
2514                 rbd_obj_request_put(obj_request);
2515
2516         return ret;
2517 }
2518
2519 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2520 {
2521         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2522         u64 hver;
2523         int rc;
2524
2525         if (!rbd_dev)
2526                 return;
2527
2528         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2529                 rbd_dev->header_name, (unsigned long long) notify_id,
2530                 (unsigned int) opcode);
2531         rc = rbd_dev_refresh(rbd_dev, &hver);
2532         if (rc)
2533                 rbd_warn(rbd_dev, "got notification but failed to "
2534                            " update snaps: %d\n", rc);
2535
2536         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2537 }
2538
2539 /*
2540  * Request sync osd watch/unwatch.  The value of "start" determines
2541  * whether a watch request is being initiated or torn down.
2542  */
2543 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2544 {
2545         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2546         struct rbd_obj_request *obj_request;
2547         int ret;
2548
2549         rbd_assert(start ^ !!rbd_dev->watch_event);
2550         rbd_assert(start ^ !!rbd_dev->watch_request);
2551
2552         if (start) {
2553                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2554                                                 &rbd_dev->watch_event);
2555                 if (ret < 0)
2556                         return ret;
2557                 rbd_assert(rbd_dev->watch_event != NULL);
2558         }
2559
2560         ret = -ENOMEM;
2561         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2562                                                         OBJ_REQUEST_NODATA);
2563         if (!obj_request)
2564                 goto out_cancel;
2565
2566         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2567         if (!obj_request->osd_req)
2568                 goto out_cancel;
2569
2570         if (start)
2571                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2572         else
2573                 ceph_osdc_unregister_linger_request(osdc,
2574                                         rbd_dev->watch_request->osd_req);
2575
2576         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2577                                 rbd_dev->watch_event->cookie,
2578                                 rbd_dev->header.obj_version, start);
2579         rbd_osd_req_format_write(obj_request);
2580
2581         ret = rbd_obj_request_submit(osdc, obj_request);
2582         if (ret)
2583                 goto out_cancel;
2584         ret = rbd_obj_request_wait(obj_request);
2585         if (ret)
2586                 goto out_cancel;
2587         ret = obj_request->result;
2588         if (ret)
2589                 goto out_cancel;
2590
2591         /*
2592          * A watch request is set to linger, so the underlying osd
2593          * request won't go away until we unregister it.  We retain
2594          * a pointer to the object request during that time (in
2595          * rbd_dev->watch_request), so we'll keep a reference to
2596          * it.  We'll drop that reference (below) after we've
2597          * unregistered it.
2598          */
2599         if (start) {
2600                 rbd_dev->watch_request = obj_request;
2601
2602                 return 0;
2603         }
2604
2605         /* We have successfully torn down the watch request */
2606
2607         rbd_obj_request_put(rbd_dev->watch_request);
2608         rbd_dev->watch_request = NULL;
2609 out_cancel:
2610         /* Cancel the event if we're tearing down, or on error */
2611         ceph_osdc_cancel_event(rbd_dev->watch_event);
2612         rbd_dev->watch_event = NULL;
2613         if (obj_request)
2614                 rbd_obj_request_put(obj_request);
2615
2616         return ret;
2617 }
2618
2619 /*
2620  * Synchronous osd object method call
2621  */
2622 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2623                              const char *object_name,
2624                              const char *class_name,
2625                              const char *method_name,
2626                              const char *outbound,
2627                              size_t outbound_size,
2628                              char *inbound,
2629                              size_t inbound_size,
2630                              u64 *version)
2631 {
2632         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2633         struct rbd_obj_request *obj_request;
2634         struct page **pages;
2635         u32 page_count;
2636         int ret;
2637
2638         /*
2639          * Method calls are ultimately read operations.  The result
2640          * should placed into the inbound buffer provided.  They
2641          * also supply outbound data--parameters for the object
2642          * method.  Currently if this is present it will be a
2643          * snapshot id.
2644          */
2645         page_count = (u32) calc_pages_for(0, inbound_size);
2646         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2647         if (IS_ERR(pages))
2648                 return PTR_ERR(pages);
2649
2650         ret = -ENOMEM;
2651         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2652                                                         OBJ_REQUEST_PAGES);
2653         if (!obj_request)
2654                 goto out;
2655
2656         obj_request->pages = pages;
2657         obj_request->page_count = page_count;
2658
2659         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2660         if (!obj_request->osd_req)
2661                 goto out;
2662
2663         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2664                                         class_name, method_name);
2665         if (outbound_size) {
2666                 struct ceph_pagelist *pagelist;
2667
2668                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2669                 if (!pagelist)
2670                         goto out;
2671
2672                 ceph_pagelist_init(pagelist);
2673                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2674                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2675                                                 pagelist);
2676         }
2677         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2678                                         obj_request->pages, inbound_size,
2679                                         0, false, false);
2680         rbd_osd_req_format_read(obj_request);
2681
2682         ret = rbd_obj_request_submit(osdc, obj_request);
2683         if (ret)
2684                 goto out;
2685         ret = rbd_obj_request_wait(obj_request);
2686         if (ret)
2687                 goto out;
2688
2689         ret = obj_request->result;
2690         if (ret < 0)
2691                 goto out;
2692         ret = 0;
2693         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2694         if (version)
2695                 *version = obj_request->version;
2696 out:
2697         if (obj_request)
2698                 rbd_obj_request_put(obj_request);
2699         else
2700                 ceph_release_page_vector(pages, page_count);
2701
2702         return ret;
2703 }
2704
2705 static void rbd_request_fn(struct request_queue *q)
2706                 __releases(q->queue_lock) __acquires(q->queue_lock)
2707 {
2708         struct rbd_device *rbd_dev = q->queuedata;
2709         bool read_only = rbd_dev->mapping.read_only;
2710         struct request *rq;
2711         int result;
2712
2713         while ((rq = blk_fetch_request(q))) {
2714                 bool write_request = rq_data_dir(rq) == WRITE;
2715                 struct rbd_img_request *img_request;
2716                 u64 offset;
2717                 u64 length;
2718
2719                 /* Ignore any non-FS requests that filter through. */
2720
2721                 if (rq->cmd_type != REQ_TYPE_FS) {
2722                         dout("%s: non-fs request type %d\n", __func__,
2723                                 (int) rq->cmd_type);
2724                         __blk_end_request_all(rq, 0);
2725                         continue;
2726                 }
2727
2728                 /* Ignore/skip any zero-length requests */
2729
2730                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2731                 length = (u64) blk_rq_bytes(rq);
2732
2733                 if (!length) {
2734                         dout("%s: zero-length request\n", __func__);
2735                         __blk_end_request_all(rq, 0);
2736                         continue;
2737                 }
2738
2739                 spin_unlock_irq(q->queue_lock);
2740
2741                 /* Disallow writes to a read-only device */
2742
2743                 if (write_request) {
2744                         result = -EROFS;
2745                         if (read_only)
2746                                 goto end_request;
2747                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2748                 }
2749
2750                 /*
2751                  * Quit early if the mapped snapshot no longer
2752                  * exists.  It's still possible the snapshot will
2753                  * have disappeared by the time our request arrives
2754                  * at the osd, but there's no sense in sending it if
2755                  * we already know.
2756                  */
2757                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2758                         dout("request for non-existent snapshot");
2759                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2760                         result = -ENXIO;
2761                         goto end_request;
2762                 }
2763
2764                 result = -EINVAL;
2765                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2766                         goto end_request;       /* Shouldn't happen */
2767
2768                 result = -ENOMEM;
2769                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2770                                                         write_request, false);
2771                 if (!img_request)
2772                         goto end_request;
2773
2774                 img_request->rq = rq;
2775
2776                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2777                                                 rq->bio);
2778                 if (!result)
2779                         result = rbd_img_request_submit(img_request);
2780                 if (result)
2781                         rbd_img_request_put(img_request);
2782 end_request:
2783                 spin_lock_irq(q->queue_lock);
2784                 if (result < 0) {
2785                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2786                                 write_request ? "write" : "read",
2787                                 length, offset, result);
2788
2789                         __blk_end_request_all(rq, result);
2790                 }
2791         }
2792 }
2793
2794 /*
2795  * a queue callback. Makes sure that we don't create a bio that spans across
2796  * multiple osd objects. One exception would be with a single page bios,
2797  * which we handle later at bio_chain_clone_range()
2798  */
2799 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2800                           struct bio_vec *bvec)
2801 {
2802         struct rbd_device *rbd_dev = q->queuedata;
2803         sector_t sector_offset;
2804         sector_t sectors_per_obj;
2805         sector_t obj_sector_offset;
2806         int ret;
2807
2808         /*
2809          * Find how far into its rbd object the partition-relative
2810          * bio start sector is to offset relative to the enclosing
2811          * device.
2812          */
2813         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2814         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2815         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2816
2817         /*
2818          * Compute the number of bytes from that offset to the end
2819          * of the object.  Account for what's already used by the bio.
2820          */
2821         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2822         if (ret > bmd->bi_size)
2823                 ret -= bmd->bi_size;
2824         else
2825                 ret = 0;
2826
2827         /*
2828          * Don't send back more than was asked for.  And if the bio
2829          * was empty, let the whole thing through because:  "Note
2830          * that a block device *must* allow a single page to be
2831          * added to an empty bio."
2832          */
2833         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2834         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2835                 ret = (int) bvec->bv_len;
2836
2837         return ret;
2838 }
2839
2840 static void rbd_free_disk(struct rbd_device *rbd_dev)
2841 {
2842         struct gendisk *disk = rbd_dev->disk;
2843
2844         if (!disk)
2845                 return;
2846
2847         if (disk->flags & GENHD_FL_UP)
2848                 del_gendisk(disk);
2849         if (disk->queue)
2850                 blk_cleanup_queue(disk->queue);
2851         put_disk(disk);
2852 }
2853
2854 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2855                                 const char *object_name,
2856                                 u64 offset, u64 length,
2857                                 void *buf, u64 *version)
2858
2859 {
2860         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2861         struct rbd_obj_request *obj_request;
2862         struct page **pages = NULL;
2863         u32 page_count;
2864         size_t size;
2865         int ret;
2866
2867         page_count = (u32) calc_pages_for(offset, length);
2868         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2869         if (IS_ERR(pages))
2870                 ret = PTR_ERR(pages);
2871
2872         ret = -ENOMEM;
2873         obj_request = rbd_obj_request_create(object_name, offset, length,
2874                                                         OBJ_REQUEST_PAGES);
2875         if (!obj_request)
2876                 goto out;
2877
2878         obj_request->pages = pages;
2879         obj_request->page_count = page_count;
2880
2881         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2882         if (!obj_request->osd_req)
2883                 goto out;
2884
2885         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2886                                         offset, length, 0, 0);
2887         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2888                                         obj_request->pages,
2889                                         obj_request->length,
2890                                         obj_request->offset & ~PAGE_MASK,
2891                                         false, false);
2892         rbd_osd_req_format_read(obj_request);
2893
2894         ret = rbd_obj_request_submit(osdc, obj_request);
2895         if (ret)
2896                 goto out;
2897         ret = rbd_obj_request_wait(obj_request);
2898         if (ret)
2899                 goto out;
2900
2901         ret = obj_request->result;
2902         if (ret < 0)
2903                 goto out;
2904
2905         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2906         size = (size_t) obj_request->xferred;
2907         ceph_copy_from_page_vector(pages, buf, 0, size);
2908         rbd_assert(size <= (size_t) INT_MAX);
2909         ret = (int) size;
2910         if (version)
2911                 *version = obj_request->version;
2912 out:
2913         if (obj_request)
2914                 rbd_obj_request_put(obj_request);
2915         else
2916                 ceph_release_page_vector(pages, page_count);
2917
2918         return ret;
2919 }
2920
2921 /*
2922  * Read the complete header for the given rbd device.
2923  *
2924  * Returns a pointer to a dynamically-allocated buffer containing
2925  * the complete and validated header.  Caller can pass the address
2926  * of a variable that will be filled in with the version of the
2927  * header object at the time it was read.
2928  *
2929  * Returns a pointer-coded errno if a failure occurs.
2930  */
2931 static struct rbd_image_header_ondisk *
2932 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2933 {
2934         struct rbd_image_header_ondisk *ondisk = NULL;
2935         u32 snap_count = 0;
2936         u64 names_size = 0;
2937         u32 want_count;
2938         int ret;
2939
2940         /*
2941          * The complete header will include an array of its 64-bit
2942          * snapshot ids, followed by the names of those snapshots as
2943          * a contiguous block of NUL-terminated strings.  Note that
2944          * the number of snapshots could change by the time we read
2945          * it in, in which case we re-read it.
2946          */
2947         do {
2948                 size_t size;
2949
2950                 kfree(ondisk);
2951
2952                 size = sizeof (*ondisk);
2953                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2954                 size += names_size;
2955                 ondisk = kmalloc(size, GFP_KERNEL);
2956                 if (!ondisk)
2957                         return ERR_PTR(-ENOMEM);
2958
2959                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2960                                        0, size, ondisk, version);
2961                 if (ret < 0)
2962                         goto out_err;
2963                 if (WARN_ON((size_t) ret < size)) {
2964                         ret = -ENXIO;
2965                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2966                                 size, ret);
2967                         goto out_err;
2968                 }
2969                 if (!rbd_dev_ondisk_valid(ondisk)) {
2970                         ret = -ENXIO;
2971                         rbd_warn(rbd_dev, "invalid header");
2972                         goto out_err;
2973                 }
2974
2975                 names_size = le64_to_cpu(ondisk->snap_names_len);
2976                 want_count = snap_count;
2977                 snap_count = le32_to_cpu(ondisk->snap_count);
2978         } while (snap_count != want_count);
2979
2980         return ondisk;
2981
2982 out_err:
2983         kfree(ondisk);
2984
2985         return ERR_PTR(ret);
2986 }
2987
2988 /*
2989  * reload the ondisk the header
2990  */
2991 static int rbd_read_header(struct rbd_device *rbd_dev,
2992                            struct rbd_image_header *header)
2993 {
2994         struct rbd_image_header_ondisk *ondisk;
2995         u64 ver = 0;
2996         int ret;
2997
2998         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2999         if (IS_ERR(ondisk))
3000                 return PTR_ERR(ondisk);
3001         ret = rbd_header_from_disk(header, ondisk);
3002         if (ret >= 0)
3003                 header->obj_version = ver;
3004         kfree(ondisk);
3005
3006         return ret;
3007 }
3008
3009 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3010 {
3011         struct rbd_snap *snap;
3012         struct rbd_snap *next;
3013
3014         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
3015                 rbd_remove_snap_dev(snap);
3016 }
3017
3018 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3019 {
3020         sector_t size;
3021
3022         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3023                 return;
3024
3025         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3026         dout("setting size to %llu sectors", (unsigned long long) size);
3027         rbd_dev->mapping.size = (u64) size;
3028         set_capacity(rbd_dev->disk, size);
3029 }
3030
3031 /*
3032  * only read the first part of the ondisk header, without the snaps info
3033  */
3034 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3035 {
3036         int ret;
3037         struct rbd_image_header h;
3038
3039         ret = rbd_read_header(rbd_dev, &h);
3040         if (ret < 0)
3041                 return ret;
3042
3043         down_write(&rbd_dev->header_rwsem);
3044
3045         /* Update image size, and check for resize of mapped image */
3046         rbd_dev->header.image_size = h.image_size;
3047         rbd_update_mapping_size(rbd_dev);
3048
3049         /* rbd_dev->header.object_prefix shouldn't change */
3050         kfree(rbd_dev->header.snap_sizes);
3051         kfree(rbd_dev->header.snap_names);
3052         /* osd requests may still refer to snapc */
3053         ceph_put_snap_context(rbd_dev->header.snapc);
3054
3055         if (hver)
3056                 *hver = h.obj_version;
3057         rbd_dev->header.obj_version = h.obj_version;
3058         rbd_dev->header.image_size = h.image_size;
3059         rbd_dev->header.snapc = h.snapc;
3060         rbd_dev->header.snap_names = h.snap_names;
3061         rbd_dev->header.snap_sizes = h.snap_sizes;
3062         /* Free the extra copy of the object prefix */
3063         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3064         kfree(h.object_prefix);
3065
3066         ret = rbd_dev_snaps_update(rbd_dev);
3067         if (!ret)
3068                 ret = rbd_dev_snaps_register(rbd_dev);
3069
3070         up_write(&rbd_dev->header_rwsem);
3071
3072         return ret;
3073 }
3074
3075 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3076 {
3077         int ret;
3078
3079         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3080         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3081         if (rbd_dev->image_format == 1)
3082                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3083         else
3084                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3085         mutex_unlock(&ctl_mutex);
3086         revalidate_disk(rbd_dev->disk);
3087
3088         return ret;
3089 }
3090
3091 static int rbd_init_disk(struct rbd_device *rbd_dev)
3092 {
3093         struct gendisk *disk;
3094         struct request_queue *q;
3095         u64 segment_size;
3096
3097         /* create gendisk info */
3098         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3099         if (!disk)
3100                 return -ENOMEM;
3101
3102         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3103                  rbd_dev->dev_id);
3104         disk->major = rbd_dev->major;
3105         disk->first_minor = 0;
3106         disk->fops = &rbd_bd_ops;
3107         disk->private_data = rbd_dev;
3108
3109         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3110         if (!q)
3111                 goto out_disk;
3112
3113         /* We use the default size, but let's be explicit about it. */
3114         blk_queue_physical_block_size(q, SECTOR_SIZE);
3115
3116         /* set io sizes to object size */
3117         segment_size = rbd_obj_bytes(&rbd_dev->header);
3118         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3119         blk_queue_max_segment_size(q, segment_size);
3120         blk_queue_io_min(q, segment_size);
3121         blk_queue_io_opt(q, segment_size);
3122
3123         blk_queue_merge_bvec(q, rbd_merge_bvec);
3124         disk->queue = q;
3125
3126         q->queuedata = rbd_dev;
3127
3128         rbd_dev->disk = disk;
3129
3130         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3131
3132         return 0;
3133 out_disk:
3134         put_disk(disk);
3135
3136         return -ENOMEM;
3137 }
3138
3139 /*
3140   sysfs
3141 */
3142
3143 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3144 {
3145         return container_of(dev, struct rbd_device, dev);
3146 }
3147
3148 static ssize_t rbd_size_show(struct device *dev,
3149                              struct device_attribute *attr, char *buf)
3150 {
3151         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3152         sector_t size;
3153
3154         down_read(&rbd_dev->header_rwsem);
3155         size = get_capacity(rbd_dev->disk);
3156         up_read(&rbd_dev->header_rwsem);
3157
3158         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3159 }
3160
3161 /*
3162  * Note this shows the features for whatever's mapped, which is not
3163  * necessarily the base image.
3164  */
3165 static ssize_t rbd_features_show(struct device *dev,
3166                              struct device_attribute *attr, char *buf)
3167 {
3168         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3169
3170         return sprintf(buf, "0x%016llx\n",
3171                         (unsigned long long) rbd_dev->mapping.features);
3172 }
3173
3174 static ssize_t rbd_major_show(struct device *dev,
3175                               struct device_attribute *attr, char *buf)
3176 {
3177         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3178
3179         return sprintf(buf, "%d\n", rbd_dev->major);
3180 }
3181
3182 static ssize_t rbd_client_id_show(struct device *dev,
3183                                   struct device_attribute *attr, char *buf)
3184 {
3185         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
3187         return sprintf(buf, "client%lld\n",
3188                         ceph_client_id(rbd_dev->rbd_client->client));
3189 }
3190
3191 static ssize_t rbd_pool_show(struct device *dev,
3192                              struct device_attribute *attr, char *buf)
3193 {
3194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
3196         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3197 }
3198
3199 static ssize_t rbd_pool_id_show(struct device *dev,
3200                              struct device_attribute *attr, char *buf)
3201 {
3202         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3203
3204         return sprintf(buf, "%llu\n",
3205                 (unsigned long long) rbd_dev->spec->pool_id);
3206 }
3207
3208 static ssize_t rbd_name_show(struct device *dev,
3209                              struct device_attribute *attr, char *buf)
3210 {
3211         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3212
3213         if (rbd_dev->spec->image_name)
3214                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3215
3216         return sprintf(buf, "(unknown)\n");
3217 }
3218
3219 static ssize_t rbd_image_id_show(struct device *dev,
3220                              struct device_attribute *attr, char *buf)
3221 {
3222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
3224         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3225 }
3226
3227 /*
3228  * Shows the name of the currently-mapped snapshot (or
3229  * RBD_SNAP_HEAD_NAME for the base image).
3230  */
3231 static ssize_t rbd_snap_show(struct device *dev,
3232                              struct device_attribute *attr,
3233                              char *buf)
3234 {
3235         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3236
3237         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3238 }
3239
3240 /*
3241  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3242  * for the parent image.  If there is no parent, simply shows
3243  * "(no parent image)".
3244  */
3245 static ssize_t rbd_parent_show(struct device *dev,
3246                              struct device_attribute *attr,
3247                              char *buf)
3248 {
3249         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3250         struct rbd_spec *spec = rbd_dev->parent_spec;
3251         int count;
3252         char *bufp = buf;
3253
3254         if (!spec)
3255                 return sprintf(buf, "(no parent image)\n");
3256
3257         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3258                         (unsigned long long) spec->pool_id, spec->pool_name);
3259         if (count < 0)
3260                 return count;
3261         bufp += count;
3262
3263         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3264                         spec->image_name ? spec->image_name : "(unknown)");
3265         if (count < 0)
3266                 return count;
3267         bufp += count;
3268
3269         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3270                         (unsigned long long) spec->snap_id, spec->snap_name);
3271         if (count < 0)
3272                 return count;
3273         bufp += count;
3274
3275         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3276         if (count < 0)
3277                 return count;
3278         bufp += count;
3279
3280         return (ssize_t) (bufp - buf);
3281 }
3282
3283 static ssize_t rbd_image_refresh(struct device *dev,
3284                                  struct device_attribute *attr,
3285                                  const char *buf,
3286                                  size_t size)
3287 {
3288         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3289         int ret;
3290
3291         ret = rbd_dev_refresh(rbd_dev, NULL);
3292
3293         return ret < 0 ? ret : size;
3294 }
3295
3296 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3297 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3298 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3299 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3300 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3301 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3302 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3303 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3304 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3305 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3306 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3307
3308 static struct attribute *rbd_attrs[] = {
3309         &dev_attr_size.attr,
3310         &dev_attr_features.attr,
3311         &dev_attr_major.attr,
3312         &dev_attr_client_id.attr,
3313         &dev_attr_pool.attr,
3314         &dev_attr_pool_id.attr,
3315         &dev_attr_name.attr,
3316         &dev_attr_image_id.attr,
3317         &dev_attr_current_snap.attr,
3318         &dev_attr_parent.attr,
3319         &dev_attr_refresh.attr,
3320         NULL
3321 };
3322
3323 static struct attribute_group rbd_attr_group = {
3324         .attrs = rbd_attrs,
3325 };
3326
3327 static const struct attribute_group *rbd_attr_groups[] = {
3328         &rbd_attr_group,
3329         NULL
3330 };
3331
3332 static void rbd_sysfs_dev_release(struct device *dev)
3333 {
3334 }
3335
3336 static struct device_type rbd_device_type = {
3337         .name           = "rbd",
3338         .groups         = rbd_attr_groups,
3339         .release        = rbd_sysfs_dev_release,
3340 };
3341
3342
3343 /*
3344   sysfs - snapshots
3345 */
3346
3347 static ssize_t rbd_snap_size_show(struct device *dev,
3348                                   struct device_attribute *attr,
3349                                   char *buf)
3350 {
3351         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3352
3353         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3354 }
3355
3356 static ssize_t rbd_snap_id_show(struct device *dev,
3357                                 struct device_attribute *attr,
3358                                 char *buf)
3359 {
3360         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3361
3362         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3363 }
3364
3365 static ssize_t rbd_snap_features_show(struct device *dev,
3366                                 struct device_attribute *attr,
3367                                 char *buf)
3368 {
3369         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3370
3371         return sprintf(buf, "0x%016llx\n",
3372                         (unsigned long long) snap->features);
3373 }
3374
3375 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3376 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3377 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3378
3379 static struct attribute *rbd_snap_attrs[] = {
3380         &dev_attr_snap_size.attr,
3381         &dev_attr_snap_id.attr,
3382         &dev_attr_snap_features.attr,
3383         NULL,
3384 };
3385
3386 static struct attribute_group rbd_snap_attr_group = {
3387         .attrs = rbd_snap_attrs,
3388 };
3389
3390 static void rbd_snap_dev_release(struct device *dev)
3391 {
3392         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3393         kfree(snap->name);
3394         kfree(snap);
3395 }
3396
3397 static const struct attribute_group *rbd_snap_attr_groups[] = {
3398         &rbd_snap_attr_group,
3399         NULL
3400 };
3401
3402 static struct device_type rbd_snap_device_type = {
3403         .groups         = rbd_snap_attr_groups,
3404         .release        = rbd_snap_dev_release,
3405 };
3406
3407 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3408 {
3409         kref_get(&spec->kref);
3410
3411         return spec;
3412 }
3413
3414 static void rbd_spec_free(struct kref *kref);
3415 static void rbd_spec_put(struct rbd_spec *spec)
3416 {
3417         if (spec)
3418                 kref_put(&spec->kref, rbd_spec_free);
3419 }
3420
3421 static struct rbd_spec *rbd_spec_alloc(void)
3422 {
3423         struct rbd_spec *spec;
3424
3425         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3426         if (!spec)
3427                 return NULL;
3428         kref_init(&spec->kref);
3429
3430         return spec;
3431 }
3432
3433 static void rbd_spec_free(struct kref *kref)
3434 {
3435         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3436
3437         kfree(spec->pool_name);
3438         kfree(spec->image_id);
3439         kfree(spec->image_name);
3440         kfree(spec->snap_name);
3441         kfree(spec);
3442 }
3443
3444 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3445                                 struct rbd_spec *spec)
3446 {
3447         struct rbd_device *rbd_dev;
3448
3449         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3450         if (!rbd_dev)
3451                 return NULL;
3452
3453         spin_lock_init(&rbd_dev->lock);
3454         rbd_dev->flags = 0;
3455         INIT_LIST_HEAD(&rbd_dev->node);
3456         INIT_LIST_HEAD(&rbd_dev->snaps);
3457         init_rwsem(&rbd_dev->header_rwsem);
3458
3459         rbd_dev->spec = spec;
3460         rbd_dev->rbd_client = rbdc;
3461
3462         /* Initialize the layout used for all rbd requests */
3463
3464         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3465         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3466         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3467         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3468
3469         return rbd_dev;
3470 }
3471
3472 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3473 {
3474         rbd_spec_put(rbd_dev->parent_spec);
3475         kfree(rbd_dev->header_name);
3476         rbd_put_client(rbd_dev->rbd_client);
3477         rbd_spec_put(rbd_dev->spec);
3478         kfree(rbd_dev);
3479 }
3480
3481 static bool rbd_snap_registered(struct rbd_snap *snap)
3482 {
3483         bool ret = snap->dev.type == &rbd_snap_device_type;
3484         bool reg = device_is_registered(&snap->dev);
3485
3486         rbd_assert(!ret ^ reg);
3487
3488         return ret;
3489 }
3490
3491 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3492 {
3493         list_del(&snap->node);
3494         if (device_is_registered(&snap->dev))
3495                 device_unregister(&snap->dev);
3496 }
3497
3498 static int rbd_register_snap_dev(struct rbd_snap *snap,
3499                                   struct device *parent)
3500 {
3501         struct device *dev = &snap->dev;
3502         int ret;
3503
3504         dev->type = &rbd_snap_device_type;
3505         dev->parent = parent;
3506         dev->release = rbd_snap_dev_release;
3507         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3508         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3509
3510         ret = device_register(dev);
3511
3512         return ret;
3513 }
3514
3515 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3516                                                 const char *snap_name,
3517                                                 u64 snap_id, u64 snap_size,
3518                                                 u64 snap_features)
3519 {
3520         struct rbd_snap *snap;
3521         int ret;
3522
3523         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3524         if (!snap)
3525                 return ERR_PTR(-ENOMEM);
3526
3527         ret = -ENOMEM;
3528         snap->name = kstrdup(snap_name, GFP_KERNEL);
3529         if (!snap->name)
3530                 goto err;
3531
3532         snap->id = snap_id;
3533         snap->size = snap_size;
3534         snap->features = snap_features;
3535
3536         return snap;
3537
3538 err:
3539         kfree(snap->name);
3540         kfree(snap);
3541
3542         return ERR_PTR(ret);
3543 }
3544
3545 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3546                 u64 *snap_size, u64 *snap_features)
3547 {
3548         char *snap_name;
3549
3550         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3551
3552         *snap_size = rbd_dev->header.snap_sizes[which];
3553         *snap_features = 0;     /* No features for v1 */
3554
3555         /* Skip over names until we find the one we are looking for */
3556
3557         snap_name = rbd_dev->header.snap_names;
3558         while (which--)
3559                 snap_name += strlen(snap_name) + 1;
3560
3561         return snap_name;
3562 }
3563
3564 /*
3565  * Get the size and object order for an image snapshot, or if
3566  * snap_id is CEPH_NOSNAP, gets this information for the base
3567  * image.
3568  */
3569 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3570                                 u8 *order, u64 *snap_size)
3571 {
3572         __le64 snapid = cpu_to_le64(snap_id);
3573         int ret;
3574         struct {
3575                 u8 order;
3576                 __le64 size;
3577         } __attribute__ ((packed)) size_buf = { 0 };
3578
3579         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3580                                 "rbd", "get_size",
3581                                 (char *) &snapid, sizeof (snapid),
3582                                 (char *) &size_buf, sizeof (size_buf), NULL);
3583         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3584         if (ret < 0)
3585                 return ret;
3586
3587         *order = size_buf.order;
3588         *snap_size = le64_to_cpu(size_buf.size);
3589
3590         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3591                 (unsigned long long) snap_id, (unsigned int) *order,
3592                 (unsigned long long) *snap_size);
3593
3594         return 0;
3595 }
3596
3597 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3598 {
3599         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3600                                         &rbd_dev->header.obj_order,
3601                                         &rbd_dev->header.image_size);
3602 }
3603
3604 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3605 {
3606         void *reply_buf;
3607         int ret;
3608         void *p;
3609
3610         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3611         if (!reply_buf)
3612                 return -ENOMEM;
3613
3614         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3615                                 "rbd", "get_object_prefix",
3616                                 NULL, 0,
3617                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3618         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3619         if (ret < 0)
3620                 goto out;
3621
3622         p = reply_buf;
3623         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3624                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3625                                                 NULL, GFP_NOIO);
3626
3627         if (IS_ERR(rbd_dev->header.object_prefix)) {
3628                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3629                 rbd_dev->header.object_prefix = NULL;
3630         } else {
3631                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3632         }
3633
3634 out:
3635         kfree(reply_buf);
3636
3637         return ret;
3638 }
3639
3640 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3641                 u64 *snap_features)
3642 {
3643         __le64 snapid = cpu_to_le64(snap_id);
3644         struct {
3645                 __le64 features;
3646                 __le64 incompat;
3647         } features_buf = { 0 };
3648         u64 incompat;
3649         int ret;
3650
3651         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3652                                 "rbd", "get_features",
3653                                 (char *) &snapid, sizeof (snapid),
3654                                 (char *) &features_buf, sizeof (features_buf),
3655                                 NULL);
3656         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3657         if (ret < 0)
3658                 return ret;
3659
3660         incompat = le64_to_cpu(features_buf.incompat);
3661         if (incompat & ~RBD_FEATURES_SUPPORTED)
3662                 return -ENXIO;
3663
3664         *snap_features = le64_to_cpu(features_buf.features);
3665
3666         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3667                 (unsigned long long) snap_id,
3668                 (unsigned long long) *snap_features,
3669                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3670
3671         return 0;
3672 }
3673
3674 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3675 {
3676         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3677                                                 &rbd_dev->header.features);
3678 }
3679
3680 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3681 {
3682         struct rbd_spec *parent_spec;
3683         size_t size;
3684         void *reply_buf = NULL;
3685         __le64 snapid;
3686         void *p;
3687         void *end;
3688         char *image_id;
3689         u64 overlap;
3690         int ret;
3691
3692         parent_spec = rbd_spec_alloc();
3693         if (!parent_spec)
3694                 return -ENOMEM;
3695
3696         size = sizeof (__le64) +                                /* pool_id */
3697                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3698                 sizeof (__le64) +                               /* snap_id */
3699                 sizeof (__le64);                                /* overlap */
3700         reply_buf = kmalloc(size, GFP_KERNEL);
3701         if (!reply_buf) {
3702                 ret = -ENOMEM;
3703                 goto out_err;
3704         }
3705
3706         snapid = cpu_to_le64(CEPH_NOSNAP);
3707         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3708                                 "rbd", "get_parent",
3709                                 (char *) &snapid, sizeof (snapid),
3710                                 (char *) reply_buf, size, NULL);
3711         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3712         if (ret < 0)
3713                 goto out_err;
3714
3715         ret = -ERANGE;
3716         p = reply_buf;
3717         end = (char *) reply_buf + size;
3718         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3719         if (parent_spec->pool_id == CEPH_NOPOOL)
3720                 goto out;       /* No parent?  No problem. */
3721
3722         /* The ceph file layout needs to fit pool id in 32 bits */
3723
3724         ret = -EIO;
3725         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3726                 goto out;
3727
3728         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3729         if (IS_ERR(image_id)) {
3730                 ret = PTR_ERR(image_id);
3731                 goto out_err;
3732         }
3733         parent_spec->image_id = image_id;
3734         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3735         ceph_decode_64_safe(&p, end, overlap, out_err);
3736
3737         rbd_dev->parent_overlap = overlap;
3738         rbd_dev->parent_spec = parent_spec;
3739         parent_spec = NULL;     /* rbd_dev now owns this */
3740 out:
3741         ret = 0;
3742 out_err:
3743         kfree(reply_buf);
3744         rbd_spec_put(parent_spec);
3745
3746         return ret;
3747 }
3748
3749 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3750 {
3751         size_t image_id_size;
3752         char *image_id;
3753         void *p;
3754         void *end;
3755         size_t size;
3756         void *reply_buf = NULL;
3757         size_t len = 0;
3758         char *image_name = NULL;
3759         int ret;
3760
3761         rbd_assert(!rbd_dev->spec->image_name);
3762
3763         len = strlen(rbd_dev->spec->image_id);
3764         image_id_size = sizeof (__le32) + len;
3765         image_id = kmalloc(image_id_size, GFP_KERNEL);
3766         if (!image_id)
3767                 return NULL;
3768
3769         p = image_id;
3770         end = (char *) image_id + image_id_size;
3771         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3772
3773         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3774         reply_buf = kmalloc(size, GFP_KERNEL);
3775         if (!reply_buf)
3776                 goto out;
3777
3778         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3779                                 "rbd", "dir_get_name",
3780                                 image_id, image_id_size,
3781                                 (char *) reply_buf, size, NULL);
3782         if (ret < 0)
3783                 goto out;
3784         p = reply_buf;
3785         end = (char *) reply_buf + size;
3786         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3787         if (IS_ERR(image_name))
3788                 image_name = NULL;
3789         else
3790                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3791 out:
3792         kfree(reply_buf);
3793         kfree(image_id);
3794
3795         return image_name;
3796 }
3797
3798 /*
3799  * When a parent image gets probed, we only have the pool, image,
3800  * and snapshot ids but not the names of any of them.  This call
3801  * is made later to fill in those names.  It has to be done after
3802  * rbd_dev_snaps_update() has completed because some of the
3803  * information (in particular, snapshot name) is not available
3804  * until then.
3805  */
3806 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3807 {
3808         struct ceph_osd_client *osdc;
3809         const char *name;
3810         void *reply_buf = NULL;
3811         int ret;
3812
3813         if (rbd_dev->spec->pool_name)
3814                 return 0;       /* Already have the names */
3815
3816         /* Look up the pool name */
3817
3818         osdc = &rbd_dev->rbd_client->client->osdc;
3819         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3820         if (!name) {
3821                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3822                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3823                 return -EIO;
3824         }
3825
3826         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3827         if (!rbd_dev->spec->pool_name)
3828                 return -ENOMEM;
3829
3830         /* Fetch the image name; tolerate failure here */
3831
3832         name = rbd_dev_image_name(rbd_dev);
3833         if (name)
3834                 rbd_dev->spec->image_name = (char *) name;
3835         else
3836                 rbd_warn(rbd_dev, "unable to get image name");
3837
3838         /* Look up the snapshot name. */
3839
3840         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3841         if (!name) {
3842                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3843                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3844                 ret = -EIO;
3845                 goto out_err;
3846         }
3847         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3848         if(!rbd_dev->spec->snap_name)
3849                 goto out_err;
3850
3851         return 0;
3852 out_err:
3853         kfree(reply_buf);
3854         kfree(rbd_dev->spec->pool_name);
3855         rbd_dev->spec->pool_name = NULL;
3856
3857         return ret;
3858 }
3859
3860 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3861 {
3862         size_t size;
3863         int ret;
3864         void *reply_buf;
3865         void *p;
3866         void *end;
3867         u64 seq;
3868         u32 snap_count;
3869         struct ceph_snap_context *snapc;
3870         u32 i;
3871
3872         /*
3873          * We'll need room for the seq value (maximum snapshot id),
3874          * snapshot count, and array of that many snapshot ids.
3875          * For now we have a fixed upper limit on the number we're
3876          * prepared to receive.
3877          */
3878         size = sizeof (__le64) + sizeof (__le32) +
3879                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3880         reply_buf = kzalloc(size, GFP_KERNEL);
3881         if (!reply_buf)
3882                 return -ENOMEM;
3883
3884         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3885                                 "rbd", "get_snapcontext",
3886                                 NULL, 0,
3887                                 reply_buf, size, ver);
3888         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3889         if (ret < 0)
3890                 goto out;
3891
3892         ret = -ERANGE;
3893         p = reply_buf;
3894         end = (char *) reply_buf + size;
3895         ceph_decode_64_safe(&p, end, seq, out);
3896         ceph_decode_32_safe(&p, end, snap_count, out);
3897
3898         /*
3899          * Make sure the reported number of snapshot ids wouldn't go
3900          * beyond the end of our buffer.  But before checking that,
3901          * make sure the computed size of the snapshot context we
3902          * allocate is representable in a size_t.
3903          */
3904         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3905                                  / sizeof (u64)) {
3906                 ret = -EINVAL;
3907                 goto out;
3908         }
3909         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3910                 goto out;
3911
3912         size = sizeof (struct ceph_snap_context) +
3913                                 snap_count * sizeof (snapc->snaps[0]);
3914         snapc = kmalloc(size, GFP_KERNEL);
3915         if (!snapc) {
3916                 ret = -ENOMEM;
3917                 goto out;
3918         }
3919
3920         atomic_set(&snapc->nref, 1);
3921         snapc->seq = seq;
3922         snapc->num_snaps = snap_count;
3923         for (i = 0; i < snap_count; i++)
3924                 snapc->snaps[i] = ceph_decode_64(&p);
3925
3926         rbd_dev->header.snapc = snapc;
3927
3928         dout("  snap context seq = %llu, snap_count = %u\n",
3929                 (unsigned long long) seq, (unsigned int) snap_count);
3930
3931 out:
3932         kfree(reply_buf);
3933
3934         return 0;
3935 }
3936
3937 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3938 {
3939         size_t size;
3940         void *reply_buf;
3941         __le64 snap_id;
3942         int ret;
3943         void *p;
3944         void *end;
3945         char *snap_name;
3946
3947         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3948         reply_buf = kmalloc(size, GFP_KERNEL);
3949         if (!reply_buf)
3950                 return ERR_PTR(-ENOMEM);
3951
3952         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3953         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3954                                 "rbd", "get_snapshot_name",
3955                                 (char *) &snap_id, sizeof (snap_id),
3956                                 reply_buf, size, NULL);
3957         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3958         if (ret < 0)
3959                 goto out;
3960
3961         p = reply_buf;
3962         end = (char *) reply_buf + size;
3963         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3964         if (IS_ERR(snap_name)) {
3965                 ret = PTR_ERR(snap_name);
3966                 goto out;
3967         } else {
3968                 dout("  snap_id 0x%016llx snap_name = %s\n",
3969                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3970         }
3971         kfree(reply_buf);
3972
3973         return snap_name;
3974 out:
3975         kfree(reply_buf);
3976
3977         return ERR_PTR(ret);
3978 }
3979
3980 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3981                 u64 *snap_size, u64 *snap_features)
3982 {
3983         u64 snap_id;
3984         u8 order;
3985         int ret;
3986
3987         snap_id = rbd_dev->header.snapc->snaps[which];
3988         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3989         if (ret)
3990                 return ERR_PTR(ret);
3991         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3992         if (ret)
3993                 return ERR_PTR(ret);
3994
3995         return rbd_dev_v2_snap_name(rbd_dev, which);
3996 }
3997
3998 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3999                 u64 *snap_size, u64 *snap_features)
4000 {
4001         if (rbd_dev->image_format == 1)
4002                 return rbd_dev_v1_snap_info(rbd_dev, which,
4003                                         snap_size, snap_features);
4004         if (rbd_dev->image_format == 2)
4005                 return rbd_dev_v2_snap_info(rbd_dev, which,
4006                                         snap_size, snap_features);
4007         return ERR_PTR(-EINVAL);
4008 }
4009
4010 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4011 {
4012         int ret;
4013         __u8 obj_order;
4014
4015         down_write(&rbd_dev->header_rwsem);
4016
4017         /* Grab old order first, to see if it changes */
4018
4019         obj_order = rbd_dev->header.obj_order,
4020         ret = rbd_dev_v2_image_size(rbd_dev);
4021         if (ret)
4022                 goto out;
4023         if (rbd_dev->header.obj_order != obj_order) {
4024                 ret = -EIO;
4025                 goto out;
4026         }
4027         rbd_update_mapping_size(rbd_dev);
4028
4029         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4030         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4031         if (ret)
4032                 goto out;
4033         ret = rbd_dev_snaps_update(rbd_dev);
4034         dout("rbd_dev_snaps_update returned %d\n", ret);
4035         if (ret)
4036                 goto out;
4037         ret = rbd_dev_snaps_register(rbd_dev);
4038         dout("rbd_dev_snaps_register returned %d\n", ret);
4039 out:
4040         up_write(&rbd_dev->header_rwsem);
4041
4042         return ret;
4043 }
4044
4045 /*
4046  * Scan the rbd device's current snapshot list and compare it to the
4047  * newly-received snapshot context.  Remove any existing snapshots
4048  * not present in the new snapshot context.  Add a new snapshot for
4049  * any snaphots in the snapshot context not in the current list.
4050  * And verify there are no changes to snapshots we already know
4051  * about.
4052  *
4053  * Assumes the snapshots in the snapshot context are sorted by
4054  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4055  * are also maintained in that order.)
4056  */
4057 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4058 {
4059         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4060         const u32 snap_count = snapc->num_snaps;
4061         struct list_head *head = &rbd_dev->snaps;
4062         struct list_head *links = head->next;
4063         u32 index = 0;
4064
4065         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
4066         while (index < snap_count || links != head) {
4067                 u64 snap_id;
4068                 struct rbd_snap *snap;
4069                 char *snap_name;
4070                 u64 snap_size = 0;
4071                 u64 snap_features = 0;
4072
4073                 snap_id = index < snap_count ? snapc->snaps[index]
4074                                              : CEPH_NOSNAP;
4075                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4076                                      : NULL;
4077                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4078
4079                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4080                         struct list_head *next = links->next;
4081
4082                         /*
4083                          * A previously-existing snapshot is not in
4084                          * the new snap context.
4085                          *
4086                          * If the now missing snapshot is the one the
4087                          * image is mapped to, clear its exists flag
4088                          * so we can avoid sending any more requests
4089                          * to it.
4090                          */
4091                         if (rbd_dev->spec->snap_id == snap->id)
4092                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4093                         rbd_remove_snap_dev(snap);
4094                         dout("%ssnap id %llu has been removed\n",
4095                                 rbd_dev->spec->snap_id == snap->id ?
4096                                                         "mapped " : "",
4097                                 (unsigned long long) snap->id);
4098
4099                         /* Done with this list entry; advance */
4100
4101                         links = next;
4102                         continue;
4103                 }
4104
4105                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4106                                         &snap_size, &snap_features);
4107                 if (IS_ERR(snap_name))
4108                         return PTR_ERR(snap_name);
4109
4110                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
4111                         (unsigned long long) snap_id);
4112                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4113                         struct rbd_snap *new_snap;
4114
4115                         /* We haven't seen this snapshot before */
4116
4117                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
4118                                         snap_id, snap_size, snap_features);
4119                         if (IS_ERR(new_snap)) {
4120                                 int err = PTR_ERR(new_snap);
4121
4122                                 dout("  failed to add dev, error %d\n", err);
4123
4124                                 return err;
4125                         }
4126
4127                         /* New goes before existing, or at end of list */
4128
4129                         dout("  added dev%s\n", snap ? "" : " at end\n");
4130                         if (snap)
4131                                 list_add_tail(&new_snap->node, &snap->node);
4132                         else
4133                                 list_add_tail(&new_snap->node, head);
4134                 } else {
4135                         /* Already have this one */
4136
4137                         dout("  already present\n");
4138
4139                         rbd_assert(snap->size == snap_size);
4140                         rbd_assert(!strcmp(snap->name, snap_name));
4141                         rbd_assert(snap->features == snap_features);
4142
4143                         /* Done with this list entry; advance */
4144
4145                         links = links->next;
4146                 }
4147
4148                 /* Advance to the next entry in the snapshot context */
4149
4150                 index++;
4151         }
4152         dout("%s: done\n", __func__);
4153
4154         return 0;
4155 }
4156
4157 /*
4158  * Scan the list of snapshots and register the devices for any that
4159  * have not already been registered.
4160  */
4161 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
4162 {
4163         struct rbd_snap *snap;
4164         int ret = 0;
4165
4166         dout("%s:\n", __func__);
4167         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
4168                 return -EIO;
4169
4170         list_for_each_entry(snap, &rbd_dev->snaps, node) {
4171                 if (!rbd_snap_registered(snap)) {
4172                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
4173                         if (ret < 0)
4174                                 break;
4175                 }
4176         }
4177         dout("%s: returning %d\n", __func__, ret);
4178
4179         return ret;
4180 }
4181
4182 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4183 {
4184         struct device *dev;
4185         int ret;
4186
4187         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4188
4189         dev = &rbd_dev->dev;
4190         dev->bus = &rbd_bus_type;
4191         dev->type = &rbd_device_type;
4192         dev->parent = &rbd_root_dev;
4193         dev->release = rbd_dev_release;
4194         dev_set_name(dev, "%d", rbd_dev->dev_id);
4195         ret = device_register(dev);
4196
4197         mutex_unlock(&ctl_mutex);
4198
4199         return ret;
4200 }
4201
4202 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4203 {
4204         device_unregister(&rbd_dev->dev);
4205 }
4206
4207 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4208
4209 /*
4210  * Get a unique rbd identifier for the given new rbd_dev, and add
4211  * the rbd_dev to the global list.  The minimum rbd id is 1.
4212  */
4213 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4214 {
4215         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4216
4217         spin_lock(&rbd_dev_list_lock);
4218         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4219         spin_unlock(&rbd_dev_list_lock);
4220         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4221                 (unsigned long long) rbd_dev->dev_id);
4222 }
4223
4224 /*
4225  * Remove an rbd_dev from the global list, and record that its
4226  * identifier is no longer in use.
4227  */
4228 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4229 {
4230         struct list_head *tmp;
4231         int rbd_id = rbd_dev->dev_id;
4232         int max_id;
4233
4234         rbd_assert(rbd_id > 0);
4235
4236         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4237                 (unsigned long long) rbd_dev->dev_id);
4238         spin_lock(&rbd_dev_list_lock);
4239         list_del_init(&rbd_dev->node);
4240
4241         /*
4242          * If the id being "put" is not the current maximum, there
4243          * is nothing special we need to do.
4244          */
4245         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4246                 spin_unlock(&rbd_dev_list_lock);
4247                 return;
4248         }
4249
4250         /*
4251          * We need to update the current maximum id.  Search the
4252          * list to find out what it is.  We're more likely to find
4253          * the maximum at the end, so search the list backward.
4254          */
4255         max_id = 0;
4256         list_for_each_prev(tmp, &rbd_dev_list) {
4257                 struct rbd_device *rbd_dev;
4258
4259                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4260                 if (rbd_dev->dev_id > max_id)
4261                         max_id = rbd_dev->dev_id;
4262         }
4263         spin_unlock(&rbd_dev_list_lock);
4264
4265         /*
4266          * The max id could have been updated by rbd_dev_id_get(), in
4267          * which case it now accurately reflects the new maximum.
4268          * Be careful not to overwrite the maximum value in that
4269          * case.
4270          */
4271         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4272         dout("  max dev id has been reset\n");
4273 }
4274
4275 /*
4276  * Skips over white space at *buf, and updates *buf to point to the
4277  * first found non-space character (if any). Returns the length of
4278  * the token (string of non-white space characters) found.  Note
4279  * that *buf must be terminated with '\0'.
4280  */
4281 static inline size_t next_token(const char **buf)
4282 {
4283         /*
4284         * These are the characters that produce nonzero for
4285         * isspace() in the "C" and "POSIX" locales.
4286         */
4287         const char *spaces = " \f\n\r\t\v";
4288
4289         *buf += strspn(*buf, spaces);   /* Find start of token */
4290
4291         return strcspn(*buf, spaces);   /* Return token length */
4292 }
4293
4294 /*
4295  * Finds the next token in *buf, and if the provided token buffer is
4296  * big enough, copies the found token into it.  The result, if
4297  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4298  * must be terminated with '\0' on entry.
4299  *
4300  * Returns the length of the token found (not including the '\0').
4301  * Return value will be 0 if no token is found, and it will be >=
4302  * token_size if the token would not fit.
4303  *
4304  * The *buf pointer will be updated to point beyond the end of the
4305  * found token.  Note that this occurs even if the token buffer is
4306  * too small to hold it.
4307  */
4308 static inline size_t copy_token(const char **buf,
4309                                 char *token,
4310                                 size_t token_size)
4311 {
4312         size_t len;
4313
4314         len = next_token(buf);
4315         if (len < token_size) {
4316                 memcpy(token, *buf, len);
4317                 *(token + len) = '\0';
4318         }
4319         *buf += len;
4320
4321         return len;
4322 }
4323
4324 /*
4325  * Finds the next token in *buf, dynamically allocates a buffer big
4326  * enough to hold a copy of it, and copies the token into the new
4327  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4328  * that a duplicate buffer is created even for a zero-length token.
4329  *
4330  * Returns a pointer to the newly-allocated duplicate, or a null
4331  * pointer if memory for the duplicate was not available.  If
4332  * the lenp argument is a non-null pointer, the length of the token
4333  * (not including the '\0') is returned in *lenp.
4334  *
4335  * If successful, the *buf pointer will be updated to point beyond
4336  * the end of the found token.
4337  *
4338  * Note: uses GFP_KERNEL for allocation.
4339  */
4340 static inline char *dup_token(const char **buf, size_t *lenp)
4341 {
4342         char *dup;
4343         size_t len;
4344
4345         len = next_token(buf);
4346         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4347         if (!dup)
4348                 return NULL;
4349         *(dup + len) = '\0';
4350         *buf += len;
4351
4352         if (lenp)
4353                 *lenp = len;
4354
4355         return dup;
4356 }
4357
4358 /*
4359  * Parse the options provided for an "rbd add" (i.e., rbd image
4360  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4361  * and the data written is passed here via a NUL-terminated buffer.
4362  * Returns 0 if successful or an error code otherwise.
4363  *
4364  * The information extracted from these options is recorded in
4365  * the other parameters which return dynamically-allocated
4366  * structures:
4367  *  ceph_opts
4368  *      The address of a pointer that will refer to a ceph options
4369  *      structure.  Caller must release the returned pointer using
4370  *      ceph_destroy_options() when it is no longer needed.
4371  *  rbd_opts
4372  *      Address of an rbd options pointer.  Fully initialized by
4373  *      this function; caller must release with kfree().
4374  *  spec
4375  *      Address of an rbd image specification pointer.  Fully
4376  *      initialized by this function based on parsed options.
4377  *      Caller must release with rbd_spec_put().
4378  *
4379  * The options passed take this form:
4380  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4381  * where:
4382  *  <mon_addrs>
4383  *      A comma-separated list of one or more monitor addresses.
4384  *      A monitor address is an ip address, optionally followed
4385  *      by a port number (separated by a colon).
4386  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4387  *  <options>
4388  *      A comma-separated list of ceph and/or rbd options.
4389  *  <pool_name>
4390  *      The name of the rados pool containing the rbd image.
4391  *  <image_name>
4392  *      The name of the image in that pool to map.
4393  *  <snap_id>
4394  *      An optional snapshot id.  If provided, the mapping will
4395  *      present data from the image at the time that snapshot was
4396  *      created.  The image head is used if no snapshot id is
4397  *      provided.  Snapshot mappings are always read-only.
4398  */
4399 static int rbd_add_parse_args(const char *buf,
4400                                 struct ceph_options **ceph_opts,
4401                                 struct rbd_options **opts,
4402                                 struct rbd_spec **rbd_spec)
4403 {
4404         size_t len;
4405         char *options;
4406         const char *mon_addrs;
4407         size_t mon_addrs_size;
4408         struct rbd_spec *spec = NULL;
4409         struct rbd_options *rbd_opts = NULL;
4410         struct ceph_options *copts;
4411         int ret;
4412
4413         /* The first four tokens are required */
4414
4415         len = next_token(&buf);
4416         if (!len) {
4417                 rbd_warn(NULL, "no monitor address(es) provided");
4418                 return -EINVAL;
4419         }
4420         mon_addrs = buf;
4421         mon_addrs_size = len + 1;
4422         buf += len;
4423
4424         ret = -EINVAL;
4425         options = dup_token(&buf, NULL);
4426         if (!options)
4427                 return -ENOMEM;
4428         if (!*options) {
4429                 rbd_warn(NULL, "no options provided");
4430                 goto out_err;
4431         }
4432
4433         spec = rbd_spec_alloc();
4434         if (!spec)
4435                 goto out_mem;
4436
4437         spec->pool_name = dup_token(&buf, NULL);
4438         if (!spec->pool_name)
4439                 goto out_mem;
4440         if (!*spec->pool_name) {
4441                 rbd_warn(NULL, "no pool name provided");
4442                 goto out_err;
4443         }
4444
4445         spec->image_name = dup_token(&buf, NULL);
4446         if (!spec->image_name)
4447                 goto out_mem;
4448         if (!*spec->image_name) {
4449                 rbd_warn(NULL, "no image name provided");
4450                 goto out_err;
4451         }
4452
4453         /*
4454          * Snapshot name is optional; default is to use "-"
4455          * (indicating the head/no snapshot).
4456          */
4457         len = next_token(&buf);
4458         if (!len) {
4459                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4460                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4461         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4462                 ret = -ENAMETOOLONG;
4463                 goto out_err;
4464         }
4465         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4466         if (!spec->snap_name)
4467                 goto out_mem;
4468         *(spec->snap_name + len) = '\0';
4469
4470         /* Initialize all rbd options to the defaults */
4471
4472         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4473         if (!rbd_opts)
4474                 goto out_mem;
4475
4476         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4477
4478         copts = ceph_parse_options(options, mon_addrs,
4479                                         mon_addrs + mon_addrs_size - 1,
4480                                         parse_rbd_opts_token, rbd_opts);
4481         if (IS_ERR(copts)) {
4482                 ret = PTR_ERR(copts);
4483                 goto out_err;
4484         }
4485         kfree(options);
4486
4487         *ceph_opts = copts;
4488         *opts = rbd_opts;
4489         *rbd_spec = spec;
4490
4491         return 0;
4492 out_mem:
4493         ret = -ENOMEM;
4494 out_err:
4495         kfree(rbd_opts);
4496         rbd_spec_put(spec);
4497         kfree(options);
4498
4499         return ret;
4500 }
4501
4502 /*
4503  * An rbd format 2 image has a unique identifier, distinct from the
4504  * name given to it by the user.  Internally, that identifier is
4505  * what's used to specify the names of objects related to the image.
4506  *
4507  * A special "rbd id" object is used to map an rbd image name to its
4508  * id.  If that object doesn't exist, then there is no v2 rbd image
4509  * with the supplied name.
4510  *
4511  * This function will record the given rbd_dev's image_id field if
4512  * it can be determined, and in that case will return 0.  If any
4513  * errors occur a negative errno will be returned and the rbd_dev's
4514  * image_id field will be unchanged (and should be NULL).
4515  */
4516 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4517 {
4518         int ret;
4519         size_t size;
4520         char *object_name;
4521         void *response;
4522         void *p;
4523
4524         /* If we already have it we don't need to look it up */
4525
4526         if (rbd_dev->spec->image_id)
4527                 return 0;
4528
4529         /*
4530          * When probing a parent image, the image id is already
4531          * known (and the image name likely is not).  There's no
4532          * need to fetch the image id again in this case.
4533          */
4534         if (rbd_dev->spec->image_id)
4535                 return 0;
4536
4537         /*
4538          * First, see if the format 2 image id file exists, and if
4539          * so, get the image's persistent id from it.
4540          */
4541         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4542         object_name = kmalloc(size, GFP_NOIO);
4543         if (!object_name)
4544                 return -ENOMEM;
4545         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4546         dout("rbd id object name is %s\n", object_name);
4547
4548         /* Response will be an encoded string, which includes a length */
4549
4550         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4551         response = kzalloc(size, GFP_NOIO);
4552         if (!response) {
4553                 ret = -ENOMEM;
4554                 goto out;
4555         }
4556
4557         ret = rbd_obj_method_sync(rbd_dev, object_name,
4558                                 "rbd", "get_id",
4559                                 NULL, 0,
4560                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4561         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4562         if (ret < 0)
4563                 goto out;
4564
4565         p = response;
4566         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4567                                                 p + RBD_IMAGE_ID_LEN_MAX,
4568                                                 NULL, GFP_NOIO);
4569         if (IS_ERR(rbd_dev->spec->image_id)) {
4570                 ret = PTR_ERR(rbd_dev->spec->image_id);
4571                 rbd_dev->spec->image_id = NULL;
4572         } else {
4573                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4574         }
4575 out:
4576         kfree(response);
4577         kfree(object_name);
4578
4579         return ret;
4580 }
4581
4582 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4583 {
4584         int ret;
4585         size_t size;
4586
4587         /* Version 1 images have no id; empty string is used */
4588
4589         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4590         if (!rbd_dev->spec->image_id)
4591                 return -ENOMEM;
4592
4593         /* Record the header object name for this rbd image. */
4594
4595         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4596         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4597         if (!rbd_dev->header_name) {
4598                 ret = -ENOMEM;
4599                 goto out_err;
4600         }
4601         sprintf(rbd_dev->header_name, "%s%s",
4602                 rbd_dev->spec->image_name, RBD_SUFFIX);
4603
4604         /* Populate rbd image metadata */
4605
4606         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4607         if (ret < 0)
4608                 goto out_err;
4609
4610         /* Version 1 images have no parent (no layering) */
4611
4612         rbd_dev->parent_spec = NULL;
4613         rbd_dev->parent_overlap = 0;
4614
4615         rbd_dev->image_format = 1;
4616
4617         dout("discovered version 1 image, header name is %s\n",
4618                 rbd_dev->header_name);
4619
4620         return 0;
4621
4622 out_err:
4623         kfree(rbd_dev->header_name);
4624         rbd_dev->header_name = NULL;
4625         kfree(rbd_dev->spec->image_id);
4626         rbd_dev->spec->image_id = NULL;
4627
4628         return ret;
4629 }
4630
4631 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4632 {
4633         size_t size;
4634         int ret;
4635         u64 ver = 0;
4636
4637         /*
4638          * Image id was filled in by the caller.  Record the header
4639          * object name for this rbd image.
4640          */
4641         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4642         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4643         if (!rbd_dev->header_name)
4644                 return -ENOMEM;
4645         sprintf(rbd_dev->header_name, "%s%s",
4646                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4647
4648         /* Get the size and object order for the image */
4649
4650         ret = rbd_dev_v2_image_size(rbd_dev);
4651         if (ret < 0)
4652                 goto out_err;
4653
4654         /* Get the object prefix (a.k.a. block_name) for the image */
4655
4656         ret = rbd_dev_v2_object_prefix(rbd_dev);
4657         if (ret < 0)
4658                 goto out_err;
4659
4660         /* Get the and check features for the image */
4661
4662         ret = rbd_dev_v2_features(rbd_dev);
4663         if (ret < 0)
4664                 goto out_err;
4665
4666         /* If the image supports layering, get the parent info */
4667
4668         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4669                 ret = rbd_dev_v2_parent_info(rbd_dev);
4670                 if (ret < 0)
4671                         goto out_err;
4672         }
4673
4674         /* crypto and compression type aren't (yet) supported for v2 images */
4675
4676         rbd_dev->header.crypt_type = 0;
4677         rbd_dev->header.comp_type = 0;
4678
4679         /* Get the snapshot context, plus the header version */
4680
4681         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4682         if (ret)
4683                 goto out_err;
4684         rbd_dev->header.obj_version = ver;
4685
4686         rbd_dev->image_format = 2;
4687
4688         dout("discovered version 2 image, header name is %s\n",
4689                 rbd_dev->header_name);
4690
4691         return 0;
4692 out_err:
4693         rbd_dev->parent_overlap = 0;
4694         rbd_spec_put(rbd_dev->parent_spec);
4695         rbd_dev->parent_spec = NULL;
4696         kfree(rbd_dev->header_name);
4697         rbd_dev->header_name = NULL;
4698         kfree(rbd_dev->header.object_prefix);
4699         rbd_dev->header.object_prefix = NULL;
4700
4701         return ret;
4702 }
4703
4704 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4705 {
4706         struct rbd_device *parent = NULL;
4707         struct rbd_spec *parent_spec = NULL;
4708         struct rbd_client *rbdc = NULL;
4709         int ret;
4710
4711         /* no need to lock here, as rbd_dev is not registered yet */
4712         ret = rbd_dev_snaps_update(rbd_dev);
4713         if (ret)
4714                 return ret;
4715
4716         ret = rbd_dev_probe_update_spec(rbd_dev);
4717         if (ret)
4718                 goto err_out_snaps;
4719
4720         ret = rbd_dev_set_mapping(rbd_dev);
4721         if (ret)
4722                 goto err_out_snaps;
4723
4724         /* generate unique id: find highest unique id, add one */
4725         rbd_dev_id_get(rbd_dev);
4726
4727         /* Fill in the device name, now that we have its id. */
4728         BUILD_BUG_ON(DEV_NAME_LEN
4729                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4730         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4731
4732         /* Get our block major device number. */
4733
4734         ret = register_blkdev(0, rbd_dev->name);
4735         if (ret < 0)
4736                 goto err_out_id;
4737         rbd_dev->major = ret;
4738
4739         /* Set up the blkdev mapping. */
4740
4741         ret = rbd_init_disk(rbd_dev);
4742         if (ret)
4743                 goto err_out_blkdev;
4744
4745         ret = rbd_bus_add_dev(rbd_dev);
4746         if (ret)
4747                 goto err_out_disk;
4748
4749         /*
4750          * At this point cleanup in the event of an error is the job
4751          * of the sysfs code (initiated by rbd_bus_del_dev()).
4752          */
4753         /* Probe the parent if there is one */
4754
4755         if (rbd_dev->parent_spec) {
4756                 /*
4757                  * We need to pass a reference to the client and the
4758                  * parent spec when creating the parent rbd_dev.
4759                  * Images related by parent/child relationships
4760                  * always share both.
4761                  */
4762                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4763                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4764
4765                 parent = rbd_dev_create(rbdc, parent_spec);
4766                 if (!parent) {
4767                         ret = -ENOMEM;
4768                         goto err_out_spec;
4769                 }
4770                 rbdc = NULL;            /* parent now owns reference */
4771                 parent_spec = NULL;     /* parent now owns reference */
4772                 ret = rbd_dev_probe(parent);
4773                 if (ret < 0)
4774                         goto err_out_parent;
4775                 rbd_dev->parent = parent;
4776         }
4777
4778         down_write(&rbd_dev->header_rwsem);
4779         ret = rbd_dev_snaps_register(rbd_dev);
4780         up_write(&rbd_dev->header_rwsem);
4781         if (ret)
4782                 goto err_out_bus;
4783
4784         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4785         if (ret)
4786                 goto err_out_bus;
4787
4788         /* Everything's ready.  Announce the disk to the world. */
4789
4790         add_disk(rbd_dev->disk);
4791
4792         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4793                 (unsigned long long) rbd_dev->mapping.size);
4794
4795         return ret;
4796
4797 err_out_parent:
4798         rbd_dev_destroy(parent);
4799 err_out_spec:
4800         rbd_spec_put(parent_spec);
4801         rbd_put_client(rbdc);
4802 err_out_bus:
4803         /* this will also clean up rest of rbd_dev stuff */
4804
4805         rbd_bus_del_dev(rbd_dev);
4806
4807         return ret;
4808 err_out_disk:
4809         rbd_free_disk(rbd_dev);
4810 err_out_blkdev:
4811         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4812 err_out_id:
4813         rbd_dev_id_put(rbd_dev);
4814 err_out_snaps:
4815         rbd_remove_all_snaps(rbd_dev);
4816
4817         return ret;
4818 }
4819
4820 /*
4821  * Probe for the existence of the header object for the given rbd
4822  * device.  For format 2 images this includes determining the image
4823  * id.
4824  */
4825 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4826 {
4827         int ret;
4828
4829         /*
4830          * Get the id from the image id object.  If it's not a
4831          * format 2 image, we'll get ENOENT back, and we'll assume
4832          * it's a format 1 image.
4833          */
4834         ret = rbd_dev_image_id(rbd_dev);
4835         if (ret)
4836                 ret = rbd_dev_v1_probe(rbd_dev);
4837         else
4838                 ret = rbd_dev_v2_probe(rbd_dev);
4839         if (ret) {
4840                 dout("probe failed, returning %d\n", ret);
4841
4842                 return ret;
4843         }
4844
4845         ret = rbd_dev_probe_finish(rbd_dev);
4846         if (ret)
4847                 rbd_header_free(&rbd_dev->header);
4848
4849         return ret;
4850 }
4851
4852 static ssize_t rbd_add(struct bus_type *bus,
4853                        const char *buf,
4854                        size_t count)
4855 {
4856         struct rbd_device *rbd_dev = NULL;
4857         struct ceph_options *ceph_opts = NULL;
4858         struct rbd_options *rbd_opts = NULL;
4859         struct rbd_spec *spec = NULL;
4860         struct rbd_client *rbdc;
4861         struct ceph_osd_client *osdc;
4862         int rc = -ENOMEM;
4863
4864         if (!try_module_get(THIS_MODULE))
4865                 return -ENODEV;
4866
4867         /* parse add command */
4868         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4869         if (rc < 0)
4870                 goto err_out_module;
4871
4872         rbdc = rbd_get_client(ceph_opts);
4873         if (IS_ERR(rbdc)) {
4874                 rc = PTR_ERR(rbdc);
4875                 goto err_out_args;
4876         }
4877         ceph_opts = NULL;       /* rbd_dev client now owns this */
4878
4879         /* pick the pool */
4880         osdc = &rbdc->client->osdc;
4881         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4882         if (rc < 0)
4883                 goto err_out_client;
4884         spec->pool_id = (u64) rc;
4885
4886         /* The ceph file layout needs to fit pool id in 32 bits */
4887
4888         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4889                 rc = -EIO;
4890                 goto err_out_client;
4891         }
4892
4893         rbd_dev = rbd_dev_create(rbdc, spec);
4894         if (!rbd_dev)
4895                 goto err_out_client;
4896         rbdc = NULL;            /* rbd_dev now owns this */
4897         spec = NULL;            /* rbd_dev now owns this */
4898
4899         rbd_dev->mapping.read_only = rbd_opts->read_only;
4900         kfree(rbd_opts);
4901         rbd_opts = NULL;        /* done with this */
4902
4903         rc = rbd_dev_probe(rbd_dev);
4904         if (rc < 0)
4905                 goto err_out_rbd_dev;
4906
4907         return count;
4908 err_out_rbd_dev:
4909         rbd_dev_destroy(rbd_dev);
4910 err_out_client:
4911         rbd_put_client(rbdc);
4912 err_out_args:
4913         if (ceph_opts)
4914                 ceph_destroy_options(ceph_opts);
4915         kfree(rbd_opts);
4916         rbd_spec_put(spec);
4917 err_out_module:
4918         module_put(THIS_MODULE);
4919
4920         dout("Error adding device %s\n", buf);
4921
4922         return (ssize_t) rc;
4923 }
4924
4925 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4926 {
4927         struct list_head *tmp;
4928         struct rbd_device *rbd_dev;
4929
4930         spin_lock(&rbd_dev_list_lock);
4931         list_for_each(tmp, &rbd_dev_list) {
4932                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4933                 if (rbd_dev->dev_id == dev_id) {
4934                         spin_unlock(&rbd_dev_list_lock);
4935                         return rbd_dev;
4936                 }
4937         }
4938         spin_unlock(&rbd_dev_list_lock);
4939         return NULL;
4940 }
4941
4942 static void rbd_dev_release(struct device *dev)
4943 {
4944         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4945
4946         if (rbd_dev->watch_event)
4947                 rbd_dev_header_watch_sync(rbd_dev, 0);
4948
4949         /* clean up and free blkdev */
4950         rbd_free_disk(rbd_dev);
4951         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4952
4953         /* release allocated disk header fields */
4954         rbd_header_free(&rbd_dev->header);
4955
4956         /* done with the id, and with the rbd_dev */
4957         rbd_dev_id_put(rbd_dev);
4958         rbd_assert(rbd_dev->rbd_client != NULL);
4959         rbd_dev_destroy(rbd_dev);
4960
4961         /* release module ref */
4962         module_put(THIS_MODULE);
4963 }
4964
4965 static void __rbd_remove(struct rbd_device *rbd_dev)
4966 {
4967         rbd_remove_all_snaps(rbd_dev);
4968         rbd_bus_del_dev(rbd_dev);
4969 }
4970
4971 static ssize_t rbd_remove(struct bus_type *bus,
4972                           const char *buf,
4973                           size_t count)
4974 {
4975         struct rbd_device *rbd_dev = NULL;
4976         int target_id, rc;
4977         unsigned long ul;
4978         int ret = count;
4979
4980         rc = strict_strtoul(buf, 10, &ul);
4981         if (rc)
4982                 return rc;
4983
4984         /* convert to int; abort if we lost anything in the conversion */
4985         target_id = (int) ul;
4986         if (target_id != ul)
4987                 return -EINVAL;
4988
4989         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4990
4991         rbd_dev = __rbd_get_dev(target_id);
4992         if (!rbd_dev) {
4993                 ret = -ENOENT;
4994                 goto done;
4995         }
4996
4997         spin_lock_irq(&rbd_dev->lock);
4998         if (rbd_dev->open_count)
4999                 ret = -EBUSY;
5000         else
5001                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5002         spin_unlock_irq(&rbd_dev->lock);
5003         if (ret < 0)
5004                 goto done;
5005
5006         while (rbd_dev->parent_spec) {
5007                 struct rbd_device *first = rbd_dev;
5008                 struct rbd_device *second = first->parent;
5009                 struct rbd_device *third;
5010
5011                 /*
5012                  * Follow to the parent with no grandparent and
5013                  * remove it.
5014                  */
5015                 while (second && (third = second->parent)) {
5016                         first = second;
5017                         second = third;
5018                 }
5019                 __rbd_remove(second);
5020                 rbd_spec_put(first->parent_spec);
5021                 first->parent_spec = NULL;
5022                 first->parent_overlap = 0;
5023                 first->parent = NULL;
5024         }
5025         __rbd_remove(rbd_dev);
5026
5027 done:
5028         mutex_unlock(&ctl_mutex);
5029
5030         return ret;
5031 }
5032
5033 /*
5034  * create control files in sysfs
5035  * /sys/bus/rbd/...
5036  */
5037 static int rbd_sysfs_init(void)
5038 {
5039         int ret;
5040
5041         ret = device_register(&rbd_root_dev);
5042         if (ret < 0)
5043                 return ret;
5044
5045         ret = bus_register(&rbd_bus_type);
5046         if (ret < 0)
5047                 device_unregister(&rbd_root_dev);
5048
5049         return ret;
5050 }
5051
5052 static void rbd_sysfs_cleanup(void)
5053 {
5054         bus_unregister(&rbd_bus_type);
5055         device_unregister(&rbd_root_dev);
5056 }
5057
5058 static int __init rbd_init(void)
5059 {
5060         int rc;
5061
5062         if (!libceph_compatible(NULL)) {
5063                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5064
5065                 return -EINVAL;
5066         }
5067         rc = rbd_sysfs_init();
5068         if (rc)
5069                 return rc;
5070         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5071         return 0;
5072 }
5073
5074 static void __exit rbd_exit(void)
5075 {
5076         rbd_sysfs_cleanup();
5077 }
5078
5079 module_init(rbd_init);
5080 module_exit(rbd_exit);
5081
5082 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5083 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5084 MODULE_DESCRIPTION("rados block device");
5085
5086 /* following authorship retained from original osdblk.c */
5087 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5088
5089 MODULE_LICENSE("GPL");