]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: encapsulate submission of image object requests
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221
222         struct ceph_osd_request *osd_req;
223
224         u64                     xferred;        /* bytes transferred */
225         u64                     version;
226         int                     result;
227
228         rbd_obj_callback_t      callback;
229         struct completion       completion;
230
231         struct kref             kref;
232 };
233
234 enum img_req_flags {
235         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
236         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
237         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
238 };
239
240 struct rbd_img_request {
241         struct rbd_device       *rbd_dev;
242         u64                     offset; /* starting image byte offset */
243         u64                     length; /* byte count from offset */
244         unsigned long           flags;
245         union {
246                 u64                     snap_id;        /* for reads */
247                 struct ceph_snap_context *snapc;        /* for writes */
248         };
249         union {
250                 struct request          *rq;            /* block request */
251                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
252         };
253         spinlock_t              completion_lock;/* protects next_completion */
254         u32                     next_completion;
255         rbd_img_callback_t      callback;
256         u64                     xferred;/* aggregate bytes transferred */
257         int                     result; /* first nonzero obj_request result */
258
259         u32                     obj_request_count;
260         struct list_head        obj_requests;   /* rbd_obj_request structs */
261
262         struct kref             kref;
263 };
264
265 #define for_each_obj_request(ireq, oreq) \
266         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
267 #define for_each_obj_request_from(ireq, oreq) \
268         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_safe(ireq, oreq, n) \
270         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
271
272 struct rbd_snap {
273         struct  device          dev;
274         const char              *name;
275         u64                     size;
276         struct list_head        node;
277         u64                     id;
278         u64                     features;
279 };
280
281 struct rbd_mapping {
282         u64                     size;
283         u64                     features;
284         bool                    read_only;
285 };
286
287 /*
288  * a single device
289  */
290 struct rbd_device {
291         int                     dev_id;         /* blkdev unique id */
292
293         int                     major;          /* blkdev assigned major */
294         struct gendisk          *disk;          /* blkdev's gendisk and rq */
295
296         u32                     image_format;   /* Either 1 or 2 */
297         struct rbd_client       *rbd_client;
298
299         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
300
301         spinlock_t              lock;           /* queue, flags, open_count */
302
303         struct rbd_image_header header;
304         unsigned long           flags;          /* possibly lock protected */
305         struct rbd_spec         *spec;
306
307         char                    *header_name;
308
309         struct ceph_file_layout layout;
310
311         struct ceph_osd_event   *watch_event;
312         struct rbd_obj_request  *watch_request;
313
314         struct rbd_spec         *parent_spec;
315         u64                     parent_overlap;
316         struct rbd_device       *parent;
317
318         /* protects updating the header */
319         struct rw_semaphore     header_rwsem;
320
321         struct rbd_mapping      mapping;
322
323         struct list_head        node;
324
325         /* list of snapshots */
326         struct list_head        snaps;
327
328         /* sysfs related */
329         struct device           dev;
330         unsigned long           open_count;     /* protected by lock */
331 };
332
333 /*
334  * Flag bits for rbd_dev->flags.  If atomicity is required,
335  * rbd_dev->lock is used to protect access.
336  *
337  * Currently, only the "removing" flag (which is coupled with the
338  * "open_count" field) requires atomic access.
339  */
340 enum rbd_dev_flags {
341         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
342         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
343 };
344
345 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
346
347 static LIST_HEAD(rbd_dev_list);    /* devices */
348 static DEFINE_SPINLOCK(rbd_dev_list_lock);
349
350 static LIST_HEAD(rbd_client_list);              /* clients */
351 static DEFINE_SPINLOCK(rbd_client_list_lock);
352
353 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
354 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
355
356 static void rbd_dev_release(struct device *dev);
357 static void rbd_remove_snap_dev(struct rbd_snap *snap);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_probe(struct rbd_device *rbd_dev);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
427
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
430
431 static int rbd_open(struct block_device *bdev, fmode_t mode)
432 {
433         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
434         bool removing = false;
435
436         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
437                 return -EROFS;
438
439         spin_lock_irq(&rbd_dev->lock);
440         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
441                 removing = true;
442         else
443                 rbd_dev->open_count++;
444         spin_unlock_irq(&rbd_dev->lock);
445         if (removing)
446                 return -ENOENT;
447
448         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
449         (void) get_device(&rbd_dev->dev);
450         set_device_ro(bdev, rbd_dev->mapping.read_only);
451         mutex_unlock(&ctl_mutex);
452
453         return 0;
454 }
455
456 static int rbd_release(struct gendisk *disk, fmode_t mode)
457 {
458         struct rbd_device *rbd_dev = disk->private_data;
459         unsigned long open_count_before;
460
461         spin_lock_irq(&rbd_dev->lock);
462         open_count_before = rbd_dev->open_count--;
463         spin_unlock_irq(&rbd_dev->lock);
464         rbd_assert(open_count_before > 0);
465
466         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
467         put_device(&rbd_dev->dev);
468         mutex_unlock(&ctl_mutex);
469
470         return 0;
471 }
472
473 static const struct block_device_operations rbd_bd_ops = {
474         .owner                  = THIS_MODULE,
475         .open                   = rbd_open,
476         .release                = rbd_release,
477 };
478
479 /*
480  * Initialize an rbd client instance.
481  * We own *ceph_opts.
482  */
483 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
484 {
485         struct rbd_client *rbdc;
486         int ret = -ENOMEM;
487
488         dout("%s:\n", __func__);
489         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
490         if (!rbdc)
491                 goto out_opt;
492
493         kref_init(&rbdc->kref);
494         INIT_LIST_HEAD(&rbdc->node);
495
496         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
497
498         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
499         if (IS_ERR(rbdc->client))
500                 goto out_mutex;
501         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
502
503         ret = ceph_open_session(rbdc->client);
504         if (ret < 0)
505                 goto out_err;
506
507         spin_lock(&rbd_client_list_lock);
508         list_add_tail(&rbdc->node, &rbd_client_list);
509         spin_unlock(&rbd_client_list_lock);
510
511         mutex_unlock(&ctl_mutex);
512         dout("%s: rbdc %p\n", __func__, rbdc);
513
514         return rbdc;
515
516 out_err:
517         ceph_destroy_client(rbdc->client);
518 out_mutex:
519         mutex_unlock(&ctl_mutex);
520         kfree(rbdc);
521 out_opt:
522         if (ceph_opts)
523                 ceph_destroy_options(ceph_opts);
524         dout("%s: error %d\n", __func__, ret);
525
526         return ERR_PTR(ret);
527 }
528
529 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
530 {
531         kref_get(&rbdc->kref);
532
533         return rbdc;
534 }
535
536 /*
537  * Find a ceph client with specific addr and configuration.  If
538  * found, bump its reference count.
539  */
540 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
541 {
542         struct rbd_client *client_node;
543         bool found = false;
544
545         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
546                 return NULL;
547
548         spin_lock(&rbd_client_list_lock);
549         list_for_each_entry(client_node, &rbd_client_list, node) {
550                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
551                         __rbd_get_client(client_node);
552
553                         found = true;
554                         break;
555                 }
556         }
557         spin_unlock(&rbd_client_list_lock);
558
559         return found ? client_node : NULL;
560 }
561
562 /*
563  * mount options
564  */
565 enum {
566         Opt_last_int,
567         /* int args above */
568         Opt_last_string,
569         /* string args above */
570         Opt_read_only,
571         Opt_read_write,
572         /* Boolean args above */
573         Opt_last_bool,
574 };
575
576 static match_table_t rbd_opts_tokens = {
577         /* int args above */
578         /* string args above */
579         {Opt_read_only, "read_only"},
580         {Opt_read_only, "ro"},          /* Alternate spelling */
581         {Opt_read_write, "read_write"},
582         {Opt_read_write, "rw"},         /* Alternate spelling */
583         /* Boolean args above */
584         {-1, NULL}
585 };
586
587 struct rbd_options {
588         bool    read_only;
589 };
590
591 #define RBD_READ_ONLY_DEFAULT   false
592
593 static int parse_rbd_opts_token(char *c, void *private)
594 {
595         struct rbd_options *rbd_opts = private;
596         substring_t argstr[MAX_OPT_ARGS];
597         int token, intval, ret;
598
599         token = match_token(c, rbd_opts_tokens, argstr);
600         if (token < 0)
601                 return -EINVAL;
602
603         if (token < Opt_last_int) {
604                 ret = match_int(&argstr[0], &intval);
605                 if (ret < 0) {
606                         pr_err("bad mount option arg (not int) "
607                                "at '%s'\n", c);
608                         return ret;
609                 }
610                 dout("got int token %d val %d\n", token, intval);
611         } else if (token > Opt_last_int && token < Opt_last_string) {
612                 dout("got string token %d val %s\n", token,
613                      argstr[0].from);
614         } else if (token > Opt_last_string && token < Opt_last_bool) {
615                 dout("got Boolean token %d\n", token);
616         } else {
617                 dout("got token %d\n", token);
618         }
619
620         switch (token) {
621         case Opt_read_only:
622                 rbd_opts->read_only = true;
623                 break;
624         case Opt_read_write:
625                 rbd_opts->read_only = false;
626                 break;
627         default:
628                 rbd_assert(false);
629                 break;
630         }
631         return 0;
632 }
633
634 /*
635  * Get a ceph client with specific addr and configuration, if one does
636  * not exist create it.
637  */
638 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
639 {
640         struct rbd_client *rbdc;
641
642         rbdc = rbd_client_find(ceph_opts);
643         if (rbdc)       /* using an existing client */
644                 ceph_destroy_options(ceph_opts);
645         else
646                 rbdc = rbd_client_create(ceph_opts);
647
648         return rbdc;
649 }
650
651 /*
652  * Destroy ceph client
653  *
654  * Caller must hold rbd_client_list_lock.
655  */
656 static void rbd_client_release(struct kref *kref)
657 {
658         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
659
660         dout("%s: rbdc %p\n", __func__, rbdc);
661         spin_lock(&rbd_client_list_lock);
662         list_del(&rbdc->node);
663         spin_unlock(&rbd_client_list_lock);
664
665         ceph_destroy_client(rbdc->client);
666         kfree(rbdc);
667 }
668
669 /*
670  * Drop reference to ceph client node. If it's not referenced anymore, release
671  * it.
672  */
673 static void rbd_put_client(struct rbd_client *rbdc)
674 {
675         if (rbdc)
676                 kref_put(&rbdc->kref, rbd_client_release);
677 }
678
679 static bool rbd_image_format_valid(u32 image_format)
680 {
681         return image_format == 1 || image_format == 2;
682 }
683
684 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
685 {
686         size_t size;
687         u32 snap_count;
688
689         /* The header has to start with the magic rbd header text */
690         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
691                 return false;
692
693         /* The bio layer requires at least sector-sized I/O */
694
695         if (ondisk->options.order < SECTOR_SHIFT)
696                 return false;
697
698         /* If we use u64 in a few spots we may be able to loosen this */
699
700         if (ondisk->options.order > 8 * sizeof (int) - 1)
701                 return false;
702
703         /*
704          * The size of a snapshot header has to fit in a size_t, and
705          * that limits the number of snapshots.
706          */
707         snap_count = le32_to_cpu(ondisk->snap_count);
708         size = SIZE_MAX - sizeof (struct ceph_snap_context);
709         if (snap_count > size / sizeof (__le64))
710                 return false;
711
712         /*
713          * Not only that, but the size of the entire the snapshot
714          * header must also be representable in a size_t.
715          */
716         size -= snap_count * sizeof (__le64);
717         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
718                 return false;
719
720         return true;
721 }
722
723 /*
724  * Create a new header structure, translate header format from the on-disk
725  * header.
726  */
727 static int rbd_header_from_disk(struct rbd_image_header *header,
728                                  struct rbd_image_header_ondisk *ondisk)
729 {
730         u32 snap_count;
731         size_t len;
732         size_t size;
733         u32 i;
734
735         memset(header, 0, sizeof (*header));
736
737         snap_count = le32_to_cpu(ondisk->snap_count);
738
739         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
740         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
741         if (!header->object_prefix)
742                 return -ENOMEM;
743         memcpy(header->object_prefix, ondisk->object_prefix, len);
744         header->object_prefix[len] = '\0';
745
746         if (snap_count) {
747                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
748
749                 /* Save a copy of the snapshot names */
750
751                 if (snap_names_len > (u64) SIZE_MAX)
752                         return -EIO;
753                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
754                 if (!header->snap_names)
755                         goto out_err;
756                 /*
757                  * Note that rbd_dev_v1_header_read() guarantees
758                  * the ondisk buffer we're working with has
759                  * snap_names_len bytes beyond the end of the
760                  * snapshot id array, this memcpy() is safe.
761                  */
762                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
763                         snap_names_len);
764
765                 /* Record each snapshot's size */
766
767                 size = snap_count * sizeof (*header->snap_sizes);
768                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
769                 if (!header->snap_sizes)
770                         goto out_err;
771                 for (i = 0; i < snap_count; i++)
772                         header->snap_sizes[i] =
773                                 le64_to_cpu(ondisk->snaps[i].image_size);
774         } else {
775                 WARN_ON(ondisk->snap_names_len);
776                 header->snap_names = NULL;
777                 header->snap_sizes = NULL;
778         }
779
780         header->features = 0;   /* No features support in v1 images */
781         header->obj_order = ondisk->options.order;
782         header->crypt_type = ondisk->options.crypt_type;
783         header->comp_type = ondisk->options.comp_type;
784
785         /* Allocate and fill in the snapshot context */
786
787         header->image_size = le64_to_cpu(ondisk->image_size);
788         size = sizeof (struct ceph_snap_context);
789         size += snap_count * sizeof (header->snapc->snaps[0]);
790         header->snapc = kzalloc(size, GFP_KERNEL);
791         if (!header->snapc)
792                 goto out_err;
793
794         atomic_set(&header->snapc->nref, 1);
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         header->snapc->num_snaps = snap_count;
797         for (i = 0; i < snap_count; i++)
798                 header->snapc->snaps[i] =
799                         le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815 {
816         struct rbd_snap *snap;
817
818         if (snap_id == CEPH_NOSNAP)
819                 return RBD_SNAP_HEAD_NAME;
820
821         list_for_each_entry(snap, &rbd_dev->snaps, node)
822                 if (snap_id == snap->id)
823                         return snap->name;
824
825         return NULL;
826 }
827
828 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
829 {
830
831         struct rbd_snap *snap;
832
833         list_for_each_entry(snap, &rbd_dev->snaps, node) {
834                 if (!strcmp(snap_name, snap->name)) {
835                         rbd_dev->spec->snap_id = snap->id;
836                         rbd_dev->mapping.size = snap->size;
837                         rbd_dev->mapping.features = snap->features;
838
839                         return 0;
840                 }
841         }
842
843         return -ENOENT;
844 }
845
846 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
847 {
848         int ret;
849
850         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
851                     sizeof (RBD_SNAP_HEAD_NAME))) {
852                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
853                 rbd_dev->mapping.size = rbd_dev->header.image_size;
854                 rbd_dev->mapping.features = rbd_dev->header.features;
855                 ret = 0;
856         } else {
857                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
858                 if (ret < 0)
859                         goto done;
860                 rbd_dev->mapping.read_only = true;
861         }
862         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
863
864 done:
865         return ret;
866 }
867
868 static void rbd_header_free(struct rbd_image_header *header)
869 {
870         kfree(header->object_prefix);
871         header->object_prefix = NULL;
872         kfree(header->snap_sizes);
873         header->snap_sizes = NULL;
874         kfree(header->snap_names);
875         header->snap_names = NULL;
876         ceph_put_snap_context(header->snapc);
877         header->snapc = NULL;
878 }
879
880 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
881 {
882         char *name;
883         u64 segment;
884         int ret;
885
886         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
887         if (!name)
888                 return NULL;
889         segment = offset >> rbd_dev->header.obj_order;
890         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
891                         rbd_dev->header.object_prefix, segment);
892         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
893                 pr_err("error formatting segment name for #%llu (%d)\n",
894                         segment, ret);
895                 kfree(name);
896                 name = NULL;
897         }
898
899         return name;
900 }
901
902 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
903 {
904         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906         return offset & (segment_size - 1);
907 }
908
909 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
910                                 u64 offset, u64 length)
911 {
912         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
913
914         offset &= segment_size - 1;
915
916         rbd_assert(length <= U64_MAX - offset);
917         if (offset + length > segment_size)
918                 length = segment_size - offset;
919
920         return length;
921 }
922
923 /*
924  * returns the size of an object in the image
925  */
926 static u64 rbd_obj_bytes(struct rbd_image_header *header)
927 {
928         return 1 << header->obj_order;
929 }
930
931 /*
932  * bio helpers
933  */
934
935 static void bio_chain_put(struct bio *chain)
936 {
937         struct bio *tmp;
938
939         while (chain) {
940                 tmp = chain;
941                 chain = chain->bi_next;
942                 bio_put(tmp);
943         }
944 }
945
946 /*
947  * zeros a bio chain, starting at specific offset
948  */
949 static void zero_bio_chain(struct bio *chain, int start_ofs)
950 {
951         struct bio_vec *bv;
952         unsigned long flags;
953         void *buf;
954         int i;
955         int pos = 0;
956
957         while (chain) {
958                 bio_for_each_segment(bv, chain, i) {
959                         if (pos + bv->bv_len > start_ofs) {
960                                 int remainder = max(start_ofs - pos, 0);
961                                 buf = bvec_kmap_irq(bv, &flags);
962                                 memset(buf + remainder, 0,
963                                        bv->bv_len - remainder);
964                                 bvec_kunmap_irq(buf, &flags);
965                         }
966                         pos += bv->bv_len;
967                 }
968
969                 chain = chain->bi_next;
970         }
971 }
972
973 /*
974  * Clone a portion of a bio, starting at the given byte offset
975  * and continuing for the number of bytes indicated.
976  */
977 static struct bio *bio_clone_range(struct bio *bio_src,
978                                         unsigned int offset,
979                                         unsigned int len,
980                                         gfp_t gfpmask)
981 {
982         struct bio_vec *bv;
983         unsigned int resid;
984         unsigned short idx;
985         unsigned int voff;
986         unsigned short end_idx;
987         unsigned short vcnt;
988         struct bio *bio;
989
990         /* Handle the easy case for the caller */
991
992         if (!offset && len == bio_src->bi_size)
993                 return bio_clone(bio_src, gfpmask);
994
995         if (WARN_ON_ONCE(!len))
996                 return NULL;
997         if (WARN_ON_ONCE(len > bio_src->bi_size))
998                 return NULL;
999         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1000                 return NULL;
1001
1002         /* Find first affected segment... */
1003
1004         resid = offset;
1005         __bio_for_each_segment(bv, bio_src, idx, 0) {
1006                 if (resid < bv->bv_len)
1007                         break;
1008                 resid -= bv->bv_len;
1009         }
1010         voff = resid;
1011
1012         /* ...and the last affected segment */
1013
1014         resid += len;
1015         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1016                 if (resid <= bv->bv_len)
1017                         break;
1018                 resid -= bv->bv_len;
1019         }
1020         vcnt = end_idx - idx + 1;
1021
1022         /* Build the clone */
1023
1024         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1025         if (!bio)
1026                 return NULL;    /* ENOMEM */
1027
1028         bio->bi_bdev = bio_src->bi_bdev;
1029         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1030         bio->bi_rw = bio_src->bi_rw;
1031         bio->bi_flags |= 1 << BIO_CLONED;
1032
1033         /*
1034          * Copy over our part of the bio_vec, then update the first
1035          * and last (or only) entries.
1036          */
1037         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1038                         vcnt * sizeof (struct bio_vec));
1039         bio->bi_io_vec[0].bv_offset += voff;
1040         if (vcnt > 1) {
1041                 bio->bi_io_vec[0].bv_len -= voff;
1042                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1043         } else {
1044                 bio->bi_io_vec[0].bv_len = len;
1045         }
1046
1047         bio->bi_vcnt = vcnt;
1048         bio->bi_size = len;
1049         bio->bi_idx = 0;
1050
1051         return bio;
1052 }
1053
1054 /*
1055  * Clone a portion of a bio chain, starting at the given byte offset
1056  * into the first bio in the source chain and continuing for the
1057  * number of bytes indicated.  The result is another bio chain of
1058  * exactly the given length, or a null pointer on error.
1059  *
1060  * The bio_src and offset parameters are both in-out.  On entry they
1061  * refer to the first source bio and the offset into that bio where
1062  * the start of data to be cloned is located.
1063  *
1064  * On return, bio_src is updated to refer to the bio in the source
1065  * chain that contains first un-cloned byte, and *offset will
1066  * contain the offset of that byte within that bio.
1067  */
1068 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1069                                         unsigned int *offset,
1070                                         unsigned int len,
1071                                         gfp_t gfpmask)
1072 {
1073         struct bio *bi = *bio_src;
1074         unsigned int off = *offset;
1075         struct bio *chain = NULL;
1076         struct bio **end;
1077
1078         /* Build up a chain of clone bios up to the limit */
1079
1080         if (!bi || off >= bi->bi_size || !len)
1081                 return NULL;            /* Nothing to clone */
1082
1083         end = &chain;
1084         while (len) {
1085                 unsigned int bi_size;
1086                 struct bio *bio;
1087
1088                 if (!bi) {
1089                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1090                         goto out_err;   /* EINVAL; ran out of bio's */
1091                 }
1092                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1093                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1094                 if (!bio)
1095                         goto out_err;   /* ENOMEM */
1096
1097                 *end = bio;
1098                 end = &bio->bi_next;
1099
1100                 off += bi_size;
1101                 if (off == bi->bi_size) {
1102                         bi = bi->bi_next;
1103                         off = 0;
1104                 }
1105                 len -= bi_size;
1106         }
1107         *bio_src = bi;
1108         *offset = off;
1109
1110         return chain;
1111 out_err:
1112         bio_chain_put(chain);
1113
1114         return NULL;
1115 }
1116
1117 /*
1118  * The default/initial value for all object request flags is 0.  For
1119  * each flag, once its value is set to 1 it is never reset to 0
1120  * again.
1121  */
1122 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1123 {
1124         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1125                 struct rbd_device *rbd_dev;
1126
1127                 rbd_dev = obj_request->img_request->rbd_dev;
1128                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1129                         obj_request);
1130         }
1131 }
1132
1133 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1134 {
1135         smp_mb();
1136         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1137 }
1138
1139 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1140 {
1141         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1142                 struct rbd_device *rbd_dev = NULL;
1143
1144                 if (obj_request_img_data_test(obj_request))
1145                         rbd_dev = obj_request->img_request->rbd_dev;
1146                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1147                         obj_request);
1148         }
1149 }
1150
1151 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1152 {
1153         smp_mb();
1154         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1155 }
1156
1157 /*
1158  * This sets the KNOWN flag after (possibly) setting the EXISTS
1159  * flag.  The latter is set based on the "exists" value provided.
1160  *
1161  * Note that for our purposes once an object exists it never goes
1162  * away again.  It's possible that the response from two existence
1163  * checks are separated by the creation of the target object, and
1164  * the first ("doesn't exist") response arrives *after* the second
1165  * ("does exist").  In that case we ignore the second one.
1166  */
1167 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1168                                 bool exists)
1169 {
1170         if (exists)
1171                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1172         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1173         smp_mb();
1174 }
1175
1176 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1177 {
1178         smp_mb();
1179         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1180 }
1181
1182 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1183 {
1184         smp_mb();
1185         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1186 }
1187
1188 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1189 {
1190         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1191                 atomic_read(&obj_request->kref.refcount));
1192         kref_get(&obj_request->kref);
1193 }
1194
1195 static void rbd_obj_request_destroy(struct kref *kref);
1196 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1197 {
1198         rbd_assert(obj_request != NULL);
1199         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1200                 atomic_read(&obj_request->kref.refcount));
1201         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1202 }
1203
1204 static void rbd_img_request_get(struct rbd_img_request *img_request)
1205 {
1206         dout("%s: img %p (was %d)\n", __func__, img_request,
1207                 atomic_read(&img_request->kref.refcount));
1208         kref_get(&img_request->kref);
1209 }
1210
1211 static void rbd_img_request_destroy(struct kref *kref);
1212 static void rbd_img_request_put(struct rbd_img_request *img_request)
1213 {
1214         rbd_assert(img_request != NULL);
1215         dout("%s: img %p (was %d)\n", __func__, img_request,
1216                 atomic_read(&img_request->kref.refcount));
1217         kref_put(&img_request->kref, rbd_img_request_destroy);
1218 }
1219
1220 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1221                                         struct rbd_obj_request *obj_request)
1222 {
1223         rbd_assert(obj_request->img_request == NULL);
1224
1225         /* Image request now owns object's original reference */
1226         obj_request->img_request = img_request;
1227         obj_request->which = img_request->obj_request_count;
1228         rbd_assert(!obj_request_img_data_test(obj_request));
1229         obj_request_img_data_set(obj_request);
1230         rbd_assert(obj_request->which != BAD_WHICH);
1231         img_request->obj_request_count++;
1232         list_add_tail(&obj_request->links, &img_request->obj_requests);
1233         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1234                 obj_request->which);
1235 }
1236
1237 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1238                                         struct rbd_obj_request *obj_request)
1239 {
1240         rbd_assert(obj_request->which != BAD_WHICH);
1241
1242         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1243                 obj_request->which);
1244         list_del(&obj_request->links);
1245         rbd_assert(img_request->obj_request_count > 0);
1246         img_request->obj_request_count--;
1247         rbd_assert(obj_request->which == img_request->obj_request_count);
1248         obj_request->which = BAD_WHICH;
1249         rbd_assert(obj_request_img_data_test(obj_request));
1250         rbd_assert(obj_request->img_request == img_request);
1251         obj_request->img_request = NULL;
1252         obj_request->callback = NULL;
1253         rbd_obj_request_put(obj_request);
1254 }
1255
1256 static bool obj_request_type_valid(enum obj_request_type type)
1257 {
1258         switch (type) {
1259         case OBJ_REQUEST_NODATA:
1260         case OBJ_REQUEST_BIO:
1261         case OBJ_REQUEST_PAGES:
1262                 return true;
1263         default:
1264                 return false;
1265         }
1266 }
1267
1268 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1269                                 struct rbd_obj_request *obj_request)
1270 {
1271         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1272
1273         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1274 }
1275
1276 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1277 {
1278
1279         dout("%s: img %p\n", __func__, img_request);
1280
1281         /*
1282          * If no error occurred, compute the aggregate transfer
1283          * count for the image request.  We could instead use
1284          * atomic64_cmpxchg() to update it as each object request
1285          * completes; not clear which way is better off hand.
1286          */
1287         if (!img_request->result) {
1288                 struct rbd_obj_request *obj_request;
1289                 u64 xferred = 0;
1290
1291                 for_each_obj_request(img_request, obj_request)
1292                         xferred += obj_request->xferred;
1293                 img_request->xferred = xferred;
1294         }
1295
1296         if (img_request->callback)
1297                 img_request->callback(img_request);
1298         else
1299                 rbd_img_request_put(img_request);
1300 }
1301
1302 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1303
1304 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1305 {
1306         dout("%s: obj %p\n", __func__, obj_request);
1307
1308         return wait_for_completion_interruptible(&obj_request->completion);
1309 }
1310
1311 /*
1312  * The default/initial value for all image request flags is 0.  Each
1313  * is conditionally set to 1 at image request initialization time
1314  * and currently never change thereafter.
1315  */
1316 static void img_request_write_set(struct rbd_img_request *img_request)
1317 {
1318         set_bit(IMG_REQ_WRITE, &img_request->flags);
1319         smp_mb();
1320 }
1321
1322 static bool img_request_write_test(struct rbd_img_request *img_request)
1323 {
1324         smp_mb();
1325         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1326 }
1327
1328 static void img_request_child_set(struct rbd_img_request *img_request)
1329 {
1330         set_bit(IMG_REQ_CHILD, &img_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool img_request_child_test(struct rbd_img_request *img_request)
1335 {
1336         smp_mb();
1337         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1338 }
1339
1340 static void img_request_layered_set(struct rbd_img_request *img_request)
1341 {
1342         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1343         smp_mb();
1344 }
1345
1346 static bool img_request_layered_test(struct rbd_img_request *img_request)
1347 {
1348         smp_mb();
1349         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1350 }
1351
1352 static void
1353 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1354 {
1355         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1356                 obj_request, obj_request->img_request, obj_request->result,
1357                 obj_request->xferred, obj_request->length);
1358         /*
1359          * ENOENT means a hole in the image.  We zero-fill the
1360          * entire length of the request.  A short read also implies
1361          * zero-fill to the end of the request.  Either way we
1362          * update the xferred count to indicate the whole request
1363          * was satisfied.
1364          */
1365         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1366         if (obj_request->result == -ENOENT) {
1367                 zero_bio_chain(obj_request->bio_list, 0);
1368                 obj_request->result = 0;
1369                 obj_request->xferred = obj_request->length;
1370         } else if (obj_request->xferred < obj_request->length &&
1371                         !obj_request->result) {
1372                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1373                 obj_request->xferred = obj_request->length;
1374         }
1375         obj_request_done_set(obj_request);
1376 }
1377
1378 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1379 {
1380         dout("%s: obj %p cb %p\n", __func__, obj_request,
1381                 obj_request->callback);
1382         if (obj_request->callback)
1383                 obj_request->callback(obj_request);
1384         else
1385                 complete_all(&obj_request->completion);
1386 }
1387
1388 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1389 {
1390         dout("%s: obj %p\n", __func__, obj_request);
1391         obj_request_done_set(obj_request);
1392 }
1393
1394 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1395 {
1396         struct rbd_img_request *img_request = NULL;
1397         bool layered = false;
1398
1399         if (obj_request_img_data_test(obj_request)) {
1400                 img_request = obj_request->img_request;
1401                 layered = img_request && img_request_layered_test(img_request);
1402         } else {
1403                 img_request = NULL;
1404                 layered = false;
1405         }
1406
1407         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1408                 obj_request, img_request, obj_request->result,
1409                 obj_request->xferred, obj_request->length);
1410         if (layered && obj_request->result == -ENOENT)
1411                 rbd_img_parent_read(obj_request);
1412         else if (img_request)
1413                 rbd_img_obj_request_read_callback(obj_request);
1414         else
1415                 obj_request_done_set(obj_request);
1416 }
1417
1418 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1419 {
1420         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1421                 obj_request->result, obj_request->length);
1422         /*
1423          * There is no such thing as a successful short write.  Set
1424          * it to our originally-requested length.
1425          */
1426         obj_request->xferred = obj_request->length;
1427         obj_request_done_set(obj_request);
1428 }
1429
1430 /*
1431  * For a simple stat call there's nothing to do.  We'll do more if
1432  * this is part of a write sequence for a layered image.
1433  */
1434 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1435 {
1436         dout("%s: obj %p\n", __func__, obj_request);
1437         obj_request_done_set(obj_request);
1438 }
1439
1440 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1441                                 struct ceph_msg *msg)
1442 {
1443         struct rbd_obj_request *obj_request = osd_req->r_priv;
1444         u16 opcode;
1445
1446         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1447         rbd_assert(osd_req == obj_request->osd_req);
1448         if (obj_request_img_data_test(obj_request)) {
1449                 rbd_assert(obj_request->img_request);
1450                 rbd_assert(obj_request->which != BAD_WHICH);
1451         } else {
1452                 rbd_assert(obj_request->which == BAD_WHICH);
1453         }
1454
1455         if (osd_req->r_result < 0)
1456                 obj_request->result = osd_req->r_result;
1457         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1458
1459         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1460
1461         /*
1462          * We support a 64-bit length, but ultimately it has to be
1463          * passed to blk_end_request(), which takes an unsigned int.
1464          */
1465         obj_request->xferred = osd_req->r_reply_op_len[0];
1466         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1467         opcode = osd_req->r_ops[0].op;
1468         switch (opcode) {
1469         case CEPH_OSD_OP_READ:
1470                 rbd_osd_read_callback(obj_request);
1471                 break;
1472         case CEPH_OSD_OP_WRITE:
1473                 rbd_osd_write_callback(obj_request);
1474                 break;
1475         case CEPH_OSD_OP_STAT:
1476                 rbd_osd_stat_callback(obj_request);
1477                 break;
1478         case CEPH_OSD_OP_CALL:
1479         case CEPH_OSD_OP_NOTIFY_ACK:
1480         case CEPH_OSD_OP_WATCH:
1481                 rbd_osd_trivial_callback(obj_request);
1482                 break;
1483         default:
1484                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1485                         obj_request->object_name, (unsigned short) opcode);
1486                 break;
1487         }
1488
1489         if (obj_request_done_test(obj_request))
1490                 rbd_obj_request_complete(obj_request);
1491 }
1492
1493 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1494 {
1495         struct rbd_img_request *img_request = obj_request->img_request;
1496         struct ceph_osd_request *osd_req = obj_request->osd_req;
1497         u64 snap_id;
1498
1499         rbd_assert(osd_req != NULL);
1500
1501         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1502         ceph_osdc_build_request(osd_req, obj_request->offset,
1503                         NULL, snap_id, NULL);
1504 }
1505
1506 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1507 {
1508         struct rbd_img_request *img_request = obj_request->img_request;
1509         struct ceph_osd_request *osd_req = obj_request->osd_req;
1510         struct ceph_snap_context *snapc;
1511         struct timespec mtime = CURRENT_TIME;
1512
1513         rbd_assert(osd_req != NULL);
1514
1515         snapc = img_request ? img_request->snapc : NULL;
1516         ceph_osdc_build_request(osd_req, obj_request->offset,
1517                         snapc, CEPH_NOSNAP, &mtime);
1518 }
1519
1520 static struct ceph_osd_request *rbd_osd_req_create(
1521                                         struct rbd_device *rbd_dev,
1522                                         bool write_request,
1523                                         struct rbd_obj_request *obj_request)
1524 {
1525         struct ceph_snap_context *snapc = NULL;
1526         struct ceph_osd_client *osdc;
1527         struct ceph_osd_request *osd_req;
1528
1529         if (obj_request_img_data_test(obj_request)) {
1530                 struct rbd_img_request *img_request = obj_request->img_request;
1531
1532                 rbd_assert(write_request ==
1533                                 img_request_write_test(img_request));
1534                 if (write_request)
1535                         snapc = img_request->snapc;
1536         }
1537
1538         /* Allocate and initialize the request, for the single op */
1539
1540         osdc = &rbd_dev->rbd_client->client->osdc;
1541         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1542         if (!osd_req)
1543                 return NULL;    /* ENOMEM */
1544
1545         if (write_request)
1546                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1547         else
1548                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1549
1550         osd_req->r_callback = rbd_osd_req_callback;
1551         osd_req->r_priv = obj_request;
1552
1553         osd_req->r_oid_len = strlen(obj_request->object_name);
1554         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1555         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1556
1557         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1558
1559         return osd_req;
1560 }
1561
1562 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1563 {
1564         ceph_osdc_put_request(osd_req);
1565 }
1566
1567 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1568
1569 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1570                                                 u64 offset, u64 length,
1571                                                 enum obj_request_type type)
1572 {
1573         struct rbd_obj_request *obj_request;
1574         size_t size;
1575         char *name;
1576
1577         rbd_assert(obj_request_type_valid(type));
1578
1579         size = strlen(object_name) + 1;
1580         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1581         if (!obj_request)
1582                 return NULL;
1583
1584         name = (char *)(obj_request + 1);
1585         obj_request->object_name = memcpy(name, object_name, size);
1586         obj_request->offset = offset;
1587         obj_request->length = length;
1588         obj_request->flags = 0;
1589         obj_request->which = BAD_WHICH;
1590         obj_request->type = type;
1591         INIT_LIST_HEAD(&obj_request->links);
1592         init_completion(&obj_request->completion);
1593         kref_init(&obj_request->kref);
1594
1595         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1596                 offset, length, (int)type, obj_request);
1597
1598         return obj_request;
1599 }
1600
1601 static void rbd_obj_request_destroy(struct kref *kref)
1602 {
1603         struct rbd_obj_request *obj_request;
1604
1605         obj_request = container_of(kref, struct rbd_obj_request, kref);
1606
1607         dout("%s: obj %p\n", __func__, obj_request);
1608
1609         rbd_assert(obj_request->img_request == NULL);
1610         rbd_assert(obj_request->which == BAD_WHICH);
1611
1612         if (obj_request->osd_req)
1613                 rbd_osd_req_destroy(obj_request->osd_req);
1614
1615         rbd_assert(obj_request_type_valid(obj_request->type));
1616         switch (obj_request->type) {
1617         case OBJ_REQUEST_NODATA:
1618                 break;          /* Nothing to do */
1619         case OBJ_REQUEST_BIO:
1620                 if (obj_request->bio_list)
1621                         bio_chain_put(obj_request->bio_list);
1622                 break;
1623         case OBJ_REQUEST_PAGES:
1624                 if (obj_request->pages)
1625                         ceph_release_page_vector(obj_request->pages,
1626                                                 obj_request->page_count);
1627                 break;
1628         }
1629
1630         kfree(obj_request);
1631 }
1632
1633 /*
1634  * Caller is responsible for filling in the list of object requests
1635  * that comprises the image request, and the Linux request pointer
1636  * (if there is one).
1637  */
1638 static struct rbd_img_request *rbd_img_request_create(
1639                                         struct rbd_device *rbd_dev,
1640                                         u64 offset, u64 length,
1641                                         bool write_request,
1642                                         bool child_request)
1643 {
1644         struct rbd_img_request *img_request;
1645         struct ceph_snap_context *snapc = NULL;
1646
1647         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1648         if (!img_request)
1649                 return NULL;
1650
1651         if (write_request) {
1652                 down_read(&rbd_dev->header_rwsem);
1653                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1654                 up_read(&rbd_dev->header_rwsem);
1655                 if (WARN_ON(!snapc)) {
1656                         kfree(img_request);
1657                         return NULL;    /* Shouldn't happen */
1658                 }
1659
1660         }
1661
1662         img_request->rq = NULL;
1663         img_request->rbd_dev = rbd_dev;
1664         img_request->offset = offset;
1665         img_request->length = length;
1666         img_request->flags = 0;
1667         if (write_request) {
1668                 img_request_write_set(img_request);
1669                 img_request->snapc = snapc;
1670         } else {
1671                 img_request->snap_id = rbd_dev->spec->snap_id;
1672         }
1673         if (child_request)
1674                 img_request_child_set(img_request);
1675         if (rbd_dev->parent_spec)
1676                 img_request_layered_set(img_request);
1677         spin_lock_init(&img_request->completion_lock);
1678         img_request->next_completion = 0;
1679         img_request->callback = NULL;
1680         img_request->result = 0;
1681         img_request->obj_request_count = 0;
1682         INIT_LIST_HEAD(&img_request->obj_requests);
1683         kref_init(&img_request->kref);
1684
1685         rbd_img_request_get(img_request);       /* Avoid a warning */
1686         rbd_img_request_put(img_request);       /* TEMPORARY */
1687
1688         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1689                 write_request ? "write" : "read", offset, length,
1690                 img_request);
1691
1692         return img_request;
1693 }
1694
1695 static void rbd_img_request_destroy(struct kref *kref)
1696 {
1697         struct rbd_img_request *img_request;
1698         struct rbd_obj_request *obj_request;
1699         struct rbd_obj_request *next_obj_request;
1700
1701         img_request = container_of(kref, struct rbd_img_request, kref);
1702
1703         dout("%s: img %p\n", __func__, img_request);
1704
1705         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1706                 rbd_img_obj_request_del(img_request, obj_request);
1707         rbd_assert(img_request->obj_request_count == 0);
1708
1709         if (img_request_write_test(img_request))
1710                 ceph_put_snap_context(img_request->snapc);
1711
1712         if (img_request_child_test(img_request))
1713                 rbd_obj_request_put(img_request->obj_request);
1714
1715         kfree(img_request);
1716 }
1717
1718 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1719 {
1720         struct rbd_img_request *img_request;
1721         unsigned int xferred;
1722         int result;
1723         bool more;
1724
1725         rbd_assert(obj_request_img_data_test(obj_request));
1726         img_request = obj_request->img_request;
1727
1728         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1729         xferred = (unsigned int)obj_request->xferred;
1730         result = obj_request->result;
1731         if (result) {
1732                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1733
1734                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1735                         img_request_write_test(img_request) ? "write" : "read",
1736                         obj_request->length, obj_request->img_offset,
1737                         obj_request->offset);
1738                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1739                         result, xferred);
1740                 if (!img_request->result)
1741                         img_request->result = result;
1742         }
1743
1744         if (img_request_child_test(img_request)) {
1745                 rbd_assert(img_request->obj_request != NULL);
1746                 more = obj_request->which < img_request->obj_request_count - 1;
1747         } else {
1748                 rbd_assert(img_request->rq != NULL);
1749                 more = blk_end_request(img_request->rq, result, xferred);
1750         }
1751
1752         return more;
1753 }
1754
1755 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1756 {
1757         struct rbd_img_request *img_request;
1758         u32 which = obj_request->which;
1759         bool more = true;
1760
1761         rbd_assert(obj_request_img_data_test(obj_request));
1762         img_request = obj_request->img_request;
1763
1764         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1765         rbd_assert(img_request != NULL);
1766         rbd_assert(img_request->obj_request_count > 0);
1767         rbd_assert(which != BAD_WHICH);
1768         rbd_assert(which < img_request->obj_request_count);
1769         rbd_assert(which >= img_request->next_completion);
1770
1771         spin_lock_irq(&img_request->completion_lock);
1772         if (which != img_request->next_completion)
1773                 goto out;
1774
1775         for_each_obj_request_from(img_request, obj_request) {
1776                 rbd_assert(more);
1777                 rbd_assert(which < img_request->obj_request_count);
1778
1779                 if (!obj_request_done_test(obj_request))
1780                         break;
1781                 more = rbd_img_obj_end_request(obj_request);
1782                 which++;
1783         }
1784
1785         rbd_assert(more ^ (which == img_request->obj_request_count));
1786         img_request->next_completion = which;
1787 out:
1788         spin_unlock_irq(&img_request->completion_lock);
1789
1790         if (!more)
1791                 rbd_img_request_complete(img_request);
1792 }
1793
1794 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1795                                         struct bio *bio_list)
1796 {
1797         struct rbd_device *rbd_dev = img_request->rbd_dev;
1798         struct rbd_obj_request *obj_request = NULL;
1799         struct rbd_obj_request *next_obj_request;
1800         bool write_request = img_request_write_test(img_request);
1801         unsigned int bio_offset;
1802         u64 img_offset;
1803         u64 resid;
1804         u16 opcode;
1805
1806         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1807
1808         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1809         bio_offset = 0;
1810         img_offset = img_request->offset;
1811         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1812         resid = img_request->length;
1813         rbd_assert(resid > 0);
1814         while (resid) {
1815                 struct ceph_osd_request *osd_req;
1816                 const char *object_name;
1817                 unsigned int clone_size;
1818                 u64 offset;
1819                 u64 length;
1820
1821                 object_name = rbd_segment_name(rbd_dev, img_offset);
1822                 if (!object_name)
1823                         goto out_unwind;
1824                 offset = rbd_segment_offset(rbd_dev, img_offset);
1825                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1826                 obj_request = rbd_obj_request_create(object_name,
1827                                                 offset, length,
1828                                                 OBJ_REQUEST_BIO);
1829                 kfree(object_name);     /* object request has its own copy */
1830                 if (!obj_request)
1831                         goto out_unwind;
1832
1833                 rbd_assert(length <= (u64) UINT_MAX);
1834                 clone_size = (unsigned int) length;
1835                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1836                                                 &bio_offset, clone_size,
1837                                                 GFP_ATOMIC);
1838                 if (!obj_request->bio_list)
1839                         goto out_partial;
1840
1841                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1842                                                 obj_request);
1843                 if (!osd_req)
1844                         goto out_partial;
1845                 obj_request->osd_req = osd_req;
1846                 obj_request->callback = rbd_img_obj_callback;
1847
1848                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1849                                                 0, 0);
1850                 osd_req_op_extent_osd_data_bio(osd_req, 0,
1851                                 obj_request->bio_list, obj_request->length);
1852
1853                 if (write_request)
1854                         rbd_osd_req_format_write(obj_request);
1855                 else
1856                         rbd_osd_req_format_read(obj_request);
1857
1858                 obj_request->img_offset = img_offset;
1859                 rbd_img_obj_request_add(img_request, obj_request);
1860
1861                 img_offset += length;
1862                 resid -= length;
1863         }
1864
1865         return 0;
1866
1867 out_partial:
1868         rbd_obj_request_put(obj_request);
1869 out_unwind:
1870         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1871                 rbd_obj_request_put(obj_request);
1872
1873         return -ENOMEM;
1874 }
1875
1876 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
1877 {
1878         struct rbd_obj_request *orig_request;
1879         int result;
1880
1881         rbd_assert(!obj_request_img_data_test(obj_request));
1882
1883         /*
1884          * All we need from the object request is the original
1885          * request and the result of the STAT op.  Grab those, then
1886          * we're done with the request.
1887          */
1888         orig_request = obj_request->obj_request;
1889         obj_request->obj_request = NULL;
1890         rbd_assert(orig_request);
1891         rbd_assert(orig_request->img_request);
1892
1893         result = obj_request->result;
1894         obj_request->result = 0;
1895
1896         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
1897                 obj_request, orig_request, result,
1898                 obj_request->xferred, obj_request->length);
1899         rbd_obj_request_put(obj_request);
1900
1901         rbd_assert(orig_request);
1902         rbd_assert(orig_request->img_request);
1903
1904         /*
1905          * Our only purpose here is to determine whether the object
1906          * exists, and we don't want to treat the non-existence as
1907          * an error.  If something else comes back, transfer the
1908          * error to the original request and complete it now.
1909          */
1910         if (!result) {
1911                 obj_request_existence_set(orig_request, true);
1912         } else if (result == -ENOENT) {
1913                 obj_request_existence_set(orig_request, false);
1914         } else if (result) {
1915                 orig_request->result = result;
1916                 goto out_err;
1917         }
1918
1919         /*
1920          * Resubmit the original request now that we have recorded
1921          * whether the target object exists.
1922          */
1923         orig_request->result = rbd_img_obj_request_submit(orig_request);
1924 out_err:
1925         if (orig_request->result)
1926                 rbd_obj_request_complete(orig_request);
1927         rbd_obj_request_put(orig_request);
1928 }
1929
1930 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
1931 {
1932         struct rbd_obj_request *stat_request;
1933         struct rbd_device *rbd_dev;
1934         struct ceph_osd_client *osdc;
1935         struct page **pages = NULL;
1936         u32 page_count;
1937         size_t size;
1938         int ret;
1939
1940         /*
1941          * The response data for a STAT call consists of:
1942          *     le64 length;
1943          *     struct {
1944          *         le32 tv_sec;
1945          *         le32 tv_nsec;
1946          *     } mtime;
1947          */
1948         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
1949         page_count = (u32)calc_pages_for(0, size);
1950         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1951         if (IS_ERR(pages))
1952                 return PTR_ERR(pages);
1953
1954         ret = -ENOMEM;
1955         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
1956                                                         OBJ_REQUEST_PAGES);
1957         if (!stat_request)
1958                 goto out;
1959
1960         rbd_obj_request_get(obj_request);
1961         stat_request->obj_request = obj_request;
1962         stat_request->pages = pages;
1963         stat_request->page_count = page_count;
1964
1965         rbd_assert(obj_request->img_request);
1966         rbd_dev = obj_request->img_request->rbd_dev;
1967         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1968                                                 stat_request);
1969         if (!stat_request->osd_req)
1970                 goto out;
1971         stat_request->callback = rbd_img_obj_exists_callback;
1972
1973         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
1974         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
1975                                         false, false);
1976         rbd_osd_req_format_read(stat_request);
1977
1978         osdc = &rbd_dev->rbd_client->client->osdc;
1979         ret = rbd_obj_request_submit(osdc, stat_request);
1980 out:
1981         if (ret)
1982                 rbd_obj_request_put(obj_request);
1983
1984         return ret;
1985 }
1986
1987 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
1988 {
1989         struct rbd_img_request *img_request;
1990
1991         rbd_assert(obj_request_img_data_test(obj_request));
1992
1993         img_request = obj_request->img_request;
1994         rbd_assert(img_request);
1995
1996         /* (At the moment we don't care whether it exists or not...) */
1997         (void) obj_request_exists_test;
1998
1999         /*
2000          * Only layered writes need special handling.  If it's not a
2001          * layered write, or it is a layered write but we know the
2002          * target object exists, it's no different from any other
2003          * object request.
2004          */
2005         if (!img_request_write_test(img_request) ||
2006                 !img_request_layered_test(img_request) ||
2007                 obj_request_known_test(obj_request)) {
2008
2009                 struct rbd_device *rbd_dev;
2010                 struct ceph_osd_client *osdc;
2011
2012                 rbd_dev = obj_request->img_request->rbd_dev;
2013                 osdc = &rbd_dev->rbd_client->client->osdc;
2014
2015                 return rbd_obj_request_submit(osdc, obj_request);
2016         }
2017
2018         /*
2019          * It's a layered write and we don't know whether the target
2020          * exists.  Issue existence check; once that completes the
2021          * original request will be submitted again.
2022          */
2023
2024         return rbd_img_obj_exists_submit(obj_request);
2025 }
2026
2027 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2028 {
2029         struct rbd_obj_request *obj_request;
2030         struct rbd_obj_request *next_obj_request;
2031
2032         dout("%s: img %p\n", __func__, img_request);
2033         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2034                 int ret;
2035
2036                 ret = rbd_img_obj_request_submit(obj_request);
2037                 if (ret)
2038                         return ret;
2039         }
2040
2041         return 0;
2042 }
2043
2044 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2045 {
2046         struct rbd_obj_request *obj_request;
2047
2048         rbd_assert(img_request_child_test(img_request));
2049
2050         obj_request = img_request->obj_request;
2051         rbd_assert(obj_request != NULL);
2052         obj_request->result = img_request->result;
2053         obj_request->xferred = img_request->xferred;
2054
2055         rbd_img_obj_request_read_callback(obj_request);
2056         rbd_obj_request_complete(obj_request);
2057 }
2058
2059 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2060 {
2061         struct rbd_device *rbd_dev;
2062         struct rbd_img_request *img_request;
2063         int result;
2064
2065         rbd_assert(obj_request_img_data_test(obj_request));
2066         rbd_assert(obj_request->img_request != NULL);
2067         rbd_assert(obj_request->result == (s32) -ENOENT);
2068         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2069
2070         rbd_dev = obj_request->img_request->rbd_dev;
2071         rbd_assert(rbd_dev->parent != NULL);
2072         /* rbd_read_finish(obj_request, obj_request->length); */
2073         img_request = rbd_img_request_create(rbd_dev->parent,
2074                                                 obj_request->img_offset,
2075                                                 obj_request->length,
2076                                                 false, true);
2077         result = -ENOMEM;
2078         if (!img_request)
2079                 goto out_err;
2080
2081         rbd_obj_request_get(obj_request);
2082         img_request->obj_request = obj_request;
2083
2084         result = rbd_img_request_fill_bio(img_request, obj_request->bio_list);
2085         if (result)
2086                 goto out_err;
2087
2088         img_request->callback = rbd_img_parent_read_callback;
2089         result = rbd_img_request_submit(img_request);
2090         if (result)
2091                 goto out_err;
2092
2093         return;
2094 out_err:
2095         if (img_request)
2096                 rbd_img_request_put(img_request);
2097         obj_request->result = result;
2098         obj_request->xferred = 0;
2099         obj_request_done_set(obj_request);
2100 }
2101
2102 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2103                                    u64 ver, u64 notify_id)
2104 {
2105         struct rbd_obj_request *obj_request;
2106         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2107         int ret;
2108
2109         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2110                                                         OBJ_REQUEST_NODATA);
2111         if (!obj_request)
2112                 return -ENOMEM;
2113
2114         ret = -ENOMEM;
2115         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2116         if (!obj_request->osd_req)
2117                 goto out;
2118         obj_request->callback = rbd_obj_request_put;
2119
2120         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2121                                         notify_id, ver, 0);
2122         rbd_osd_req_format_read(obj_request);
2123
2124         ret = rbd_obj_request_submit(osdc, obj_request);
2125 out:
2126         if (ret)
2127                 rbd_obj_request_put(obj_request);
2128
2129         return ret;
2130 }
2131
2132 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2133 {
2134         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2135         u64 hver;
2136         int rc;
2137
2138         if (!rbd_dev)
2139                 return;
2140
2141         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2142                 rbd_dev->header_name, (unsigned long long) notify_id,
2143                 (unsigned int) opcode);
2144         rc = rbd_dev_refresh(rbd_dev, &hver);
2145         if (rc)
2146                 rbd_warn(rbd_dev, "got notification but failed to "
2147                            " update snaps: %d\n", rc);
2148
2149         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2150 }
2151
2152 /*
2153  * Request sync osd watch/unwatch.  The value of "start" determines
2154  * whether a watch request is being initiated or torn down.
2155  */
2156 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2157 {
2158         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2159         struct rbd_obj_request *obj_request;
2160         int ret;
2161
2162         rbd_assert(start ^ !!rbd_dev->watch_event);
2163         rbd_assert(start ^ !!rbd_dev->watch_request);
2164
2165         if (start) {
2166                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2167                                                 &rbd_dev->watch_event);
2168                 if (ret < 0)
2169                         return ret;
2170                 rbd_assert(rbd_dev->watch_event != NULL);
2171         }
2172
2173         ret = -ENOMEM;
2174         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2175                                                         OBJ_REQUEST_NODATA);
2176         if (!obj_request)
2177                 goto out_cancel;
2178
2179         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2180         if (!obj_request->osd_req)
2181                 goto out_cancel;
2182
2183         if (start)
2184                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2185         else
2186                 ceph_osdc_unregister_linger_request(osdc,
2187                                         rbd_dev->watch_request->osd_req);
2188
2189         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2190                                 rbd_dev->watch_event->cookie,
2191                                 rbd_dev->header.obj_version, start);
2192         rbd_osd_req_format_write(obj_request);
2193
2194         ret = rbd_obj_request_submit(osdc, obj_request);
2195         if (ret)
2196                 goto out_cancel;
2197         ret = rbd_obj_request_wait(obj_request);
2198         if (ret)
2199                 goto out_cancel;
2200         ret = obj_request->result;
2201         if (ret)
2202                 goto out_cancel;
2203
2204         /*
2205          * A watch request is set to linger, so the underlying osd
2206          * request won't go away until we unregister it.  We retain
2207          * a pointer to the object request during that time (in
2208          * rbd_dev->watch_request), so we'll keep a reference to
2209          * it.  We'll drop that reference (below) after we've
2210          * unregistered it.
2211          */
2212         if (start) {
2213                 rbd_dev->watch_request = obj_request;
2214
2215                 return 0;
2216         }
2217
2218         /* We have successfully torn down the watch request */
2219
2220         rbd_obj_request_put(rbd_dev->watch_request);
2221         rbd_dev->watch_request = NULL;
2222 out_cancel:
2223         /* Cancel the event if we're tearing down, or on error */
2224         ceph_osdc_cancel_event(rbd_dev->watch_event);
2225         rbd_dev->watch_event = NULL;
2226         if (obj_request)
2227                 rbd_obj_request_put(obj_request);
2228
2229         return ret;
2230 }
2231
2232 /*
2233  * Synchronous osd object method call
2234  */
2235 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2236                              const char *object_name,
2237                              const char *class_name,
2238                              const char *method_name,
2239                              const char *outbound,
2240                              size_t outbound_size,
2241                              char *inbound,
2242                              size_t inbound_size,
2243                              u64 *version)
2244 {
2245         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2246         struct rbd_obj_request *obj_request;
2247         struct page **pages;
2248         u32 page_count;
2249         int ret;
2250
2251         /*
2252          * Method calls are ultimately read operations.  The result
2253          * should placed into the inbound buffer provided.  They
2254          * also supply outbound data--parameters for the object
2255          * method.  Currently if this is present it will be a
2256          * snapshot id.
2257          */
2258         page_count = (u32) calc_pages_for(0, inbound_size);
2259         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2260         if (IS_ERR(pages))
2261                 return PTR_ERR(pages);
2262
2263         ret = -ENOMEM;
2264         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2265                                                         OBJ_REQUEST_PAGES);
2266         if (!obj_request)
2267                 goto out;
2268
2269         obj_request->pages = pages;
2270         obj_request->page_count = page_count;
2271
2272         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2273         if (!obj_request->osd_req)
2274                 goto out;
2275
2276         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2277                                         class_name, method_name);
2278         if (outbound_size) {
2279                 struct ceph_pagelist *pagelist;
2280
2281                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2282                 if (!pagelist)
2283                         goto out;
2284
2285                 ceph_pagelist_init(pagelist);
2286                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2287                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2288                                                 pagelist);
2289         }
2290         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2291                                         obj_request->pages, inbound_size,
2292                                         0, false, false);
2293         rbd_osd_req_format_read(obj_request);
2294
2295         ret = rbd_obj_request_submit(osdc, obj_request);
2296         if (ret)
2297                 goto out;
2298         ret = rbd_obj_request_wait(obj_request);
2299         if (ret)
2300                 goto out;
2301
2302         ret = obj_request->result;
2303         if (ret < 0)
2304                 goto out;
2305         ret = 0;
2306         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2307         if (version)
2308                 *version = obj_request->version;
2309 out:
2310         if (obj_request)
2311                 rbd_obj_request_put(obj_request);
2312         else
2313                 ceph_release_page_vector(pages, page_count);
2314
2315         return ret;
2316 }
2317
2318 static void rbd_request_fn(struct request_queue *q)
2319                 __releases(q->queue_lock) __acquires(q->queue_lock)
2320 {
2321         struct rbd_device *rbd_dev = q->queuedata;
2322         bool read_only = rbd_dev->mapping.read_only;
2323         struct request *rq;
2324         int result;
2325
2326         while ((rq = blk_fetch_request(q))) {
2327                 bool write_request = rq_data_dir(rq) == WRITE;
2328                 struct rbd_img_request *img_request;
2329                 u64 offset;
2330                 u64 length;
2331
2332                 /* Ignore any non-FS requests that filter through. */
2333
2334                 if (rq->cmd_type != REQ_TYPE_FS) {
2335                         dout("%s: non-fs request type %d\n", __func__,
2336                                 (int) rq->cmd_type);
2337                         __blk_end_request_all(rq, 0);
2338                         continue;
2339                 }
2340
2341                 /* Ignore/skip any zero-length requests */
2342
2343                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2344                 length = (u64) blk_rq_bytes(rq);
2345
2346                 if (!length) {
2347                         dout("%s: zero-length request\n", __func__);
2348                         __blk_end_request_all(rq, 0);
2349                         continue;
2350                 }
2351
2352                 spin_unlock_irq(q->queue_lock);
2353
2354                 /* Disallow writes to a read-only device */
2355
2356                 if (write_request) {
2357                         result = -EROFS;
2358                         if (read_only)
2359                                 goto end_request;
2360                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2361                 }
2362
2363                 /*
2364                  * Quit early if the mapped snapshot no longer
2365                  * exists.  It's still possible the snapshot will
2366                  * have disappeared by the time our request arrives
2367                  * at the osd, but there's no sense in sending it if
2368                  * we already know.
2369                  */
2370                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2371                         dout("request for non-existent snapshot");
2372                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2373                         result = -ENXIO;
2374                         goto end_request;
2375                 }
2376
2377                 result = -EINVAL;
2378                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2379                         goto end_request;       /* Shouldn't happen */
2380
2381                 result = -ENOMEM;
2382                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2383                                                         write_request, false);
2384                 if (!img_request)
2385                         goto end_request;
2386
2387                 img_request->rq = rq;
2388
2389                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2390                 if (!result)
2391                         result = rbd_img_request_submit(img_request);
2392                 if (result)
2393                         rbd_img_request_put(img_request);
2394 end_request:
2395                 spin_lock_irq(q->queue_lock);
2396                 if (result < 0) {
2397                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2398                                 write_request ? "write" : "read",
2399                                 length, offset, result);
2400
2401                         __blk_end_request_all(rq, result);
2402                 }
2403         }
2404 }
2405
2406 /*
2407  * a queue callback. Makes sure that we don't create a bio that spans across
2408  * multiple osd objects. One exception would be with a single page bios,
2409  * which we handle later at bio_chain_clone_range()
2410  */
2411 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2412                           struct bio_vec *bvec)
2413 {
2414         struct rbd_device *rbd_dev = q->queuedata;
2415         sector_t sector_offset;
2416         sector_t sectors_per_obj;
2417         sector_t obj_sector_offset;
2418         int ret;
2419
2420         /*
2421          * Find how far into its rbd object the partition-relative
2422          * bio start sector is to offset relative to the enclosing
2423          * device.
2424          */
2425         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2426         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2427         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2428
2429         /*
2430          * Compute the number of bytes from that offset to the end
2431          * of the object.  Account for what's already used by the bio.
2432          */
2433         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2434         if (ret > bmd->bi_size)
2435                 ret -= bmd->bi_size;
2436         else
2437                 ret = 0;
2438
2439         /*
2440          * Don't send back more than was asked for.  And if the bio
2441          * was empty, let the whole thing through because:  "Note
2442          * that a block device *must* allow a single page to be
2443          * added to an empty bio."
2444          */
2445         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2446         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2447                 ret = (int) bvec->bv_len;
2448
2449         return ret;
2450 }
2451
2452 static void rbd_free_disk(struct rbd_device *rbd_dev)
2453 {
2454         struct gendisk *disk = rbd_dev->disk;
2455
2456         if (!disk)
2457                 return;
2458
2459         if (disk->flags & GENHD_FL_UP)
2460                 del_gendisk(disk);
2461         if (disk->queue)
2462                 blk_cleanup_queue(disk->queue);
2463         put_disk(disk);
2464 }
2465
2466 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2467                                 const char *object_name,
2468                                 u64 offset, u64 length,
2469                                 char *buf, u64 *version)
2470
2471 {
2472         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2473         struct rbd_obj_request *obj_request;
2474         struct page **pages = NULL;
2475         u32 page_count;
2476         size_t size;
2477         int ret;
2478
2479         page_count = (u32) calc_pages_for(offset, length);
2480         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2481         if (IS_ERR(pages))
2482                 ret = PTR_ERR(pages);
2483
2484         ret = -ENOMEM;
2485         obj_request = rbd_obj_request_create(object_name, offset, length,
2486                                                         OBJ_REQUEST_PAGES);
2487         if (!obj_request)
2488                 goto out;
2489
2490         obj_request->pages = pages;
2491         obj_request->page_count = page_count;
2492
2493         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2494         if (!obj_request->osd_req)
2495                 goto out;
2496
2497         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2498                                         offset, length, 0, 0);
2499         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2500                                         obj_request->pages,
2501                                         obj_request->length,
2502                                         obj_request->offset & ~PAGE_MASK,
2503                                         false, false);
2504         rbd_osd_req_format_read(obj_request);
2505
2506         ret = rbd_obj_request_submit(osdc, obj_request);
2507         if (ret)
2508                 goto out;
2509         ret = rbd_obj_request_wait(obj_request);
2510         if (ret)
2511                 goto out;
2512
2513         ret = obj_request->result;
2514         if (ret < 0)
2515                 goto out;
2516
2517         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2518         size = (size_t) obj_request->xferred;
2519         ceph_copy_from_page_vector(pages, buf, 0, size);
2520         rbd_assert(size <= (size_t) INT_MAX);
2521         ret = (int) size;
2522         if (version)
2523                 *version = obj_request->version;
2524 out:
2525         if (obj_request)
2526                 rbd_obj_request_put(obj_request);
2527         else
2528                 ceph_release_page_vector(pages, page_count);
2529
2530         return ret;
2531 }
2532
2533 /*
2534  * Read the complete header for the given rbd device.
2535  *
2536  * Returns a pointer to a dynamically-allocated buffer containing
2537  * the complete and validated header.  Caller can pass the address
2538  * of a variable that will be filled in with the version of the
2539  * header object at the time it was read.
2540  *
2541  * Returns a pointer-coded errno if a failure occurs.
2542  */
2543 static struct rbd_image_header_ondisk *
2544 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2545 {
2546         struct rbd_image_header_ondisk *ondisk = NULL;
2547         u32 snap_count = 0;
2548         u64 names_size = 0;
2549         u32 want_count;
2550         int ret;
2551
2552         /*
2553          * The complete header will include an array of its 64-bit
2554          * snapshot ids, followed by the names of those snapshots as
2555          * a contiguous block of NUL-terminated strings.  Note that
2556          * the number of snapshots could change by the time we read
2557          * it in, in which case we re-read it.
2558          */
2559         do {
2560                 size_t size;
2561
2562                 kfree(ondisk);
2563
2564                 size = sizeof (*ondisk);
2565                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2566                 size += names_size;
2567                 ondisk = kmalloc(size, GFP_KERNEL);
2568                 if (!ondisk)
2569                         return ERR_PTR(-ENOMEM);
2570
2571                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2572                                        0, size,
2573                                        (char *) ondisk, version);
2574                 if (ret < 0)
2575                         goto out_err;
2576                 if (WARN_ON((size_t) ret < size)) {
2577                         ret = -ENXIO;
2578                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2579                                 size, ret);
2580                         goto out_err;
2581                 }
2582                 if (!rbd_dev_ondisk_valid(ondisk)) {
2583                         ret = -ENXIO;
2584                         rbd_warn(rbd_dev, "invalid header");
2585                         goto out_err;
2586                 }
2587
2588                 names_size = le64_to_cpu(ondisk->snap_names_len);
2589                 want_count = snap_count;
2590                 snap_count = le32_to_cpu(ondisk->snap_count);
2591         } while (snap_count != want_count);
2592
2593         return ondisk;
2594
2595 out_err:
2596         kfree(ondisk);
2597
2598         return ERR_PTR(ret);
2599 }
2600
2601 /*
2602  * reload the ondisk the header
2603  */
2604 static int rbd_read_header(struct rbd_device *rbd_dev,
2605                            struct rbd_image_header *header)
2606 {
2607         struct rbd_image_header_ondisk *ondisk;
2608         u64 ver = 0;
2609         int ret;
2610
2611         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2612         if (IS_ERR(ondisk))
2613                 return PTR_ERR(ondisk);
2614         ret = rbd_header_from_disk(header, ondisk);
2615         if (ret >= 0)
2616                 header->obj_version = ver;
2617         kfree(ondisk);
2618
2619         return ret;
2620 }
2621
2622 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2623 {
2624         struct rbd_snap *snap;
2625         struct rbd_snap *next;
2626
2627         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2628                 rbd_remove_snap_dev(snap);
2629 }
2630
2631 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2632 {
2633         sector_t size;
2634
2635         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2636                 return;
2637
2638         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2639         dout("setting size to %llu sectors", (unsigned long long) size);
2640         rbd_dev->mapping.size = (u64) size;
2641         set_capacity(rbd_dev->disk, size);
2642 }
2643
2644 /*
2645  * only read the first part of the ondisk header, without the snaps info
2646  */
2647 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2648 {
2649         int ret;
2650         struct rbd_image_header h;
2651
2652         ret = rbd_read_header(rbd_dev, &h);
2653         if (ret < 0)
2654                 return ret;
2655
2656         down_write(&rbd_dev->header_rwsem);
2657
2658         /* Update image size, and check for resize of mapped image */
2659         rbd_dev->header.image_size = h.image_size;
2660         rbd_update_mapping_size(rbd_dev);
2661
2662         /* rbd_dev->header.object_prefix shouldn't change */
2663         kfree(rbd_dev->header.snap_sizes);
2664         kfree(rbd_dev->header.snap_names);
2665         /* osd requests may still refer to snapc */
2666         ceph_put_snap_context(rbd_dev->header.snapc);
2667
2668         if (hver)
2669                 *hver = h.obj_version;
2670         rbd_dev->header.obj_version = h.obj_version;
2671         rbd_dev->header.image_size = h.image_size;
2672         rbd_dev->header.snapc = h.snapc;
2673         rbd_dev->header.snap_names = h.snap_names;
2674         rbd_dev->header.snap_sizes = h.snap_sizes;
2675         /* Free the extra copy of the object prefix */
2676         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2677         kfree(h.object_prefix);
2678
2679         ret = rbd_dev_snaps_update(rbd_dev);
2680         if (!ret)
2681                 ret = rbd_dev_snaps_register(rbd_dev);
2682
2683         up_write(&rbd_dev->header_rwsem);
2684
2685         return ret;
2686 }
2687
2688 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2689 {
2690         int ret;
2691
2692         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2693         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2694         if (rbd_dev->image_format == 1)
2695                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2696         else
2697                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2698         mutex_unlock(&ctl_mutex);
2699
2700         return ret;
2701 }
2702
2703 static int rbd_init_disk(struct rbd_device *rbd_dev)
2704 {
2705         struct gendisk *disk;
2706         struct request_queue *q;
2707         u64 segment_size;
2708
2709         /* create gendisk info */
2710         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2711         if (!disk)
2712                 return -ENOMEM;
2713
2714         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2715                  rbd_dev->dev_id);
2716         disk->major = rbd_dev->major;
2717         disk->first_minor = 0;
2718         disk->fops = &rbd_bd_ops;
2719         disk->private_data = rbd_dev;
2720
2721         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2722         if (!q)
2723                 goto out_disk;
2724
2725         /* We use the default size, but let's be explicit about it. */
2726         blk_queue_physical_block_size(q, SECTOR_SIZE);
2727
2728         /* set io sizes to object size */
2729         segment_size = rbd_obj_bytes(&rbd_dev->header);
2730         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2731         blk_queue_max_segment_size(q, segment_size);
2732         blk_queue_io_min(q, segment_size);
2733         blk_queue_io_opt(q, segment_size);
2734
2735         blk_queue_merge_bvec(q, rbd_merge_bvec);
2736         disk->queue = q;
2737
2738         q->queuedata = rbd_dev;
2739
2740         rbd_dev->disk = disk;
2741
2742         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2743
2744         return 0;
2745 out_disk:
2746         put_disk(disk);
2747
2748         return -ENOMEM;
2749 }
2750
2751 /*
2752   sysfs
2753 */
2754
2755 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2756 {
2757         return container_of(dev, struct rbd_device, dev);
2758 }
2759
2760 static ssize_t rbd_size_show(struct device *dev,
2761                              struct device_attribute *attr, char *buf)
2762 {
2763         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2764         sector_t size;
2765
2766         down_read(&rbd_dev->header_rwsem);
2767         size = get_capacity(rbd_dev->disk);
2768         up_read(&rbd_dev->header_rwsem);
2769
2770         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2771 }
2772
2773 /*
2774  * Note this shows the features for whatever's mapped, which is not
2775  * necessarily the base image.
2776  */
2777 static ssize_t rbd_features_show(struct device *dev,
2778                              struct device_attribute *attr, char *buf)
2779 {
2780         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2781
2782         return sprintf(buf, "0x%016llx\n",
2783                         (unsigned long long) rbd_dev->mapping.features);
2784 }
2785
2786 static ssize_t rbd_major_show(struct device *dev,
2787                               struct device_attribute *attr, char *buf)
2788 {
2789         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2790
2791         return sprintf(buf, "%d\n", rbd_dev->major);
2792 }
2793
2794 static ssize_t rbd_client_id_show(struct device *dev,
2795                                   struct device_attribute *attr, char *buf)
2796 {
2797         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2798
2799         return sprintf(buf, "client%lld\n",
2800                         ceph_client_id(rbd_dev->rbd_client->client));
2801 }
2802
2803 static ssize_t rbd_pool_show(struct device *dev,
2804                              struct device_attribute *attr, char *buf)
2805 {
2806         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2807
2808         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2809 }
2810
2811 static ssize_t rbd_pool_id_show(struct device *dev,
2812                              struct device_attribute *attr, char *buf)
2813 {
2814         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2815
2816         return sprintf(buf, "%llu\n",
2817                 (unsigned long long) rbd_dev->spec->pool_id);
2818 }
2819
2820 static ssize_t rbd_name_show(struct device *dev,
2821                              struct device_attribute *attr, char *buf)
2822 {
2823         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2824
2825         if (rbd_dev->spec->image_name)
2826                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2827
2828         return sprintf(buf, "(unknown)\n");
2829 }
2830
2831 static ssize_t rbd_image_id_show(struct device *dev,
2832                              struct device_attribute *attr, char *buf)
2833 {
2834         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2835
2836         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2837 }
2838
2839 /*
2840  * Shows the name of the currently-mapped snapshot (or
2841  * RBD_SNAP_HEAD_NAME for the base image).
2842  */
2843 static ssize_t rbd_snap_show(struct device *dev,
2844                              struct device_attribute *attr,
2845                              char *buf)
2846 {
2847         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2848
2849         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2850 }
2851
2852 /*
2853  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2854  * for the parent image.  If there is no parent, simply shows
2855  * "(no parent image)".
2856  */
2857 static ssize_t rbd_parent_show(struct device *dev,
2858                              struct device_attribute *attr,
2859                              char *buf)
2860 {
2861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2862         struct rbd_spec *spec = rbd_dev->parent_spec;
2863         int count;
2864         char *bufp = buf;
2865
2866         if (!spec)
2867                 return sprintf(buf, "(no parent image)\n");
2868
2869         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2870                         (unsigned long long) spec->pool_id, spec->pool_name);
2871         if (count < 0)
2872                 return count;
2873         bufp += count;
2874
2875         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2876                         spec->image_name ? spec->image_name : "(unknown)");
2877         if (count < 0)
2878                 return count;
2879         bufp += count;
2880
2881         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2882                         (unsigned long long) spec->snap_id, spec->snap_name);
2883         if (count < 0)
2884                 return count;
2885         bufp += count;
2886
2887         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2888         if (count < 0)
2889                 return count;
2890         bufp += count;
2891
2892         return (ssize_t) (bufp - buf);
2893 }
2894
2895 static ssize_t rbd_image_refresh(struct device *dev,
2896                                  struct device_attribute *attr,
2897                                  const char *buf,
2898                                  size_t size)
2899 {
2900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2901         int ret;
2902
2903         ret = rbd_dev_refresh(rbd_dev, NULL);
2904
2905         return ret < 0 ? ret : size;
2906 }
2907
2908 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2909 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2910 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2911 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2912 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2913 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2914 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2915 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2916 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2917 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2918 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2919
2920 static struct attribute *rbd_attrs[] = {
2921         &dev_attr_size.attr,
2922         &dev_attr_features.attr,
2923         &dev_attr_major.attr,
2924         &dev_attr_client_id.attr,
2925         &dev_attr_pool.attr,
2926         &dev_attr_pool_id.attr,
2927         &dev_attr_name.attr,
2928         &dev_attr_image_id.attr,
2929         &dev_attr_current_snap.attr,
2930         &dev_attr_parent.attr,
2931         &dev_attr_refresh.attr,
2932         NULL
2933 };
2934
2935 static struct attribute_group rbd_attr_group = {
2936         .attrs = rbd_attrs,
2937 };
2938
2939 static const struct attribute_group *rbd_attr_groups[] = {
2940         &rbd_attr_group,
2941         NULL
2942 };
2943
2944 static void rbd_sysfs_dev_release(struct device *dev)
2945 {
2946 }
2947
2948 static struct device_type rbd_device_type = {
2949         .name           = "rbd",
2950         .groups         = rbd_attr_groups,
2951         .release        = rbd_sysfs_dev_release,
2952 };
2953
2954
2955 /*
2956   sysfs - snapshots
2957 */
2958
2959 static ssize_t rbd_snap_size_show(struct device *dev,
2960                                   struct device_attribute *attr,
2961                                   char *buf)
2962 {
2963         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2964
2965         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2966 }
2967
2968 static ssize_t rbd_snap_id_show(struct device *dev,
2969                                 struct device_attribute *attr,
2970                                 char *buf)
2971 {
2972         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2973
2974         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2975 }
2976
2977 static ssize_t rbd_snap_features_show(struct device *dev,
2978                                 struct device_attribute *attr,
2979                                 char *buf)
2980 {
2981         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2982
2983         return sprintf(buf, "0x%016llx\n",
2984                         (unsigned long long) snap->features);
2985 }
2986
2987 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2988 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2989 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2990
2991 static struct attribute *rbd_snap_attrs[] = {
2992         &dev_attr_snap_size.attr,
2993         &dev_attr_snap_id.attr,
2994         &dev_attr_snap_features.attr,
2995         NULL,
2996 };
2997
2998 static struct attribute_group rbd_snap_attr_group = {
2999         .attrs = rbd_snap_attrs,
3000 };
3001
3002 static void rbd_snap_dev_release(struct device *dev)
3003 {
3004         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3005         kfree(snap->name);
3006         kfree(snap);
3007 }
3008
3009 static const struct attribute_group *rbd_snap_attr_groups[] = {
3010         &rbd_snap_attr_group,
3011         NULL
3012 };
3013
3014 static struct device_type rbd_snap_device_type = {
3015         .groups         = rbd_snap_attr_groups,
3016         .release        = rbd_snap_dev_release,
3017 };
3018
3019 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3020 {
3021         kref_get(&spec->kref);
3022
3023         return spec;
3024 }
3025
3026 static void rbd_spec_free(struct kref *kref);
3027 static void rbd_spec_put(struct rbd_spec *spec)
3028 {
3029         if (spec)
3030                 kref_put(&spec->kref, rbd_spec_free);
3031 }
3032
3033 static struct rbd_spec *rbd_spec_alloc(void)
3034 {
3035         struct rbd_spec *spec;
3036
3037         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3038         if (!spec)
3039                 return NULL;
3040         kref_init(&spec->kref);
3041
3042         return spec;
3043 }
3044
3045 static void rbd_spec_free(struct kref *kref)
3046 {
3047         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3048
3049         kfree(spec->pool_name);
3050         kfree(spec->image_id);
3051         kfree(spec->image_name);
3052         kfree(spec->snap_name);
3053         kfree(spec);
3054 }
3055
3056 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3057                                 struct rbd_spec *spec)
3058 {
3059         struct rbd_device *rbd_dev;
3060
3061         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3062         if (!rbd_dev)
3063                 return NULL;
3064
3065         spin_lock_init(&rbd_dev->lock);
3066         rbd_dev->flags = 0;
3067         INIT_LIST_HEAD(&rbd_dev->node);
3068         INIT_LIST_HEAD(&rbd_dev->snaps);
3069         init_rwsem(&rbd_dev->header_rwsem);
3070
3071         rbd_dev->spec = spec;
3072         rbd_dev->rbd_client = rbdc;
3073
3074         /* Initialize the layout used for all rbd requests */
3075
3076         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3077         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3078         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3079         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3080
3081         return rbd_dev;
3082 }
3083
3084 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3085 {
3086         rbd_spec_put(rbd_dev->parent_spec);
3087         kfree(rbd_dev->header_name);
3088         rbd_put_client(rbd_dev->rbd_client);
3089         rbd_spec_put(rbd_dev->spec);
3090         kfree(rbd_dev);
3091 }
3092
3093 static bool rbd_snap_registered(struct rbd_snap *snap)
3094 {
3095         bool ret = snap->dev.type == &rbd_snap_device_type;
3096         bool reg = device_is_registered(&snap->dev);
3097
3098         rbd_assert(!ret ^ reg);
3099
3100         return ret;
3101 }
3102
3103 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3104 {
3105         list_del(&snap->node);
3106         if (device_is_registered(&snap->dev))
3107                 device_unregister(&snap->dev);
3108 }
3109
3110 static int rbd_register_snap_dev(struct rbd_snap *snap,
3111                                   struct device *parent)
3112 {
3113         struct device *dev = &snap->dev;
3114         int ret;
3115
3116         dev->type = &rbd_snap_device_type;
3117         dev->parent = parent;
3118         dev->release = rbd_snap_dev_release;
3119         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3120         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3121
3122         ret = device_register(dev);
3123
3124         return ret;
3125 }
3126
3127 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3128                                                 const char *snap_name,
3129                                                 u64 snap_id, u64 snap_size,
3130                                                 u64 snap_features)
3131 {
3132         struct rbd_snap *snap;
3133         int ret;
3134
3135         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3136         if (!snap)
3137                 return ERR_PTR(-ENOMEM);
3138
3139         ret = -ENOMEM;
3140         snap->name = kstrdup(snap_name, GFP_KERNEL);
3141         if (!snap->name)
3142                 goto err;
3143
3144         snap->id = snap_id;
3145         snap->size = snap_size;
3146         snap->features = snap_features;
3147
3148         return snap;
3149
3150 err:
3151         kfree(snap->name);
3152         kfree(snap);
3153
3154         return ERR_PTR(ret);
3155 }
3156
3157 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3158                 u64 *snap_size, u64 *snap_features)
3159 {
3160         char *snap_name;
3161
3162         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3163
3164         *snap_size = rbd_dev->header.snap_sizes[which];
3165         *snap_features = 0;     /* No features for v1 */
3166
3167         /* Skip over names until we find the one we are looking for */
3168
3169         snap_name = rbd_dev->header.snap_names;
3170         while (which--)
3171                 snap_name += strlen(snap_name) + 1;
3172
3173         return snap_name;
3174 }
3175
3176 /*
3177  * Get the size and object order for an image snapshot, or if
3178  * snap_id is CEPH_NOSNAP, gets this information for the base
3179  * image.
3180  */
3181 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3182                                 u8 *order, u64 *snap_size)
3183 {
3184         __le64 snapid = cpu_to_le64(snap_id);
3185         int ret;
3186         struct {
3187                 u8 order;
3188                 __le64 size;
3189         } __attribute__ ((packed)) size_buf = { 0 };
3190
3191         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3192                                 "rbd", "get_size",
3193                                 (char *) &snapid, sizeof (snapid),
3194                                 (char *) &size_buf, sizeof (size_buf), NULL);
3195         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3196         if (ret < 0)
3197                 return ret;
3198
3199         *order = size_buf.order;
3200         *snap_size = le64_to_cpu(size_buf.size);
3201
3202         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3203                 (unsigned long long) snap_id, (unsigned int) *order,
3204                 (unsigned long long) *snap_size);
3205
3206         return 0;
3207 }
3208
3209 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3210 {
3211         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3212                                         &rbd_dev->header.obj_order,
3213                                         &rbd_dev->header.image_size);
3214 }
3215
3216 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3217 {
3218         void *reply_buf;
3219         int ret;
3220         void *p;
3221
3222         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3223         if (!reply_buf)
3224                 return -ENOMEM;
3225
3226         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3227                                 "rbd", "get_object_prefix",
3228                                 NULL, 0,
3229                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3230         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3231         if (ret < 0)
3232                 goto out;
3233
3234         p = reply_buf;
3235         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3236                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3237                                                 NULL, GFP_NOIO);
3238
3239         if (IS_ERR(rbd_dev->header.object_prefix)) {
3240                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3241                 rbd_dev->header.object_prefix = NULL;
3242         } else {
3243                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3244         }
3245
3246 out:
3247         kfree(reply_buf);
3248
3249         return ret;
3250 }
3251
3252 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3253                 u64 *snap_features)
3254 {
3255         __le64 snapid = cpu_to_le64(snap_id);
3256         struct {
3257                 __le64 features;
3258                 __le64 incompat;
3259         } features_buf = { 0 };
3260         u64 incompat;
3261         int ret;
3262
3263         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3264                                 "rbd", "get_features",
3265                                 (char *) &snapid, sizeof (snapid),
3266                                 (char *) &features_buf, sizeof (features_buf),
3267                                 NULL);
3268         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3269         if (ret < 0)
3270                 return ret;
3271
3272         incompat = le64_to_cpu(features_buf.incompat);
3273         if (incompat & ~RBD_FEATURES_SUPPORTED)
3274                 return -ENXIO;
3275
3276         *snap_features = le64_to_cpu(features_buf.features);
3277
3278         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3279                 (unsigned long long) snap_id,
3280                 (unsigned long long) *snap_features,
3281                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3282
3283         return 0;
3284 }
3285
3286 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3287 {
3288         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3289                                                 &rbd_dev->header.features);
3290 }
3291
3292 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3293 {
3294         struct rbd_spec *parent_spec;
3295         size_t size;
3296         void *reply_buf = NULL;
3297         __le64 snapid;
3298         void *p;
3299         void *end;
3300         char *image_id;
3301         u64 overlap;
3302         int ret;
3303
3304         parent_spec = rbd_spec_alloc();
3305         if (!parent_spec)
3306                 return -ENOMEM;
3307
3308         size = sizeof (__le64) +                                /* pool_id */
3309                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3310                 sizeof (__le64) +                               /* snap_id */
3311                 sizeof (__le64);                                /* overlap */
3312         reply_buf = kmalloc(size, GFP_KERNEL);
3313         if (!reply_buf) {
3314                 ret = -ENOMEM;
3315                 goto out_err;
3316         }
3317
3318         snapid = cpu_to_le64(CEPH_NOSNAP);
3319         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3320                                 "rbd", "get_parent",
3321                                 (char *) &snapid, sizeof (snapid),
3322                                 (char *) reply_buf, size, NULL);
3323         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3324         if (ret < 0)
3325                 goto out_err;
3326
3327         ret = -ERANGE;
3328         p = reply_buf;
3329         end = (char *) reply_buf + size;
3330         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3331         if (parent_spec->pool_id == CEPH_NOPOOL)
3332                 goto out;       /* No parent?  No problem. */
3333
3334         /* The ceph file layout needs to fit pool id in 32 bits */
3335
3336         ret = -EIO;
3337         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3338                 goto out;
3339
3340         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3341         if (IS_ERR(image_id)) {
3342                 ret = PTR_ERR(image_id);
3343                 goto out_err;
3344         }
3345         parent_spec->image_id = image_id;
3346         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3347         ceph_decode_64_safe(&p, end, overlap, out_err);
3348
3349         rbd_dev->parent_overlap = overlap;
3350         rbd_dev->parent_spec = parent_spec;
3351         parent_spec = NULL;     /* rbd_dev now owns this */
3352 out:
3353         ret = 0;
3354 out_err:
3355         kfree(reply_buf);
3356         rbd_spec_put(parent_spec);
3357
3358         return ret;
3359 }
3360
3361 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3362 {
3363         size_t image_id_size;
3364         char *image_id;
3365         void *p;
3366         void *end;
3367         size_t size;
3368         void *reply_buf = NULL;
3369         size_t len = 0;
3370         char *image_name = NULL;
3371         int ret;
3372
3373         rbd_assert(!rbd_dev->spec->image_name);
3374
3375         len = strlen(rbd_dev->spec->image_id);
3376         image_id_size = sizeof (__le32) + len;
3377         image_id = kmalloc(image_id_size, GFP_KERNEL);
3378         if (!image_id)
3379                 return NULL;
3380
3381         p = image_id;
3382         end = (char *) image_id + image_id_size;
3383         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3384
3385         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3386         reply_buf = kmalloc(size, GFP_KERNEL);
3387         if (!reply_buf)
3388                 goto out;
3389
3390         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3391                                 "rbd", "dir_get_name",
3392                                 image_id, image_id_size,
3393                                 (char *) reply_buf, size, NULL);
3394         if (ret < 0)
3395                 goto out;
3396         p = reply_buf;
3397         end = (char *) reply_buf + size;
3398         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3399         if (IS_ERR(image_name))
3400                 image_name = NULL;
3401         else
3402                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3403 out:
3404         kfree(reply_buf);
3405         kfree(image_id);
3406
3407         return image_name;
3408 }
3409
3410 /*
3411  * When a parent image gets probed, we only have the pool, image,
3412  * and snapshot ids but not the names of any of them.  This call
3413  * is made later to fill in those names.  It has to be done after
3414  * rbd_dev_snaps_update() has completed because some of the
3415  * information (in particular, snapshot name) is not available
3416  * until then.
3417  */
3418 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3419 {
3420         struct ceph_osd_client *osdc;
3421         const char *name;
3422         void *reply_buf = NULL;
3423         int ret;
3424
3425         if (rbd_dev->spec->pool_name)
3426                 return 0;       /* Already have the names */
3427
3428         /* Look up the pool name */
3429
3430         osdc = &rbd_dev->rbd_client->client->osdc;
3431         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3432         if (!name) {
3433                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3434                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3435                 return -EIO;
3436         }
3437
3438         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3439         if (!rbd_dev->spec->pool_name)
3440                 return -ENOMEM;
3441
3442         /* Fetch the image name; tolerate failure here */
3443
3444         name = rbd_dev_image_name(rbd_dev);
3445         if (name)
3446                 rbd_dev->spec->image_name = (char *) name;
3447         else
3448                 rbd_warn(rbd_dev, "unable to get image name");
3449
3450         /* Look up the snapshot name. */
3451
3452         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3453         if (!name) {
3454                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3455                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3456                 ret = -EIO;
3457                 goto out_err;
3458         }
3459         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3460         if(!rbd_dev->spec->snap_name)
3461                 goto out_err;
3462
3463         return 0;
3464 out_err:
3465         kfree(reply_buf);
3466         kfree(rbd_dev->spec->pool_name);
3467         rbd_dev->spec->pool_name = NULL;
3468
3469         return ret;
3470 }
3471
3472 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3473 {
3474         size_t size;
3475         int ret;
3476         void *reply_buf;
3477         void *p;
3478         void *end;
3479         u64 seq;
3480         u32 snap_count;
3481         struct ceph_snap_context *snapc;
3482         u32 i;
3483
3484         /*
3485          * We'll need room for the seq value (maximum snapshot id),
3486          * snapshot count, and array of that many snapshot ids.
3487          * For now we have a fixed upper limit on the number we're
3488          * prepared to receive.
3489          */
3490         size = sizeof (__le64) + sizeof (__le32) +
3491                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3492         reply_buf = kzalloc(size, GFP_KERNEL);
3493         if (!reply_buf)
3494                 return -ENOMEM;
3495
3496         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3497                                 "rbd", "get_snapcontext",
3498                                 NULL, 0,
3499                                 reply_buf, size, ver);
3500         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3501         if (ret < 0)
3502                 goto out;
3503
3504         ret = -ERANGE;
3505         p = reply_buf;
3506         end = (char *) reply_buf + size;
3507         ceph_decode_64_safe(&p, end, seq, out);
3508         ceph_decode_32_safe(&p, end, snap_count, out);
3509
3510         /*
3511          * Make sure the reported number of snapshot ids wouldn't go
3512          * beyond the end of our buffer.  But before checking that,
3513          * make sure the computed size of the snapshot context we
3514          * allocate is representable in a size_t.
3515          */
3516         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3517                                  / sizeof (u64)) {
3518                 ret = -EINVAL;
3519                 goto out;
3520         }
3521         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3522                 goto out;
3523
3524         size = sizeof (struct ceph_snap_context) +
3525                                 snap_count * sizeof (snapc->snaps[0]);
3526         snapc = kmalloc(size, GFP_KERNEL);
3527         if (!snapc) {
3528                 ret = -ENOMEM;
3529                 goto out;
3530         }
3531
3532         atomic_set(&snapc->nref, 1);
3533         snapc->seq = seq;
3534         snapc->num_snaps = snap_count;
3535         for (i = 0; i < snap_count; i++)
3536                 snapc->snaps[i] = ceph_decode_64(&p);
3537
3538         rbd_dev->header.snapc = snapc;
3539
3540         dout("  snap context seq = %llu, snap_count = %u\n",
3541                 (unsigned long long) seq, (unsigned int) snap_count);
3542
3543 out:
3544         kfree(reply_buf);
3545
3546         return 0;
3547 }
3548
3549 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3550 {
3551         size_t size;
3552         void *reply_buf;
3553         __le64 snap_id;
3554         int ret;
3555         void *p;
3556         void *end;
3557         char *snap_name;
3558
3559         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3560         reply_buf = kmalloc(size, GFP_KERNEL);
3561         if (!reply_buf)
3562                 return ERR_PTR(-ENOMEM);
3563
3564         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3565         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3566                                 "rbd", "get_snapshot_name",
3567                                 (char *) &snap_id, sizeof (snap_id),
3568                                 reply_buf, size, NULL);
3569         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3570         if (ret < 0)
3571                 goto out;
3572
3573         p = reply_buf;
3574         end = (char *) reply_buf + size;
3575         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3576         if (IS_ERR(snap_name)) {
3577                 ret = PTR_ERR(snap_name);
3578                 goto out;
3579         } else {
3580                 dout("  snap_id 0x%016llx snap_name = %s\n",
3581                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3582         }
3583         kfree(reply_buf);
3584
3585         return snap_name;
3586 out:
3587         kfree(reply_buf);
3588
3589         return ERR_PTR(ret);
3590 }
3591
3592 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3593                 u64 *snap_size, u64 *snap_features)
3594 {
3595         u64 snap_id;
3596         u8 order;
3597         int ret;
3598
3599         snap_id = rbd_dev->header.snapc->snaps[which];
3600         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3601         if (ret)
3602                 return ERR_PTR(ret);
3603         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3604         if (ret)
3605                 return ERR_PTR(ret);
3606
3607         return rbd_dev_v2_snap_name(rbd_dev, which);
3608 }
3609
3610 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3611                 u64 *snap_size, u64 *snap_features)
3612 {
3613         if (rbd_dev->image_format == 1)
3614                 return rbd_dev_v1_snap_info(rbd_dev, which,
3615                                         snap_size, snap_features);
3616         if (rbd_dev->image_format == 2)
3617                 return rbd_dev_v2_snap_info(rbd_dev, which,
3618                                         snap_size, snap_features);
3619         return ERR_PTR(-EINVAL);
3620 }
3621
3622 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3623 {
3624         int ret;
3625         __u8 obj_order;
3626
3627         down_write(&rbd_dev->header_rwsem);
3628
3629         /* Grab old order first, to see if it changes */
3630
3631         obj_order = rbd_dev->header.obj_order,
3632         ret = rbd_dev_v2_image_size(rbd_dev);
3633         if (ret)
3634                 goto out;
3635         if (rbd_dev->header.obj_order != obj_order) {
3636                 ret = -EIO;
3637                 goto out;
3638         }
3639         rbd_update_mapping_size(rbd_dev);
3640
3641         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3642         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3643         if (ret)
3644                 goto out;
3645         ret = rbd_dev_snaps_update(rbd_dev);
3646         dout("rbd_dev_snaps_update returned %d\n", ret);
3647         if (ret)
3648                 goto out;
3649         ret = rbd_dev_snaps_register(rbd_dev);
3650         dout("rbd_dev_snaps_register returned %d\n", ret);
3651 out:
3652         up_write(&rbd_dev->header_rwsem);
3653
3654         return ret;
3655 }
3656
3657 /*
3658  * Scan the rbd device's current snapshot list and compare it to the
3659  * newly-received snapshot context.  Remove any existing snapshots
3660  * not present in the new snapshot context.  Add a new snapshot for
3661  * any snaphots in the snapshot context not in the current list.
3662  * And verify there are no changes to snapshots we already know
3663  * about.
3664  *
3665  * Assumes the snapshots in the snapshot context are sorted by
3666  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3667  * are also maintained in that order.)
3668  */
3669 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3670 {
3671         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3672         const u32 snap_count = snapc->num_snaps;
3673         struct list_head *head = &rbd_dev->snaps;
3674         struct list_head *links = head->next;
3675         u32 index = 0;
3676
3677         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3678         while (index < snap_count || links != head) {
3679                 u64 snap_id;
3680                 struct rbd_snap *snap;
3681                 char *snap_name;
3682                 u64 snap_size = 0;
3683                 u64 snap_features = 0;
3684
3685                 snap_id = index < snap_count ? snapc->snaps[index]
3686                                              : CEPH_NOSNAP;
3687                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3688                                      : NULL;
3689                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3690
3691                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3692                         struct list_head *next = links->next;
3693
3694                         /*
3695                          * A previously-existing snapshot is not in
3696                          * the new snap context.
3697                          *
3698                          * If the now missing snapshot is the one the
3699                          * image is mapped to, clear its exists flag
3700                          * so we can avoid sending any more requests
3701                          * to it.
3702                          */
3703                         if (rbd_dev->spec->snap_id == snap->id)
3704                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3705                         rbd_remove_snap_dev(snap);
3706                         dout("%ssnap id %llu has been removed\n",
3707                                 rbd_dev->spec->snap_id == snap->id ?
3708                                                         "mapped " : "",
3709                                 (unsigned long long) snap->id);
3710
3711                         /* Done with this list entry; advance */
3712
3713                         links = next;
3714                         continue;
3715                 }
3716
3717                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3718                                         &snap_size, &snap_features);
3719                 if (IS_ERR(snap_name))
3720                         return PTR_ERR(snap_name);
3721
3722                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3723                         (unsigned long long) snap_id);
3724                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3725                         struct rbd_snap *new_snap;
3726
3727                         /* We haven't seen this snapshot before */
3728
3729                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3730                                         snap_id, snap_size, snap_features);
3731                         if (IS_ERR(new_snap)) {
3732                                 int err = PTR_ERR(new_snap);
3733
3734                                 dout("  failed to add dev, error %d\n", err);
3735
3736                                 return err;
3737                         }
3738
3739                         /* New goes before existing, or at end of list */
3740
3741                         dout("  added dev%s\n", snap ? "" : " at end\n");
3742                         if (snap)
3743                                 list_add_tail(&new_snap->node, &snap->node);
3744                         else
3745                                 list_add_tail(&new_snap->node, head);
3746                 } else {
3747                         /* Already have this one */
3748
3749                         dout("  already present\n");
3750
3751                         rbd_assert(snap->size == snap_size);
3752                         rbd_assert(!strcmp(snap->name, snap_name));
3753                         rbd_assert(snap->features == snap_features);
3754
3755                         /* Done with this list entry; advance */
3756
3757                         links = links->next;
3758                 }
3759
3760                 /* Advance to the next entry in the snapshot context */
3761
3762                 index++;
3763         }
3764         dout("%s: done\n", __func__);
3765
3766         return 0;
3767 }
3768
3769 /*
3770  * Scan the list of snapshots and register the devices for any that
3771  * have not already been registered.
3772  */
3773 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3774 {
3775         struct rbd_snap *snap;
3776         int ret = 0;
3777
3778         dout("%s:\n", __func__);
3779         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3780                 return -EIO;
3781
3782         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3783                 if (!rbd_snap_registered(snap)) {
3784                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3785                         if (ret < 0)
3786                                 break;
3787                 }
3788         }
3789         dout("%s: returning %d\n", __func__, ret);
3790
3791         return ret;
3792 }
3793
3794 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3795 {
3796         struct device *dev;
3797         int ret;
3798
3799         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3800
3801         dev = &rbd_dev->dev;
3802         dev->bus = &rbd_bus_type;
3803         dev->type = &rbd_device_type;
3804         dev->parent = &rbd_root_dev;
3805         dev->release = rbd_dev_release;
3806         dev_set_name(dev, "%d", rbd_dev->dev_id);
3807         ret = device_register(dev);
3808
3809         mutex_unlock(&ctl_mutex);
3810
3811         return ret;
3812 }
3813
3814 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3815 {
3816         device_unregister(&rbd_dev->dev);
3817 }
3818
3819 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3820
3821 /*
3822  * Get a unique rbd identifier for the given new rbd_dev, and add
3823  * the rbd_dev to the global list.  The minimum rbd id is 1.
3824  */
3825 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3826 {
3827         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3828
3829         spin_lock(&rbd_dev_list_lock);
3830         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3831         spin_unlock(&rbd_dev_list_lock);
3832         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3833                 (unsigned long long) rbd_dev->dev_id);
3834 }
3835
3836 /*
3837  * Remove an rbd_dev from the global list, and record that its
3838  * identifier is no longer in use.
3839  */
3840 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3841 {
3842         struct list_head *tmp;
3843         int rbd_id = rbd_dev->dev_id;
3844         int max_id;
3845
3846         rbd_assert(rbd_id > 0);
3847
3848         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3849                 (unsigned long long) rbd_dev->dev_id);
3850         spin_lock(&rbd_dev_list_lock);
3851         list_del_init(&rbd_dev->node);
3852
3853         /*
3854          * If the id being "put" is not the current maximum, there
3855          * is nothing special we need to do.
3856          */
3857         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3858                 spin_unlock(&rbd_dev_list_lock);
3859                 return;
3860         }
3861
3862         /*
3863          * We need to update the current maximum id.  Search the
3864          * list to find out what it is.  We're more likely to find
3865          * the maximum at the end, so search the list backward.
3866          */
3867         max_id = 0;
3868         list_for_each_prev(tmp, &rbd_dev_list) {
3869                 struct rbd_device *rbd_dev;
3870
3871                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3872                 if (rbd_dev->dev_id > max_id)
3873                         max_id = rbd_dev->dev_id;
3874         }
3875         spin_unlock(&rbd_dev_list_lock);
3876
3877         /*
3878          * The max id could have been updated by rbd_dev_id_get(), in
3879          * which case it now accurately reflects the new maximum.
3880          * Be careful not to overwrite the maximum value in that
3881          * case.
3882          */
3883         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3884         dout("  max dev id has been reset\n");
3885 }
3886
3887 /*
3888  * Skips over white space at *buf, and updates *buf to point to the
3889  * first found non-space character (if any). Returns the length of
3890  * the token (string of non-white space characters) found.  Note
3891  * that *buf must be terminated with '\0'.
3892  */
3893 static inline size_t next_token(const char **buf)
3894 {
3895         /*
3896         * These are the characters that produce nonzero for
3897         * isspace() in the "C" and "POSIX" locales.
3898         */
3899         const char *spaces = " \f\n\r\t\v";
3900
3901         *buf += strspn(*buf, spaces);   /* Find start of token */
3902
3903         return strcspn(*buf, spaces);   /* Return token length */
3904 }
3905
3906 /*
3907  * Finds the next token in *buf, and if the provided token buffer is
3908  * big enough, copies the found token into it.  The result, if
3909  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3910  * must be terminated with '\0' on entry.
3911  *
3912  * Returns the length of the token found (not including the '\0').
3913  * Return value will be 0 if no token is found, and it will be >=
3914  * token_size if the token would not fit.
3915  *
3916  * The *buf pointer will be updated to point beyond the end of the
3917  * found token.  Note that this occurs even if the token buffer is
3918  * too small to hold it.
3919  */
3920 static inline size_t copy_token(const char **buf,
3921                                 char *token,
3922                                 size_t token_size)
3923 {
3924         size_t len;
3925
3926         len = next_token(buf);
3927         if (len < token_size) {
3928                 memcpy(token, *buf, len);
3929                 *(token + len) = '\0';
3930         }
3931         *buf += len;
3932
3933         return len;
3934 }
3935
3936 /*
3937  * Finds the next token in *buf, dynamically allocates a buffer big
3938  * enough to hold a copy of it, and copies the token into the new
3939  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3940  * that a duplicate buffer is created even for a zero-length token.
3941  *
3942  * Returns a pointer to the newly-allocated duplicate, or a null
3943  * pointer if memory for the duplicate was not available.  If
3944  * the lenp argument is a non-null pointer, the length of the token
3945  * (not including the '\0') is returned in *lenp.
3946  *
3947  * If successful, the *buf pointer will be updated to point beyond
3948  * the end of the found token.
3949  *
3950  * Note: uses GFP_KERNEL for allocation.
3951  */
3952 static inline char *dup_token(const char **buf, size_t *lenp)
3953 {
3954         char *dup;
3955         size_t len;
3956
3957         len = next_token(buf);
3958         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3959         if (!dup)
3960                 return NULL;
3961         *(dup + len) = '\0';
3962         *buf += len;
3963
3964         if (lenp)
3965                 *lenp = len;
3966
3967         return dup;
3968 }
3969
3970 /*
3971  * Parse the options provided for an "rbd add" (i.e., rbd image
3972  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3973  * and the data written is passed here via a NUL-terminated buffer.
3974  * Returns 0 if successful or an error code otherwise.
3975  *
3976  * The information extracted from these options is recorded in
3977  * the other parameters which return dynamically-allocated
3978  * structures:
3979  *  ceph_opts
3980  *      The address of a pointer that will refer to a ceph options
3981  *      structure.  Caller must release the returned pointer using
3982  *      ceph_destroy_options() when it is no longer needed.
3983  *  rbd_opts
3984  *      Address of an rbd options pointer.  Fully initialized by
3985  *      this function; caller must release with kfree().
3986  *  spec
3987  *      Address of an rbd image specification pointer.  Fully
3988  *      initialized by this function based on parsed options.
3989  *      Caller must release with rbd_spec_put().
3990  *
3991  * The options passed take this form:
3992  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3993  * where:
3994  *  <mon_addrs>
3995  *      A comma-separated list of one or more monitor addresses.
3996  *      A monitor address is an ip address, optionally followed
3997  *      by a port number (separated by a colon).
3998  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3999  *  <options>
4000  *      A comma-separated list of ceph and/or rbd options.
4001  *  <pool_name>
4002  *      The name of the rados pool containing the rbd image.
4003  *  <image_name>
4004  *      The name of the image in that pool to map.
4005  *  <snap_id>
4006  *      An optional snapshot id.  If provided, the mapping will
4007  *      present data from the image at the time that snapshot was
4008  *      created.  The image head is used if no snapshot id is
4009  *      provided.  Snapshot mappings are always read-only.
4010  */
4011 static int rbd_add_parse_args(const char *buf,
4012                                 struct ceph_options **ceph_opts,
4013                                 struct rbd_options **opts,
4014                                 struct rbd_spec **rbd_spec)
4015 {
4016         size_t len;
4017         char *options;
4018         const char *mon_addrs;
4019         size_t mon_addrs_size;
4020         struct rbd_spec *spec = NULL;
4021         struct rbd_options *rbd_opts = NULL;
4022         struct ceph_options *copts;
4023         int ret;
4024
4025         /* The first four tokens are required */
4026
4027         len = next_token(&buf);
4028         if (!len) {
4029                 rbd_warn(NULL, "no monitor address(es) provided");
4030                 return -EINVAL;
4031         }
4032         mon_addrs = buf;
4033         mon_addrs_size = len + 1;
4034         buf += len;
4035
4036         ret = -EINVAL;
4037         options = dup_token(&buf, NULL);
4038         if (!options)
4039                 return -ENOMEM;
4040         if (!*options) {
4041                 rbd_warn(NULL, "no options provided");
4042                 goto out_err;
4043         }
4044
4045         spec = rbd_spec_alloc();
4046         if (!spec)
4047                 goto out_mem;
4048
4049         spec->pool_name = dup_token(&buf, NULL);
4050         if (!spec->pool_name)
4051                 goto out_mem;
4052         if (!*spec->pool_name) {
4053                 rbd_warn(NULL, "no pool name provided");
4054                 goto out_err;
4055         }
4056
4057         spec->image_name = dup_token(&buf, NULL);
4058         if (!spec->image_name)
4059                 goto out_mem;
4060         if (!*spec->image_name) {
4061                 rbd_warn(NULL, "no image name provided");
4062                 goto out_err;
4063         }
4064
4065         /*
4066          * Snapshot name is optional; default is to use "-"
4067          * (indicating the head/no snapshot).
4068          */
4069         len = next_token(&buf);
4070         if (!len) {
4071                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4072                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4073         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4074                 ret = -ENAMETOOLONG;
4075                 goto out_err;
4076         }
4077         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4078         if (!spec->snap_name)
4079                 goto out_mem;
4080         *(spec->snap_name + len) = '\0';
4081
4082         /* Initialize all rbd options to the defaults */
4083
4084         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4085         if (!rbd_opts)
4086                 goto out_mem;
4087
4088         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4089
4090         copts = ceph_parse_options(options, mon_addrs,
4091                                         mon_addrs + mon_addrs_size - 1,
4092                                         parse_rbd_opts_token, rbd_opts);
4093         if (IS_ERR(copts)) {
4094                 ret = PTR_ERR(copts);
4095                 goto out_err;
4096         }
4097         kfree(options);
4098
4099         *ceph_opts = copts;
4100         *opts = rbd_opts;
4101         *rbd_spec = spec;
4102
4103         return 0;
4104 out_mem:
4105         ret = -ENOMEM;
4106 out_err:
4107         kfree(rbd_opts);
4108         rbd_spec_put(spec);
4109         kfree(options);
4110
4111         return ret;
4112 }
4113
4114 /*
4115  * An rbd format 2 image has a unique identifier, distinct from the
4116  * name given to it by the user.  Internally, that identifier is
4117  * what's used to specify the names of objects related to the image.
4118  *
4119  * A special "rbd id" object is used to map an rbd image name to its
4120  * id.  If that object doesn't exist, then there is no v2 rbd image
4121  * with the supplied name.
4122  *
4123  * This function will record the given rbd_dev's image_id field if
4124  * it can be determined, and in that case will return 0.  If any
4125  * errors occur a negative errno will be returned and the rbd_dev's
4126  * image_id field will be unchanged (and should be NULL).
4127  */
4128 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4129 {
4130         int ret;
4131         size_t size;
4132         char *object_name;
4133         void *response;
4134         void *p;
4135
4136         /* If we already have it we don't need to look it up */
4137
4138         if (rbd_dev->spec->image_id)
4139                 return 0;
4140
4141         /*
4142          * When probing a parent image, the image id is already
4143          * known (and the image name likely is not).  There's no
4144          * need to fetch the image id again in this case.
4145          */
4146         if (rbd_dev->spec->image_id)
4147                 return 0;
4148
4149         /*
4150          * First, see if the format 2 image id file exists, and if
4151          * so, get the image's persistent id from it.
4152          */
4153         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4154         object_name = kmalloc(size, GFP_NOIO);
4155         if (!object_name)
4156                 return -ENOMEM;
4157         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4158         dout("rbd id object name is %s\n", object_name);
4159
4160         /* Response will be an encoded string, which includes a length */
4161
4162         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4163         response = kzalloc(size, GFP_NOIO);
4164         if (!response) {
4165                 ret = -ENOMEM;
4166                 goto out;
4167         }
4168
4169         ret = rbd_obj_method_sync(rbd_dev, object_name,
4170                                 "rbd", "get_id",
4171                                 NULL, 0,
4172                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4173         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4174         if (ret < 0)
4175                 goto out;
4176
4177         p = response;
4178         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4179                                                 p + RBD_IMAGE_ID_LEN_MAX,
4180                                                 NULL, GFP_NOIO);
4181         if (IS_ERR(rbd_dev->spec->image_id)) {
4182                 ret = PTR_ERR(rbd_dev->spec->image_id);
4183                 rbd_dev->spec->image_id = NULL;
4184         } else {
4185                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4186         }
4187 out:
4188         kfree(response);
4189         kfree(object_name);
4190
4191         return ret;
4192 }
4193
4194 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4195 {
4196         int ret;
4197         size_t size;
4198
4199         /* Version 1 images have no id; empty string is used */
4200
4201         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4202         if (!rbd_dev->spec->image_id)
4203                 return -ENOMEM;
4204
4205         /* Record the header object name for this rbd image. */
4206
4207         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4208         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4209         if (!rbd_dev->header_name) {
4210                 ret = -ENOMEM;
4211                 goto out_err;
4212         }
4213         sprintf(rbd_dev->header_name, "%s%s",
4214                 rbd_dev->spec->image_name, RBD_SUFFIX);
4215
4216         /* Populate rbd image metadata */
4217
4218         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4219         if (ret < 0)
4220                 goto out_err;
4221
4222         /* Version 1 images have no parent (no layering) */
4223
4224         rbd_dev->parent_spec = NULL;
4225         rbd_dev->parent_overlap = 0;
4226
4227         rbd_dev->image_format = 1;
4228
4229         dout("discovered version 1 image, header name is %s\n",
4230                 rbd_dev->header_name);
4231
4232         return 0;
4233
4234 out_err:
4235         kfree(rbd_dev->header_name);
4236         rbd_dev->header_name = NULL;
4237         kfree(rbd_dev->spec->image_id);
4238         rbd_dev->spec->image_id = NULL;
4239
4240         return ret;
4241 }
4242
4243 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4244 {
4245         size_t size;
4246         int ret;
4247         u64 ver = 0;
4248
4249         /*
4250          * Image id was filled in by the caller.  Record the header
4251          * object name for this rbd image.
4252          */
4253         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4254         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4255         if (!rbd_dev->header_name)
4256                 return -ENOMEM;
4257         sprintf(rbd_dev->header_name, "%s%s",
4258                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4259
4260         /* Get the size and object order for the image */
4261
4262         ret = rbd_dev_v2_image_size(rbd_dev);
4263         if (ret < 0)
4264                 goto out_err;
4265
4266         /* Get the object prefix (a.k.a. block_name) for the image */
4267
4268         ret = rbd_dev_v2_object_prefix(rbd_dev);
4269         if (ret < 0)
4270                 goto out_err;
4271
4272         /* Get the and check features for the image */
4273
4274         ret = rbd_dev_v2_features(rbd_dev);
4275         if (ret < 0)
4276                 goto out_err;
4277
4278         /* If the image supports layering, get the parent info */
4279
4280         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4281                 ret = rbd_dev_v2_parent_info(rbd_dev);
4282                 if (ret < 0)
4283                         goto out_err;
4284         }
4285
4286         /* crypto and compression type aren't (yet) supported for v2 images */
4287
4288         rbd_dev->header.crypt_type = 0;
4289         rbd_dev->header.comp_type = 0;
4290
4291         /* Get the snapshot context, plus the header version */
4292
4293         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4294         if (ret)
4295                 goto out_err;
4296         rbd_dev->header.obj_version = ver;
4297
4298         rbd_dev->image_format = 2;
4299
4300         dout("discovered version 2 image, header name is %s\n",
4301                 rbd_dev->header_name);
4302
4303         return 0;
4304 out_err:
4305         rbd_dev->parent_overlap = 0;
4306         rbd_spec_put(rbd_dev->parent_spec);
4307         rbd_dev->parent_spec = NULL;
4308         kfree(rbd_dev->header_name);
4309         rbd_dev->header_name = NULL;
4310         kfree(rbd_dev->header.object_prefix);
4311         rbd_dev->header.object_prefix = NULL;
4312
4313         return ret;
4314 }
4315
4316 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4317 {
4318         struct rbd_device *parent = NULL;
4319         struct rbd_spec *parent_spec = NULL;
4320         struct rbd_client *rbdc = NULL;
4321         int ret;
4322
4323         /* no need to lock here, as rbd_dev is not registered yet */
4324         ret = rbd_dev_snaps_update(rbd_dev);
4325         if (ret)
4326                 return ret;
4327
4328         ret = rbd_dev_probe_update_spec(rbd_dev);
4329         if (ret)
4330                 goto err_out_snaps;
4331
4332         ret = rbd_dev_set_mapping(rbd_dev);
4333         if (ret)
4334                 goto err_out_snaps;
4335
4336         /* generate unique id: find highest unique id, add one */
4337         rbd_dev_id_get(rbd_dev);
4338
4339         /* Fill in the device name, now that we have its id. */
4340         BUILD_BUG_ON(DEV_NAME_LEN
4341                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4342         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4343
4344         /* Get our block major device number. */
4345
4346         ret = register_blkdev(0, rbd_dev->name);
4347         if (ret < 0)
4348                 goto err_out_id;
4349         rbd_dev->major = ret;
4350
4351         /* Set up the blkdev mapping. */
4352
4353         ret = rbd_init_disk(rbd_dev);
4354         if (ret)
4355                 goto err_out_blkdev;
4356
4357         ret = rbd_bus_add_dev(rbd_dev);
4358         if (ret)
4359                 goto err_out_disk;
4360
4361         /*
4362          * At this point cleanup in the event of an error is the job
4363          * of the sysfs code (initiated by rbd_bus_del_dev()).
4364          */
4365         /* Probe the parent if there is one */
4366
4367         if (rbd_dev->parent_spec) {
4368                 /*
4369                  * We need to pass a reference to the client and the
4370                  * parent spec when creating the parent rbd_dev.
4371                  * Images related by parent/child relationships
4372                  * always share both.
4373                  */
4374                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4375                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4376
4377                 parent = rbd_dev_create(rbdc, parent_spec);
4378                 if (!parent) {
4379                         ret = -ENOMEM;
4380                         goto err_out_spec;
4381                 }
4382                 rbdc = NULL;            /* parent now owns reference */
4383                 parent_spec = NULL;     /* parent now owns reference */
4384                 ret = rbd_dev_probe(parent);
4385                 if (ret < 0)
4386                         goto err_out_parent;
4387                 rbd_dev->parent = parent;
4388         }
4389
4390         down_write(&rbd_dev->header_rwsem);
4391         ret = rbd_dev_snaps_register(rbd_dev);
4392         up_write(&rbd_dev->header_rwsem);
4393         if (ret)
4394                 goto err_out_bus;
4395
4396         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4397         if (ret)
4398                 goto err_out_bus;
4399
4400         /* Everything's ready.  Announce the disk to the world. */
4401
4402         add_disk(rbd_dev->disk);
4403
4404         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4405                 (unsigned long long) rbd_dev->mapping.size);
4406
4407         return ret;
4408
4409 err_out_parent:
4410         rbd_dev_destroy(parent);
4411 err_out_spec:
4412         rbd_spec_put(parent_spec);
4413         rbd_put_client(rbdc);
4414 err_out_bus:
4415         /* this will also clean up rest of rbd_dev stuff */
4416
4417         rbd_bus_del_dev(rbd_dev);
4418
4419         return ret;
4420 err_out_disk:
4421         rbd_free_disk(rbd_dev);
4422 err_out_blkdev:
4423         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4424 err_out_id:
4425         rbd_dev_id_put(rbd_dev);
4426 err_out_snaps:
4427         rbd_remove_all_snaps(rbd_dev);
4428
4429         return ret;
4430 }
4431
4432 /*
4433  * Probe for the existence of the header object for the given rbd
4434  * device.  For format 2 images this includes determining the image
4435  * id.
4436  */
4437 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4438 {
4439         int ret;
4440
4441         /*
4442          * Get the id from the image id object.  If it's not a
4443          * format 2 image, we'll get ENOENT back, and we'll assume
4444          * it's a format 1 image.
4445          */
4446         ret = rbd_dev_image_id(rbd_dev);
4447         if (ret)
4448                 ret = rbd_dev_v1_probe(rbd_dev);
4449         else
4450                 ret = rbd_dev_v2_probe(rbd_dev);
4451         if (ret) {
4452                 dout("probe failed, returning %d\n", ret);
4453
4454                 return ret;
4455         }
4456
4457         ret = rbd_dev_probe_finish(rbd_dev);
4458         if (ret)
4459                 rbd_header_free(&rbd_dev->header);
4460
4461         return ret;
4462 }
4463
4464 static ssize_t rbd_add(struct bus_type *bus,
4465                        const char *buf,
4466                        size_t count)
4467 {
4468         struct rbd_device *rbd_dev = NULL;
4469         struct ceph_options *ceph_opts = NULL;
4470         struct rbd_options *rbd_opts = NULL;
4471         struct rbd_spec *spec = NULL;
4472         struct rbd_client *rbdc;
4473         struct ceph_osd_client *osdc;
4474         int rc = -ENOMEM;
4475
4476         if (!try_module_get(THIS_MODULE))
4477                 return -ENODEV;
4478
4479         /* parse add command */
4480         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4481         if (rc < 0)
4482                 goto err_out_module;
4483
4484         rbdc = rbd_get_client(ceph_opts);
4485         if (IS_ERR(rbdc)) {
4486                 rc = PTR_ERR(rbdc);
4487                 goto err_out_args;
4488         }
4489         ceph_opts = NULL;       /* rbd_dev client now owns this */
4490
4491         /* pick the pool */
4492         osdc = &rbdc->client->osdc;
4493         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4494         if (rc < 0)
4495                 goto err_out_client;
4496         spec->pool_id = (u64) rc;
4497
4498         /* The ceph file layout needs to fit pool id in 32 bits */
4499
4500         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4501                 rc = -EIO;
4502                 goto err_out_client;
4503         }
4504
4505         rbd_dev = rbd_dev_create(rbdc, spec);
4506         if (!rbd_dev)
4507                 goto err_out_client;
4508         rbdc = NULL;            /* rbd_dev now owns this */
4509         spec = NULL;            /* rbd_dev now owns this */
4510
4511         rbd_dev->mapping.read_only = rbd_opts->read_only;
4512         kfree(rbd_opts);
4513         rbd_opts = NULL;        /* done with this */
4514
4515         rc = rbd_dev_probe(rbd_dev);
4516         if (rc < 0)
4517                 goto err_out_rbd_dev;
4518
4519         return count;
4520 err_out_rbd_dev:
4521         rbd_dev_destroy(rbd_dev);
4522 err_out_client:
4523         rbd_put_client(rbdc);
4524 err_out_args:
4525         if (ceph_opts)
4526                 ceph_destroy_options(ceph_opts);
4527         kfree(rbd_opts);
4528         rbd_spec_put(spec);
4529 err_out_module:
4530         module_put(THIS_MODULE);
4531
4532         dout("Error adding device %s\n", buf);
4533
4534         return (ssize_t) rc;
4535 }
4536
4537 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4538 {
4539         struct list_head *tmp;
4540         struct rbd_device *rbd_dev;
4541
4542         spin_lock(&rbd_dev_list_lock);
4543         list_for_each(tmp, &rbd_dev_list) {
4544                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4545                 if (rbd_dev->dev_id == dev_id) {
4546                         spin_unlock(&rbd_dev_list_lock);
4547                         return rbd_dev;
4548                 }
4549         }
4550         spin_unlock(&rbd_dev_list_lock);
4551         return NULL;
4552 }
4553
4554 static void rbd_dev_release(struct device *dev)
4555 {
4556         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4557
4558         if (rbd_dev->watch_event)
4559                 rbd_dev_header_watch_sync(rbd_dev, 0);
4560
4561         /* clean up and free blkdev */
4562         rbd_free_disk(rbd_dev);
4563         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4564
4565         /* release allocated disk header fields */
4566         rbd_header_free(&rbd_dev->header);
4567
4568         /* done with the id, and with the rbd_dev */
4569         rbd_dev_id_put(rbd_dev);
4570         rbd_assert(rbd_dev->rbd_client != NULL);
4571         rbd_dev_destroy(rbd_dev);
4572
4573         /* release module ref */
4574         module_put(THIS_MODULE);
4575 }
4576
4577 static void __rbd_remove(struct rbd_device *rbd_dev)
4578 {
4579         rbd_remove_all_snaps(rbd_dev);
4580         rbd_bus_del_dev(rbd_dev);
4581 }
4582
4583 static ssize_t rbd_remove(struct bus_type *bus,
4584                           const char *buf,
4585                           size_t count)
4586 {
4587         struct rbd_device *rbd_dev = NULL;
4588         int target_id, rc;
4589         unsigned long ul;
4590         int ret = count;
4591
4592         rc = strict_strtoul(buf, 10, &ul);
4593         if (rc)
4594                 return rc;
4595
4596         /* convert to int; abort if we lost anything in the conversion */
4597         target_id = (int) ul;
4598         if (target_id != ul)
4599                 return -EINVAL;
4600
4601         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4602
4603         rbd_dev = __rbd_get_dev(target_id);
4604         if (!rbd_dev) {
4605                 ret = -ENOENT;
4606                 goto done;
4607         }
4608
4609         spin_lock_irq(&rbd_dev->lock);
4610         if (rbd_dev->open_count)
4611                 ret = -EBUSY;
4612         else
4613                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4614         spin_unlock_irq(&rbd_dev->lock);
4615         if (ret < 0)
4616                 goto done;
4617
4618         while (rbd_dev->parent_spec) {
4619                 struct rbd_device *first = rbd_dev;
4620                 struct rbd_device *second = first->parent;
4621                 struct rbd_device *third;
4622
4623                 /*
4624                  * Follow to the parent with no grandparent and
4625                  * remove it.
4626                  */
4627                 while (second && (third = second->parent)) {
4628                         first = second;
4629                         second = third;
4630                 }
4631                 __rbd_remove(second);
4632                 rbd_spec_put(first->parent_spec);
4633                 first->parent_spec = NULL;
4634                 first->parent_overlap = 0;
4635                 first->parent = NULL;
4636         }
4637         __rbd_remove(rbd_dev);
4638
4639 done:
4640         mutex_unlock(&ctl_mutex);
4641
4642         return ret;
4643 }
4644
4645 /*
4646  * create control files in sysfs
4647  * /sys/bus/rbd/...
4648  */
4649 static int rbd_sysfs_init(void)
4650 {
4651         int ret;
4652
4653         ret = device_register(&rbd_root_dev);
4654         if (ret < 0)
4655                 return ret;
4656
4657         ret = bus_register(&rbd_bus_type);
4658         if (ret < 0)
4659                 device_unregister(&rbd_root_dev);
4660
4661         return ret;
4662 }
4663
4664 static void rbd_sysfs_cleanup(void)
4665 {
4666         bus_unregister(&rbd_bus_type);
4667         device_unregister(&rbd_root_dev);
4668 }
4669
4670 static int __init rbd_init(void)
4671 {
4672         int rc;
4673
4674         if (!libceph_compatible(NULL)) {
4675                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4676
4677                 return -EINVAL;
4678         }
4679         rc = rbd_sysfs_init();
4680         if (rc)
4681                 return rc;
4682         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4683         return 0;
4684 }
4685
4686 static void __exit rbd_exit(void)
4687 {
4688         rbd_sysfs_cleanup();
4689 }
4690
4691 module_init(rbd_init);
4692 module_exit(rbd_exit);
4693
4694 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4695 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4696 MODULE_DESCRIPTION("rados block device");
4697
4698 /* following authorship retained from original osdblk.c */
4699 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4700
4701 MODULE_LICENSE("GPL");