]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
61375093358816f23d7d56d135425911da904427
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.  Each rbd_dev structure includes a pointer to
120  * an rbd_spec structure that encapsulates this identity.
121  *
122  * Each of the id's in an rbd_spec has an associated name.  For a
123  * user-mapped image, the names are supplied and the id's associated
124  * with them are looked up.  For a layered image, a parent image is
125  * defined by the tuple, and the names are looked up.
126  *
127  * An rbd_dev structure contains a parent_spec pointer which is
128  * non-null if the image it represents is a child in a layered
129  * image.  This pointer will refer to the rbd_spec structure used
130  * by the parent rbd_dev for its own identity (i.e., the structure
131  * is shared between the parent and child).
132  *
133  * Since these structures are populated once, during the discovery
134  * phase of image construction, they are effectively immutable so
135  * we make no effort to synchronize access to them.
136  *
137  * Note that code herein does not assume the image name is known (it
138  * could be a null pointer).
139  */
140 struct rbd_spec {
141         u64             pool_id;
142         const char      *pool_name;
143
144         const char      *image_id;
145         const char      *image_name;
146
147         u64             snap_id;
148         const char      *snap_name;
149
150         struct kref     kref;
151 };
152
153 /*
154  * an instance of the client.  multiple devices may share an rbd client.
155  */
156 struct rbd_client {
157         struct ceph_client      *client;
158         struct kref             kref;
159         struct list_head        node;
160 };
161
162 struct rbd_img_request;
163 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164
165 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
166
167 struct rbd_obj_request;
168 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169
170 enum obj_request_type {
171         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
172 };
173
174 enum obj_req_flags {
175         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
176         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
177         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
178         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
179 };
180
181 struct rbd_obj_request {
182         const char              *object_name;
183         u64                     offset;         /* object start byte */
184         u64                     length;         /* bytes from offset */
185         unsigned long           flags;
186
187         /*
188          * An object request associated with an image will have its
189          * img_data flag set; a standalone object request will not.
190          *
191          * A standalone object request will have which == BAD_WHICH
192          * and a null obj_request pointer.
193          *
194          * An object request initiated in support of a layered image
195          * object (to check for its existence before a write) will
196          * have which == BAD_WHICH and a non-null obj_request pointer.
197          *
198          * Finally, an object request for rbd image data will have
199          * which != BAD_WHICH, and will have a non-null img_request
200          * pointer.  The value of which will be in the range
201          * 0..(img_request->obj_request_count-1).
202          */
203         union {
204                 struct rbd_obj_request  *obj_request;   /* STAT op */
205                 struct {
206                         struct rbd_img_request  *img_request;
207                         u64                     img_offset;
208                         /* links for img_request->obj_requests list */
209                         struct list_head        links;
210                 };
211         };
212         u32                     which;          /* posn image request list */
213
214         enum obj_request_type   type;
215         union {
216                 struct bio      *bio_list;
217                 struct {
218                         struct page     **pages;
219                         u32             page_count;
220                 };
221         };
222         struct page             **copyup_pages;
223
224         struct ceph_osd_request *osd_req;
225
226         u64                     xferred;        /* bytes transferred */
227         u64                     version;
228         int                     result;
229
230         rbd_obj_callback_t      callback;
231         struct completion       completion;
232
233         struct kref             kref;
234 };
235
236 enum img_req_flags {
237         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
238         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
239         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
240 };
241
242 struct rbd_img_request {
243         struct rbd_device       *rbd_dev;
244         u64                     offset; /* starting image byte offset */
245         u64                     length; /* byte count from offset */
246         unsigned long           flags;
247         union {
248                 u64                     snap_id;        /* for reads */
249                 struct ceph_snap_context *snapc;        /* for writes */
250         };
251         union {
252                 struct request          *rq;            /* block request */
253                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
254         };
255         struct page             **copyup_pages;
256         spinlock_t              completion_lock;/* protects next_completion */
257         u32                     next_completion;
258         rbd_img_callback_t      callback;
259         u64                     xferred;/* aggregate bytes transferred */
260         int                     result; /* first nonzero obj_request result */
261
262         u32                     obj_request_count;
263         struct list_head        obj_requests;   /* rbd_obj_request structs */
264
265         struct kref             kref;
266 };
267
268 #define for_each_obj_request(ireq, oreq) \
269         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_from(ireq, oreq) \
271         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_safe(ireq, oreq, n) \
273         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
274
275 struct rbd_snap {
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         /* protects updating the header */
321         struct rw_semaphore     header_rwsem;
322
323         struct rbd_mapping      mapping;
324
325         struct list_head        node;
326
327         /* list of snapshots */
328         struct list_head        snaps;
329
330         /* sysfs related */
331         struct device           dev;
332         unsigned long           open_count;     /* protected by lock */
333 };
334
335 /*
336  * Flag bits for rbd_dev->flags.  If atomicity is required,
337  * rbd_dev->lock is used to protect access.
338  *
339  * Currently, only the "removing" flag (which is coupled with the
340  * "open_count" field) requires atomic access.
341  */
342 enum rbd_dev_flags {
343         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
344         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
345 };
346
347 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
348
349 static LIST_HEAD(rbd_dev_list);    /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
352 static LIST_HEAD(rbd_client_list);              /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358
359 static void rbd_dev_device_release(struct device *dev);
360 static void rbd_snap_destroy(struct rbd_snap *snap);
361
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363                        size_t count);
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365                           size_t count);
366 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
367
368 static struct bus_attribute rbd_bus_attrs[] = {
369         __ATTR(add, S_IWUSR, NULL, rbd_add),
370         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371         __ATTR_NULL
372 };
373
374 static struct bus_type rbd_bus_type = {
375         .name           = "rbd",
376         .bus_attrs      = rbd_bus_attrs,
377 };
378
379 static void rbd_root_dev_release(struct device *dev)
380 {
381 }
382
383 static struct device rbd_root_dev = {
384         .init_name =    "rbd",
385         .release =      rbd_root_dev_release,
386 };
387
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390 {
391         struct va_format vaf;
392         va_list args;
393
394         va_start(args, fmt);
395         vaf.fmt = fmt;
396         vaf.va = &args;
397
398         if (!rbd_dev)
399                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400         else if (rbd_dev->disk)
401                 printk(KERN_WARNING "%s: %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_name)
404                 printk(KERN_WARNING "%s: image %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406         else if (rbd_dev->spec && rbd_dev->spec->image_id)
407                 printk(KERN_WARNING "%s: id %s: %pV\n",
408                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409         else    /* punt */
410                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411                         RBD_DRV_NAME, rbd_dev, &vaf);
412         va_end(args);
413 }
414
415 #ifdef RBD_DEBUG
416 #define rbd_assert(expr)                                                \
417                 if (unlikely(!(expr))) {                                \
418                         printk(KERN_ERR "\nAssertion failure in %s() "  \
419                                                 "at line %d:\n\n"       \
420                                         "\trbd_assert(%s);\n\n",        \
421                                         __func__, __LINE__, #expr);     \
422                         BUG();                                          \
423                 }
424 #else /* !RBD_DEBUG */
425 #  define rbd_assert(expr)      ((void) 0)
426 #endif /* !RBD_DEBUG */
427
428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
431
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791
792         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
793         if (!header->snapc)
794                 goto out_err;
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         for (i = 0; i < snap_count; i++)
797                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
798
799         return 0;
800
801 out_err:
802         kfree(header->snap_sizes);
803         header->snap_sizes = NULL;
804         kfree(header->snap_names);
805         header->snap_names = NULL;
806         kfree(header->object_prefix);
807         header->object_prefix = NULL;
808
809         return -ENOMEM;
810 }
811
812 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
813 {
814         struct rbd_snap *snap;
815
816         if (snap_id == CEPH_NOSNAP)
817                 return RBD_SNAP_HEAD_NAME;
818
819         list_for_each_entry(snap, &rbd_dev->snaps, node)
820                 if (snap_id == snap->id)
821                         return snap->name;
822
823         return NULL;
824 }
825
826 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827                                         const char *snap_name)
828 {
829         struct rbd_snap *snap;
830
831         list_for_each_entry(snap, &rbd_dev->snaps, node)
832                 if (!strcmp(snap_name, snap->name))
833                         return snap;
834
835         return NULL;
836 }
837
838 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
839 {
840         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
841                     sizeof (RBD_SNAP_HEAD_NAME))) {
842                 rbd_dev->mapping.size = rbd_dev->header.image_size;
843                 rbd_dev->mapping.features = rbd_dev->header.features;
844         } else {
845                 struct rbd_snap *snap;
846
847                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
848                 if (!snap)
849                         return -ENOENT;
850                 rbd_dev->mapping.size = snap->size;
851                 rbd_dev->mapping.features = snap->features;
852                 rbd_dev->mapping.read_only = true;
853         }
854
855         return 0;
856 }
857
858 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
859 {
860         rbd_dev->mapping.size = 0;
861         rbd_dev->mapping.features = 0;
862         rbd_dev->mapping.read_only = true;
863 }
864
865 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
866 {
867         rbd_dev->mapping.size = 0;
868         rbd_dev->mapping.features = 0;
869         rbd_dev->mapping.read_only = true;
870 }
871
872 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
873 {
874         char *name;
875         u64 segment;
876         int ret;
877
878         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
879         if (!name)
880                 return NULL;
881         segment = offset >> rbd_dev->header.obj_order;
882         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
883                         rbd_dev->header.object_prefix, segment);
884         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
885                 pr_err("error formatting segment name for #%llu (%d)\n",
886                         segment, ret);
887                 kfree(name);
888                 name = NULL;
889         }
890
891         return name;
892 }
893
894 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
895 {
896         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
897
898         return offset & (segment_size - 1);
899 }
900
901 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902                                 u64 offset, u64 length)
903 {
904         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906         offset &= segment_size - 1;
907
908         rbd_assert(length <= U64_MAX - offset);
909         if (offset + length > segment_size)
910                 length = segment_size - offset;
911
912         return length;
913 }
914
915 /*
916  * returns the size of an object in the image
917  */
918 static u64 rbd_obj_bytes(struct rbd_image_header *header)
919 {
920         return 1 << header->obj_order;
921 }
922
923 /*
924  * bio helpers
925  */
926
927 static void bio_chain_put(struct bio *chain)
928 {
929         struct bio *tmp;
930
931         while (chain) {
932                 tmp = chain;
933                 chain = chain->bi_next;
934                 bio_put(tmp);
935         }
936 }
937
938 /*
939  * zeros a bio chain, starting at specific offset
940  */
941 static void zero_bio_chain(struct bio *chain, int start_ofs)
942 {
943         struct bio_vec *bv;
944         unsigned long flags;
945         void *buf;
946         int i;
947         int pos = 0;
948
949         while (chain) {
950                 bio_for_each_segment(bv, chain, i) {
951                         if (pos + bv->bv_len > start_ofs) {
952                                 int remainder = max(start_ofs - pos, 0);
953                                 buf = bvec_kmap_irq(bv, &flags);
954                                 memset(buf + remainder, 0,
955                                        bv->bv_len - remainder);
956                                 bvec_kunmap_irq(buf, &flags);
957                         }
958                         pos += bv->bv_len;
959                 }
960
961                 chain = chain->bi_next;
962         }
963 }
964
965 /*
966  * similar to zero_bio_chain(), zeros data defined by a page array,
967  * starting at the given byte offset from the start of the array and
968  * continuing up to the given end offset.  The pages array is
969  * assumed to be big enough to hold all bytes up to the end.
970  */
971 static void zero_pages(struct page **pages, u64 offset, u64 end)
972 {
973         struct page **page = &pages[offset >> PAGE_SHIFT];
974
975         rbd_assert(end > offset);
976         rbd_assert(end - offset <= (u64)SIZE_MAX);
977         while (offset < end) {
978                 size_t page_offset;
979                 size_t length;
980                 unsigned long flags;
981                 void *kaddr;
982
983                 page_offset = (size_t)(offset & ~PAGE_MASK);
984                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985                 local_irq_save(flags);
986                 kaddr = kmap_atomic(*page);
987                 memset(kaddr + page_offset, 0, length);
988                 kunmap_atomic(kaddr);
989                 local_irq_restore(flags);
990
991                 offset += length;
992                 page++;
993         }
994 }
995
996 /*
997  * Clone a portion of a bio, starting at the given byte offset
998  * and continuing for the number of bytes indicated.
999  */
1000 static struct bio *bio_clone_range(struct bio *bio_src,
1001                                         unsigned int offset,
1002                                         unsigned int len,
1003                                         gfp_t gfpmask)
1004 {
1005         struct bio_vec *bv;
1006         unsigned int resid;
1007         unsigned short idx;
1008         unsigned int voff;
1009         unsigned short end_idx;
1010         unsigned short vcnt;
1011         struct bio *bio;
1012
1013         /* Handle the easy case for the caller */
1014
1015         if (!offset && len == bio_src->bi_size)
1016                 return bio_clone(bio_src, gfpmask);
1017
1018         if (WARN_ON_ONCE(!len))
1019                 return NULL;
1020         if (WARN_ON_ONCE(len > bio_src->bi_size))
1021                 return NULL;
1022         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1023                 return NULL;
1024
1025         /* Find first affected segment... */
1026
1027         resid = offset;
1028         __bio_for_each_segment(bv, bio_src, idx, 0) {
1029                 if (resid < bv->bv_len)
1030                         break;
1031                 resid -= bv->bv_len;
1032         }
1033         voff = resid;
1034
1035         /* ...and the last affected segment */
1036
1037         resid += len;
1038         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039                 if (resid <= bv->bv_len)
1040                         break;
1041                 resid -= bv->bv_len;
1042         }
1043         vcnt = end_idx - idx + 1;
1044
1045         /* Build the clone */
1046
1047         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1048         if (!bio)
1049                 return NULL;    /* ENOMEM */
1050
1051         bio->bi_bdev = bio_src->bi_bdev;
1052         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053         bio->bi_rw = bio_src->bi_rw;
1054         bio->bi_flags |= 1 << BIO_CLONED;
1055
1056         /*
1057          * Copy over our part of the bio_vec, then update the first
1058          * and last (or only) entries.
1059          */
1060         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061                         vcnt * sizeof (struct bio_vec));
1062         bio->bi_io_vec[0].bv_offset += voff;
1063         if (vcnt > 1) {
1064                 bio->bi_io_vec[0].bv_len -= voff;
1065                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1066         } else {
1067                 bio->bi_io_vec[0].bv_len = len;
1068         }
1069
1070         bio->bi_vcnt = vcnt;
1071         bio->bi_size = len;
1072         bio->bi_idx = 0;
1073
1074         return bio;
1075 }
1076
1077 /*
1078  * Clone a portion of a bio chain, starting at the given byte offset
1079  * into the first bio in the source chain and continuing for the
1080  * number of bytes indicated.  The result is another bio chain of
1081  * exactly the given length, or a null pointer on error.
1082  *
1083  * The bio_src and offset parameters are both in-out.  On entry they
1084  * refer to the first source bio and the offset into that bio where
1085  * the start of data to be cloned is located.
1086  *
1087  * On return, bio_src is updated to refer to the bio in the source
1088  * chain that contains first un-cloned byte, and *offset will
1089  * contain the offset of that byte within that bio.
1090  */
1091 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092                                         unsigned int *offset,
1093                                         unsigned int len,
1094                                         gfp_t gfpmask)
1095 {
1096         struct bio *bi = *bio_src;
1097         unsigned int off = *offset;
1098         struct bio *chain = NULL;
1099         struct bio **end;
1100
1101         /* Build up a chain of clone bios up to the limit */
1102
1103         if (!bi || off >= bi->bi_size || !len)
1104                 return NULL;            /* Nothing to clone */
1105
1106         end = &chain;
1107         while (len) {
1108                 unsigned int bi_size;
1109                 struct bio *bio;
1110
1111                 if (!bi) {
1112                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1113                         goto out_err;   /* EINVAL; ran out of bio's */
1114                 }
1115                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1117                 if (!bio)
1118                         goto out_err;   /* ENOMEM */
1119
1120                 *end = bio;
1121                 end = &bio->bi_next;
1122
1123                 off += bi_size;
1124                 if (off == bi->bi_size) {
1125                         bi = bi->bi_next;
1126                         off = 0;
1127                 }
1128                 len -= bi_size;
1129         }
1130         *bio_src = bi;
1131         *offset = off;
1132
1133         return chain;
1134 out_err:
1135         bio_chain_put(chain);
1136
1137         return NULL;
1138 }
1139
1140 /*
1141  * The default/initial value for all object request flags is 0.  For
1142  * each flag, once its value is set to 1 it is never reset to 0
1143  * again.
1144  */
1145 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1146 {
1147         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1148                 struct rbd_device *rbd_dev;
1149
1150                 rbd_dev = obj_request->img_request->rbd_dev;
1151                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1152                         obj_request);
1153         }
1154 }
1155
1156 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1157 {
1158         smp_mb();
1159         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1160 }
1161
1162 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1163 {
1164         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165                 struct rbd_device *rbd_dev = NULL;
1166
1167                 if (obj_request_img_data_test(obj_request))
1168                         rbd_dev = obj_request->img_request->rbd_dev;
1169                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1170                         obj_request);
1171         }
1172 }
1173
1174 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1175 {
1176         smp_mb();
1177         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1178 }
1179
1180 /*
1181  * This sets the KNOWN flag after (possibly) setting the EXISTS
1182  * flag.  The latter is set based on the "exists" value provided.
1183  *
1184  * Note that for our purposes once an object exists it never goes
1185  * away again.  It's possible that the response from two existence
1186  * checks are separated by the creation of the target object, and
1187  * the first ("doesn't exist") response arrives *after* the second
1188  * ("does exist").  In that case we ignore the second one.
1189  */
1190 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1191                                 bool exists)
1192 {
1193         if (exists)
1194                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1196         smp_mb();
1197 }
1198
1199 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1200 {
1201         smp_mb();
1202         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1203 }
1204
1205 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1206 {
1207         smp_mb();
1208         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1209 }
1210
1211 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1212 {
1213         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214                 atomic_read(&obj_request->kref.refcount));
1215         kref_get(&obj_request->kref);
1216 }
1217
1218 static void rbd_obj_request_destroy(struct kref *kref);
1219 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1220 {
1221         rbd_assert(obj_request != NULL);
1222         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223                 atomic_read(&obj_request->kref.refcount));
1224         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1225 }
1226
1227 static void rbd_img_request_get(struct rbd_img_request *img_request)
1228 {
1229         dout("%s: img %p (was %d)\n", __func__, img_request,
1230                 atomic_read(&img_request->kref.refcount));
1231         kref_get(&img_request->kref);
1232 }
1233
1234 static void rbd_img_request_destroy(struct kref *kref);
1235 static void rbd_img_request_put(struct rbd_img_request *img_request)
1236 {
1237         rbd_assert(img_request != NULL);
1238         dout("%s: img %p (was %d)\n", __func__, img_request,
1239                 atomic_read(&img_request->kref.refcount));
1240         kref_put(&img_request->kref, rbd_img_request_destroy);
1241 }
1242
1243 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244                                         struct rbd_obj_request *obj_request)
1245 {
1246         rbd_assert(obj_request->img_request == NULL);
1247
1248         /* Image request now owns object's original reference */
1249         obj_request->img_request = img_request;
1250         obj_request->which = img_request->obj_request_count;
1251         rbd_assert(!obj_request_img_data_test(obj_request));
1252         obj_request_img_data_set(obj_request);
1253         rbd_assert(obj_request->which != BAD_WHICH);
1254         img_request->obj_request_count++;
1255         list_add_tail(&obj_request->links, &img_request->obj_requests);
1256         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257                 obj_request->which);
1258 }
1259
1260 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261                                         struct rbd_obj_request *obj_request)
1262 {
1263         rbd_assert(obj_request->which != BAD_WHICH);
1264
1265         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266                 obj_request->which);
1267         list_del(&obj_request->links);
1268         rbd_assert(img_request->obj_request_count > 0);
1269         img_request->obj_request_count--;
1270         rbd_assert(obj_request->which == img_request->obj_request_count);
1271         obj_request->which = BAD_WHICH;
1272         rbd_assert(obj_request_img_data_test(obj_request));
1273         rbd_assert(obj_request->img_request == img_request);
1274         obj_request->img_request = NULL;
1275         obj_request->callback = NULL;
1276         rbd_obj_request_put(obj_request);
1277 }
1278
1279 static bool obj_request_type_valid(enum obj_request_type type)
1280 {
1281         switch (type) {
1282         case OBJ_REQUEST_NODATA:
1283         case OBJ_REQUEST_BIO:
1284         case OBJ_REQUEST_PAGES:
1285                 return true;
1286         default:
1287                 return false;
1288         }
1289 }
1290
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292                                 struct rbd_obj_request *obj_request)
1293 {
1294         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1295
1296         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1297 }
1298
1299 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1300 {
1301
1302         dout("%s: img %p\n", __func__, img_request);
1303
1304         /*
1305          * If no error occurred, compute the aggregate transfer
1306          * count for the image request.  We could instead use
1307          * atomic64_cmpxchg() to update it as each object request
1308          * completes; not clear which way is better off hand.
1309          */
1310         if (!img_request->result) {
1311                 struct rbd_obj_request *obj_request;
1312                 u64 xferred = 0;
1313
1314                 for_each_obj_request(img_request, obj_request)
1315                         xferred += obj_request->xferred;
1316                 img_request->xferred = xferred;
1317         }
1318
1319         if (img_request->callback)
1320                 img_request->callback(img_request);
1321         else
1322                 rbd_img_request_put(img_request);
1323 }
1324
1325 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1326
1327 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1328 {
1329         dout("%s: obj %p\n", __func__, obj_request);
1330
1331         return wait_for_completion_interruptible(&obj_request->completion);
1332 }
1333
1334 /*
1335  * The default/initial value for all image request flags is 0.  Each
1336  * is conditionally set to 1 at image request initialization time
1337  * and currently never change thereafter.
1338  */
1339 static void img_request_write_set(struct rbd_img_request *img_request)
1340 {
1341         set_bit(IMG_REQ_WRITE, &img_request->flags);
1342         smp_mb();
1343 }
1344
1345 static bool img_request_write_test(struct rbd_img_request *img_request)
1346 {
1347         smp_mb();
1348         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1349 }
1350
1351 static void img_request_child_set(struct rbd_img_request *img_request)
1352 {
1353         set_bit(IMG_REQ_CHILD, &img_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool img_request_child_test(struct rbd_img_request *img_request)
1358 {
1359         smp_mb();
1360         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1361 }
1362
1363 static void img_request_layered_set(struct rbd_img_request *img_request)
1364 {
1365         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool img_request_layered_test(struct rbd_img_request *img_request)
1370 {
1371         smp_mb();
1372         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1373 }
1374
1375 static void
1376 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1377 {
1378         u64 xferred = obj_request->xferred;
1379         u64 length = obj_request->length;
1380
1381         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382                 obj_request, obj_request->img_request, obj_request->result,
1383                 xferred, length);
1384         /*
1385          * ENOENT means a hole in the image.  We zero-fill the
1386          * entire length of the request.  A short read also implies
1387          * zero-fill to the end of the request.  Either way we
1388          * update the xferred count to indicate the whole request
1389          * was satisfied.
1390          */
1391         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1392         if (obj_request->result == -ENOENT) {
1393                 if (obj_request->type == OBJ_REQUEST_BIO)
1394                         zero_bio_chain(obj_request->bio_list, 0);
1395                 else
1396                         zero_pages(obj_request->pages, 0, length);
1397                 obj_request->result = 0;
1398                 obj_request->xferred = length;
1399         } else if (xferred < length && !obj_request->result) {
1400                 if (obj_request->type == OBJ_REQUEST_BIO)
1401                         zero_bio_chain(obj_request->bio_list, xferred);
1402                 else
1403                         zero_pages(obj_request->pages, xferred, length);
1404                 obj_request->xferred = length;
1405         }
1406         obj_request_done_set(obj_request);
1407 }
1408
1409 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1410 {
1411         dout("%s: obj %p cb %p\n", __func__, obj_request,
1412                 obj_request->callback);
1413         if (obj_request->callback)
1414                 obj_request->callback(obj_request);
1415         else
1416                 complete_all(&obj_request->completion);
1417 }
1418
1419 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1420 {
1421         dout("%s: obj %p\n", __func__, obj_request);
1422         obj_request_done_set(obj_request);
1423 }
1424
1425 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1426 {
1427         struct rbd_img_request *img_request = NULL;
1428         struct rbd_device *rbd_dev = NULL;
1429         bool layered = false;
1430
1431         if (obj_request_img_data_test(obj_request)) {
1432                 img_request = obj_request->img_request;
1433                 layered = img_request && img_request_layered_test(img_request);
1434                 rbd_dev = img_request->rbd_dev;
1435         }
1436
1437         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438                 obj_request, img_request, obj_request->result,
1439                 obj_request->xferred, obj_request->length);
1440         if (layered && obj_request->result == -ENOENT &&
1441                         obj_request->img_offset < rbd_dev->parent_overlap)
1442                 rbd_img_parent_read(obj_request);
1443         else if (img_request)
1444                 rbd_img_obj_request_read_callback(obj_request);
1445         else
1446                 obj_request_done_set(obj_request);
1447 }
1448
1449 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1450 {
1451         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452                 obj_request->result, obj_request->length);
1453         /*
1454          * There is no such thing as a successful short write.  Set
1455          * it to our originally-requested length.
1456          */
1457         obj_request->xferred = obj_request->length;
1458         obj_request_done_set(obj_request);
1459 }
1460
1461 /*
1462  * For a simple stat call there's nothing to do.  We'll do more if
1463  * this is part of a write sequence for a layered image.
1464  */
1465 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1466 {
1467         dout("%s: obj %p\n", __func__, obj_request);
1468         obj_request_done_set(obj_request);
1469 }
1470
1471 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472                                 struct ceph_msg *msg)
1473 {
1474         struct rbd_obj_request *obj_request = osd_req->r_priv;
1475         u16 opcode;
1476
1477         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1478         rbd_assert(osd_req == obj_request->osd_req);
1479         if (obj_request_img_data_test(obj_request)) {
1480                 rbd_assert(obj_request->img_request);
1481                 rbd_assert(obj_request->which != BAD_WHICH);
1482         } else {
1483                 rbd_assert(obj_request->which == BAD_WHICH);
1484         }
1485
1486         if (osd_req->r_result < 0)
1487                 obj_request->result = osd_req->r_result;
1488         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1489
1490         BUG_ON(osd_req->r_num_ops > 2);
1491
1492         /*
1493          * We support a 64-bit length, but ultimately it has to be
1494          * passed to blk_end_request(), which takes an unsigned int.
1495          */
1496         obj_request->xferred = osd_req->r_reply_op_len[0];
1497         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1498         opcode = osd_req->r_ops[0].op;
1499         switch (opcode) {
1500         case CEPH_OSD_OP_READ:
1501                 rbd_osd_read_callback(obj_request);
1502                 break;
1503         case CEPH_OSD_OP_WRITE:
1504                 rbd_osd_write_callback(obj_request);
1505                 break;
1506         case CEPH_OSD_OP_STAT:
1507                 rbd_osd_stat_callback(obj_request);
1508                 break;
1509         case CEPH_OSD_OP_CALL:
1510         case CEPH_OSD_OP_NOTIFY_ACK:
1511         case CEPH_OSD_OP_WATCH:
1512                 rbd_osd_trivial_callback(obj_request);
1513                 break;
1514         default:
1515                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516                         obj_request->object_name, (unsigned short) opcode);
1517                 break;
1518         }
1519
1520         if (obj_request_done_test(obj_request))
1521                 rbd_obj_request_complete(obj_request);
1522 }
1523
1524 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1525 {
1526         struct rbd_img_request *img_request = obj_request->img_request;
1527         struct ceph_osd_request *osd_req = obj_request->osd_req;
1528         u64 snap_id;
1529
1530         rbd_assert(osd_req != NULL);
1531
1532         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1533         ceph_osdc_build_request(osd_req, obj_request->offset,
1534                         NULL, snap_id, NULL);
1535 }
1536
1537 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1538 {
1539         struct rbd_img_request *img_request = obj_request->img_request;
1540         struct ceph_osd_request *osd_req = obj_request->osd_req;
1541         struct ceph_snap_context *snapc;
1542         struct timespec mtime = CURRENT_TIME;
1543
1544         rbd_assert(osd_req != NULL);
1545
1546         snapc = img_request ? img_request->snapc : NULL;
1547         ceph_osdc_build_request(osd_req, obj_request->offset,
1548                         snapc, CEPH_NOSNAP, &mtime);
1549 }
1550
1551 static struct ceph_osd_request *rbd_osd_req_create(
1552                                         struct rbd_device *rbd_dev,
1553                                         bool write_request,
1554                                         struct rbd_obj_request *obj_request)
1555 {
1556         struct ceph_snap_context *snapc = NULL;
1557         struct ceph_osd_client *osdc;
1558         struct ceph_osd_request *osd_req;
1559
1560         if (obj_request_img_data_test(obj_request)) {
1561                 struct rbd_img_request *img_request = obj_request->img_request;
1562
1563                 rbd_assert(write_request ==
1564                                 img_request_write_test(img_request));
1565                 if (write_request)
1566                         snapc = img_request->snapc;
1567         }
1568
1569         /* Allocate and initialize the request, for the single op */
1570
1571         osdc = &rbd_dev->rbd_client->client->osdc;
1572         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1573         if (!osd_req)
1574                 return NULL;    /* ENOMEM */
1575
1576         if (write_request)
1577                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1578         else
1579                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1580
1581         osd_req->r_callback = rbd_osd_req_callback;
1582         osd_req->r_priv = obj_request;
1583
1584         osd_req->r_oid_len = strlen(obj_request->object_name);
1585         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1587
1588         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1589
1590         return osd_req;
1591 }
1592
1593 /*
1594  * Create a copyup osd request based on the information in the
1595  * object request supplied.  A copyup request has two osd ops,
1596  * a copyup method call, and a "normal" write request.
1597  */
1598 static struct ceph_osd_request *
1599 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1600 {
1601         struct rbd_img_request *img_request;
1602         struct ceph_snap_context *snapc;
1603         struct rbd_device *rbd_dev;
1604         struct ceph_osd_client *osdc;
1605         struct ceph_osd_request *osd_req;
1606
1607         rbd_assert(obj_request_img_data_test(obj_request));
1608         img_request = obj_request->img_request;
1609         rbd_assert(img_request);
1610         rbd_assert(img_request_write_test(img_request));
1611
1612         /* Allocate and initialize the request, for the two ops */
1613
1614         snapc = img_request->snapc;
1615         rbd_dev = img_request->rbd_dev;
1616         osdc = &rbd_dev->rbd_client->client->osdc;
1617         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1618         if (!osd_req)
1619                 return NULL;    /* ENOMEM */
1620
1621         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622         osd_req->r_callback = rbd_osd_req_callback;
1623         osd_req->r_priv = obj_request;
1624
1625         osd_req->r_oid_len = strlen(obj_request->object_name);
1626         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1628
1629         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1630
1631         return osd_req;
1632 }
1633
1634
1635 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1636 {
1637         ceph_osdc_put_request(osd_req);
1638 }
1639
1640 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1641
1642 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643                                                 u64 offset, u64 length,
1644                                                 enum obj_request_type type)
1645 {
1646         struct rbd_obj_request *obj_request;
1647         size_t size;
1648         char *name;
1649
1650         rbd_assert(obj_request_type_valid(type));
1651
1652         size = strlen(object_name) + 1;
1653         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1654         if (!obj_request)
1655                 return NULL;
1656
1657         name = (char *)(obj_request + 1);
1658         obj_request->object_name = memcpy(name, object_name, size);
1659         obj_request->offset = offset;
1660         obj_request->length = length;
1661         obj_request->flags = 0;
1662         obj_request->which = BAD_WHICH;
1663         obj_request->type = type;
1664         INIT_LIST_HEAD(&obj_request->links);
1665         init_completion(&obj_request->completion);
1666         kref_init(&obj_request->kref);
1667
1668         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669                 offset, length, (int)type, obj_request);
1670
1671         return obj_request;
1672 }
1673
1674 static void rbd_obj_request_destroy(struct kref *kref)
1675 {
1676         struct rbd_obj_request *obj_request;
1677
1678         obj_request = container_of(kref, struct rbd_obj_request, kref);
1679
1680         dout("%s: obj %p\n", __func__, obj_request);
1681
1682         rbd_assert(obj_request->img_request == NULL);
1683         rbd_assert(obj_request->which == BAD_WHICH);
1684
1685         if (obj_request->osd_req)
1686                 rbd_osd_req_destroy(obj_request->osd_req);
1687
1688         rbd_assert(obj_request_type_valid(obj_request->type));
1689         switch (obj_request->type) {
1690         case OBJ_REQUEST_NODATA:
1691                 break;          /* Nothing to do */
1692         case OBJ_REQUEST_BIO:
1693                 if (obj_request->bio_list)
1694                         bio_chain_put(obj_request->bio_list);
1695                 break;
1696         case OBJ_REQUEST_PAGES:
1697                 if (obj_request->pages)
1698                         ceph_release_page_vector(obj_request->pages,
1699                                                 obj_request->page_count);
1700                 break;
1701         }
1702
1703         kfree(obj_request);
1704 }
1705
1706 /*
1707  * Caller is responsible for filling in the list of object requests
1708  * that comprises the image request, and the Linux request pointer
1709  * (if there is one).
1710  */
1711 static struct rbd_img_request *rbd_img_request_create(
1712                                         struct rbd_device *rbd_dev,
1713                                         u64 offset, u64 length,
1714                                         bool write_request,
1715                                         bool child_request)
1716 {
1717         struct rbd_img_request *img_request;
1718
1719         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1720         if (!img_request)
1721                 return NULL;
1722
1723         if (write_request) {
1724                 down_read(&rbd_dev->header_rwsem);
1725                 ceph_get_snap_context(rbd_dev->header.snapc);
1726                 up_read(&rbd_dev->header_rwsem);
1727         }
1728
1729         img_request->rq = NULL;
1730         img_request->rbd_dev = rbd_dev;
1731         img_request->offset = offset;
1732         img_request->length = length;
1733         img_request->flags = 0;
1734         if (write_request) {
1735                 img_request_write_set(img_request);
1736                 img_request->snapc = rbd_dev->header.snapc;
1737         } else {
1738                 img_request->snap_id = rbd_dev->spec->snap_id;
1739         }
1740         if (child_request)
1741                 img_request_child_set(img_request);
1742         if (rbd_dev->parent_spec)
1743                 img_request_layered_set(img_request);
1744         spin_lock_init(&img_request->completion_lock);
1745         img_request->next_completion = 0;
1746         img_request->callback = NULL;
1747         img_request->result = 0;
1748         img_request->obj_request_count = 0;
1749         INIT_LIST_HEAD(&img_request->obj_requests);
1750         kref_init(&img_request->kref);
1751
1752         rbd_img_request_get(img_request);       /* Avoid a warning */
1753         rbd_img_request_put(img_request);       /* TEMPORARY */
1754
1755         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756                 write_request ? "write" : "read", offset, length,
1757                 img_request);
1758
1759         return img_request;
1760 }
1761
1762 static void rbd_img_request_destroy(struct kref *kref)
1763 {
1764         struct rbd_img_request *img_request;
1765         struct rbd_obj_request *obj_request;
1766         struct rbd_obj_request *next_obj_request;
1767
1768         img_request = container_of(kref, struct rbd_img_request, kref);
1769
1770         dout("%s: img %p\n", __func__, img_request);
1771
1772         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773                 rbd_img_obj_request_del(img_request, obj_request);
1774         rbd_assert(img_request->obj_request_count == 0);
1775
1776         if (img_request_write_test(img_request))
1777                 ceph_put_snap_context(img_request->snapc);
1778
1779         if (img_request_child_test(img_request))
1780                 rbd_obj_request_put(img_request->obj_request);
1781
1782         kfree(img_request);
1783 }
1784
1785 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1786 {
1787         struct rbd_img_request *img_request;
1788         unsigned int xferred;
1789         int result;
1790         bool more;
1791
1792         rbd_assert(obj_request_img_data_test(obj_request));
1793         img_request = obj_request->img_request;
1794
1795         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796         xferred = (unsigned int)obj_request->xferred;
1797         result = obj_request->result;
1798         if (result) {
1799                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1800
1801                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802                         img_request_write_test(img_request) ? "write" : "read",
1803                         obj_request->length, obj_request->img_offset,
1804                         obj_request->offset);
1805                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1806                         result, xferred);
1807                 if (!img_request->result)
1808                         img_request->result = result;
1809         }
1810
1811         /* Image object requests don't own their page array */
1812
1813         if (obj_request->type == OBJ_REQUEST_PAGES) {
1814                 obj_request->pages = NULL;
1815                 obj_request->page_count = 0;
1816         }
1817
1818         if (img_request_child_test(img_request)) {
1819                 rbd_assert(img_request->obj_request != NULL);
1820                 more = obj_request->which < img_request->obj_request_count - 1;
1821         } else {
1822                 rbd_assert(img_request->rq != NULL);
1823                 more = blk_end_request(img_request->rq, result, xferred);
1824         }
1825
1826         return more;
1827 }
1828
1829 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1830 {
1831         struct rbd_img_request *img_request;
1832         u32 which = obj_request->which;
1833         bool more = true;
1834
1835         rbd_assert(obj_request_img_data_test(obj_request));
1836         img_request = obj_request->img_request;
1837
1838         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839         rbd_assert(img_request != NULL);
1840         rbd_assert(img_request->obj_request_count > 0);
1841         rbd_assert(which != BAD_WHICH);
1842         rbd_assert(which < img_request->obj_request_count);
1843         rbd_assert(which >= img_request->next_completion);
1844
1845         spin_lock_irq(&img_request->completion_lock);
1846         if (which != img_request->next_completion)
1847                 goto out;
1848
1849         for_each_obj_request_from(img_request, obj_request) {
1850                 rbd_assert(more);
1851                 rbd_assert(which < img_request->obj_request_count);
1852
1853                 if (!obj_request_done_test(obj_request))
1854                         break;
1855                 more = rbd_img_obj_end_request(obj_request);
1856                 which++;
1857         }
1858
1859         rbd_assert(more ^ (which == img_request->obj_request_count));
1860         img_request->next_completion = which;
1861 out:
1862         spin_unlock_irq(&img_request->completion_lock);
1863
1864         if (!more)
1865                 rbd_img_request_complete(img_request);
1866 }
1867
1868 /*
1869  * Split up an image request into one or more object requests, each
1870  * to a different object.  The "type" parameter indicates whether
1871  * "data_desc" is the pointer to the head of a list of bio
1872  * structures, or the base of a page array.  In either case this
1873  * function assumes data_desc describes memory sufficient to hold
1874  * all data described by the image request.
1875  */
1876 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877                                         enum obj_request_type type,
1878                                         void *data_desc)
1879 {
1880         struct rbd_device *rbd_dev = img_request->rbd_dev;
1881         struct rbd_obj_request *obj_request = NULL;
1882         struct rbd_obj_request *next_obj_request;
1883         bool write_request = img_request_write_test(img_request);
1884         struct bio *bio_list;
1885         unsigned int bio_offset = 0;
1886         struct page **pages;
1887         u64 img_offset;
1888         u64 resid;
1889         u16 opcode;
1890
1891         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892                 (int)type, data_desc);
1893
1894         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1895         img_offset = img_request->offset;
1896         resid = img_request->length;
1897         rbd_assert(resid > 0);
1898
1899         if (type == OBJ_REQUEST_BIO) {
1900                 bio_list = data_desc;
1901                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1902         } else {
1903                 rbd_assert(type == OBJ_REQUEST_PAGES);
1904                 pages = data_desc;
1905         }
1906
1907         while (resid) {
1908                 struct ceph_osd_request *osd_req;
1909                 const char *object_name;
1910                 u64 offset;
1911                 u64 length;
1912
1913                 object_name = rbd_segment_name(rbd_dev, img_offset);
1914                 if (!object_name)
1915                         goto out_unwind;
1916                 offset = rbd_segment_offset(rbd_dev, img_offset);
1917                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1918                 obj_request = rbd_obj_request_create(object_name,
1919                                                 offset, length, type);
1920                 kfree(object_name);     /* object request has its own copy */
1921                 if (!obj_request)
1922                         goto out_unwind;
1923
1924                 if (type == OBJ_REQUEST_BIO) {
1925                         unsigned int clone_size;
1926
1927                         rbd_assert(length <= (u64)UINT_MAX);
1928                         clone_size = (unsigned int)length;
1929                         obj_request->bio_list =
1930                                         bio_chain_clone_range(&bio_list,
1931                                                                 &bio_offset,
1932                                                                 clone_size,
1933                                                                 GFP_ATOMIC);
1934                         if (!obj_request->bio_list)
1935                                 goto out_partial;
1936                 } else {
1937                         unsigned int page_count;
1938
1939                         obj_request->pages = pages;
1940                         page_count = (u32)calc_pages_for(offset, length);
1941                         obj_request->page_count = page_count;
1942                         if ((offset + length) & ~PAGE_MASK)
1943                                 page_count--;   /* more on last page */
1944                         pages += page_count;
1945                 }
1946
1947                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1948                                                 obj_request);
1949                 if (!osd_req)
1950                         goto out_partial;
1951                 obj_request->osd_req = osd_req;
1952                 obj_request->callback = rbd_img_obj_callback;
1953
1954                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1955                                                 0, 0);
1956                 if (type == OBJ_REQUEST_BIO)
1957                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1958                                         obj_request->bio_list, length);
1959                 else
1960                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1961                                         obj_request->pages, length,
1962                                         offset & ~PAGE_MASK, false, false);
1963
1964                 if (write_request)
1965                         rbd_osd_req_format_write(obj_request);
1966                 else
1967                         rbd_osd_req_format_read(obj_request);
1968
1969                 obj_request->img_offset = img_offset;
1970                 rbd_img_obj_request_add(img_request, obj_request);
1971
1972                 img_offset += length;
1973                 resid -= length;
1974         }
1975
1976         return 0;
1977
1978 out_partial:
1979         rbd_obj_request_put(obj_request);
1980 out_unwind:
1981         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982                 rbd_obj_request_put(obj_request);
1983
1984         return -ENOMEM;
1985 }
1986
1987 static void
1988 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1989 {
1990         struct rbd_img_request *img_request;
1991         struct rbd_device *rbd_dev;
1992         u64 length;
1993         u32 page_count;
1994
1995         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996         rbd_assert(obj_request_img_data_test(obj_request));
1997         img_request = obj_request->img_request;
1998         rbd_assert(img_request);
1999
2000         rbd_dev = img_request->rbd_dev;
2001         rbd_assert(rbd_dev);
2002         length = (u64)1 << rbd_dev->header.obj_order;
2003         page_count = (u32)calc_pages_for(0, length);
2004
2005         rbd_assert(obj_request->copyup_pages);
2006         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007         obj_request->copyup_pages = NULL;
2008
2009         /*
2010          * We want the transfer count to reflect the size of the
2011          * original write request.  There is no such thing as a
2012          * successful short write, so if the request was successful
2013          * we can just set it to the originally-requested length.
2014          */
2015         if (!obj_request->result)
2016                 obj_request->xferred = obj_request->length;
2017
2018         /* Finish up with the normal image object callback */
2019
2020         rbd_img_obj_callback(obj_request);
2021 }
2022
2023 static void
2024 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2025 {
2026         struct rbd_obj_request *orig_request;
2027         struct ceph_osd_request *osd_req;
2028         struct ceph_osd_client *osdc;
2029         struct rbd_device *rbd_dev;
2030         struct page **pages;
2031         int result;
2032         u64 obj_size;
2033         u64 xferred;
2034
2035         rbd_assert(img_request_child_test(img_request));
2036
2037         /* First get what we need from the image request */
2038
2039         pages = img_request->copyup_pages;
2040         rbd_assert(pages != NULL);
2041         img_request->copyup_pages = NULL;
2042
2043         orig_request = img_request->obj_request;
2044         rbd_assert(orig_request != NULL);
2045         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2046         result = img_request->result;
2047         obj_size = img_request->length;
2048         xferred = img_request->xferred;
2049
2050         rbd_dev = img_request->rbd_dev;
2051         rbd_assert(rbd_dev);
2052         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2053
2054         rbd_img_request_put(img_request);
2055
2056         if (result)
2057                 goto out_err;
2058
2059         /* Allocate the new copyup osd request for the original request */
2060
2061         result = -ENOMEM;
2062         rbd_assert(!orig_request->osd_req);
2063         osd_req = rbd_osd_req_create_copyup(orig_request);
2064         if (!osd_req)
2065                 goto out_err;
2066         orig_request->osd_req = osd_req;
2067         orig_request->copyup_pages = pages;
2068
2069         /* Initialize the copyup op */
2070
2071         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2073                                                 false, false);
2074
2075         /* Then the original write request op */
2076
2077         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078                                         orig_request->offset,
2079                                         orig_request->length, 0, 0);
2080         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081                                         orig_request->length);
2082
2083         rbd_osd_req_format_write(orig_request);
2084
2085         /* All set, send it off. */
2086
2087         orig_request->callback = rbd_img_obj_copyup_callback;
2088         osdc = &rbd_dev->rbd_client->client->osdc;
2089         result = rbd_obj_request_submit(osdc, orig_request);
2090         if (!result)
2091                 return;
2092 out_err:
2093         /* Record the error code and complete the request */
2094
2095         orig_request->result = result;
2096         orig_request->xferred = 0;
2097         obj_request_done_set(orig_request);
2098         rbd_obj_request_complete(orig_request);
2099 }
2100
2101 /*
2102  * Read from the parent image the range of data that covers the
2103  * entire target of the given object request.  This is used for
2104  * satisfying a layered image write request when the target of an
2105  * object request from the image request does not exist.
2106  *
2107  * A page array big enough to hold the returned data is allocated
2108  * and supplied to rbd_img_request_fill() as the "data descriptor."
2109  * When the read completes, this page array will be transferred to
2110  * the original object request for the copyup operation.
2111  *
2112  * If an error occurs, record it as the result of the original
2113  * object request and mark it done so it gets completed.
2114  */
2115 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2116 {
2117         struct rbd_img_request *img_request = NULL;
2118         struct rbd_img_request *parent_request = NULL;
2119         struct rbd_device *rbd_dev;
2120         u64 img_offset;
2121         u64 length;
2122         struct page **pages = NULL;
2123         u32 page_count;
2124         int result;
2125
2126         rbd_assert(obj_request_img_data_test(obj_request));
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request != NULL);
2131         rbd_dev = img_request->rbd_dev;
2132         rbd_assert(rbd_dev->parent != NULL);
2133
2134         /*
2135          * First things first.  The original osd request is of no
2136          * use to use any more, we'll need a new one that can hold
2137          * the two ops in a copyup request.  We'll get that later,
2138          * but for now we can release the old one.
2139          */
2140         rbd_osd_req_destroy(obj_request->osd_req);
2141         obj_request->osd_req = NULL;
2142
2143         /*
2144          * Determine the byte range covered by the object in the
2145          * child image to which the original request was to be sent.
2146          */
2147         img_offset = obj_request->img_offset - obj_request->offset;
2148         length = (u64)1 << rbd_dev->header.obj_order;
2149
2150         /*
2151          * There is no defined parent data beyond the parent
2152          * overlap, so limit what we read at that boundary if
2153          * necessary.
2154          */
2155         if (img_offset + length > rbd_dev->parent_overlap) {
2156                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157                 length = rbd_dev->parent_overlap - img_offset;
2158         }
2159
2160         /*
2161          * Allocate a page array big enough to receive the data read
2162          * from the parent.
2163          */
2164         page_count = (u32)calc_pages_for(0, length);
2165         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166         if (IS_ERR(pages)) {
2167                 result = PTR_ERR(pages);
2168                 pages = NULL;
2169                 goto out_err;
2170         }
2171
2172         result = -ENOMEM;
2173         parent_request = rbd_img_request_create(rbd_dev->parent,
2174                                                 img_offset, length,
2175                                                 false, true);
2176         if (!parent_request)
2177                 goto out_err;
2178         rbd_obj_request_get(obj_request);
2179         parent_request->obj_request = obj_request;
2180
2181         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2182         if (result)
2183                 goto out_err;
2184         parent_request->copyup_pages = pages;
2185
2186         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187         result = rbd_img_request_submit(parent_request);
2188         if (!result)
2189                 return 0;
2190
2191         parent_request->copyup_pages = NULL;
2192         parent_request->obj_request = NULL;
2193         rbd_obj_request_put(obj_request);
2194 out_err:
2195         if (pages)
2196                 ceph_release_page_vector(pages, page_count);
2197         if (parent_request)
2198                 rbd_img_request_put(parent_request);
2199         obj_request->result = result;
2200         obj_request->xferred = 0;
2201         obj_request_done_set(obj_request);
2202
2203         return result;
2204 }
2205
2206 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2207 {
2208         struct rbd_obj_request *orig_request;
2209         int result;
2210
2211         rbd_assert(!obj_request_img_data_test(obj_request));
2212
2213         /*
2214          * All we need from the object request is the original
2215          * request and the result of the STAT op.  Grab those, then
2216          * we're done with the request.
2217          */
2218         orig_request = obj_request->obj_request;
2219         obj_request->obj_request = NULL;
2220         rbd_assert(orig_request);
2221         rbd_assert(orig_request->img_request);
2222
2223         result = obj_request->result;
2224         obj_request->result = 0;
2225
2226         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227                 obj_request, orig_request, result,
2228                 obj_request->xferred, obj_request->length);
2229         rbd_obj_request_put(obj_request);
2230
2231         rbd_assert(orig_request);
2232         rbd_assert(orig_request->img_request);
2233
2234         /*
2235          * Our only purpose here is to determine whether the object
2236          * exists, and we don't want to treat the non-existence as
2237          * an error.  If something else comes back, transfer the
2238          * error to the original request and complete it now.
2239          */
2240         if (!result) {
2241                 obj_request_existence_set(orig_request, true);
2242         } else if (result == -ENOENT) {
2243                 obj_request_existence_set(orig_request, false);
2244         } else if (result) {
2245                 orig_request->result = result;
2246                 goto out;
2247         }
2248
2249         /*
2250          * Resubmit the original request now that we have recorded
2251          * whether the target object exists.
2252          */
2253         orig_request->result = rbd_img_obj_request_submit(orig_request);
2254 out:
2255         if (orig_request->result)
2256                 rbd_obj_request_complete(orig_request);
2257         rbd_obj_request_put(orig_request);
2258 }
2259
2260 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2261 {
2262         struct rbd_obj_request *stat_request;
2263         struct rbd_device *rbd_dev;
2264         struct ceph_osd_client *osdc;
2265         struct page **pages = NULL;
2266         u32 page_count;
2267         size_t size;
2268         int ret;
2269
2270         /*
2271          * The response data for a STAT call consists of:
2272          *     le64 length;
2273          *     struct {
2274          *         le32 tv_sec;
2275          *         le32 tv_nsec;
2276          *     } mtime;
2277          */
2278         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279         page_count = (u32)calc_pages_for(0, size);
2280         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2281         if (IS_ERR(pages))
2282                 return PTR_ERR(pages);
2283
2284         ret = -ENOMEM;
2285         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2286                                                         OBJ_REQUEST_PAGES);
2287         if (!stat_request)
2288                 goto out;
2289
2290         rbd_obj_request_get(obj_request);
2291         stat_request->obj_request = obj_request;
2292         stat_request->pages = pages;
2293         stat_request->page_count = page_count;
2294
2295         rbd_assert(obj_request->img_request);
2296         rbd_dev = obj_request->img_request->rbd_dev;
2297         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2298                                                 stat_request);
2299         if (!stat_request->osd_req)
2300                 goto out;
2301         stat_request->callback = rbd_img_obj_exists_callback;
2302
2303         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2305                                         false, false);
2306         rbd_osd_req_format_read(stat_request);
2307
2308         osdc = &rbd_dev->rbd_client->client->osdc;
2309         ret = rbd_obj_request_submit(osdc, stat_request);
2310 out:
2311         if (ret)
2312                 rbd_obj_request_put(obj_request);
2313
2314         return ret;
2315 }
2316
2317 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2318 {
2319         struct rbd_img_request *img_request;
2320         struct rbd_device *rbd_dev;
2321         bool known;
2322
2323         rbd_assert(obj_request_img_data_test(obj_request));
2324
2325         img_request = obj_request->img_request;
2326         rbd_assert(img_request);
2327         rbd_dev = img_request->rbd_dev;
2328
2329         /*
2330          * Only writes to layered images need special handling.
2331          * Reads and non-layered writes are simple object requests.
2332          * Layered writes that start beyond the end of the overlap
2333          * with the parent have no parent data, so they too are
2334          * simple object requests.  Finally, if the target object is
2335          * known to already exist, its parent data has already been
2336          * copied, so a write to the object can also be handled as a
2337          * simple object request.
2338          */
2339         if (!img_request_write_test(img_request) ||
2340                 !img_request_layered_test(img_request) ||
2341                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2342                 ((known = obj_request_known_test(obj_request)) &&
2343                         obj_request_exists_test(obj_request))) {
2344
2345                 struct rbd_device *rbd_dev;
2346                 struct ceph_osd_client *osdc;
2347
2348                 rbd_dev = obj_request->img_request->rbd_dev;
2349                 osdc = &rbd_dev->rbd_client->client->osdc;
2350
2351                 return rbd_obj_request_submit(osdc, obj_request);
2352         }
2353
2354         /*
2355          * It's a layered write.  The target object might exist but
2356          * we may not know that yet.  If we know it doesn't exist,
2357          * start by reading the data for the full target object from
2358          * the parent so we can use it for a copyup to the target.
2359          */
2360         if (known)
2361                 return rbd_img_obj_parent_read_full(obj_request);
2362
2363         /* We don't know whether the target exists.  Go find out. */
2364
2365         return rbd_img_obj_exists_submit(obj_request);
2366 }
2367
2368 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2369 {
2370         struct rbd_obj_request *obj_request;
2371         struct rbd_obj_request *next_obj_request;
2372
2373         dout("%s: img %p\n", __func__, img_request);
2374         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2375                 int ret;
2376
2377                 ret = rbd_img_obj_request_submit(obj_request);
2378                 if (ret)
2379                         return ret;
2380         }
2381
2382         return 0;
2383 }
2384
2385 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2386 {
2387         struct rbd_obj_request *obj_request;
2388         struct rbd_device *rbd_dev;
2389         u64 obj_end;
2390
2391         rbd_assert(img_request_child_test(img_request));
2392
2393         obj_request = img_request->obj_request;
2394         rbd_assert(obj_request);
2395         rbd_assert(obj_request->img_request);
2396
2397         obj_request->result = img_request->result;
2398         if (obj_request->result)
2399                 goto out;
2400
2401         /*
2402          * We need to zero anything beyond the parent overlap
2403          * boundary.  Since rbd_img_obj_request_read_callback()
2404          * will zero anything beyond the end of a short read, an
2405          * easy way to do this is to pretend the data from the
2406          * parent came up short--ending at the overlap boundary.
2407          */
2408         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409         obj_end = obj_request->img_offset + obj_request->length;
2410         rbd_dev = obj_request->img_request->rbd_dev;
2411         if (obj_end > rbd_dev->parent_overlap) {
2412                 u64 xferred = 0;
2413
2414                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415                         xferred = rbd_dev->parent_overlap -
2416                                         obj_request->img_offset;
2417
2418                 obj_request->xferred = min(img_request->xferred, xferred);
2419         } else {
2420                 obj_request->xferred = img_request->xferred;
2421         }
2422 out:
2423         rbd_img_obj_request_read_callback(obj_request);
2424         rbd_obj_request_complete(obj_request);
2425 }
2426
2427 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2428 {
2429         struct rbd_device *rbd_dev;
2430         struct rbd_img_request *img_request;
2431         int result;
2432
2433         rbd_assert(obj_request_img_data_test(obj_request));
2434         rbd_assert(obj_request->img_request != NULL);
2435         rbd_assert(obj_request->result == (s32) -ENOENT);
2436         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2437
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         rbd_assert(rbd_dev->parent != NULL);
2440         /* rbd_read_finish(obj_request, obj_request->length); */
2441         img_request = rbd_img_request_create(rbd_dev->parent,
2442                                                 obj_request->img_offset,
2443                                                 obj_request->length,
2444                                                 false, true);
2445         result = -ENOMEM;
2446         if (!img_request)
2447                 goto out_err;
2448
2449         rbd_obj_request_get(obj_request);
2450         img_request->obj_request = obj_request;
2451
2452         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453                                         obj_request->bio_list);
2454         if (result)
2455                 goto out_err;
2456
2457         img_request->callback = rbd_img_parent_read_callback;
2458         result = rbd_img_request_submit(img_request);
2459         if (result)
2460                 goto out_err;
2461
2462         return;
2463 out_err:
2464         if (img_request)
2465                 rbd_img_request_put(img_request);
2466         obj_request->result = result;
2467         obj_request->xferred = 0;
2468         obj_request_done_set(obj_request);
2469 }
2470
2471 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2472 {
2473         struct rbd_obj_request *obj_request;
2474         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2475         int ret;
2476
2477         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2478                                                         OBJ_REQUEST_NODATA);
2479         if (!obj_request)
2480                 return -ENOMEM;
2481
2482         ret = -ENOMEM;
2483         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2484         if (!obj_request->osd_req)
2485                 goto out;
2486         obj_request->callback = rbd_obj_request_put;
2487
2488         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2489                                         notify_id, 0, 0);
2490         rbd_osd_req_format_read(obj_request);
2491
2492         ret = rbd_obj_request_submit(osdc, obj_request);
2493 out:
2494         if (ret)
2495                 rbd_obj_request_put(obj_request);
2496
2497         return ret;
2498 }
2499
2500 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2501 {
2502         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2503
2504         if (!rbd_dev)
2505                 return;
2506
2507         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2508                 rbd_dev->header_name, (unsigned long long)notify_id,
2509                 (unsigned int)opcode);
2510         (void)rbd_dev_refresh(rbd_dev);
2511
2512         rbd_obj_notify_ack(rbd_dev, notify_id);
2513 }
2514
2515 /*
2516  * Request sync osd watch/unwatch.  The value of "start" determines
2517  * whether a watch request is being initiated or torn down.
2518  */
2519 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2520 {
2521         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2522         struct rbd_obj_request *obj_request;
2523         int ret;
2524
2525         rbd_assert(start ^ !!rbd_dev->watch_event);
2526         rbd_assert(start ^ !!rbd_dev->watch_request);
2527
2528         if (start) {
2529                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2530                                                 &rbd_dev->watch_event);
2531                 if (ret < 0)
2532                         return ret;
2533                 rbd_assert(rbd_dev->watch_event != NULL);
2534         }
2535
2536         ret = -ENOMEM;
2537         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2538                                                         OBJ_REQUEST_NODATA);
2539         if (!obj_request)
2540                 goto out_cancel;
2541
2542         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2543         if (!obj_request->osd_req)
2544                 goto out_cancel;
2545
2546         if (start)
2547                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2548         else
2549                 ceph_osdc_unregister_linger_request(osdc,
2550                                         rbd_dev->watch_request->osd_req);
2551
2552         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2553                                 rbd_dev->watch_event->cookie, 0, start);
2554         rbd_osd_req_format_write(obj_request);
2555
2556         ret = rbd_obj_request_submit(osdc, obj_request);
2557         if (ret)
2558                 goto out_cancel;
2559         ret = rbd_obj_request_wait(obj_request);
2560         if (ret)
2561                 goto out_cancel;
2562         ret = obj_request->result;
2563         if (ret)
2564                 goto out_cancel;
2565
2566         /*
2567          * A watch request is set to linger, so the underlying osd
2568          * request won't go away until we unregister it.  We retain
2569          * a pointer to the object request during that time (in
2570          * rbd_dev->watch_request), so we'll keep a reference to
2571          * it.  We'll drop that reference (below) after we've
2572          * unregistered it.
2573          */
2574         if (start) {
2575                 rbd_dev->watch_request = obj_request;
2576
2577                 return 0;
2578         }
2579
2580         /* We have successfully torn down the watch request */
2581
2582         rbd_obj_request_put(rbd_dev->watch_request);
2583         rbd_dev->watch_request = NULL;
2584 out_cancel:
2585         /* Cancel the event if we're tearing down, or on error */
2586         ceph_osdc_cancel_event(rbd_dev->watch_event);
2587         rbd_dev->watch_event = NULL;
2588         if (obj_request)
2589                 rbd_obj_request_put(obj_request);
2590
2591         return ret;
2592 }
2593
2594 /*
2595  * Synchronous osd object method call.  Returns the number of bytes
2596  * returned in the outbound buffer, or a negative error code.
2597  */
2598 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2599                              const char *object_name,
2600                              const char *class_name,
2601                              const char *method_name,
2602                              const void *outbound,
2603                              size_t outbound_size,
2604                              void *inbound,
2605                              size_t inbound_size,
2606                              u64 *version)
2607 {
2608         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2609         struct rbd_obj_request *obj_request;
2610         struct page **pages;
2611         u32 page_count;
2612         int ret;
2613
2614         /*
2615          * Method calls are ultimately read operations.  The result
2616          * should placed into the inbound buffer provided.  They
2617          * also supply outbound data--parameters for the object
2618          * method.  Currently if this is present it will be a
2619          * snapshot id.
2620          */
2621         page_count = (u32)calc_pages_for(0, inbound_size);
2622         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2623         if (IS_ERR(pages))
2624                 return PTR_ERR(pages);
2625
2626         ret = -ENOMEM;
2627         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2628                                                         OBJ_REQUEST_PAGES);
2629         if (!obj_request)
2630                 goto out;
2631
2632         obj_request->pages = pages;
2633         obj_request->page_count = page_count;
2634
2635         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2636         if (!obj_request->osd_req)
2637                 goto out;
2638
2639         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2640                                         class_name, method_name);
2641         if (outbound_size) {
2642                 struct ceph_pagelist *pagelist;
2643
2644                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2645                 if (!pagelist)
2646                         goto out;
2647
2648                 ceph_pagelist_init(pagelist);
2649                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2650                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2651                                                 pagelist);
2652         }
2653         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2654                                         obj_request->pages, inbound_size,
2655                                         0, false, false);
2656         rbd_osd_req_format_read(obj_request);
2657
2658         ret = rbd_obj_request_submit(osdc, obj_request);
2659         if (ret)
2660                 goto out;
2661         ret = rbd_obj_request_wait(obj_request);
2662         if (ret)
2663                 goto out;
2664
2665         ret = obj_request->result;
2666         if (ret < 0)
2667                 goto out;
2668
2669         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2670         ret = (int)obj_request->xferred;
2671         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2672         if (version)
2673                 *version = obj_request->version;
2674 out:
2675         if (obj_request)
2676                 rbd_obj_request_put(obj_request);
2677         else
2678                 ceph_release_page_vector(pages, page_count);
2679
2680         return ret;
2681 }
2682
2683 static void rbd_request_fn(struct request_queue *q)
2684                 __releases(q->queue_lock) __acquires(q->queue_lock)
2685 {
2686         struct rbd_device *rbd_dev = q->queuedata;
2687         bool read_only = rbd_dev->mapping.read_only;
2688         struct request *rq;
2689         int result;
2690
2691         while ((rq = blk_fetch_request(q))) {
2692                 bool write_request = rq_data_dir(rq) == WRITE;
2693                 struct rbd_img_request *img_request;
2694                 u64 offset;
2695                 u64 length;
2696
2697                 /* Ignore any non-FS requests that filter through. */
2698
2699                 if (rq->cmd_type != REQ_TYPE_FS) {
2700                         dout("%s: non-fs request type %d\n", __func__,
2701                                 (int) rq->cmd_type);
2702                         __blk_end_request_all(rq, 0);
2703                         continue;
2704                 }
2705
2706                 /* Ignore/skip any zero-length requests */
2707
2708                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2709                 length = (u64) blk_rq_bytes(rq);
2710
2711                 if (!length) {
2712                         dout("%s: zero-length request\n", __func__);
2713                         __blk_end_request_all(rq, 0);
2714                         continue;
2715                 }
2716
2717                 spin_unlock_irq(q->queue_lock);
2718
2719                 /* Disallow writes to a read-only device */
2720
2721                 if (write_request) {
2722                         result = -EROFS;
2723                         if (read_only)
2724                                 goto end_request;
2725                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2726                 }
2727
2728                 /*
2729                  * Quit early if the mapped snapshot no longer
2730                  * exists.  It's still possible the snapshot will
2731                  * have disappeared by the time our request arrives
2732                  * at the osd, but there's no sense in sending it if
2733                  * we already know.
2734                  */
2735                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2736                         dout("request for non-existent snapshot");
2737                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2738                         result = -ENXIO;
2739                         goto end_request;
2740                 }
2741
2742                 result = -EINVAL;
2743                 if (offset && length > U64_MAX - offset + 1) {
2744                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2745                                 offset, length);
2746                         goto end_request;       /* Shouldn't happen */
2747                 }
2748
2749                 result = -ENOMEM;
2750                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2751                                                         write_request, false);
2752                 if (!img_request)
2753                         goto end_request;
2754
2755                 img_request->rq = rq;
2756
2757                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2758                                                 rq->bio);
2759                 if (!result)
2760                         result = rbd_img_request_submit(img_request);
2761                 if (result)
2762                         rbd_img_request_put(img_request);
2763 end_request:
2764                 spin_lock_irq(q->queue_lock);
2765                 if (result < 0) {
2766                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2767                                 write_request ? "write" : "read",
2768                                 length, offset, result);
2769
2770                         __blk_end_request_all(rq, result);
2771                 }
2772         }
2773 }
2774
2775 /*
2776  * a queue callback. Makes sure that we don't create a bio that spans across
2777  * multiple osd objects. One exception would be with a single page bios,
2778  * which we handle later at bio_chain_clone_range()
2779  */
2780 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2781                           struct bio_vec *bvec)
2782 {
2783         struct rbd_device *rbd_dev = q->queuedata;
2784         sector_t sector_offset;
2785         sector_t sectors_per_obj;
2786         sector_t obj_sector_offset;
2787         int ret;
2788
2789         /*
2790          * Find how far into its rbd object the partition-relative
2791          * bio start sector is to offset relative to the enclosing
2792          * device.
2793          */
2794         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2795         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2796         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2797
2798         /*
2799          * Compute the number of bytes from that offset to the end
2800          * of the object.  Account for what's already used by the bio.
2801          */
2802         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2803         if (ret > bmd->bi_size)
2804                 ret -= bmd->bi_size;
2805         else
2806                 ret = 0;
2807
2808         /*
2809          * Don't send back more than was asked for.  And if the bio
2810          * was empty, let the whole thing through because:  "Note
2811          * that a block device *must* allow a single page to be
2812          * added to an empty bio."
2813          */
2814         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2815         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2816                 ret = (int) bvec->bv_len;
2817
2818         return ret;
2819 }
2820
2821 static void rbd_free_disk(struct rbd_device *rbd_dev)
2822 {
2823         struct gendisk *disk = rbd_dev->disk;
2824
2825         if (!disk)
2826                 return;
2827
2828         rbd_dev->disk = NULL;
2829         if (disk->flags & GENHD_FL_UP) {
2830                 del_gendisk(disk);
2831                 if (disk->queue)
2832                         blk_cleanup_queue(disk->queue);
2833         }
2834         put_disk(disk);
2835 }
2836
2837 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2838                                 const char *object_name,
2839                                 u64 offset, u64 length, void *buf)
2840
2841 {
2842         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2843         struct rbd_obj_request *obj_request;
2844         struct page **pages = NULL;
2845         u32 page_count;
2846         size_t size;
2847         int ret;
2848
2849         page_count = (u32) calc_pages_for(offset, length);
2850         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2851         if (IS_ERR(pages))
2852                 ret = PTR_ERR(pages);
2853
2854         ret = -ENOMEM;
2855         obj_request = rbd_obj_request_create(object_name, offset, length,
2856                                                         OBJ_REQUEST_PAGES);
2857         if (!obj_request)
2858                 goto out;
2859
2860         obj_request->pages = pages;
2861         obj_request->page_count = page_count;
2862
2863         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2864         if (!obj_request->osd_req)
2865                 goto out;
2866
2867         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2868                                         offset, length, 0, 0);
2869         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2870                                         obj_request->pages,
2871                                         obj_request->length,
2872                                         obj_request->offset & ~PAGE_MASK,
2873                                         false, false);
2874         rbd_osd_req_format_read(obj_request);
2875
2876         ret = rbd_obj_request_submit(osdc, obj_request);
2877         if (ret)
2878                 goto out;
2879         ret = rbd_obj_request_wait(obj_request);
2880         if (ret)
2881                 goto out;
2882
2883         ret = obj_request->result;
2884         if (ret < 0)
2885                 goto out;
2886
2887         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2888         size = (size_t) obj_request->xferred;
2889         ceph_copy_from_page_vector(pages, buf, 0, size);
2890         rbd_assert(size <= (size_t)INT_MAX);
2891         ret = (int)size;
2892 out:
2893         if (obj_request)
2894                 rbd_obj_request_put(obj_request);
2895         else
2896                 ceph_release_page_vector(pages, page_count);
2897
2898         return ret;
2899 }
2900
2901 /*
2902  * Read the complete header for the given rbd device.
2903  *
2904  * Returns a pointer to a dynamically-allocated buffer containing
2905  * the complete and validated header.  Caller can pass the address
2906  * of a variable that will be filled in with the version of the
2907  * header object at the time it was read.
2908  *
2909  * Returns a pointer-coded errno if a failure occurs.
2910  */
2911 static struct rbd_image_header_ondisk *
2912 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2913 {
2914         struct rbd_image_header_ondisk *ondisk = NULL;
2915         u32 snap_count = 0;
2916         u64 names_size = 0;
2917         u32 want_count;
2918         int ret;
2919
2920         /*
2921          * The complete header will include an array of its 64-bit
2922          * snapshot ids, followed by the names of those snapshots as
2923          * a contiguous block of NUL-terminated strings.  Note that
2924          * the number of snapshots could change by the time we read
2925          * it in, in which case we re-read it.
2926          */
2927         do {
2928                 size_t size;
2929
2930                 kfree(ondisk);
2931
2932                 size = sizeof (*ondisk);
2933                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2934                 size += names_size;
2935                 ondisk = kmalloc(size, GFP_KERNEL);
2936                 if (!ondisk)
2937                         return ERR_PTR(-ENOMEM);
2938
2939                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2940                                        0, size, ondisk);
2941                 if (ret < 0)
2942                         goto out_err;
2943                 if ((size_t)ret < size) {
2944                         ret = -ENXIO;
2945                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2946                                 size, ret);
2947                         goto out_err;
2948                 }
2949                 if (!rbd_dev_ondisk_valid(ondisk)) {
2950                         ret = -ENXIO;
2951                         rbd_warn(rbd_dev, "invalid header");
2952                         goto out_err;
2953                 }
2954
2955                 names_size = le64_to_cpu(ondisk->snap_names_len);
2956                 want_count = snap_count;
2957                 snap_count = le32_to_cpu(ondisk->snap_count);
2958         } while (snap_count != want_count);
2959
2960         return ondisk;
2961
2962 out_err:
2963         kfree(ondisk);
2964
2965         return ERR_PTR(ret);
2966 }
2967
2968 /*
2969  * reload the ondisk the header
2970  */
2971 static int rbd_read_header(struct rbd_device *rbd_dev,
2972                            struct rbd_image_header *header)
2973 {
2974         struct rbd_image_header_ondisk *ondisk;
2975         int ret;
2976
2977         ondisk = rbd_dev_v1_header_read(rbd_dev);
2978         if (IS_ERR(ondisk))
2979                 return PTR_ERR(ondisk);
2980         ret = rbd_header_from_disk(header, ondisk);
2981         kfree(ondisk);
2982
2983         return ret;
2984 }
2985
2986 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2987 {
2988         struct rbd_snap *snap;
2989         struct rbd_snap *next;
2990
2991         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2992                 list_del(&snap->node);
2993                 rbd_snap_destroy(snap);
2994         }
2995 }
2996
2997 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2998 {
2999         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3000                 return;
3001
3002         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3003                 sector_t size;
3004
3005                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3006                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3007                 dout("setting size to %llu sectors", (unsigned long long)size);
3008                 set_capacity(rbd_dev->disk, size);
3009         }
3010 }
3011
3012 /*
3013  * only read the first part of the ondisk header, without the snaps info
3014  */
3015 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3016 {
3017         int ret;
3018         struct rbd_image_header h;
3019
3020         ret = rbd_read_header(rbd_dev, &h);
3021         if (ret < 0)
3022                 return ret;
3023
3024         down_write(&rbd_dev->header_rwsem);
3025
3026         /* Update image size, and check for resize of mapped image */
3027         rbd_dev->header.image_size = h.image_size;
3028         rbd_update_mapping_size(rbd_dev);
3029
3030         /* rbd_dev->header.object_prefix shouldn't change */
3031         kfree(rbd_dev->header.snap_sizes);
3032         kfree(rbd_dev->header.snap_names);
3033         /* osd requests may still refer to snapc */
3034         ceph_put_snap_context(rbd_dev->header.snapc);
3035
3036         rbd_dev->header.image_size = h.image_size;
3037         rbd_dev->header.snapc = h.snapc;
3038         rbd_dev->header.snap_names = h.snap_names;
3039         rbd_dev->header.snap_sizes = h.snap_sizes;
3040         /* Free the extra copy of the object prefix */
3041         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3042                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3043         kfree(h.object_prefix);
3044
3045         ret = rbd_dev_snaps_update(rbd_dev);
3046
3047         up_write(&rbd_dev->header_rwsem);
3048
3049         return ret;
3050 }
3051
3052 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3053 {
3054         u64 image_size;
3055         int ret;
3056
3057         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3058         image_size = rbd_dev->header.image_size;
3059         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3060         if (rbd_dev->image_format == 1)
3061                 ret = rbd_dev_v1_refresh(rbd_dev);
3062         else
3063                 ret = rbd_dev_v2_refresh(rbd_dev);
3064         mutex_unlock(&ctl_mutex);
3065         if (ret)
3066                 rbd_warn(rbd_dev, "got notification but failed to "
3067                            " update snaps: %d\n", ret);
3068         if (image_size != rbd_dev->header.image_size)
3069                 revalidate_disk(rbd_dev->disk);
3070
3071         return ret;
3072 }
3073
3074 static int rbd_init_disk(struct rbd_device *rbd_dev)
3075 {
3076         struct gendisk *disk;
3077         struct request_queue *q;
3078         u64 segment_size;
3079
3080         /* create gendisk info */
3081         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3082         if (!disk)
3083                 return -ENOMEM;
3084
3085         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3086                  rbd_dev->dev_id);
3087         disk->major = rbd_dev->major;
3088         disk->first_minor = 0;
3089         disk->fops = &rbd_bd_ops;
3090         disk->private_data = rbd_dev;
3091
3092         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3093         if (!q)
3094                 goto out_disk;
3095
3096         /* We use the default size, but let's be explicit about it. */
3097         blk_queue_physical_block_size(q, SECTOR_SIZE);
3098
3099         /* set io sizes to object size */
3100         segment_size = rbd_obj_bytes(&rbd_dev->header);
3101         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3102         blk_queue_max_segment_size(q, segment_size);
3103         blk_queue_io_min(q, segment_size);
3104         blk_queue_io_opt(q, segment_size);
3105
3106         blk_queue_merge_bvec(q, rbd_merge_bvec);
3107         disk->queue = q;
3108
3109         q->queuedata = rbd_dev;
3110
3111         rbd_dev->disk = disk;
3112
3113         return 0;
3114 out_disk:
3115         put_disk(disk);
3116
3117         return -ENOMEM;
3118 }
3119
3120 /*
3121   sysfs
3122 */
3123
3124 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3125 {
3126         return container_of(dev, struct rbd_device, dev);
3127 }
3128
3129 static ssize_t rbd_size_show(struct device *dev,
3130                              struct device_attribute *attr, char *buf)
3131 {
3132         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3133
3134         return sprintf(buf, "%llu\n",
3135                 (unsigned long long)rbd_dev->mapping.size);
3136 }
3137
3138 /*
3139  * Note this shows the features for whatever's mapped, which is not
3140  * necessarily the base image.
3141  */
3142 static ssize_t rbd_features_show(struct device *dev,
3143                              struct device_attribute *attr, char *buf)
3144 {
3145         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3146
3147         return sprintf(buf, "0x%016llx\n",
3148                         (unsigned long long)rbd_dev->mapping.features);
3149 }
3150
3151 static ssize_t rbd_major_show(struct device *dev,
3152                               struct device_attribute *attr, char *buf)
3153 {
3154         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3155
3156         if (rbd_dev->major)
3157                 return sprintf(buf, "%d\n", rbd_dev->major);
3158
3159         return sprintf(buf, "(none)\n");
3160
3161 }
3162
3163 static ssize_t rbd_client_id_show(struct device *dev,
3164                                   struct device_attribute *attr, char *buf)
3165 {
3166         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3167
3168         return sprintf(buf, "client%lld\n",
3169                         ceph_client_id(rbd_dev->rbd_client->client));
3170 }
3171
3172 static ssize_t rbd_pool_show(struct device *dev,
3173                              struct device_attribute *attr, char *buf)
3174 {
3175         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3176
3177         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3178 }
3179
3180 static ssize_t rbd_pool_id_show(struct device *dev,
3181                              struct device_attribute *attr, char *buf)
3182 {
3183         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3184
3185         return sprintf(buf, "%llu\n",
3186                         (unsigned long long) rbd_dev->spec->pool_id);
3187 }
3188
3189 static ssize_t rbd_name_show(struct device *dev,
3190                              struct device_attribute *attr, char *buf)
3191 {
3192         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3193
3194         if (rbd_dev->spec->image_name)
3195                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3196
3197         return sprintf(buf, "(unknown)\n");
3198 }
3199
3200 static ssize_t rbd_image_id_show(struct device *dev,
3201                              struct device_attribute *attr, char *buf)
3202 {
3203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3206 }
3207
3208 /*
3209  * Shows the name of the currently-mapped snapshot (or
3210  * RBD_SNAP_HEAD_NAME for the base image).
3211  */
3212 static ssize_t rbd_snap_show(struct device *dev,
3213                              struct device_attribute *attr,
3214                              char *buf)
3215 {
3216         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3217
3218         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3219 }
3220
3221 /*
3222  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3223  * for the parent image.  If there is no parent, simply shows
3224  * "(no parent image)".
3225  */
3226 static ssize_t rbd_parent_show(struct device *dev,
3227                              struct device_attribute *attr,
3228                              char *buf)
3229 {
3230         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3231         struct rbd_spec *spec = rbd_dev->parent_spec;
3232         int count;
3233         char *bufp = buf;
3234
3235         if (!spec)
3236                 return sprintf(buf, "(no parent image)\n");
3237
3238         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3239                         (unsigned long long) spec->pool_id, spec->pool_name);
3240         if (count < 0)
3241                 return count;
3242         bufp += count;
3243
3244         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3245                         spec->image_name ? spec->image_name : "(unknown)");
3246         if (count < 0)
3247                 return count;
3248         bufp += count;
3249
3250         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3251                         (unsigned long long) spec->snap_id, spec->snap_name);
3252         if (count < 0)
3253                 return count;
3254         bufp += count;
3255
3256         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3257         if (count < 0)
3258                 return count;
3259         bufp += count;
3260
3261         return (ssize_t) (bufp - buf);
3262 }
3263
3264 static ssize_t rbd_image_refresh(struct device *dev,
3265                                  struct device_attribute *attr,
3266                                  const char *buf,
3267                                  size_t size)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270         int ret;
3271
3272         ret = rbd_dev_refresh(rbd_dev);
3273
3274         return ret < 0 ? ret : size;
3275 }
3276
3277 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3278 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3279 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3280 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3281 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3282 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3283 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3284 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3285 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3286 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3287 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3288
3289 static struct attribute *rbd_attrs[] = {
3290         &dev_attr_size.attr,
3291         &dev_attr_features.attr,
3292         &dev_attr_major.attr,
3293         &dev_attr_client_id.attr,
3294         &dev_attr_pool.attr,
3295         &dev_attr_pool_id.attr,
3296         &dev_attr_name.attr,
3297         &dev_attr_image_id.attr,
3298         &dev_attr_current_snap.attr,
3299         &dev_attr_parent.attr,
3300         &dev_attr_refresh.attr,
3301         NULL
3302 };
3303
3304 static struct attribute_group rbd_attr_group = {
3305         .attrs = rbd_attrs,
3306 };
3307
3308 static const struct attribute_group *rbd_attr_groups[] = {
3309         &rbd_attr_group,
3310         NULL
3311 };
3312
3313 static void rbd_sysfs_dev_release(struct device *dev)
3314 {
3315 }
3316
3317 static struct device_type rbd_device_type = {
3318         .name           = "rbd",
3319         .groups         = rbd_attr_groups,
3320         .release        = rbd_sysfs_dev_release,
3321 };
3322
3323 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3324 {
3325         kref_get(&spec->kref);
3326
3327         return spec;
3328 }
3329
3330 static void rbd_spec_free(struct kref *kref);
3331 static void rbd_spec_put(struct rbd_spec *spec)
3332 {
3333         if (spec)
3334                 kref_put(&spec->kref, rbd_spec_free);
3335 }
3336
3337 static struct rbd_spec *rbd_spec_alloc(void)
3338 {
3339         struct rbd_spec *spec;
3340
3341         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3342         if (!spec)
3343                 return NULL;
3344         kref_init(&spec->kref);
3345
3346         return spec;
3347 }
3348
3349 static void rbd_spec_free(struct kref *kref)
3350 {
3351         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3352
3353         kfree(spec->pool_name);
3354         kfree(spec->image_id);
3355         kfree(spec->image_name);
3356         kfree(spec->snap_name);
3357         kfree(spec);
3358 }
3359
3360 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3361                                 struct rbd_spec *spec)
3362 {
3363         struct rbd_device *rbd_dev;
3364
3365         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3366         if (!rbd_dev)
3367                 return NULL;
3368
3369         spin_lock_init(&rbd_dev->lock);
3370         rbd_dev->flags = 0;
3371         INIT_LIST_HEAD(&rbd_dev->node);
3372         INIT_LIST_HEAD(&rbd_dev->snaps);
3373         init_rwsem(&rbd_dev->header_rwsem);
3374
3375         rbd_dev->spec = spec;
3376         rbd_dev->rbd_client = rbdc;
3377
3378         /* Initialize the layout used for all rbd requests */
3379
3380         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3381         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3382         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3383         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3384
3385         return rbd_dev;
3386 }
3387
3388 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3389 {
3390         rbd_put_client(rbd_dev->rbd_client);
3391         rbd_spec_put(rbd_dev->spec);
3392         kfree(rbd_dev);
3393 }
3394
3395 static void rbd_snap_destroy(struct rbd_snap *snap)
3396 {
3397         kfree(snap->name);
3398         kfree(snap);
3399 }
3400
3401 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3402                                                 const char *snap_name,
3403                                                 u64 snap_id, u64 snap_size,
3404                                                 u64 snap_features)
3405 {
3406         struct rbd_snap *snap;
3407
3408         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3409         if (!snap)
3410                 return ERR_PTR(-ENOMEM);
3411
3412         snap->name = snap_name;
3413         snap->id = snap_id;
3414         snap->size = snap_size;
3415         snap->features = snap_features;
3416
3417         return snap;
3418 }
3419
3420 /*
3421  * Returns a dynamically-allocated snapshot name if successful, or a
3422  * pointer-coded error otherwise.
3423  */
3424 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3425                 u64 *snap_size, u64 *snap_features)
3426 {
3427         const char *snap_name;
3428         int i;
3429
3430         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3431
3432         /* Skip over names until we find the one we are looking for */
3433
3434         snap_name = rbd_dev->header.snap_names;
3435         for (i = 0; i < which; i++)
3436                 snap_name += strlen(snap_name) + 1;
3437
3438         snap_name = kstrdup(snap_name, GFP_KERNEL);
3439         if (!snap_name)
3440                 return ERR_PTR(-ENOMEM);
3441
3442         *snap_size = rbd_dev->header.snap_sizes[which];
3443         *snap_features = 0;     /* No features for v1 */
3444
3445         return snap_name;
3446 }
3447
3448 /*
3449  * Get the size and object order for an image snapshot, or if
3450  * snap_id is CEPH_NOSNAP, gets this information for the base
3451  * image.
3452  */
3453 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3454                                 u8 *order, u64 *snap_size)
3455 {
3456         __le64 snapid = cpu_to_le64(snap_id);
3457         int ret;
3458         struct {
3459                 u8 order;
3460                 __le64 size;
3461         } __attribute__ ((packed)) size_buf = { 0 };
3462
3463         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3464                                 "rbd", "get_size",
3465                                 &snapid, sizeof (snapid),
3466                                 &size_buf, sizeof (size_buf), NULL);
3467         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3468         if (ret < 0)
3469                 return ret;
3470         if (ret < sizeof (size_buf))
3471                 return -ERANGE;
3472
3473         if (order)
3474                 *order = size_buf.order;
3475         *snap_size = le64_to_cpu(size_buf.size);
3476
3477         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3478                 (unsigned long long)snap_id, (unsigned int)*order,
3479                 (unsigned long long)*snap_size);
3480
3481         return 0;
3482 }
3483
3484 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3485 {
3486         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3487                                         &rbd_dev->header.obj_order,
3488                                         &rbd_dev->header.image_size);
3489 }
3490
3491 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3492 {
3493         void *reply_buf;
3494         int ret;
3495         void *p;
3496
3497         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3498         if (!reply_buf)
3499                 return -ENOMEM;
3500
3501         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3502                                 "rbd", "get_object_prefix", NULL, 0,
3503                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3504         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3505         if (ret < 0)
3506                 goto out;
3507
3508         p = reply_buf;
3509         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3510                                                 p + ret, NULL, GFP_NOIO);
3511         ret = 0;
3512
3513         if (IS_ERR(rbd_dev->header.object_prefix)) {
3514                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3515                 rbd_dev->header.object_prefix = NULL;
3516         } else {
3517                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3518         }
3519 out:
3520         kfree(reply_buf);
3521
3522         return ret;
3523 }
3524
3525 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3526                 u64 *snap_features)
3527 {
3528         __le64 snapid = cpu_to_le64(snap_id);
3529         struct {
3530                 __le64 features;
3531                 __le64 incompat;
3532         } __attribute__ ((packed)) features_buf = { 0 };
3533         u64 incompat;
3534         int ret;
3535
3536         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3537                                 "rbd", "get_features",
3538                                 &snapid, sizeof (snapid),
3539                                 &features_buf, sizeof (features_buf), NULL);
3540         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3541         if (ret < 0)
3542                 return ret;
3543         if (ret < sizeof (features_buf))
3544                 return -ERANGE;
3545
3546         incompat = le64_to_cpu(features_buf.incompat);
3547         if (incompat & ~RBD_FEATURES_SUPPORTED)
3548                 return -ENXIO;
3549
3550         *snap_features = le64_to_cpu(features_buf.features);
3551
3552         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3553                 (unsigned long long)snap_id,
3554                 (unsigned long long)*snap_features,
3555                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3556
3557         return 0;
3558 }
3559
3560 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3561 {
3562         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3563                                                 &rbd_dev->header.features);
3564 }
3565
3566 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3567 {
3568         struct rbd_spec *parent_spec;
3569         size_t size;
3570         void *reply_buf = NULL;
3571         __le64 snapid;
3572         void *p;
3573         void *end;
3574         char *image_id;
3575         u64 overlap;
3576         int ret;
3577
3578         parent_spec = rbd_spec_alloc();
3579         if (!parent_spec)
3580                 return -ENOMEM;
3581
3582         size = sizeof (__le64) +                                /* pool_id */
3583                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3584                 sizeof (__le64) +                               /* snap_id */
3585                 sizeof (__le64);                                /* overlap */
3586         reply_buf = kmalloc(size, GFP_KERNEL);
3587         if (!reply_buf) {
3588                 ret = -ENOMEM;
3589                 goto out_err;
3590         }
3591
3592         snapid = cpu_to_le64(CEPH_NOSNAP);
3593         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3594                                 "rbd", "get_parent",
3595                                 &snapid, sizeof (snapid),
3596                                 reply_buf, size, NULL);
3597         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3598         if (ret < 0)
3599                 goto out_err;
3600
3601         p = reply_buf;
3602         end = reply_buf + ret;
3603         ret = -ERANGE;
3604         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3605         if (parent_spec->pool_id == CEPH_NOPOOL)
3606                 goto out;       /* No parent?  No problem. */
3607
3608         /* The ceph file layout needs to fit pool id in 32 bits */
3609
3610         ret = -EIO;
3611         if (parent_spec->pool_id > (u64)U32_MAX) {
3612                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3613                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3614                 goto out_err;
3615         }
3616
3617         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3618         if (IS_ERR(image_id)) {
3619                 ret = PTR_ERR(image_id);
3620                 goto out_err;
3621         }
3622         parent_spec->image_id = image_id;
3623         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3624         ceph_decode_64_safe(&p, end, overlap, out_err);
3625
3626         rbd_dev->parent_overlap = overlap;
3627         rbd_dev->parent_spec = parent_spec;
3628         parent_spec = NULL;     /* rbd_dev now owns this */
3629 out:
3630         ret = 0;
3631 out_err:
3632         kfree(reply_buf);
3633         rbd_spec_put(parent_spec);
3634
3635         return ret;
3636 }
3637
3638 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3639 {
3640         struct {
3641                 __le64 stripe_unit;
3642                 __le64 stripe_count;
3643         } __attribute__ ((packed)) striping_info_buf = { 0 };
3644         size_t size = sizeof (striping_info_buf);
3645         void *p;
3646         u64 obj_size;
3647         u64 stripe_unit;
3648         u64 stripe_count;
3649         int ret;
3650
3651         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3652                                 "rbd", "get_stripe_unit_count", NULL, 0,
3653                                 (char *)&striping_info_buf, size, NULL);
3654         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3655         if (ret < 0)
3656                 return ret;
3657         if (ret < size)
3658                 return -ERANGE;
3659
3660         /*
3661          * We don't actually support the "fancy striping" feature
3662          * (STRIPINGV2) yet, but if the striping sizes are the
3663          * defaults the behavior is the same as before.  So find
3664          * out, and only fail if the image has non-default values.
3665          */
3666         ret = -EINVAL;
3667         obj_size = (u64)1 << rbd_dev->header.obj_order;
3668         p = &striping_info_buf;
3669         stripe_unit = ceph_decode_64(&p);
3670         if (stripe_unit != obj_size) {
3671                 rbd_warn(rbd_dev, "unsupported stripe unit "
3672                                 "(got %llu want %llu)",
3673                                 stripe_unit, obj_size);
3674                 return -EINVAL;
3675         }
3676         stripe_count = ceph_decode_64(&p);
3677         if (stripe_count != 1) {
3678                 rbd_warn(rbd_dev, "unsupported stripe count "
3679                                 "(got %llu want 1)", stripe_count);
3680                 return -EINVAL;
3681         }
3682         rbd_dev->header.stripe_unit = stripe_unit;
3683         rbd_dev->header.stripe_count = stripe_count;
3684
3685         return 0;
3686 }
3687
3688 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3689 {
3690         size_t image_id_size;
3691         char *image_id;
3692         void *p;
3693         void *end;
3694         size_t size;
3695         void *reply_buf = NULL;
3696         size_t len = 0;
3697         char *image_name = NULL;
3698         int ret;
3699
3700         rbd_assert(!rbd_dev->spec->image_name);
3701
3702         len = strlen(rbd_dev->spec->image_id);
3703         image_id_size = sizeof (__le32) + len;
3704         image_id = kmalloc(image_id_size, GFP_KERNEL);
3705         if (!image_id)
3706                 return NULL;
3707
3708         p = image_id;
3709         end = image_id + image_id_size;
3710         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3711
3712         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3713         reply_buf = kmalloc(size, GFP_KERNEL);
3714         if (!reply_buf)
3715                 goto out;
3716
3717         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3718                                 "rbd", "dir_get_name",
3719                                 image_id, image_id_size,
3720                                 reply_buf, size, NULL);
3721         if (ret < 0)
3722                 goto out;
3723         p = reply_buf;
3724         end = reply_buf + ret;
3725
3726         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3727         if (IS_ERR(image_name))
3728                 image_name = NULL;
3729         else
3730                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3731 out:
3732         kfree(reply_buf);
3733         kfree(image_id);
3734
3735         return image_name;
3736 }
3737
3738 /*
3739  * When an rbd image has a parent image, it is identified by the
3740  * pool, image, and snapshot ids (not names).  This function fills
3741  * in the names for those ids.  (It's OK if we can't figure out the
3742  * name for an image id, but the pool and snapshot ids should always
3743  * exist and have names.)  All names in an rbd spec are dynamically
3744  * allocated.
3745  *
3746  * When an image being mapped (not a parent) is probed, we have the
3747  * pool name and pool id, image name and image id, and the snapshot
3748  * name.  The only thing we're missing is the snapshot id.
3749  *
3750  * The set of snapshots for an image is not known until they have
3751  * been read by rbd_dev_snaps_update(), so we can't completely fill
3752  * in this information until after that has been called.
3753  */
3754 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3755 {
3756         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3757         struct rbd_spec *spec = rbd_dev->spec;
3758         const char *pool_name;
3759         const char *image_name;
3760         const char *snap_name;
3761         int ret;
3762
3763         /*
3764          * An image being mapped will have the pool name (etc.), but
3765          * we need to look up the snapshot id.
3766          */
3767         if (spec->pool_name) {
3768                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3769                         struct rbd_snap *snap;
3770
3771                         snap = snap_by_name(rbd_dev, spec->snap_name);
3772                         if (!snap)
3773                                 return -ENOENT;
3774                         spec->snap_id = snap->id;
3775                 } else {
3776                         spec->snap_id = CEPH_NOSNAP;
3777                 }
3778
3779                 return 0;
3780         }
3781
3782         /* Get the pool name; we have to make our own copy of this */
3783
3784         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3785         if (!pool_name) {
3786                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3787                 return -EIO;
3788         }
3789         pool_name = kstrdup(pool_name, GFP_KERNEL);
3790         if (!pool_name)
3791                 return -ENOMEM;
3792
3793         /* Fetch the image name; tolerate failure here */
3794
3795         image_name = rbd_dev_image_name(rbd_dev);
3796         if (!image_name)
3797                 rbd_warn(rbd_dev, "unable to get image name");
3798
3799         /* Look up the snapshot name, and make a copy */
3800
3801         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3802         if (!snap_name) {
3803                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3804                 ret = -EIO;
3805                 goto out_err;
3806         }
3807         snap_name = kstrdup(snap_name, GFP_KERNEL);
3808         if (!snap_name) {
3809                 ret = -ENOMEM;
3810                 goto out_err;
3811         }
3812
3813         spec->pool_name = pool_name;
3814         spec->image_name = image_name;
3815         spec->snap_name = snap_name;
3816
3817         return 0;
3818 out_err:
3819         kfree(image_name);
3820         kfree(pool_name);
3821
3822         return ret;
3823 }
3824
3825 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3826 {
3827         size_t size;
3828         int ret;
3829         void *reply_buf;
3830         void *p;
3831         void *end;
3832         u64 seq;
3833         u32 snap_count;
3834         struct ceph_snap_context *snapc;
3835         u32 i;
3836
3837         /*
3838          * We'll need room for the seq value (maximum snapshot id),
3839          * snapshot count, and array of that many snapshot ids.
3840          * For now we have a fixed upper limit on the number we're
3841          * prepared to receive.
3842          */
3843         size = sizeof (__le64) + sizeof (__le32) +
3844                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3845         reply_buf = kzalloc(size, GFP_KERNEL);
3846         if (!reply_buf)
3847                 return -ENOMEM;
3848
3849         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3850                                 "rbd", "get_snapcontext", NULL, 0,
3851                                 reply_buf, size, NULL);
3852         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3853         if (ret < 0)
3854                 goto out;
3855
3856         p = reply_buf;
3857         end = reply_buf + ret;
3858         ret = -ERANGE;
3859         ceph_decode_64_safe(&p, end, seq, out);
3860         ceph_decode_32_safe(&p, end, snap_count, out);
3861
3862         /*
3863          * Make sure the reported number of snapshot ids wouldn't go
3864          * beyond the end of our buffer.  But before checking that,
3865          * make sure the computed size of the snapshot context we
3866          * allocate is representable in a size_t.
3867          */
3868         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3869                                  / sizeof (u64)) {
3870                 ret = -EINVAL;
3871                 goto out;
3872         }
3873         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3874                 goto out;
3875         ret = 0;
3876
3877         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3878         if (!snapc) {
3879                 ret = -ENOMEM;
3880                 goto out;
3881         }
3882         snapc->seq = seq;
3883         for (i = 0; i < snap_count; i++)
3884                 snapc->snaps[i] = ceph_decode_64(&p);
3885
3886         rbd_dev->header.snapc = snapc;
3887
3888         dout("  snap context seq = %llu, snap_count = %u\n",
3889                 (unsigned long long)seq, (unsigned int)snap_count);
3890 out:
3891         kfree(reply_buf);
3892
3893         return ret;
3894 }
3895
3896 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3897 {
3898         size_t size;
3899         void *reply_buf;
3900         __le64 snap_id;
3901         int ret;
3902         void *p;
3903         void *end;
3904         char *snap_name;
3905
3906         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3907         reply_buf = kmalloc(size, GFP_KERNEL);
3908         if (!reply_buf)
3909                 return ERR_PTR(-ENOMEM);
3910
3911         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3912         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3913         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3914                                 "rbd", "get_snapshot_name",
3915                                 &snap_id, sizeof (snap_id),
3916                                 reply_buf, size, NULL);
3917         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3918         if (ret < 0) {
3919                 snap_name = ERR_PTR(ret);
3920                 goto out;
3921         }
3922
3923         p = reply_buf;
3924         end = reply_buf + ret;
3925         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3926         if (IS_ERR(snap_name))
3927                 goto out;
3928
3929         dout("  snap_id 0x%016llx snap_name = %s\n",
3930                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3931 out:
3932         kfree(reply_buf);
3933
3934         return snap_name;
3935 }
3936
3937 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3938                 u64 *snap_size, u64 *snap_features)
3939 {
3940         u64 snap_id;
3941         u64 size;
3942         u64 features;
3943         const char *snap_name;
3944         int ret;
3945
3946         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3947         snap_id = rbd_dev->header.snapc->snaps[which];
3948         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3949         if (ret)
3950                 goto out_err;
3951
3952         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3953         if (ret)
3954                 goto out_err;
3955
3956         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3957         if (!IS_ERR(snap_name)) {
3958                 *snap_size = size;
3959                 *snap_features = features;
3960         }
3961
3962         return snap_name;
3963 out_err:
3964         return ERR_PTR(ret);
3965 }
3966
3967 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3968                 u64 *snap_size, u64 *snap_features)
3969 {
3970         if (rbd_dev->image_format == 1)
3971                 return rbd_dev_v1_snap_info(rbd_dev, which,
3972                                         snap_size, snap_features);
3973         if (rbd_dev->image_format == 2)
3974                 return rbd_dev_v2_snap_info(rbd_dev, which,
3975                                         snap_size, snap_features);
3976         return ERR_PTR(-EINVAL);
3977 }
3978
3979 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3980 {
3981         int ret;
3982
3983         down_write(&rbd_dev->header_rwsem);
3984
3985         ret = rbd_dev_v2_image_size(rbd_dev);
3986         if (ret)
3987                 goto out;
3988         rbd_update_mapping_size(rbd_dev);
3989
3990         ret = rbd_dev_v2_snap_context(rbd_dev);
3991         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3992         if (ret)
3993                 goto out;
3994         ret = rbd_dev_snaps_update(rbd_dev);
3995         dout("rbd_dev_snaps_update returned %d\n", ret);
3996         if (ret)
3997                 goto out;
3998 out:
3999         up_write(&rbd_dev->header_rwsem);
4000
4001         return ret;
4002 }
4003
4004 /*
4005  * Scan the rbd device's current snapshot list and compare it to the
4006  * newly-received snapshot context.  Remove any existing snapshots
4007  * not present in the new snapshot context.  Add a new snapshot for
4008  * any snaphots in the snapshot context not in the current list.
4009  * And verify there are no changes to snapshots we already know
4010  * about.
4011  *
4012  * Assumes the snapshots in the snapshot context are sorted by
4013  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4014  * are also maintained in that order.)
4015  *
4016  * Note that any error occurs while updating the snapshot list
4017  * aborts the update, and the entire list is cleared.  The snapshot
4018  * list becomes inconsistent at that point anyway, so it might as
4019  * well be empty.
4020  */
4021 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4022 {
4023         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4024         const u32 snap_count = snapc->num_snaps;
4025         struct list_head *head = &rbd_dev->snaps;
4026         struct list_head *links = head->next;
4027         u32 index = 0;
4028         int ret = 0;
4029
4030         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4031         while (index < snap_count || links != head) {
4032                 u64 snap_id;
4033                 struct rbd_snap *snap;
4034                 const char *snap_name;
4035                 u64 snap_size = 0;
4036                 u64 snap_features = 0;
4037
4038                 snap_id = index < snap_count ? snapc->snaps[index]
4039                                              : CEPH_NOSNAP;
4040                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4041                                      : NULL;
4042                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4043
4044                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4045                         struct list_head *next = links->next;
4046
4047                         /*
4048                          * A previously-existing snapshot is not in
4049                          * the new snap context.
4050                          *
4051                          * If the now-missing snapshot is the one
4052                          * the image represents, clear its existence
4053                          * flag so we can avoid sending any more
4054                          * requests to it.
4055                          */
4056                         if (rbd_dev->spec->snap_id == snap->id)
4057                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4058                         dout("removing %ssnap id %llu\n",
4059                                 rbd_dev->spec->snap_id == snap->id ?
4060                                                         "mapped " : "",
4061                                 (unsigned long long)snap->id);
4062
4063                         list_del(&snap->node);
4064                         rbd_snap_destroy(snap);
4065
4066                         /* Done with this list entry; advance */
4067
4068                         links = next;
4069                         continue;
4070                 }
4071
4072                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4073                                         &snap_size, &snap_features);
4074                 if (IS_ERR(snap_name)) {
4075                         ret = PTR_ERR(snap_name);
4076                         dout("failed to get snap info, error %d\n", ret);
4077                         goto out_err;
4078                 }
4079
4080                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4081                         (unsigned long long)snap_id);
4082                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4083                         struct rbd_snap *new_snap;
4084
4085                         /* We haven't seen this snapshot before */
4086
4087                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4088                                         snap_id, snap_size, snap_features);
4089                         if (IS_ERR(new_snap)) {
4090                                 ret = PTR_ERR(new_snap);
4091                                 dout("  failed to add dev, error %d\n", ret);
4092                                 goto out_err;
4093                         }
4094
4095                         /* New goes before existing, or at end of list */
4096
4097                         dout("  added dev%s\n", snap ? "" : " at end\n");
4098                         if (snap)
4099                                 list_add_tail(&new_snap->node, &snap->node);
4100                         else
4101                                 list_add_tail(&new_snap->node, head);
4102                 } else {
4103                         /* Already have this one */
4104
4105                         dout("  already present\n");
4106
4107                         rbd_assert(snap->size == snap_size);
4108                         rbd_assert(!strcmp(snap->name, snap_name));
4109                         rbd_assert(snap->features == snap_features);
4110
4111                         /* Done with this list entry; advance */
4112
4113                         links = links->next;
4114                 }
4115
4116                 /* Advance to the next entry in the snapshot context */
4117
4118                 index++;
4119         }
4120         dout("%s: done\n", __func__);
4121
4122         return 0;
4123 out_err:
4124         rbd_remove_all_snaps(rbd_dev);
4125
4126         return ret;
4127 }
4128
4129 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4130 {
4131         struct device *dev;
4132         int ret;
4133
4134         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4135
4136         dev = &rbd_dev->dev;
4137         dev->bus = &rbd_bus_type;
4138         dev->type = &rbd_device_type;
4139         dev->parent = &rbd_root_dev;
4140         dev->release = rbd_dev_device_release;
4141         dev_set_name(dev, "%d", rbd_dev->dev_id);
4142         ret = device_register(dev);
4143
4144         mutex_unlock(&ctl_mutex);
4145
4146         return ret;
4147 }
4148
4149 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4150 {
4151         device_unregister(&rbd_dev->dev);
4152 }
4153
4154 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4155
4156 /*
4157  * Get a unique rbd identifier for the given new rbd_dev, and add
4158  * the rbd_dev to the global list.  The minimum rbd id is 1.
4159  */
4160 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4161 {
4162         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4163
4164         spin_lock(&rbd_dev_list_lock);
4165         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4166         spin_unlock(&rbd_dev_list_lock);
4167         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4168                 (unsigned long long) rbd_dev->dev_id);
4169 }
4170
4171 /*
4172  * Remove an rbd_dev from the global list, and record that its
4173  * identifier is no longer in use.
4174  */
4175 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4176 {
4177         struct list_head *tmp;
4178         int rbd_id = rbd_dev->dev_id;
4179         int max_id;
4180
4181         rbd_assert(rbd_id > 0);
4182
4183         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4184                 (unsigned long long) rbd_dev->dev_id);
4185         spin_lock(&rbd_dev_list_lock);
4186         list_del_init(&rbd_dev->node);
4187
4188         /*
4189          * If the id being "put" is not the current maximum, there
4190          * is nothing special we need to do.
4191          */
4192         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4193                 spin_unlock(&rbd_dev_list_lock);
4194                 return;
4195         }
4196
4197         /*
4198          * We need to update the current maximum id.  Search the
4199          * list to find out what it is.  We're more likely to find
4200          * the maximum at the end, so search the list backward.
4201          */
4202         max_id = 0;
4203         list_for_each_prev(tmp, &rbd_dev_list) {
4204                 struct rbd_device *rbd_dev;
4205
4206                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4207                 if (rbd_dev->dev_id > max_id)
4208                         max_id = rbd_dev->dev_id;
4209         }
4210         spin_unlock(&rbd_dev_list_lock);
4211
4212         /*
4213          * The max id could have been updated by rbd_dev_id_get(), in
4214          * which case it now accurately reflects the new maximum.
4215          * Be careful not to overwrite the maximum value in that
4216          * case.
4217          */
4218         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4219         dout("  max dev id has been reset\n");
4220 }
4221
4222 /*
4223  * Skips over white space at *buf, and updates *buf to point to the
4224  * first found non-space character (if any). Returns the length of
4225  * the token (string of non-white space characters) found.  Note
4226  * that *buf must be terminated with '\0'.
4227  */
4228 static inline size_t next_token(const char **buf)
4229 {
4230         /*
4231         * These are the characters that produce nonzero for
4232         * isspace() in the "C" and "POSIX" locales.
4233         */
4234         const char *spaces = " \f\n\r\t\v";
4235
4236         *buf += strspn(*buf, spaces);   /* Find start of token */
4237
4238         return strcspn(*buf, spaces);   /* Return token length */
4239 }
4240
4241 /*
4242  * Finds the next token in *buf, and if the provided token buffer is
4243  * big enough, copies the found token into it.  The result, if
4244  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4245  * must be terminated with '\0' on entry.
4246  *
4247  * Returns the length of the token found (not including the '\0').
4248  * Return value will be 0 if no token is found, and it will be >=
4249  * token_size if the token would not fit.
4250  *
4251  * The *buf pointer will be updated to point beyond the end of the
4252  * found token.  Note that this occurs even if the token buffer is
4253  * too small to hold it.
4254  */
4255 static inline size_t copy_token(const char **buf,
4256                                 char *token,
4257                                 size_t token_size)
4258 {
4259         size_t len;
4260
4261         len = next_token(buf);
4262         if (len < token_size) {
4263                 memcpy(token, *buf, len);
4264                 *(token + len) = '\0';
4265         }
4266         *buf += len;
4267
4268         return len;
4269 }
4270
4271 /*
4272  * Finds the next token in *buf, dynamically allocates a buffer big
4273  * enough to hold a copy of it, and copies the token into the new
4274  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4275  * that a duplicate buffer is created even for a zero-length token.
4276  *
4277  * Returns a pointer to the newly-allocated duplicate, or a null
4278  * pointer if memory for the duplicate was not available.  If
4279  * the lenp argument is a non-null pointer, the length of the token
4280  * (not including the '\0') is returned in *lenp.
4281  *
4282  * If successful, the *buf pointer will be updated to point beyond
4283  * the end of the found token.
4284  *
4285  * Note: uses GFP_KERNEL for allocation.
4286  */
4287 static inline char *dup_token(const char **buf, size_t *lenp)
4288 {
4289         char *dup;
4290         size_t len;
4291
4292         len = next_token(buf);
4293         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4294         if (!dup)
4295                 return NULL;
4296         *(dup + len) = '\0';
4297         *buf += len;
4298
4299         if (lenp)
4300                 *lenp = len;
4301
4302         return dup;
4303 }
4304
4305 /*
4306  * Parse the options provided for an "rbd add" (i.e., rbd image
4307  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4308  * and the data written is passed here via a NUL-terminated buffer.
4309  * Returns 0 if successful or an error code otherwise.
4310  *
4311  * The information extracted from these options is recorded in
4312  * the other parameters which return dynamically-allocated
4313  * structures:
4314  *  ceph_opts
4315  *      The address of a pointer that will refer to a ceph options
4316  *      structure.  Caller must release the returned pointer using
4317  *      ceph_destroy_options() when it is no longer needed.
4318  *  rbd_opts
4319  *      Address of an rbd options pointer.  Fully initialized by
4320  *      this function; caller must release with kfree().
4321  *  spec
4322  *      Address of an rbd image specification pointer.  Fully
4323  *      initialized by this function based on parsed options.
4324  *      Caller must release with rbd_spec_put().
4325  *
4326  * The options passed take this form:
4327  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4328  * where:
4329  *  <mon_addrs>
4330  *      A comma-separated list of one or more monitor addresses.
4331  *      A monitor address is an ip address, optionally followed
4332  *      by a port number (separated by a colon).
4333  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4334  *  <options>
4335  *      A comma-separated list of ceph and/or rbd options.
4336  *  <pool_name>
4337  *      The name of the rados pool containing the rbd image.
4338  *  <image_name>
4339  *      The name of the image in that pool to map.
4340  *  <snap_id>
4341  *      An optional snapshot id.  If provided, the mapping will
4342  *      present data from the image at the time that snapshot was
4343  *      created.  The image head is used if no snapshot id is
4344  *      provided.  Snapshot mappings are always read-only.
4345  */
4346 static int rbd_add_parse_args(const char *buf,
4347                                 struct ceph_options **ceph_opts,
4348                                 struct rbd_options **opts,
4349                                 struct rbd_spec **rbd_spec)
4350 {
4351         size_t len;
4352         char *options;
4353         const char *mon_addrs;
4354         char *snap_name;
4355         size_t mon_addrs_size;
4356         struct rbd_spec *spec = NULL;
4357         struct rbd_options *rbd_opts = NULL;
4358         struct ceph_options *copts;
4359         int ret;
4360
4361         /* The first four tokens are required */
4362
4363         len = next_token(&buf);
4364         if (!len) {
4365                 rbd_warn(NULL, "no monitor address(es) provided");
4366                 return -EINVAL;
4367         }
4368         mon_addrs = buf;
4369         mon_addrs_size = len + 1;
4370         buf += len;
4371
4372         ret = -EINVAL;
4373         options = dup_token(&buf, NULL);
4374         if (!options)
4375                 return -ENOMEM;
4376         if (!*options) {
4377                 rbd_warn(NULL, "no options provided");
4378                 goto out_err;
4379         }
4380
4381         spec = rbd_spec_alloc();
4382         if (!spec)
4383                 goto out_mem;
4384
4385         spec->pool_name = dup_token(&buf, NULL);
4386         if (!spec->pool_name)
4387                 goto out_mem;
4388         if (!*spec->pool_name) {
4389                 rbd_warn(NULL, "no pool name provided");
4390                 goto out_err;
4391         }
4392
4393         spec->image_name = dup_token(&buf, NULL);
4394         if (!spec->image_name)
4395                 goto out_mem;
4396         if (!*spec->image_name) {
4397                 rbd_warn(NULL, "no image name provided");
4398                 goto out_err;
4399         }
4400
4401         /*
4402          * Snapshot name is optional; default is to use "-"
4403          * (indicating the head/no snapshot).
4404          */
4405         len = next_token(&buf);
4406         if (!len) {
4407                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4408                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4409         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4410                 ret = -ENAMETOOLONG;
4411                 goto out_err;
4412         }
4413         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4414         if (!snap_name)
4415                 goto out_mem;
4416         *(snap_name + len) = '\0';
4417         spec->snap_name = snap_name;
4418
4419         /* Initialize all rbd options to the defaults */
4420
4421         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4422         if (!rbd_opts)
4423                 goto out_mem;
4424
4425         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4426
4427         copts = ceph_parse_options(options, mon_addrs,
4428                                         mon_addrs + mon_addrs_size - 1,
4429                                         parse_rbd_opts_token, rbd_opts);
4430         if (IS_ERR(copts)) {
4431                 ret = PTR_ERR(copts);
4432                 goto out_err;
4433         }
4434         kfree(options);
4435
4436         *ceph_opts = copts;
4437         *opts = rbd_opts;
4438         *rbd_spec = spec;
4439
4440         return 0;
4441 out_mem:
4442         ret = -ENOMEM;
4443 out_err:
4444         kfree(rbd_opts);
4445         rbd_spec_put(spec);
4446         kfree(options);
4447
4448         return ret;
4449 }
4450
4451 /*
4452  * An rbd format 2 image has a unique identifier, distinct from the
4453  * name given to it by the user.  Internally, that identifier is
4454  * what's used to specify the names of objects related to the image.
4455  *
4456  * A special "rbd id" object is used to map an rbd image name to its
4457  * id.  If that object doesn't exist, then there is no v2 rbd image
4458  * with the supplied name.
4459  *
4460  * This function will record the given rbd_dev's image_id field if
4461  * it can be determined, and in that case will return 0.  If any
4462  * errors occur a negative errno will be returned and the rbd_dev's
4463  * image_id field will be unchanged (and should be NULL).
4464  */
4465 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4466 {
4467         int ret;
4468         size_t size;
4469         char *object_name;
4470         void *response;
4471         char *image_id;
4472
4473         /*
4474          * When probing a parent image, the image id is already
4475          * known (and the image name likely is not).  There's no
4476          * need to fetch the image id again in this case.  We
4477          * do still need to set the image format though.
4478          */
4479         if (rbd_dev->spec->image_id) {
4480                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4481
4482                 return 0;
4483         }
4484
4485         /*
4486          * First, see if the format 2 image id file exists, and if
4487          * so, get the image's persistent id from it.
4488          */
4489         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4490         object_name = kmalloc(size, GFP_NOIO);
4491         if (!object_name)
4492                 return -ENOMEM;
4493         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4494         dout("rbd id object name is %s\n", object_name);
4495
4496         /* Response will be an encoded string, which includes a length */
4497
4498         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4499         response = kzalloc(size, GFP_NOIO);
4500         if (!response) {
4501                 ret = -ENOMEM;
4502                 goto out;
4503         }
4504
4505         /* If it doesn't exist we'll assume it's a format 1 image */
4506
4507         ret = rbd_obj_method_sync(rbd_dev, object_name,
4508                                 "rbd", "get_id", NULL, 0,
4509                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4510         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4511         if (ret == -ENOENT) {
4512                 image_id = kstrdup("", GFP_KERNEL);
4513                 ret = image_id ? 0 : -ENOMEM;
4514                 if (!ret)
4515                         rbd_dev->image_format = 1;
4516         } else if (ret > sizeof (__le32)) {
4517                 void *p = response;
4518
4519                 image_id = ceph_extract_encoded_string(&p, p + ret,
4520                                                 NULL, GFP_NOIO);
4521                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4522                 if (!ret)
4523                         rbd_dev->image_format = 2;
4524         } else {
4525                 ret = -EINVAL;
4526         }
4527
4528         if (!ret) {
4529                 rbd_dev->spec->image_id = image_id;
4530                 dout("image_id is %s\n", image_id);
4531         }
4532 out:
4533         kfree(response);
4534         kfree(object_name);
4535
4536         return ret;
4537 }
4538
4539 /* Undo whatever state changes are made by v1 or v2 image probe */
4540
4541 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4542 {
4543         struct rbd_image_header *header;
4544
4545         rbd_dev_remove_parent(rbd_dev);
4546         rbd_spec_put(rbd_dev->parent_spec);
4547         rbd_dev->parent_spec = NULL;
4548         rbd_dev->parent_overlap = 0;
4549
4550         /* Free dynamic fields from the header, then zero it out */
4551
4552         header = &rbd_dev->header;
4553         ceph_put_snap_context(header->snapc);
4554         kfree(header->snap_sizes);
4555         kfree(header->snap_names);
4556         kfree(header->object_prefix);
4557         memset(header, 0, sizeof (*header));
4558 }
4559
4560 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4561 {
4562         int ret;
4563
4564         /* Populate rbd image metadata */
4565
4566         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4567         if (ret < 0)
4568                 goto out_err;
4569
4570         /* Version 1 images have no parent (no layering) */
4571
4572         rbd_dev->parent_spec = NULL;
4573         rbd_dev->parent_overlap = 0;
4574
4575         dout("discovered version 1 image, header name is %s\n",
4576                 rbd_dev->header_name);
4577
4578         return 0;
4579
4580 out_err:
4581         kfree(rbd_dev->header_name);
4582         rbd_dev->header_name = NULL;
4583         kfree(rbd_dev->spec->image_id);
4584         rbd_dev->spec->image_id = NULL;
4585
4586         return ret;
4587 }
4588
4589 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4590 {
4591         int ret;
4592
4593         ret = rbd_dev_v2_image_size(rbd_dev);
4594         if (ret)
4595                 goto out_err;
4596
4597         /* Get the object prefix (a.k.a. block_name) for the image */
4598
4599         ret = rbd_dev_v2_object_prefix(rbd_dev);
4600         if (ret)
4601                 goto out_err;
4602
4603         /* Get the and check features for the image */
4604
4605         ret = rbd_dev_v2_features(rbd_dev);
4606         if (ret)
4607                 goto out_err;
4608
4609         /* If the image supports layering, get the parent info */
4610
4611         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4612                 ret = rbd_dev_v2_parent_info(rbd_dev);
4613                 if (ret)
4614                         goto out_err;
4615
4616                 /*
4617                  * Don't print a warning for parent images.  We can
4618                  * tell this point because we won't know its pool
4619                  * name yet (just its pool id).
4620                  */
4621                 if (rbd_dev->spec->pool_name)
4622                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4623                                         "is EXPERIMENTAL!");
4624         }
4625
4626         /* If the image supports fancy striping, get its parameters */
4627
4628         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4629                 ret = rbd_dev_v2_striping_info(rbd_dev);
4630                 if (ret < 0)
4631                         goto out_err;
4632         }
4633
4634         /* crypto and compression type aren't (yet) supported for v2 images */
4635
4636         rbd_dev->header.crypt_type = 0;
4637         rbd_dev->header.comp_type = 0;
4638
4639         /* Get the snapshot context, plus the header version */
4640
4641         ret = rbd_dev_v2_snap_context(rbd_dev);
4642         if (ret)
4643                 goto out_err;
4644
4645         dout("discovered version 2 image, header name is %s\n",
4646                 rbd_dev->header_name);
4647
4648         return 0;
4649 out_err:
4650         rbd_dev->parent_overlap = 0;
4651         rbd_spec_put(rbd_dev->parent_spec);
4652         rbd_dev->parent_spec = NULL;
4653         kfree(rbd_dev->header_name);
4654         rbd_dev->header_name = NULL;
4655         kfree(rbd_dev->header.object_prefix);
4656         rbd_dev->header.object_prefix = NULL;
4657
4658         return ret;
4659 }
4660
4661 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4662 {
4663         struct rbd_device *parent = NULL;
4664         struct rbd_spec *parent_spec;
4665         struct rbd_client *rbdc;
4666         int ret;
4667
4668         if (!rbd_dev->parent_spec)
4669                 return 0;
4670         /*
4671          * We need to pass a reference to the client and the parent
4672          * spec when creating the parent rbd_dev.  Images related by
4673          * parent/child relationships always share both.
4674          */
4675         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4676         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4677
4678         ret = -ENOMEM;
4679         parent = rbd_dev_create(rbdc, parent_spec);
4680         if (!parent)
4681                 goto out_err;
4682
4683         ret = rbd_dev_image_probe(parent);
4684         if (ret < 0)
4685                 goto out_err;
4686         rbd_dev->parent = parent;
4687
4688         return 0;
4689 out_err:
4690         if (parent) {
4691                 rbd_spec_put(rbd_dev->parent_spec);
4692                 kfree(rbd_dev->header_name);
4693                 rbd_dev_destroy(parent);
4694         } else {
4695                 rbd_put_client(rbdc);
4696                 rbd_spec_put(parent_spec);
4697         }
4698
4699         return ret;
4700 }
4701
4702 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4703 {
4704         int ret;
4705
4706         ret = rbd_dev_mapping_set(rbd_dev);
4707         if (ret)
4708                 return ret;
4709
4710         /* generate unique id: find highest unique id, add one */
4711         rbd_dev_id_get(rbd_dev);
4712
4713         /* Fill in the device name, now that we have its id. */
4714         BUILD_BUG_ON(DEV_NAME_LEN
4715                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4716         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4717
4718         /* Get our block major device number. */
4719
4720         ret = register_blkdev(0, rbd_dev->name);
4721         if (ret < 0)
4722                 goto err_out_id;
4723         rbd_dev->major = ret;
4724
4725         /* Set up the blkdev mapping. */
4726
4727         ret = rbd_init_disk(rbd_dev);
4728         if (ret)
4729                 goto err_out_blkdev;
4730
4731         ret = rbd_bus_add_dev(rbd_dev);
4732         if (ret)
4733                 goto err_out_disk;
4734
4735         /* Everything's ready.  Announce the disk to the world. */
4736
4737         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4738         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4739         add_disk(rbd_dev->disk);
4740
4741         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4742                 (unsigned long long) rbd_dev->mapping.size);
4743
4744         return ret;
4745
4746 err_out_disk:
4747         rbd_free_disk(rbd_dev);
4748 err_out_blkdev:
4749         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4750 err_out_id:
4751         rbd_dev_id_put(rbd_dev);
4752         rbd_dev_mapping_clear(rbd_dev);
4753
4754         return ret;
4755 }
4756
4757 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4758 {
4759         struct rbd_spec *spec = rbd_dev->spec;
4760         size_t size;
4761
4762         /* Record the header object name for this rbd image. */
4763
4764         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4765
4766         if (rbd_dev->image_format == 1)
4767                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4768         else
4769                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4770
4771         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4772         if (!rbd_dev->header_name)
4773                 return -ENOMEM;
4774
4775         if (rbd_dev->image_format == 1)
4776                 sprintf(rbd_dev->header_name, "%s%s",
4777                         spec->image_name, RBD_SUFFIX);
4778         else
4779                 sprintf(rbd_dev->header_name, "%s%s",
4780                         RBD_HEADER_PREFIX, spec->image_id);
4781         return 0;
4782 }
4783
4784 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4785 {
4786         int ret;
4787
4788         rbd_remove_all_snaps(rbd_dev);
4789         rbd_dev_unprobe(rbd_dev);
4790         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4791         if (ret)
4792                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4793         kfree(rbd_dev->header_name);
4794         rbd_dev->header_name = NULL;
4795         rbd_dev->image_format = 0;
4796         kfree(rbd_dev->spec->image_id);
4797         rbd_dev->spec->image_id = NULL;
4798
4799         rbd_dev_destroy(rbd_dev);
4800 }
4801
4802 /*
4803  * Probe for the existence of the header object for the given rbd
4804  * device.  For format 2 images this includes determining the image
4805  * id.
4806  */
4807 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4808 {
4809         int ret;
4810         int tmp;
4811
4812         /*
4813          * Get the id from the image id object.  If it's not a
4814          * format 2 image, we'll get ENOENT back, and we'll assume
4815          * it's a format 1 image.
4816          */
4817         ret = rbd_dev_image_id(rbd_dev);
4818         if (ret)
4819                 return ret;
4820         rbd_assert(rbd_dev->spec->image_id);
4821         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4822
4823         ret = rbd_dev_header_name(rbd_dev);
4824         if (ret)
4825                 goto err_out_format;
4826
4827         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4828         if (ret)
4829                 goto out_header_name;
4830
4831         if (rbd_dev->image_format == 1)
4832                 ret = rbd_dev_v1_probe(rbd_dev);
4833         else
4834                 ret = rbd_dev_v2_probe(rbd_dev);
4835         if (ret)
4836                 goto err_out_watch;
4837
4838         ret = rbd_dev_snaps_update(rbd_dev);
4839         if (ret)
4840                 goto err_out_probe;
4841
4842         ret = rbd_dev_spec_update(rbd_dev);
4843         if (ret)
4844                 goto err_out_snaps;
4845
4846         ret = rbd_dev_probe_parent(rbd_dev);
4847         if (!ret)
4848                 return 0;
4849
4850 err_out_snaps:
4851         rbd_remove_all_snaps(rbd_dev);
4852 err_out_probe:
4853         rbd_dev_unprobe(rbd_dev);
4854 err_out_watch:
4855         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4856         if (tmp)
4857                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4858 out_header_name:
4859         kfree(rbd_dev->header_name);
4860         rbd_dev->header_name = NULL;
4861 err_out_format:
4862         rbd_dev->image_format = 0;
4863         kfree(rbd_dev->spec->image_id);
4864         rbd_dev->spec->image_id = NULL;
4865
4866         dout("probe failed, returning %d\n", ret);
4867
4868         return ret;
4869 }
4870
4871 static ssize_t rbd_add(struct bus_type *bus,
4872                        const char *buf,
4873                        size_t count)
4874 {
4875         struct rbd_device *rbd_dev = NULL;
4876         struct ceph_options *ceph_opts = NULL;
4877         struct rbd_options *rbd_opts = NULL;
4878         struct rbd_spec *spec = NULL;
4879         struct rbd_client *rbdc;
4880         struct ceph_osd_client *osdc;
4881         int rc = -ENOMEM;
4882
4883         if (!try_module_get(THIS_MODULE))
4884                 return -ENODEV;
4885
4886         /* parse add command */
4887         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4888         if (rc < 0)
4889                 goto err_out_module;
4890
4891         rbdc = rbd_get_client(ceph_opts);
4892         if (IS_ERR(rbdc)) {
4893                 rc = PTR_ERR(rbdc);
4894                 goto err_out_args;
4895         }
4896         ceph_opts = NULL;       /* rbd_dev client now owns this */
4897
4898         /* pick the pool */
4899         osdc = &rbdc->client->osdc;
4900         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4901         if (rc < 0)
4902                 goto err_out_client;
4903         spec->pool_id = (u64)rc;
4904
4905         /* The ceph file layout needs to fit pool id in 32 bits */
4906
4907         if (spec->pool_id > (u64)U32_MAX) {
4908                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4909                                 (unsigned long long)spec->pool_id, U32_MAX);
4910                 rc = -EIO;
4911                 goto err_out_client;
4912         }
4913
4914         rbd_dev = rbd_dev_create(rbdc, spec);
4915         if (!rbd_dev)
4916                 goto err_out_client;
4917         rbdc = NULL;            /* rbd_dev now owns this */
4918         spec = NULL;            /* rbd_dev now owns this */
4919
4920         rbd_dev->mapping.read_only = rbd_opts->read_only;
4921         kfree(rbd_opts);
4922         rbd_opts = NULL;        /* done with this */
4923
4924         rc = rbd_dev_image_probe(rbd_dev);
4925         if (rc < 0)
4926                 goto err_out_rbd_dev;
4927
4928         rc = rbd_dev_device_setup(rbd_dev);
4929         if (!rc)
4930                 return count;
4931
4932         rbd_dev_image_release(rbd_dev);
4933 err_out_rbd_dev:
4934         rbd_dev_destroy(rbd_dev);
4935 err_out_client:
4936         rbd_put_client(rbdc);
4937 err_out_args:
4938         if (ceph_opts)
4939                 ceph_destroy_options(ceph_opts);
4940         kfree(rbd_opts);
4941         rbd_spec_put(spec);
4942 err_out_module:
4943         module_put(THIS_MODULE);
4944
4945         dout("Error adding device %s\n", buf);
4946
4947         return (ssize_t)rc;
4948 }
4949
4950 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4951 {
4952         struct list_head *tmp;
4953         struct rbd_device *rbd_dev;
4954
4955         spin_lock(&rbd_dev_list_lock);
4956         list_for_each(tmp, &rbd_dev_list) {
4957                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4958                 if (rbd_dev->dev_id == dev_id) {
4959                         spin_unlock(&rbd_dev_list_lock);
4960                         return rbd_dev;
4961                 }
4962         }
4963         spin_unlock(&rbd_dev_list_lock);
4964         return NULL;
4965 }
4966
4967 static void rbd_dev_device_release(struct device *dev)
4968 {
4969         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4970
4971         rbd_free_disk(rbd_dev);
4972         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4973         rbd_dev_clear_mapping(rbd_dev);
4974         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4975         rbd_dev->major = 0;
4976         rbd_dev_id_put(rbd_dev);
4977         rbd_dev_mapping_clear(rbd_dev);
4978 }
4979
4980 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4981 {
4982         while (rbd_dev->parent) {
4983                 struct rbd_device *first = rbd_dev;
4984                 struct rbd_device *second = first->parent;
4985                 struct rbd_device *third;
4986
4987                 /*
4988                  * Follow to the parent with no grandparent and
4989                  * remove it.
4990                  */
4991                 while (second && (third = second->parent)) {
4992                         first = second;
4993                         second = third;
4994                 }
4995                 rbd_assert(second);
4996                 rbd_dev_image_release(second);
4997                 first->parent = NULL;
4998                 first->parent_overlap = 0;
4999
5000                 rbd_assert(first->parent_spec);
5001                 rbd_spec_put(first->parent_spec);
5002                 first->parent_spec = NULL;
5003         }
5004 }
5005
5006 static ssize_t rbd_remove(struct bus_type *bus,
5007                           const char *buf,
5008                           size_t count)
5009 {
5010         struct rbd_device *rbd_dev = NULL;
5011         int target_id;
5012         unsigned long ul;
5013         int ret;
5014
5015         ret = strict_strtoul(buf, 10, &ul);
5016         if (ret)
5017                 return ret;
5018
5019         /* convert to int; abort if we lost anything in the conversion */
5020         target_id = (int) ul;
5021         if (target_id != ul)
5022                 return -EINVAL;
5023
5024         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5025
5026         rbd_dev = __rbd_get_dev(target_id);
5027         if (!rbd_dev) {
5028                 ret = -ENOENT;
5029                 goto done;
5030         }
5031
5032         spin_lock_irq(&rbd_dev->lock);
5033         if (rbd_dev->open_count)
5034                 ret = -EBUSY;
5035         else
5036                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5037         spin_unlock_irq(&rbd_dev->lock);
5038         if (ret < 0)
5039                 goto done;
5040         ret = count;
5041         rbd_bus_del_dev(rbd_dev);
5042         rbd_dev_image_release(rbd_dev);
5043         module_put(THIS_MODULE);
5044 done:
5045         mutex_unlock(&ctl_mutex);
5046
5047         return ret;
5048 }
5049
5050 /*
5051  * create control files in sysfs
5052  * /sys/bus/rbd/...
5053  */
5054 static int rbd_sysfs_init(void)
5055 {
5056         int ret;
5057
5058         ret = device_register(&rbd_root_dev);
5059         if (ret < 0)
5060                 return ret;
5061
5062         ret = bus_register(&rbd_bus_type);
5063         if (ret < 0)
5064                 device_unregister(&rbd_root_dev);
5065
5066         return ret;
5067 }
5068
5069 static void rbd_sysfs_cleanup(void)
5070 {
5071         bus_unregister(&rbd_bus_type);
5072         device_unregister(&rbd_root_dev);
5073 }
5074
5075 static int __init rbd_init(void)
5076 {
5077         int rc;
5078
5079         if (!libceph_compatible(NULL)) {
5080                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5081
5082                 return -EINVAL;
5083         }
5084         rc = rbd_sysfs_init();
5085         if (rc)
5086                 return rc;
5087         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5088         return 0;
5089 }
5090
5091 static void __exit rbd_exit(void)
5092 {
5093         rbd_sysfs_cleanup();
5094 }
5095
5096 module_init(rbd_init);
5097 module_exit(rbd_exit);
5098
5099 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5100 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5101 MODULE_DESCRIPTION("rados block device");
5102
5103 /* following authorship retained from original osdblk.c */
5104 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5105
5106 MODULE_LICENSE("GPL");