]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: don't create sysfs entries for non-mapped snapshots
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221         struct page             **copyup_pages;
222
223         struct ceph_osd_request *osd_req;
224
225         u64                     xferred;        /* bytes transferred */
226         u64                     version;
227         int                     result;
228
229         rbd_obj_callback_t      callback;
230         struct completion       completion;
231
232         struct kref             kref;
233 };
234
235 enum img_req_flags {
236         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
237         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
238         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
239 };
240
241 struct rbd_img_request {
242         struct rbd_device       *rbd_dev;
243         u64                     offset; /* starting image byte offset */
244         u64                     length; /* byte count from offset */
245         unsigned long           flags;
246         union {
247                 u64                     snap_id;        /* for reads */
248                 struct ceph_snap_context *snapc;        /* for writes */
249         };
250         union {
251                 struct request          *rq;            /* block request */
252                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
253         };
254         struct page             **copyup_pages;
255         spinlock_t              completion_lock;/* protects next_completion */
256         u32                     next_completion;
257         rbd_img_callback_t      callback;
258         u64                     xferred;/* aggregate bytes transferred */
259         int                     result; /* first nonzero obj_request result */
260
261         u32                     obj_request_count;
262         struct list_head        obj_requests;   /* rbd_obj_request structs */
263
264         struct kref             kref;
265 };
266
267 #define for_each_obj_request(ireq, oreq) \
268         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_from(ireq, oreq) \
270         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_safe(ireq, oreq, n) \
272         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
273
274 struct rbd_snap {
275         const char              *name;
276         u64                     size;
277         struct list_head        node;
278         u64                     id;
279         u64                     features;
280 };
281
282 struct rbd_mapping {
283         u64                     size;
284         u64                     features;
285         bool                    read_only;
286 };
287
288 /*
289  * a single device
290  */
291 struct rbd_device {
292         int                     dev_id;         /* blkdev unique id */
293
294         int                     major;          /* blkdev assigned major */
295         struct gendisk          *disk;          /* blkdev's gendisk and rq */
296
297         u32                     image_format;   /* Either 1 or 2 */
298         struct rbd_client       *rbd_client;
299
300         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
301
302         spinlock_t              lock;           /* queue, flags, open_count */
303
304         struct rbd_image_header header;
305         unsigned long           flags;          /* possibly lock protected */
306         struct rbd_spec         *spec;
307
308         char                    *header_name;
309
310         struct ceph_file_layout layout;
311
312         struct ceph_osd_event   *watch_event;
313         struct rbd_obj_request  *watch_request;
314
315         struct rbd_spec         *parent_spec;
316         u64                     parent_overlap;
317         struct rbd_device       *parent;
318
319         u64                     stripe_unit;
320         u64                     stripe_count;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_remove_snap_dev(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680         if (rbdc)
681                 kref_put(&rbdc->kref, rbd_client_release);
682 }
683
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686         return image_format == 1 || image_format == 2;
687 }
688
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691         size_t size;
692         u32 snap_count;
693
694         /* The header has to start with the magic rbd header text */
695         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696                 return false;
697
698         /* The bio layer requires at least sector-sized I/O */
699
700         if (ondisk->options.order < SECTOR_SHIFT)
701                 return false;
702
703         /* If we use u64 in a few spots we may be able to loosen this */
704
705         if (ondisk->options.order > 8 * sizeof (int) - 1)
706                 return false;
707
708         /*
709          * The size of a snapshot header has to fit in a size_t, and
710          * that limits the number of snapshots.
711          */
712         snap_count = le32_to_cpu(ondisk->snap_count);
713         size = SIZE_MAX - sizeof (struct ceph_snap_context);
714         if (snap_count > size / sizeof (__le64))
715                 return false;
716
717         /*
718          * Not only that, but the size of the entire the snapshot
719          * header must also be representable in a size_t.
720          */
721         size -= snap_count * sizeof (__le64);
722         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723                 return false;
724
725         return true;
726 }
727
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733                                  struct rbd_image_header_ondisk *ondisk)
734 {
735         u32 snap_count;
736         size_t len;
737         size_t size;
738         u32 i;
739
740         memset(header, 0, sizeof (*header));
741
742         snap_count = le32_to_cpu(ondisk->snap_count);
743
744         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746         if (!header->object_prefix)
747                 return -ENOMEM;
748         memcpy(header->object_prefix, ondisk->object_prefix, len);
749         header->object_prefix[len] = '\0';
750
751         if (snap_count) {
752                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754                 /* Save a copy of the snapshot names */
755
756                 if (snap_names_len > (u64) SIZE_MAX)
757                         return -EIO;
758                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759                 if (!header->snap_names)
760                         goto out_err;
761                 /*
762                  * Note that rbd_dev_v1_header_read() guarantees
763                  * the ondisk buffer we're working with has
764                  * snap_names_len bytes beyond the end of the
765                  * snapshot id array, this memcpy() is safe.
766                  */
767                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768                         snap_names_len);
769
770                 /* Record each snapshot's size */
771
772                 size = snap_count * sizeof (*header->snap_sizes);
773                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774                 if (!header->snap_sizes)
775                         goto out_err;
776                 for (i = 0; i < snap_count; i++)
777                         header->snap_sizes[i] =
778                                 le64_to_cpu(ondisk->snaps[i].image_size);
779         } else {
780                 WARN_ON(ondisk->snap_names_len);
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793         size = sizeof (struct ceph_snap_context);
794         size += snap_count * sizeof (header->snapc->snaps[0]);
795         header->snapc = kzalloc(size, GFP_KERNEL);
796         if (!header->snapc)
797                 goto out_err;
798
799         atomic_set(&header->snapc->nref, 1);
800         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
801         header->snapc->num_snaps = snap_count;
802         for (i = 0; i < snap_count; i++)
803                 header->snapc->snaps[i] =
804                         le64_to_cpu(ondisk->snaps[i].id);
805
806         return 0;
807
808 out_err:
809         kfree(header->snap_sizes);
810         header->snap_sizes = NULL;
811         kfree(header->snap_names);
812         header->snap_names = NULL;
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815
816         return -ENOMEM;
817 }
818
819 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
820 {
821         struct rbd_snap *snap;
822
823         if (snap_id == CEPH_NOSNAP)
824                 return RBD_SNAP_HEAD_NAME;
825
826         list_for_each_entry(snap, &rbd_dev->snaps, node)
827                 if (snap_id == snap->id)
828                         return snap->name;
829
830         return NULL;
831 }
832
833 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
834 {
835
836         struct rbd_snap *snap;
837
838         list_for_each_entry(snap, &rbd_dev->snaps, node) {
839                 if (!strcmp(snap_name, snap->name)) {
840                         rbd_dev->spec->snap_id = snap->id;
841                         rbd_dev->mapping.size = snap->size;
842                         rbd_dev->mapping.features = snap->features;
843
844                         return 0;
845                 }
846         }
847
848         return -ENOENT;
849 }
850
851 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
852 {
853         int ret;
854
855         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
856                     sizeof (RBD_SNAP_HEAD_NAME))) {
857                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
858                 rbd_dev->mapping.size = rbd_dev->header.image_size;
859                 rbd_dev->mapping.features = rbd_dev->header.features;
860                 ret = 0;
861         } else {
862                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
863                 if (ret < 0)
864                         goto done;
865                 rbd_dev->mapping.read_only = true;
866         }
867         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
868
869 done:
870         return ret;
871 }
872
873 static void rbd_header_free(struct rbd_image_header *header)
874 {
875         kfree(header->object_prefix);
876         header->object_prefix = NULL;
877         kfree(header->snap_sizes);
878         header->snap_sizes = NULL;
879         kfree(header->snap_names);
880         header->snap_names = NULL;
881         ceph_put_snap_context(header->snapc);
882         header->snapc = NULL;
883 }
884
885 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
886 {
887         char *name;
888         u64 segment;
889         int ret;
890
891         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
892         if (!name)
893                 return NULL;
894         segment = offset >> rbd_dev->header.obj_order;
895         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
896                         rbd_dev->header.object_prefix, segment);
897         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
898                 pr_err("error formatting segment name for #%llu (%d)\n",
899                         segment, ret);
900                 kfree(name);
901                 name = NULL;
902         }
903
904         return name;
905 }
906
907 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
908 {
909         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
910
911         return offset & (segment_size - 1);
912 }
913
914 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
915                                 u64 offset, u64 length)
916 {
917         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
918
919         offset &= segment_size - 1;
920
921         rbd_assert(length <= U64_MAX - offset);
922         if (offset + length > segment_size)
923                 length = segment_size - offset;
924
925         return length;
926 }
927
928 /*
929  * returns the size of an object in the image
930  */
931 static u64 rbd_obj_bytes(struct rbd_image_header *header)
932 {
933         return 1 << header->obj_order;
934 }
935
936 /*
937  * bio helpers
938  */
939
940 static void bio_chain_put(struct bio *chain)
941 {
942         struct bio *tmp;
943
944         while (chain) {
945                 tmp = chain;
946                 chain = chain->bi_next;
947                 bio_put(tmp);
948         }
949 }
950
951 /*
952  * zeros a bio chain, starting at specific offset
953  */
954 static void zero_bio_chain(struct bio *chain, int start_ofs)
955 {
956         struct bio_vec *bv;
957         unsigned long flags;
958         void *buf;
959         int i;
960         int pos = 0;
961
962         while (chain) {
963                 bio_for_each_segment(bv, chain, i) {
964                         if (pos + bv->bv_len > start_ofs) {
965                                 int remainder = max(start_ofs - pos, 0);
966                                 buf = bvec_kmap_irq(bv, &flags);
967                                 memset(buf + remainder, 0,
968                                        bv->bv_len - remainder);
969                                 bvec_kunmap_irq(buf, &flags);
970                         }
971                         pos += bv->bv_len;
972                 }
973
974                 chain = chain->bi_next;
975         }
976 }
977
978 /*
979  * similar to zero_bio_chain(), zeros data defined by a page array,
980  * starting at the given byte offset from the start of the array and
981  * continuing up to the given end offset.  The pages array is
982  * assumed to be big enough to hold all bytes up to the end.
983  */
984 static void zero_pages(struct page **pages, u64 offset, u64 end)
985 {
986         struct page **page = &pages[offset >> PAGE_SHIFT];
987
988         rbd_assert(end > offset);
989         rbd_assert(end - offset <= (u64)SIZE_MAX);
990         while (offset < end) {
991                 size_t page_offset;
992                 size_t length;
993                 unsigned long flags;
994                 void *kaddr;
995
996                 page_offset = (size_t)(offset & ~PAGE_MASK);
997                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
998                 local_irq_save(flags);
999                 kaddr = kmap_atomic(*page);
1000                 memset(kaddr + page_offset, 0, length);
1001                 kunmap_atomic(kaddr);
1002                 local_irq_restore(flags);
1003
1004                 offset += length;
1005                 page++;
1006         }
1007 }
1008
1009 /*
1010  * Clone a portion of a bio, starting at the given byte offset
1011  * and continuing for the number of bytes indicated.
1012  */
1013 static struct bio *bio_clone_range(struct bio *bio_src,
1014                                         unsigned int offset,
1015                                         unsigned int len,
1016                                         gfp_t gfpmask)
1017 {
1018         struct bio_vec *bv;
1019         unsigned int resid;
1020         unsigned short idx;
1021         unsigned int voff;
1022         unsigned short end_idx;
1023         unsigned short vcnt;
1024         struct bio *bio;
1025
1026         /* Handle the easy case for the caller */
1027
1028         if (!offset && len == bio_src->bi_size)
1029                 return bio_clone(bio_src, gfpmask);
1030
1031         if (WARN_ON_ONCE(!len))
1032                 return NULL;
1033         if (WARN_ON_ONCE(len > bio_src->bi_size))
1034                 return NULL;
1035         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1036                 return NULL;
1037
1038         /* Find first affected segment... */
1039
1040         resid = offset;
1041         __bio_for_each_segment(bv, bio_src, idx, 0) {
1042                 if (resid < bv->bv_len)
1043                         break;
1044                 resid -= bv->bv_len;
1045         }
1046         voff = resid;
1047
1048         /* ...and the last affected segment */
1049
1050         resid += len;
1051         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1052                 if (resid <= bv->bv_len)
1053                         break;
1054                 resid -= bv->bv_len;
1055         }
1056         vcnt = end_idx - idx + 1;
1057
1058         /* Build the clone */
1059
1060         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1061         if (!bio)
1062                 return NULL;    /* ENOMEM */
1063
1064         bio->bi_bdev = bio_src->bi_bdev;
1065         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1066         bio->bi_rw = bio_src->bi_rw;
1067         bio->bi_flags |= 1 << BIO_CLONED;
1068
1069         /*
1070          * Copy over our part of the bio_vec, then update the first
1071          * and last (or only) entries.
1072          */
1073         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1074                         vcnt * sizeof (struct bio_vec));
1075         bio->bi_io_vec[0].bv_offset += voff;
1076         if (vcnt > 1) {
1077                 bio->bi_io_vec[0].bv_len -= voff;
1078                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1079         } else {
1080                 bio->bi_io_vec[0].bv_len = len;
1081         }
1082
1083         bio->bi_vcnt = vcnt;
1084         bio->bi_size = len;
1085         bio->bi_idx = 0;
1086
1087         return bio;
1088 }
1089
1090 /*
1091  * Clone a portion of a bio chain, starting at the given byte offset
1092  * into the first bio in the source chain and continuing for the
1093  * number of bytes indicated.  The result is another bio chain of
1094  * exactly the given length, or a null pointer on error.
1095  *
1096  * The bio_src and offset parameters are both in-out.  On entry they
1097  * refer to the first source bio and the offset into that bio where
1098  * the start of data to be cloned is located.
1099  *
1100  * On return, bio_src is updated to refer to the bio in the source
1101  * chain that contains first un-cloned byte, and *offset will
1102  * contain the offset of that byte within that bio.
1103  */
1104 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1105                                         unsigned int *offset,
1106                                         unsigned int len,
1107                                         gfp_t gfpmask)
1108 {
1109         struct bio *bi = *bio_src;
1110         unsigned int off = *offset;
1111         struct bio *chain = NULL;
1112         struct bio **end;
1113
1114         /* Build up a chain of clone bios up to the limit */
1115
1116         if (!bi || off >= bi->bi_size || !len)
1117                 return NULL;            /* Nothing to clone */
1118
1119         end = &chain;
1120         while (len) {
1121                 unsigned int bi_size;
1122                 struct bio *bio;
1123
1124                 if (!bi) {
1125                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1126                         goto out_err;   /* EINVAL; ran out of bio's */
1127                 }
1128                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1129                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1130                 if (!bio)
1131                         goto out_err;   /* ENOMEM */
1132
1133                 *end = bio;
1134                 end = &bio->bi_next;
1135
1136                 off += bi_size;
1137                 if (off == bi->bi_size) {
1138                         bi = bi->bi_next;
1139                         off = 0;
1140                 }
1141                 len -= bi_size;
1142         }
1143         *bio_src = bi;
1144         *offset = off;
1145
1146         return chain;
1147 out_err:
1148         bio_chain_put(chain);
1149
1150         return NULL;
1151 }
1152
1153 /*
1154  * The default/initial value for all object request flags is 0.  For
1155  * each flag, once its value is set to 1 it is never reset to 0
1156  * again.
1157  */
1158 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1159 {
1160         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1161                 struct rbd_device *rbd_dev;
1162
1163                 rbd_dev = obj_request->img_request->rbd_dev;
1164                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1165                         obj_request);
1166         }
1167 }
1168
1169 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1170 {
1171         smp_mb();
1172         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1173 }
1174
1175 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1176 {
1177         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1178                 struct rbd_device *rbd_dev = NULL;
1179
1180                 if (obj_request_img_data_test(obj_request))
1181                         rbd_dev = obj_request->img_request->rbd_dev;
1182                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1183                         obj_request);
1184         }
1185 }
1186
1187 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1188 {
1189         smp_mb();
1190         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1191 }
1192
1193 /*
1194  * This sets the KNOWN flag after (possibly) setting the EXISTS
1195  * flag.  The latter is set based on the "exists" value provided.
1196  *
1197  * Note that for our purposes once an object exists it never goes
1198  * away again.  It's possible that the response from two existence
1199  * checks are separated by the creation of the target object, and
1200  * the first ("doesn't exist") response arrives *after* the second
1201  * ("does exist").  In that case we ignore the second one.
1202  */
1203 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1204                                 bool exists)
1205 {
1206         if (exists)
1207                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1208         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1209         smp_mb();
1210 }
1211
1212 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1213 {
1214         smp_mb();
1215         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1216 }
1217
1218 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1219 {
1220         smp_mb();
1221         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1222 }
1223
1224 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1225 {
1226         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1227                 atomic_read(&obj_request->kref.refcount));
1228         kref_get(&obj_request->kref);
1229 }
1230
1231 static void rbd_obj_request_destroy(struct kref *kref);
1232 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1233 {
1234         rbd_assert(obj_request != NULL);
1235         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1236                 atomic_read(&obj_request->kref.refcount));
1237         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1238 }
1239
1240 static void rbd_img_request_get(struct rbd_img_request *img_request)
1241 {
1242         dout("%s: img %p (was %d)\n", __func__, img_request,
1243                 atomic_read(&img_request->kref.refcount));
1244         kref_get(&img_request->kref);
1245 }
1246
1247 static void rbd_img_request_destroy(struct kref *kref);
1248 static void rbd_img_request_put(struct rbd_img_request *img_request)
1249 {
1250         rbd_assert(img_request != NULL);
1251         dout("%s: img %p (was %d)\n", __func__, img_request,
1252                 atomic_read(&img_request->kref.refcount));
1253         kref_put(&img_request->kref, rbd_img_request_destroy);
1254 }
1255
1256 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1257                                         struct rbd_obj_request *obj_request)
1258 {
1259         rbd_assert(obj_request->img_request == NULL);
1260
1261         /* Image request now owns object's original reference */
1262         obj_request->img_request = img_request;
1263         obj_request->which = img_request->obj_request_count;
1264         rbd_assert(!obj_request_img_data_test(obj_request));
1265         obj_request_img_data_set(obj_request);
1266         rbd_assert(obj_request->which != BAD_WHICH);
1267         img_request->obj_request_count++;
1268         list_add_tail(&obj_request->links, &img_request->obj_requests);
1269         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1270                 obj_request->which);
1271 }
1272
1273 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1274                                         struct rbd_obj_request *obj_request)
1275 {
1276         rbd_assert(obj_request->which != BAD_WHICH);
1277
1278         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1279                 obj_request->which);
1280         list_del(&obj_request->links);
1281         rbd_assert(img_request->obj_request_count > 0);
1282         img_request->obj_request_count--;
1283         rbd_assert(obj_request->which == img_request->obj_request_count);
1284         obj_request->which = BAD_WHICH;
1285         rbd_assert(obj_request_img_data_test(obj_request));
1286         rbd_assert(obj_request->img_request == img_request);
1287         obj_request->img_request = NULL;
1288         obj_request->callback = NULL;
1289         rbd_obj_request_put(obj_request);
1290 }
1291
1292 static bool obj_request_type_valid(enum obj_request_type type)
1293 {
1294         switch (type) {
1295         case OBJ_REQUEST_NODATA:
1296         case OBJ_REQUEST_BIO:
1297         case OBJ_REQUEST_PAGES:
1298                 return true;
1299         default:
1300                 return false;
1301         }
1302 }
1303
1304 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1305                                 struct rbd_obj_request *obj_request)
1306 {
1307         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1308
1309         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1310 }
1311
1312 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1313 {
1314
1315         dout("%s: img %p\n", __func__, img_request);
1316
1317         /*
1318          * If no error occurred, compute the aggregate transfer
1319          * count for the image request.  We could instead use
1320          * atomic64_cmpxchg() to update it as each object request
1321          * completes; not clear which way is better off hand.
1322          */
1323         if (!img_request->result) {
1324                 struct rbd_obj_request *obj_request;
1325                 u64 xferred = 0;
1326
1327                 for_each_obj_request(img_request, obj_request)
1328                         xferred += obj_request->xferred;
1329                 img_request->xferred = xferred;
1330         }
1331
1332         if (img_request->callback)
1333                 img_request->callback(img_request);
1334         else
1335                 rbd_img_request_put(img_request);
1336 }
1337
1338 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1339
1340 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1341 {
1342         dout("%s: obj %p\n", __func__, obj_request);
1343
1344         return wait_for_completion_interruptible(&obj_request->completion);
1345 }
1346
1347 /*
1348  * The default/initial value for all image request flags is 0.  Each
1349  * is conditionally set to 1 at image request initialization time
1350  * and currently never change thereafter.
1351  */
1352 static void img_request_write_set(struct rbd_img_request *img_request)
1353 {
1354         set_bit(IMG_REQ_WRITE, &img_request->flags);
1355         smp_mb();
1356 }
1357
1358 static bool img_request_write_test(struct rbd_img_request *img_request)
1359 {
1360         smp_mb();
1361         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1362 }
1363
1364 static void img_request_child_set(struct rbd_img_request *img_request)
1365 {
1366         set_bit(IMG_REQ_CHILD, &img_request->flags);
1367         smp_mb();
1368 }
1369
1370 static bool img_request_child_test(struct rbd_img_request *img_request)
1371 {
1372         smp_mb();
1373         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1374 }
1375
1376 static void img_request_layered_set(struct rbd_img_request *img_request)
1377 {
1378         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1379         smp_mb();
1380 }
1381
1382 static bool img_request_layered_test(struct rbd_img_request *img_request)
1383 {
1384         smp_mb();
1385         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1386 }
1387
1388 static void
1389 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1390 {
1391         u64 xferred = obj_request->xferred;
1392         u64 length = obj_request->length;
1393
1394         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1395                 obj_request, obj_request->img_request, obj_request->result,
1396                 xferred, length);
1397         /*
1398          * ENOENT means a hole in the image.  We zero-fill the
1399          * entire length of the request.  A short read also implies
1400          * zero-fill to the end of the request.  Either way we
1401          * update the xferred count to indicate the whole request
1402          * was satisfied.
1403          */
1404         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1405         if (obj_request->result == -ENOENT) {
1406                 if (obj_request->type == OBJ_REQUEST_BIO)
1407                         zero_bio_chain(obj_request->bio_list, 0);
1408                 else
1409                         zero_pages(obj_request->pages, 0, length);
1410                 obj_request->result = 0;
1411                 obj_request->xferred = length;
1412         } else if (xferred < length && !obj_request->result) {
1413                 if (obj_request->type == OBJ_REQUEST_BIO)
1414                         zero_bio_chain(obj_request->bio_list, xferred);
1415                 else
1416                         zero_pages(obj_request->pages, xferred, length);
1417                 obj_request->xferred = length;
1418         }
1419         obj_request_done_set(obj_request);
1420 }
1421
1422 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1423 {
1424         dout("%s: obj %p cb %p\n", __func__, obj_request,
1425                 obj_request->callback);
1426         if (obj_request->callback)
1427                 obj_request->callback(obj_request);
1428         else
1429                 complete_all(&obj_request->completion);
1430 }
1431
1432 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1433 {
1434         dout("%s: obj %p\n", __func__, obj_request);
1435         obj_request_done_set(obj_request);
1436 }
1437
1438 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1439 {
1440         struct rbd_img_request *img_request = NULL;
1441         struct rbd_device *rbd_dev = NULL;
1442         bool layered = false;
1443
1444         if (obj_request_img_data_test(obj_request)) {
1445                 img_request = obj_request->img_request;
1446                 layered = img_request && img_request_layered_test(img_request);
1447                 rbd_dev = img_request->rbd_dev;
1448         }
1449
1450         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1451                 obj_request, img_request, obj_request->result,
1452                 obj_request->xferred, obj_request->length);
1453         if (layered && obj_request->result == -ENOENT &&
1454                         obj_request->img_offset < rbd_dev->parent_overlap)
1455                 rbd_img_parent_read(obj_request);
1456         else if (img_request)
1457                 rbd_img_obj_request_read_callback(obj_request);
1458         else
1459                 obj_request_done_set(obj_request);
1460 }
1461
1462 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1463 {
1464         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1465                 obj_request->result, obj_request->length);
1466         /*
1467          * There is no such thing as a successful short write.  Set
1468          * it to our originally-requested length.
1469          */
1470         obj_request->xferred = obj_request->length;
1471         obj_request_done_set(obj_request);
1472 }
1473
1474 /*
1475  * For a simple stat call there's nothing to do.  We'll do more if
1476  * this is part of a write sequence for a layered image.
1477  */
1478 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1479 {
1480         dout("%s: obj %p\n", __func__, obj_request);
1481         obj_request_done_set(obj_request);
1482 }
1483
1484 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1485                                 struct ceph_msg *msg)
1486 {
1487         struct rbd_obj_request *obj_request = osd_req->r_priv;
1488         u16 opcode;
1489
1490         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1491         rbd_assert(osd_req == obj_request->osd_req);
1492         if (obj_request_img_data_test(obj_request)) {
1493                 rbd_assert(obj_request->img_request);
1494                 rbd_assert(obj_request->which != BAD_WHICH);
1495         } else {
1496                 rbd_assert(obj_request->which == BAD_WHICH);
1497         }
1498
1499         if (osd_req->r_result < 0)
1500                 obj_request->result = osd_req->r_result;
1501         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1502
1503         BUG_ON(osd_req->r_num_ops > 2);
1504
1505         /*
1506          * We support a 64-bit length, but ultimately it has to be
1507          * passed to blk_end_request(), which takes an unsigned int.
1508          */
1509         obj_request->xferred = osd_req->r_reply_op_len[0];
1510         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1511         opcode = osd_req->r_ops[0].op;
1512         switch (opcode) {
1513         case CEPH_OSD_OP_READ:
1514                 rbd_osd_read_callback(obj_request);
1515                 break;
1516         case CEPH_OSD_OP_WRITE:
1517                 rbd_osd_write_callback(obj_request);
1518                 break;
1519         case CEPH_OSD_OP_STAT:
1520                 rbd_osd_stat_callback(obj_request);
1521                 break;
1522         case CEPH_OSD_OP_CALL:
1523         case CEPH_OSD_OP_NOTIFY_ACK:
1524         case CEPH_OSD_OP_WATCH:
1525                 rbd_osd_trivial_callback(obj_request);
1526                 break;
1527         default:
1528                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1529                         obj_request->object_name, (unsigned short) opcode);
1530                 break;
1531         }
1532
1533         if (obj_request_done_test(obj_request))
1534                 rbd_obj_request_complete(obj_request);
1535 }
1536
1537 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1538 {
1539         struct rbd_img_request *img_request = obj_request->img_request;
1540         struct ceph_osd_request *osd_req = obj_request->osd_req;
1541         u64 snap_id;
1542
1543         rbd_assert(osd_req != NULL);
1544
1545         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1546         ceph_osdc_build_request(osd_req, obj_request->offset,
1547                         NULL, snap_id, NULL);
1548 }
1549
1550 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1551 {
1552         struct rbd_img_request *img_request = obj_request->img_request;
1553         struct ceph_osd_request *osd_req = obj_request->osd_req;
1554         struct ceph_snap_context *snapc;
1555         struct timespec mtime = CURRENT_TIME;
1556
1557         rbd_assert(osd_req != NULL);
1558
1559         snapc = img_request ? img_request->snapc : NULL;
1560         ceph_osdc_build_request(osd_req, obj_request->offset,
1561                         snapc, CEPH_NOSNAP, &mtime);
1562 }
1563
1564 static struct ceph_osd_request *rbd_osd_req_create(
1565                                         struct rbd_device *rbd_dev,
1566                                         bool write_request,
1567                                         struct rbd_obj_request *obj_request)
1568 {
1569         struct ceph_snap_context *snapc = NULL;
1570         struct ceph_osd_client *osdc;
1571         struct ceph_osd_request *osd_req;
1572
1573         if (obj_request_img_data_test(obj_request)) {
1574                 struct rbd_img_request *img_request = obj_request->img_request;
1575
1576                 rbd_assert(write_request ==
1577                                 img_request_write_test(img_request));
1578                 if (write_request)
1579                         snapc = img_request->snapc;
1580         }
1581
1582         /* Allocate and initialize the request, for the single op */
1583
1584         osdc = &rbd_dev->rbd_client->client->osdc;
1585         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1586         if (!osd_req)
1587                 return NULL;    /* ENOMEM */
1588
1589         if (write_request)
1590                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1591         else
1592                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1593
1594         osd_req->r_callback = rbd_osd_req_callback;
1595         osd_req->r_priv = obj_request;
1596
1597         osd_req->r_oid_len = strlen(obj_request->object_name);
1598         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1599         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1600
1601         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1602
1603         return osd_req;
1604 }
1605
1606 /*
1607  * Create a copyup osd request based on the information in the
1608  * object request supplied.  A copyup request has two osd ops,
1609  * a copyup method call, and a "normal" write request.
1610  */
1611 static struct ceph_osd_request *
1612 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1613 {
1614         struct rbd_img_request *img_request;
1615         struct ceph_snap_context *snapc;
1616         struct rbd_device *rbd_dev;
1617         struct ceph_osd_client *osdc;
1618         struct ceph_osd_request *osd_req;
1619
1620         rbd_assert(obj_request_img_data_test(obj_request));
1621         img_request = obj_request->img_request;
1622         rbd_assert(img_request);
1623         rbd_assert(img_request_write_test(img_request));
1624
1625         /* Allocate and initialize the request, for the two ops */
1626
1627         snapc = img_request->snapc;
1628         rbd_dev = img_request->rbd_dev;
1629         osdc = &rbd_dev->rbd_client->client->osdc;
1630         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1631         if (!osd_req)
1632                 return NULL;    /* ENOMEM */
1633
1634         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1635         osd_req->r_callback = rbd_osd_req_callback;
1636         osd_req->r_priv = obj_request;
1637
1638         osd_req->r_oid_len = strlen(obj_request->object_name);
1639         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1640         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1641
1642         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1643
1644         return osd_req;
1645 }
1646
1647
1648 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1649 {
1650         ceph_osdc_put_request(osd_req);
1651 }
1652
1653 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1654
1655 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1656                                                 u64 offset, u64 length,
1657                                                 enum obj_request_type type)
1658 {
1659         struct rbd_obj_request *obj_request;
1660         size_t size;
1661         char *name;
1662
1663         rbd_assert(obj_request_type_valid(type));
1664
1665         size = strlen(object_name) + 1;
1666         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1667         if (!obj_request)
1668                 return NULL;
1669
1670         name = (char *)(obj_request + 1);
1671         obj_request->object_name = memcpy(name, object_name, size);
1672         obj_request->offset = offset;
1673         obj_request->length = length;
1674         obj_request->flags = 0;
1675         obj_request->which = BAD_WHICH;
1676         obj_request->type = type;
1677         INIT_LIST_HEAD(&obj_request->links);
1678         init_completion(&obj_request->completion);
1679         kref_init(&obj_request->kref);
1680
1681         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1682                 offset, length, (int)type, obj_request);
1683
1684         return obj_request;
1685 }
1686
1687 static void rbd_obj_request_destroy(struct kref *kref)
1688 {
1689         struct rbd_obj_request *obj_request;
1690
1691         obj_request = container_of(kref, struct rbd_obj_request, kref);
1692
1693         dout("%s: obj %p\n", __func__, obj_request);
1694
1695         rbd_assert(obj_request->img_request == NULL);
1696         rbd_assert(obj_request->which == BAD_WHICH);
1697
1698         if (obj_request->osd_req)
1699                 rbd_osd_req_destroy(obj_request->osd_req);
1700
1701         rbd_assert(obj_request_type_valid(obj_request->type));
1702         switch (obj_request->type) {
1703         case OBJ_REQUEST_NODATA:
1704                 break;          /* Nothing to do */
1705         case OBJ_REQUEST_BIO:
1706                 if (obj_request->bio_list)
1707                         bio_chain_put(obj_request->bio_list);
1708                 break;
1709         case OBJ_REQUEST_PAGES:
1710                 if (obj_request->pages)
1711                         ceph_release_page_vector(obj_request->pages,
1712                                                 obj_request->page_count);
1713                 break;
1714         }
1715
1716         kfree(obj_request);
1717 }
1718
1719 /*
1720  * Caller is responsible for filling in the list of object requests
1721  * that comprises the image request, and the Linux request pointer
1722  * (if there is one).
1723  */
1724 static struct rbd_img_request *rbd_img_request_create(
1725                                         struct rbd_device *rbd_dev,
1726                                         u64 offset, u64 length,
1727                                         bool write_request,
1728                                         bool child_request)
1729 {
1730         struct rbd_img_request *img_request;
1731         struct ceph_snap_context *snapc = NULL;
1732
1733         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1734         if (!img_request)
1735                 return NULL;
1736
1737         if (write_request) {
1738                 down_read(&rbd_dev->header_rwsem);
1739                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1740                 up_read(&rbd_dev->header_rwsem);
1741                 if (WARN_ON(!snapc)) {
1742                         kfree(img_request);
1743                         return NULL;    /* Shouldn't happen */
1744                 }
1745
1746         }
1747
1748         img_request->rq = NULL;
1749         img_request->rbd_dev = rbd_dev;
1750         img_request->offset = offset;
1751         img_request->length = length;
1752         img_request->flags = 0;
1753         if (write_request) {
1754                 img_request_write_set(img_request);
1755                 img_request->snapc = snapc;
1756         } else {
1757                 img_request->snap_id = rbd_dev->spec->snap_id;
1758         }
1759         if (child_request)
1760                 img_request_child_set(img_request);
1761         if (rbd_dev->parent_spec)
1762                 img_request_layered_set(img_request);
1763         spin_lock_init(&img_request->completion_lock);
1764         img_request->next_completion = 0;
1765         img_request->callback = NULL;
1766         img_request->result = 0;
1767         img_request->obj_request_count = 0;
1768         INIT_LIST_HEAD(&img_request->obj_requests);
1769         kref_init(&img_request->kref);
1770
1771         rbd_img_request_get(img_request);       /* Avoid a warning */
1772         rbd_img_request_put(img_request);       /* TEMPORARY */
1773
1774         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1775                 write_request ? "write" : "read", offset, length,
1776                 img_request);
1777
1778         return img_request;
1779 }
1780
1781 static void rbd_img_request_destroy(struct kref *kref)
1782 {
1783         struct rbd_img_request *img_request;
1784         struct rbd_obj_request *obj_request;
1785         struct rbd_obj_request *next_obj_request;
1786
1787         img_request = container_of(kref, struct rbd_img_request, kref);
1788
1789         dout("%s: img %p\n", __func__, img_request);
1790
1791         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1792                 rbd_img_obj_request_del(img_request, obj_request);
1793         rbd_assert(img_request->obj_request_count == 0);
1794
1795         if (img_request_write_test(img_request))
1796                 ceph_put_snap_context(img_request->snapc);
1797
1798         if (img_request_child_test(img_request))
1799                 rbd_obj_request_put(img_request->obj_request);
1800
1801         kfree(img_request);
1802 }
1803
1804 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1805 {
1806         struct rbd_img_request *img_request;
1807         unsigned int xferred;
1808         int result;
1809         bool more;
1810
1811         rbd_assert(obj_request_img_data_test(obj_request));
1812         img_request = obj_request->img_request;
1813
1814         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1815         xferred = (unsigned int)obj_request->xferred;
1816         result = obj_request->result;
1817         if (result) {
1818                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1819
1820                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1821                         img_request_write_test(img_request) ? "write" : "read",
1822                         obj_request->length, obj_request->img_offset,
1823                         obj_request->offset);
1824                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1825                         result, xferred);
1826                 if (!img_request->result)
1827                         img_request->result = result;
1828         }
1829
1830         /* Image object requests don't own their page array */
1831
1832         if (obj_request->type == OBJ_REQUEST_PAGES) {
1833                 obj_request->pages = NULL;
1834                 obj_request->page_count = 0;
1835         }
1836
1837         if (img_request_child_test(img_request)) {
1838                 rbd_assert(img_request->obj_request != NULL);
1839                 more = obj_request->which < img_request->obj_request_count - 1;
1840         } else {
1841                 rbd_assert(img_request->rq != NULL);
1842                 more = blk_end_request(img_request->rq, result, xferred);
1843         }
1844
1845         return more;
1846 }
1847
1848 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1849 {
1850         struct rbd_img_request *img_request;
1851         u32 which = obj_request->which;
1852         bool more = true;
1853
1854         rbd_assert(obj_request_img_data_test(obj_request));
1855         img_request = obj_request->img_request;
1856
1857         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1858         rbd_assert(img_request != NULL);
1859         rbd_assert(img_request->obj_request_count > 0);
1860         rbd_assert(which != BAD_WHICH);
1861         rbd_assert(which < img_request->obj_request_count);
1862         rbd_assert(which >= img_request->next_completion);
1863
1864         spin_lock_irq(&img_request->completion_lock);
1865         if (which != img_request->next_completion)
1866                 goto out;
1867
1868         for_each_obj_request_from(img_request, obj_request) {
1869                 rbd_assert(more);
1870                 rbd_assert(which < img_request->obj_request_count);
1871
1872                 if (!obj_request_done_test(obj_request))
1873                         break;
1874                 more = rbd_img_obj_end_request(obj_request);
1875                 which++;
1876         }
1877
1878         rbd_assert(more ^ (which == img_request->obj_request_count));
1879         img_request->next_completion = which;
1880 out:
1881         spin_unlock_irq(&img_request->completion_lock);
1882
1883         if (!more)
1884                 rbd_img_request_complete(img_request);
1885 }
1886
1887 /*
1888  * Split up an image request into one or more object requests, each
1889  * to a different object.  The "type" parameter indicates whether
1890  * "data_desc" is the pointer to the head of a list of bio
1891  * structures, or the base of a page array.  In either case this
1892  * function assumes data_desc describes memory sufficient to hold
1893  * all data described by the image request.
1894  */
1895 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1896                                         enum obj_request_type type,
1897                                         void *data_desc)
1898 {
1899         struct rbd_device *rbd_dev = img_request->rbd_dev;
1900         struct rbd_obj_request *obj_request = NULL;
1901         struct rbd_obj_request *next_obj_request;
1902         bool write_request = img_request_write_test(img_request);
1903         struct bio *bio_list;
1904         unsigned int bio_offset = 0;
1905         struct page **pages;
1906         u64 img_offset;
1907         u64 resid;
1908         u16 opcode;
1909
1910         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1911                 (int)type, data_desc);
1912
1913         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1914         img_offset = img_request->offset;
1915         resid = img_request->length;
1916         rbd_assert(resid > 0);
1917
1918         if (type == OBJ_REQUEST_BIO) {
1919                 bio_list = data_desc;
1920                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1921         } else {
1922                 rbd_assert(type == OBJ_REQUEST_PAGES);
1923                 pages = data_desc;
1924         }
1925
1926         while (resid) {
1927                 struct ceph_osd_request *osd_req;
1928                 const char *object_name;
1929                 u64 offset;
1930                 u64 length;
1931
1932                 object_name = rbd_segment_name(rbd_dev, img_offset);
1933                 if (!object_name)
1934                         goto out_unwind;
1935                 offset = rbd_segment_offset(rbd_dev, img_offset);
1936                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1937                 obj_request = rbd_obj_request_create(object_name,
1938                                                 offset, length, type);
1939                 kfree(object_name);     /* object request has its own copy */
1940                 if (!obj_request)
1941                         goto out_unwind;
1942
1943                 if (type == OBJ_REQUEST_BIO) {
1944                         unsigned int clone_size;
1945
1946                         rbd_assert(length <= (u64)UINT_MAX);
1947                         clone_size = (unsigned int)length;
1948                         obj_request->bio_list =
1949                                         bio_chain_clone_range(&bio_list,
1950                                                                 &bio_offset,
1951                                                                 clone_size,
1952                                                                 GFP_ATOMIC);
1953                         if (!obj_request->bio_list)
1954                                 goto out_partial;
1955                 } else {
1956                         unsigned int page_count;
1957
1958                         obj_request->pages = pages;
1959                         page_count = (u32)calc_pages_for(offset, length);
1960                         obj_request->page_count = page_count;
1961                         if ((offset + length) & ~PAGE_MASK)
1962                                 page_count--;   /* more on last page */
1963                         pages += page_count;
1964                 }
1965
1966                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1967                                                 obj_request);
1968                 if (!osd_req)
1969                         goto out_partial;
1970                 obj_request->osd_req = osd_req;
1971                 obj_request->callback = rbd_img_obj_callback;
1972
1973                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1974                                                 0, 0);
1975                 if (type == OBJ_REQUEST_BIO)
1976                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1977                                         obj_request->bio_list, length);
1978                 else
1979                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1980                                         obj_request->pages, length,
1981                                         offset & ~PAGE_MASK, false, false);
1982
1983                 if (write_request)
1984                         rbd_osd_req_format_write(obj_request);
1985                 else
1986                         rbd_osd_req_format_read(obj_request);
1987
1988                 obj_request->img_offset = img_offset;
1989                 rbd_img_obj_request_add(img_request, obj_request);
1990
1991                 img_offset += length;
1992                 resid -= length;
1993         }
1994
1995         return 0;
1996
1997 out_partial:
1998         rbd_obj_request_put(obj_request);
1999 out_unwind:
2000         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2001                 rbd_obj_request_put(obj_request);
2002
2003         return -ENOMEM;
2004 }
2005
2006 static void
2007 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2008 {
2009         struct rbd_img_request *img_request;
2010         struct rbd_device *rbd_dev;
2011         u64 length;
2012         u32 page_count;
2013
2014         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2015         rbd_assert(obj_request_img_data_test(obj_request));
2016         img_request = obj_request->img_request;
2017         rbd_assert(img_request);
2018
2019         rbd_dev = img_request->rbd_dev;
2020         rbd_assert(rbd_dev);
2021         length = (u64)1 << rbd_dev->header.obj_order;
2022         page_count = (u32)calc_pages_for(0, length);
2023
2024         rbd_assert(obj_request->copyup_pages);
2025         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2026         obj_request->copyup_pages = NULL;
2027
2028         /*
2029          * We want the transfer count to reflect the size of the
2030          * original write request.  There is no such thing as a
2031          * successful short write, so if the request was successful
2032          * we can just set it to the originally-requested length.
2033          */
2034         if (!obj_request->result)
2035                 obj_request->xferred = obj_request->length;
2036
2037         /* Finish up with the normal image object callback */
2038
2039         rbd_img_obj_callback(obj_request);
2040 }
2041
2042 static void
2043 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2044 {
2045         struct rbd_obj_request *orig_request;
2046         struct ceph_osd_request *osd_req;
2047         struct ceph_osd_client *osdc;
2048         struct rbd_device *rbd_dev;
2049         struct page **pages;
2050         int result;
2051         u64 obj_size;
2052         u64 xferred;
2053
2054         rbd_assert(img_request_child_test(img_request));
2055
2056         /* First get what we need from the image request */
2057
2058         pages = img_request->copyup_pages;
2059         rbd_assert(pages != NULL);
2060         img_request->copyup_pages = NULL;
2061
2062         orig_request = img_request->obj_request;
2063         rbd_assert(orig_request != NULL);
2064         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2065         result = img_request->result;
2066         obj_size = img_request->length;
2067         xferred = img_request->xferred;
2068
2069         rbd_dev = img_request->rbd_dev;
2070         rbd_assert(rbd_dev);
2071         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2072
2073         rbd_img_request_put(img_request);
2074
2075         if (result)
2076                 goto out_err;
2077
2078         /* Allocate the new copyup osd request for the original request */
2079
2080         result = -ENOMEM;
2081         rbd_assert(!orig_request->osd_req);
2082         osd_req = rbd_osd_req_create_copyup(orig_request);
2083         if (!osd_req)
2084                 goto out_err;
2085         orig_request->osd_req = osd_req;
2086         orig_request->copyup_pages = pages;
2087
2088         /* Initialize the copyup op */
2089
2090         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2091         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2092                                                 false, false);
2093
2094         /* Then the original write request op */
2095
2096         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2097                                         orig_request->offset,
2098                                         orig_request->length, 0, 0);
2099         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2100                                         orig_request->length);
2101
2102         rbd_osd_req_format_write(orig_request);
2103
2104         /* All set, send it off. */
2105
2106         orig_request->callback = rbd_img_obj_copyup_callback;
2107         osdc = &rbd_dev->rbd_client->client->osdc;
2108         result = rbd_obj_request_submit(osdc, orig_request);
2109         if (!result)
2110                 return;
2111 out_err:
2112         /* Record the error code and complete the request */
2113
2114         orig_request->result = result;
2115         orig_request->xferred = 0;
2116         obj_request_done_set(orig_request);
2117         rbd_obj_request_complete(orig_request);
2118 }
2119
2120 /*
2121  * Read from the parent image the range of data that covers the
2122  * entire target of the given object request.  This is used for
2123  * satisfying a layered image write request when the target of an
2124  * object request from the image request does not exist.
2125  *
2126  * A page array big enough to hold the returned data is allocated
2127  * and supplied to rbd_img_request_fill() as the "data descriptor."
2128  * When the read completes, this page array will be transferred to
2129  * the original object request for the copyup operation.
2130  *
2131  * If an error occurs, record it as the result of the original
2132  * object request and mark it done so it gets completed.
2133  */
2134 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2135 {
2136         struct rbd_img_request *img_request = NULL;
2137         struct rbd_img_request *parent_request = NULL;
2138         struct rbd_device *rbd_dev;
2139         u64 img_offset;
2140         u64 length;
2141         struct page **pages = NULL;
2142         u32 page_count;
2143         int result;
2144
2145         rbd_assert(obj_request_img_data_test(obj_request));
2146         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2147
2148         img_request = obj_request->img_request;
2149         rbd_assert(img_request != NULL);
2150         rbd_dev = img_request->rbd_dev;
2151         rbd_assert(rbd_dev->parent != NULL);
2152
2153         /*
2154          * First things first.  The original osd request is of no
2155          * use to use any more, we'll need a new one that can hold
2156          * the two ops in a copyup request.  We'll get that later,
2157          * but for now we can release the old one.
2158          */
2159         rbd_osd_req_destroy(obj_request->osd_req);
2160         obj_request->osd_req = NULL;
2161
2162         /*
2163          * Determine the byte range covered by the object in the
2164          * child image to which the original request was to be sent.
2165          */
2166         img_offset = obj_request->img_offset - obj_request->offset;
2167         length = (u64)1 << rbd_dev->header.obj_order;
2168
2169         /*
2170          * There is no defined parent data beyond the parent
2171          * overlap, so limit what we read at that boundary if
2172          * necessary.
2173          */
2174         if (img_offset + length > rbd_dev->parent_overlap) {
2175                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2176                 length = rbd_dev->parent_overlap - img_offset;
2177         }
2178
2179         /*
2180          * Allocate a page array big enough to receive the data read
2181          * from the parent.
2182          */
2183         page_count = (u32)calc_pages_for(0, length);
2184         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2185         if (IS_ERR(pages)) {
2186                 result = PTR_ERR(pages);
2187                 pages = NULL;
2188                 goto out_err;
2189         }
2190
2191         result = -ENOMEM;
2192         parent_request = rbd_img_request_create(rbd_dev->parent,
2193                                                 img_offset, length,
2194                                                 false, true);
2195         if (!parent_request)
2196                 goto out_err;
2197         rbd_obj_request_get(obj_request);
2198         parent_request->obj_request = obj_request;
2199
2200         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2201         if (result)
2202                 goto out_err;
2203         parent_request->copyup_pages = pages;
2204
2205         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2206         result = rbd_img_request_submit(parent_request);
2207         if (!result)
2208                 return 0;
2209
2210         parent_request->copyup_pages = NULL;
2211         parent_request->obj_request = NULL;
2212         rbd_obj_request_put(obj_request);
2213 out_err:
2214         if (pages)
2215                 ceph_release_page_vector(pages, page_count);
2216         if (parent_request)
2217                 rbd_img_request_put(parent_request);
2218         obj_request->result = result;
2219         obj_request->xferred = 0;
2220         obj_request_done_set(obj_request);
2221
2222         return result;
2223 }
2224
2225 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2226 {
2227         struct rbd_obj_request *orig_request;
2228         int result;
2229
2230         rbd_assert(!obj_request_img_data_test(obj_request));
2231
2232         /*
2233          * All we need from the object request is the original
2234          * request and the result of the STAT op.  Grab those, then
2235          * we're done with the request.
2236          */
2237         orig_request = obj_request->obj_request;
2238         obj_request->obj_request = NULL;
2239         rbd_assert(orig_request);
2240         rbd_assert(orig_request->img_request);
2241
2242         result = obj_request->result;
2243         obj_request->result = 0;
2244
2245         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2246                 obj_request, orig_request, result,
2247                 obj_request->xferred, obj_request->length);
2248         rbd_obj_request_put(obj_request);
2249
2250         rbd_assert(orig_request);
2251         rbd_assert(orig_request->img_request);
2252
2253         /*
2254          * Our only purpose here is to determine whether the object
2255          * exists, and we don't want to treat the non-existence as
2256          * an error.  If something else comes back, transfer the
2257          * error to the original request and complete it now.
2258          */
2259         if (!result) {
2260                 obj_request_existence_set(orig_request, true);
2261         } else if (result == -ENOENT) {
2262                 obj_request_existence_set(orig_request, false);
2263         } else if (result) {
2264                 orig_request->result = result;
2265                 goto out;
2266         }
2267
2268         /*
2269          * Resubmit the original request now that we have recorded
2270          * whether the target object exists.
2271          */
2272         orig_request->result = rbd_img_obj_request_submit(orig_request);
2273 out:
2274         if (orig_request->result)
2275                 rbd_obj_request_complete(orig_request);
2276         rbd_obj_request_put(orig_request);
2277 }
2278
2279 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2280 {
2281         struct rbd_obj_request *stat_request;
2282         struct rbd_device *rbd_dev;
2283         struct ceph_osd_client *osdc;
2284         struct page **pages = NULL;
2285         u32 page_count;
2286         size_t size;
2287         int ret;
2288
2289         /*
2290          * The response data for a STAT call consists of:
2291          *     le64 length;
2292          *     struct {
2293          *         le32 tv_sec;
2294          *         le32 tv_nsec;
2295          *     } mtime;
2296          */
2297         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2298         page_count = (u32)calc_pages_for(0, size);
2299         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2300         if (IS_ERR(pages))
2301                 return PTR_ERR(pages);
2302
2303         ret = -ENOMEM;
2304         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2305                                                         OBJ_REQUEST_PAGES);
2306         if (!stat_request)
2307                 goto out;
2308
2309         rbd_obj_request_get(obj_request);
2310         stat_request->obj_request = obj_request;
2311         stat_request->pages = pages;
2312         stat_request->page_count = page_count;
2313
2314         rbd_assert(obj_request->img_request);
2315         rbd_dev = obj_request->img_request->rbd_dev;
2316         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2317                                                 stat_request);
2318         if (!stat_request->osd_req)
2319                 goto out;
2320         stat_request->callback = rbd_img_obj_exists_callback;
2321
2322         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2323         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2324                                         false, false);
2325         rbd_osd_req_format_read(stat_request);
2326
2327         osdc = &rbd_dev->rbd_client->client->osdc;
2328         ret = rbd_obj_request_submit(osdc, stat_request);
2329 out:
2330         if (ret)
2331                 rbd_obj_request_put(obj_request);
2332
2333         return ret;
2334 }
2335
2336 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2337 {
2338         struct rbd_img_request *img_request;
2339         struct rbd_device *rbd_dev;
2340         bool known;
2341
2342         rbd_assert(obj_request_img_data_test(obj_request));
2343
2344         img_request = obj_request->img_request;
2345         rbd_assert(img_request);
2346         rbd_dev = img_request->rbd_dev;
2347
2348         /*
2349          * Only writes to layered images need special handling.
2350          * Reads and non-layered writes are simple object requests.
2351          * Layered writes that start beyond the end of the overlap
2352          * with the parent have no parent data, so they too are
2353          * simple object requests.  Finally, if the target object is
2354          * known to already exist, its parent data has already been
2355          * copied, so a write to the object can also be handled as a
2356          * simple object request.
2357          */
2358         if (!img_request_write_test(img_request) ||
2359                 !img_request_layered_test(img_request) ||
2360                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2361                 ((known = obj_request_known_test(obj_request)) &&
2362                         obj_request_exists_test(obj_request))) {
2363
2364                 struct rbd_device *rbd_dev;
2365                 struct ceph_osd_client *osdc;
2366
2367                 rbd_dev = obj_request->img_request->rbd_dev;
2368                 osdc = &rbd_dev->rbd_client->client->osdc;
2369
2370                 return rbd_obj_request_submit(osdc, obj_request);
2371         }
2372
2373         /*
2374          * It's a layered write.  The target object might exist but
2375          * we may not know that yet.  If we know it doesn't exist,
2376          * start by reading the data for the full target object from
2377          * the parent so we can use it for a copyup to the target.
2378          */
2379         if (known)
2380                 return rbd_img_obj_parent_read_full(obj_request);
2381
2382         /* We don't know whether the target exists.  Go find out. */
2383
2384         return rbd_img_obj_exists_submit(obj_request);
2385 }
2386
2387 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2388 {
2389         struct rbd_obj_request *obj_request;
2390         struct rbd_obj_request *next_obj_request;
2391
2392         dout("%s: img %p\n", __func__, img_request);
2393         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2394                 int ret;
2395
2396                 ret = rbd_img_obj_request_submit(obj_request);
2397                 if (ret)
2398                         return ret;
2399         }
2400
2401         return 0;
2402 }
2403
2404 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2405 {
2406         struct rbd_obj_request *obj_request;
2407         struct rbd_device *rbd_dev;
2408         u64 obj_end;
2409
2410         rbd_assert(img_request_child_test(img_request));
2411
2412         obj_request = img_request->obj_request;
2413         rbd_assert(obj_request);
2414         rbd_assert(obj_request->img_request);
2415
2416         obj_request->result = img_request->result;
2417         if (obj_request->result)
2418                 goto out;
2419
2420         /*
2421          * We need to zero anything beyond the parent overlap
2422          * boundary.  Since rbd_img_obj_request_read_callback()
2423          * will zero anything beyond the end of a short read, an
2424          * easy way to do this is to pretend the data from the
2425          * parent came up short--ending at the overlap boundary.
2426          */
2427         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2428         obj_end = obj_request->img_offset + obj_request->length;
2429         rbd_dev = obj_request->img_request->rbd_dev;
2430         if (obj_end > rbd_dev->parent_overlap) {
2431                 u64 xferred = 0;
2432
2433                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2434                         xferred = rbd_dev->parent_overlap -
2435                                         obj_request->img_offset;
2436
2437                 obj_request->xferred = min(img_request->xferred, xferred);
2438         } else {
2439                 obj_request->xferred = img_request->xferred;
2440         }
2441 out:
2442         rbd_img_obj_request_read_callback(obj_request);
2443         rbd_obj_request_complete(obj_request);
2444 }
2445
2446 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2447 {
2448         struct rbd_device *rbd_dev;
2449         struct rbd_img_request *img_request;
2450         int result;
2451
2452         rbd_assert(obj_request_img_data_test(obj_request));
2453         rbd_assert(obj_request->img_request != NULL);
2454         rbd_assert(obj_request->result == (s32) -ENOENT);
2455         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2456
2457         rbd_dev = obj_request->img_request->rbd_dev;
2458         rbd_assert(rbd_dev->parent != NULL);
2459         /* rbd_read_finish(obj_request, obj_request->length); */
2460         img_request = rbd_img_request_create(rbd_dev->parent,
2461                                                 obj_request->img_offset,
2462                                                 obj_request->length,
2463                                                 false, true);
2464         result = -ENOMEM;
2465         if (!img_request)
2466                 goto out_err;
2467
2468         rbd_obj_request_get(obj_request);
2469         img_request->obj_request = obj_request;
2470
2471         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2472                                         obj_request->bio_list);
2473         if (result)
2474                 goto out_err;
2475
2476         img_request->callback = rbd_img_parent_read_callback;
2477         result = rbd_img_request_submit(img_request);
2478         if (result)
2479                 goto out_err;
2480
2481         return;
2482 out_err:
2483         if (img_request)
2484                 rbd_img_request_put(img_request);
2485         obj_request->result = result;
2486         obj_request->xferred = 0;
2487         obj_request_done_set(obj_request);
2488 }
2489
2490 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2491                                    u64 ver, u64 notify_id)
2492 {
2493         struct rbd_obj_request *obj_request;
2494         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2495         int ret;
2496
2497         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2498                                                         OBJ_REQUEST_NODATA);
2499         if (!obj_request)
2500                 return -ENOMEM;
2501
2502         ret = -ENOMEM;
2503         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2504         if (!obj_request->osd_req)
2505                 goto out;
2506         obj_request->callback = rbd_obj_request_put;
2507
2508         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2509                                         notify_id, ver, 0);
2510         rbd_osd_req_format_read(obj_request);
2511
2512         ret = rbd_obj_request_submit(osdc, obj_request);
2513 out:
2514         if (ret)
2515                 rbd_obj_request_put(obj_request);
2516
2517         return ret;
2518 }
2519
2520 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2521 {
2522         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2523         u64 hver;
2524         int rc;
2525
2526         if (!rbd_dev)
2527                 return;
2528
2529         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2530                 rbd_dev->header_name, (unsigned long long) notify_id,
2531                 (unsigned int) opcode);
2532         rc = rbd_dev_refresh(rbd_dev, &hver);
2533         if (rc)
2534                 rbd_warn(rbd_dev, "got notification but failed to "
2535                            " update snaps: %d\n", rc);
2536
2537         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2538 }
2539
2540 /*
2541  * Request sync osd watch/unwatch.  The value of "start" determines
2542  * whether a watch request is being initiated or torn down.
2543  */
2544 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2545 {
2546         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2547         struct rbd_obj_request *obj_request;
2548         int ret;
2549
2550         rbd_assert(start ^ !!rbd_dev->watch_event);
2551         rbd_assert(start ^ !!rbd_dev->watch_request);
2552
2553         if (start) {
2554                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2555                                                 &rbd_dev->watch_event);
2556                 if (ret < 0)
2557                         return ret;
2558                 rbd_assert(rbd_dev->watch_event != NULL);
2559         }
2560
2561         ret = -ENOMEM;
2562         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2563                                                         OBJ_REQUEST_NODATA);
2564         if (!obj_request)
2565                 goto out_cancel;
2566
2567         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2568         if (!obj_request->osd_req)
2569                 goto out_cancel;
2570
2571         if (start)
2572                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2573         else
2574                 ceph_osdc_unregister_linger_request(osdc,
2575                                         rbd_dev->watch_request->osd_req);
2576
2577         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2578                                 rbd_dev->watch_event->cookie,
2579                                 rbd_dev->header.obj_version, start);
2580         rbd_osd_req_format_write(obj_request);
2581
2582         ret = rbd_obj_request_submit(osdc, obj_request);
2583         if (ret)
2584                 goto out_cancel;
2585         ret = rbd_obj_request_wait(obj_request);
2586         if (ret)
2587                 goto out_cancel;
2588         ret = obj_request->result;
2589         if (ret)
2590                 goto out_cancel;
2591
2592         /*
2593          * A watch request is set to linger, so the underlying osd
2594          * request won't go away until we unregister it.  We retain
2595          * a pointer to the object request during that time (in
2596          * rbd_dev->watch_request), so we'll keep a reference to
2597          * it.  We'll drop that reference (below) after we've
2598          * unregistered it.
2599          */
2600         if (start) {
2601                 rbd_dev->watch_request = obj_request;
2602
2603                 return 0;
2604         }
2605
2606         /* We have successfully torn down the watch request */
2607
2608         rbd_obj_request_put(rbd_dev->watch_request);
2609         rbd_dev->watch_request = NULL;
2610 out_cancel:
2611         /* Cancel the event if we're tearing down, or on error */
2612         ceph_osdc_cancel_event(rbd_dev->watch_event);
2613         rbd_dev->watch_event = NULL;
2614         if (obj_request)
2615                 rbd_obj_request_put(obj_request);
2616
2617         return ret;
2618 }
2619
2620 /*
2621  * Synchronous osd object method call
2622  */
2623 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2624                              const char *object_name,
2625                              const char *class_name,
2626                              const char *method_name,
2627                              const void *outbound,
2628                              size_t outbound_size,
2629                              void *inbound,
2630                              size_t inbound_size,
2631                              u64 *version)
2632 {
2633         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2634         struct rbd_obj_request *obj_request;
2635         struct page **pages;
2636         u32 page_count;
2637         int ret;
2638
2639         /*
2640          * Method calls are ultimately read operations.  The result
2641          * should placed into the inbound buffer provided.  They
2642          * also supply outbound data--parameters for the object
2643          * method.  Currently if this is present it will be a
2644          * snapshot id.
2645          */
2646         page_count = (u32)calc_pages_for(0, inbound_size);
2647         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2648         if (IS_ERR(pages))
2649                 return PTR_ERR(pages);
2650
2651         ret = -ENOMEM;
2652         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2653                                                         OBJ_REQUEST_PAGES);
2654         if (!obj_request)
2655                 goto out;
2656
2657         obj_request->pages = pages;
2658         obj_request->page_count = page_count;
2659
2660         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2661         if (!obj_request->osd_req)
2662                 goto out;
2663
2664         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2665                                         class_name, method_name);
2666         if (outbound_size) {
2667                 struct ceph_pagelist *pagelist;
2668
2669                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2670                 if (!pagelist)
2671                         goto out;
2672
2673                 ceph_pagelist_init(pagelist);
2674                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2675                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2676                                                 pagelist);
2677         }
2678         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2679                                         obj_request->pages, inbound_size,
2680                                         0, false, false);
2681         rbd_osd_req_format_read(obj_request);
2682
2683         ret = rbd_obj_request_submit(osdc, obj_request);
2684         if (ret)
2685                 goto out;
2686         ret = rbd_obj_request_wait(obj_request);
2687         if (ret)
2688                 goto out;
2689
2690         ret = obj_request->result;
2691         if (ret < 0)
2692                 goto out;
2693
2694         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2695         ret = (int)obj_request->xferred;
2696         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2697         if (version)
2698                 *version = obj_request->version;
2699 out:
2700         if (obj_request)
2701                 rbd_obj_request_put(obj_request);
2702         else
2703                 ceph_release_page_vector(pages, page_count);
2704
2705         return ret;
2706 }
2707
2708 static void rbd_request_fn(struct request_queue *q)
2709                 __releases(q->queue_lock) __acquires(q->queue_lock)
2710 {
2711         struct rbd_device *rbd_dev = q->queuedata;
2712         bool read_only = rbd_dev->mapping.read_only;
2713         struct request *rq;
2714         int result;
2715
2716         while ((rq = blk_fetch_request(q))) {
2717                 bool write_request = rq_data_dir(rq) == WRITE;
2718                 struct rbd_img_request *img_request;
2719                 u64 offset;
2720                 u64 length;
2721
2722                 /* Ignore any non-FS requests that filter through. */
2723
2724                 if (rq->cmd_type != REQ_TYPE_FS) {
2725                         dout("%s: non-fs request type %d\n", __func__,
2726                                 (int) rq->cmd_type);
2727                         __blk_end_request_all(rq, 0);
2728                         continue;
2729                 }
2730
2731                 /* Ignore/skip any zero-length requests */
2732
2733                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2734                 length = (u64) blk_rq_bytes(rq);
2735
2736                 if (!length) {
2737                         dout("%s: zero-length request\n", __func__);
2738                         __blk_end_request_all(rq, 0);
2739                         continue;
2740                 }
2741
2742                 spin_unlock_irq(q->queue_lock);
2743
2744                 /* Disallow writes to a read-only device */
2745
2746                 if (write_request) {
2747                         result = -EROFS;
2748                         if (read_only)
2749                                 goto end_request;
2750                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2751                 }
2752
2753                 /*
2754                  * Quit early if the mapped snapshot no longer
2755                  * exists.  It's still possible the snapshot will
2756                  * have disappeared by the time our request arrives
2757                  * at the osd, but there's no sense in sending it if
2758                  * we already know.
2759                  */
2760                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2761                         dout("request for non-existent snapshot");
2762                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2763                         result = -ENXIO;
2764                         goto end_request;
2765                 }
2766
2767                 result = -EINVAL;
2768                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2769                         goto end_request;       /* Shouldn't happen */
2770
2771                 result = -ENOMEM;
2772                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2773                                                         write_request, false);
2774                 if (!img_request)
2775                         goto end_request;
2776
2777                 img_request->rq = rq;
2778
2779                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2780                                                 rq->bio);
2781                 if (!result)
2782                         result = rbd_img_request_submit(img_request);
2783                 if (result)
2784                         rbd_img_request_put(img_request);
2785 end_request:
2786                 spin_lock_irq(q->queue_lock);
2787                 if (result < 0) {
2788                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2789                                 write_request ? "write" : "read",
2790                                 length, offset, result);
2791
2792                         __blk_end_request_all(rq, result);
2793                 }
2794         }
2795 }
2796
2797 /*
2798  * a queue callback. Makes sure that we don't create a bio that spans across
2799  * multiple osd objects. One exception would be with a single page bios,
2800  * which we handle later at bio_chain_clone_range()
2801  */
2802 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2803                           struct bio_vec *bvec)
2804 {
2805         struct rbd_device *rbd_dev = q->queuedata;
2806         sector_t sector_offset;
2807         sector_t sectors_per_obj;
2808         sector_t obj_sector_offset;
2809         int ret;
2810
2811         /*
2812          * Find how far into its rbd object the partition-relative
2813          * bio start sector is to offset relative to the enclosing
2814          * device.
2815          */
2816         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2817         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2818         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2819
2820         /*
2821          * Compute the number of bytes from that offset to the end
2822          * of the object.  Account for what's already used by the bio.
2823          */
2824         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2825         if (ret > bmd->bi_size)
2826                 ret -= bmd->bi_size;
2827         else
2828                 ret = 0;
2829
2830         /*
2831          * Don't send back more than was asked for.  And if the bio
2832          * was empty, let the whole thing through because:  "Note
2833          * that a block device *must* allow a single page to be
2834          * added to an empty bio."
2835          */
2836         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2837         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2838                 ret = (int) bvec->bv_len;
2839
2840         return ret;
2841 }
2842
2843 static void rbd_free_disk(struct rbd_device *rbd_dev)
2844 {
2845         struct gendisk *disk = rbd_dev->disk;
2846
2847         if (!disk)
2848                 return;
2849
2850         if (disk->flags & GENHD_FL_UP)
2851                 del_gendisk(disk);
2852         if (disk->queue)
2853                 blk_cleanup_queue(disk->queue);
2854         put_disk(disk);
2855 }
2856
2857 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2858                                 const char *object_name,
2859                                 u64 offset, u64 length,
2860                                 void *buf, u64 *version)
2861
2862 {
2863         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2864         struct rbd_obj_request *obj_request;
2865         struct page **pages = NULL;
2866         u32 page_count;
2867         size_t size;
2868         int ret;
2869
2870         page_count = (u32) calc_pages_for(offset, length);
2871         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2872         if (IS_ERR(pages))
2873                 ret = PTR_ERR(pages);
2874
2875         ret = -ENOMEM;
2876         obj_request = rbd_obj_request_create(object_name, offset, length,
2877                                                         OBJ_REQUEST_PAGES);
2878         if (!obj_request)
2879                 goto out;
2880
2881         obj_request->pages = pages;
2882         obj_request->page_count = page_count;
2883
2884         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2885         if (!obj_request->osd_req)
2886                 goto out;
2887
2888         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2889                                         offset, length, 0, 0);
2890         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2891                                         obj_request->pages,
2892                                         obj_request->length,
2893                                         obj_request->offset & ~PAGE_MASK,
2894                                         false, false);
2895         rbd_osd_req_format_read(obj_request);
2896
2897         ret = rbd_obj_request_submit(osdc, obj_request);
2898         if (ret)
2899                 goto out;
2900         ret = rbd_obj_request_wait(obj_request);
2901         if (ret)
2902                 goto out;
2903
2904         ret = obj_request->result;
2905         if (ret < 0)
2906                 goto out;
2907
2908         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2909         size = (size_t) obj_request->xferred;
2910         ceph_copy_from_page_vector(pages, buf, 0, size);
2911         rbd_assert(size <= (size_t) INT_MAX);
2912         ret = (int) size;
2913         if (version)
2914                 *version = obj_request->version;
2915 out:
2916         if (obj_request)
2917                 rbd_obj_request_put(obj_request);
2918         else
2919                 ceph_release_page_vector(pages, page_count);
2920
2921         return ret;
2922 }
2923
2924 /*
2925  * Read the complete header for the given rbd device.
2926  *
2927  * Returns a pointer to a dynamically-allocated buffer containing
2928  * the complete and validated header.  Caller can pass the address
2929  * of a variable that will be filled in with the version of the
2930  * header object at the time it was read.
2931  *
2932  * Returns a pointer-coded errno if a failure occurs.
2933  */
2934 static struct rbd_image_header_ondisk *
2935 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2936 {
2937         struct rbd_image_header_ondisk *ondisk = NULL;
2938         u32 snap_count = 0;
2939         u64 names_size = 0;
2940         u32 want_count;
2941         int ret;
2942
2943         /*
2944          * The complete header will include an array of its 64-bit
2945          * snapshot ids, followed by the names of those snapshots as
2946          * a contiguous block of NUL-terminated strings.  Note that
2947          * the number of snapshots could change by the time we read
2948          * it in, in which case we re-read it.
2949          */
2950         do {
2951                 size_t size;
2952
2953                 kfree(ondisk);
2954
2955                 size = sizeof (*ondisk);
2956                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2957                 size += names_size;
2958                 ondisk = kmalloc(size, GFP_KERNEL);
2959                 if (!ondisk)
2960                         return ERR_PTR(-ENOMEM);
2961
2962                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2963                                        0, size, ondisk, version);
2964                 if (ret < 0)
2965                         goto out_err;
2966                 if (WARN_ON((size_t) ret < size)) {
2967                         ret = -ENXIO;
2968                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2969                                 size, ret);
2970                         goto out_err;
2971                 }
2972                 if (!rbd_dev_ondisk_valid(ondisk)) {
2973                         ret = -ENXIO;
2974                         rbd_warn(rbd_dev, "invalid header");
2975                         goto out_err;
2976                 }
2977
2978                 names_size = le64_to_cpu(ondisk->snap_names_len);
2979                 want_count = snap_count;
2980                 snap_count = le32_to_cpu(ondisk->snap_count);
2981         } while (snap_count != want_count);
2982
2983         return ondisk;
2984
2985 out_err:
2986         kfree(ondisk);
2987
2988         return ERR_PTR(ret);
2989 }
2990
2991 /*
2992  * reload the ondisk the header
2993  */
2994 static int rbd_read_header(struct rbd_device *rbd_dev,
2995                            struct rbd_image_header *header)
2996 {
2997         struct rbd_image_header_ondisk *ondisk;
2998         u64 ver = 0;
2999         int ret;
3000
3001         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3002         if (IS_ERR(ondisk))
3003                 return PTR_ERR(ondisk);
3004         ret = rbd_header_from_disk(header, ondisk);
3005         if (ret >= 0)
3006                 header->obj_version = ver;
3007         kfree(ondisk);
3008
3009         return ret;
3010 }
3011
3012 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3013 {
3014         struct rbd_snap *snap;
3015         struct rbd_snap *next;
3016
3017         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
3018                 rbd_remove_snap_dev(snap);
3019 }
3020
3021 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3022 {
3023         sector_t size;
3024
3025         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3026                 return;
3027
3028         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3029         dout("setting size to %llu sectors", (unsigned long long) size);
3030         rbd_dev->mapping.size = (u64) size;
3031         set_capacity(rbd_dev->disk, size);
3032 }
3033
3034 /*
3035  * only read the first part of the ondisk header, without the snaps info
3036  */
3037 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3038 {
3039         int ret;
3040         struct rbd_image_header h;
3041
3042         ret = rbd_read_header(rbd_dev, &h);
3043         if (ret < 0)
3044                 return ret;
3045
3046         down_write(&rbd_dev->header_rwsem);
3047
3048         /* Update image size, and check for resize of mapped image */
3049         rbd_dev->header.image_size = h.image_size;
3050         rbd_update_mapping_size(rbd_dev);
3051
3052         /* rbd_dev->header.object_prefix shouldn't change */
3053         kfree(rbd_dev->header.snap_sizes);
3054         kfree(rbd_dev->header.snap_names);
3055         /* osd requests may still refer to snapc */
3056         ceph_put_snap_context(rbd_dev->header.snapc);
3057
3058         if (hver)
3059                 *hver = h.obj_version;
3060         rbd_dev->header.obj_version = h.obj_version;
3061         rbd_dev->header.image_size = h.image_size;
3062         rbd_dev->header.snapc = h.snapc;
3063         rbd_dev->header.snap_names = h.snap_names;
3064         rbd_dev->header.snap_sizes = h.snap_sizes;
3065         /* Free the extra copy of the object prefix */
3066         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3067         kfree(h.object_prefix);
3068
3069         ret = rbd_dev_snaps_update(rbd_dev);
3070
3071         up_write(&rbd_dev->header_rwsem);
3072
3073         return ret;
3074 }
3075
3076 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3077 {
3078         int ret;
3079
3080         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3081         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3082         if (rbd_dev->image_format == 1)
3083                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3084         else
3085                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3086         mutex_unlock(&ctl_mutex);
3087         revalidate_disk(rbd_dev->disk);
3088
3089         return ret;
3090 }
3091
3092 static int rbd_init_disk(struct rbd_device *rbd_dev)
3093 {
3094         struct gendisk *disk;
3095         struct request_queue *q;
3096         u64 segment_size;
3097
3098         /* create gendisk info */
3099         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3100         if (!disk)
3101                 return -ENOMEM;
3102
3103         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3104                  rbd_dev->dev_id);
3105         disk->major = rbd_dev->major;
3106         disk->first_minor = 0;
3107         disk->fops = &rbd_bd_ops;
3108         disk->private_data = rbd_dev;
3109
3110         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3111         if (!q)
3112                 goto out_disk;
3113
3114         /* We use the default size, but let's be explicit about it. */
3115         blk_queue_physical_block_size(q, SECTOR_SIZE);
3116
3117         /* set io sizes to object size */
3118         segment_size = rbd_obj_bytes(&rbd_dev->header);
3119         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3120         blk_queue_max_segment_size(q, segment_size);
3121         blk_queue_io_min(q, segment_size);
3122         blk_queue_io_opt(q, segment_size);
3123
3124         blk_queue_merge_bvec(q, rbd_merge_bvec);
3125         disk->queue = q;
3126
3127         q->queuedata = rbd_dev;
3128
3129         rbd_dev->disk = disk;
3130
3131         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3132
3133         return 0;
3134 out_disk:
3135         put_disk(disk);
3136
3137         return -ENOMEM;
3138 }
3139
3140 /*
3141   sysfs
3142 */
3143
3144 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3145 {
3146         return container_of(dev, struct rbd_device, dev);
3147 }
3148
3149 static ssize_t rbd_size_show(struct device *dev,
3150                              struct device_attribute *attr, char *buf)
3151 {
3152         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3153         sector_t size;
3154
3155         down_read(&rbd_dev->header_rwsem);
3156         size = get_capacity(rbd_dev->disk);
3157         up_read(&rbd_dev->header_rwsem);
3158
3159         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3160 }
3161
3162 /*
3163  * Note this shows the features for whatever's mapped, which is not
3164  * necessarily the base image.
3165  */
3166 static ssize_t rbd_features_show(struct device *dev,
3167                              struct device_attribute *attr, char *buf)
3168 {
3169         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3170
3171         return sprintf(buf, "0x%016llx\n",
3172                         (unsigned long long) rbd_dev->mapping.features);
3173 }
3174
3175 static ssize_t rbd_major_show(struct device *dev,
3176                               struct device_attribute *attr, char *buf)
3177 {
3178         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3179
3180         return sprintf(buf, "%d\n", rbd_dev->major);
3181 }
3182
3183 static ssize_t rbd_client_id_show(struct device *dev,
3184                                   struct device_attribute *attr, char *buf)
3185 {
3186         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3187
3188         return sprintf(buf, "client%lld\n",
3189                         ceph_client_id(rbd_dev->rbd_client->client));
3190 }
3191
3192 static ssize_t rbd_pool_show(struct device *dev,
3193                              struct device_attribute *attr, char *buf)
3194 {
3195         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3196
3197         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3198 }
3199
3200 static ssize_t rbd_pool_id_show(struct device *dev,
3201                              struct device_attribute *attr, char *buf)
3202 {
3203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205         return sprintf(buf, "%llu\n",
3206                 (unsigned long long) rbd_dev->spec->pool_id);
3207 }
3208
3209 static ssize_t rbd_name_show(struct device *dev,
3210                              struct device_attribute *attr, char *buf)
3211 {
3212         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3213
3214         if (rbd_dev->spec->image_name)
3215                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3216
3217         return sprintf(buf, "(unknown)\n");
3218 }
3219
3220 static ssize_t rbd_image_id_show(struct device *dev,
3221                              struct device_attribute *attr, char *buf)
3222 {
3223         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
3225         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3226 }
3227
3228 /*
3229  * Shows the name of the currently-mapped snapshot (or
3230  * RBD_SNAP_HEAD_NAME for the base image).
3231  */
3232 static ssize_t rbd_snap_show(struct device *dev,
3233                              struct device_attribute *attr,
3234                              char *buf)
3235 {
3236         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237
3238         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3239 }
3240
3241 /*
3242  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3243  * for the parent image.  If there is no parent, simply shows
3244  * "(no parent image)".
3245  */
3246 static ssize_t rbd_parent_show(struct device *dev,
3247                              struct device_attribute *attr,
3248                              char *buf)
3249 {
3250         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3251         struct rbd_spec *spec = rbd_dev->parent_spec;
3252         int count;
3253         char *bufp = buf;
3254
3255         if (!spec)
3256                 return sprintf(buf, "(no parent image)\n");
3257
3258         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3259                         (unsigned long long) spec->pool_id, spec->pool_name);
3260         if (count < 0)
3261                 return count;
3262         bufp += count;
3263
3264         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3265                         spec->image_name ? spec->image_name : "(unknown)");
3266         if (count < 0)
3267                 return count;
3268         bufp += count;
3269
3270         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3271                         (unsigned long long) spec->snap_id, spec->snap_name);
3272         if (count < 0)
3273                 return count;
3274         bufp += count;
3275
3276         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3277         if (count < 0)
3278                 return count;
3279         bufp += count;
3280
3281         return (ssize_t) (bufp - buf);
3282 }
3283
3284 static ssize_t rbd_image_refresh(struct device *dev,
3285                                  struct device_attribute *attr,
3286                                  const char *buf,
3287                                  size_t size)
3288 {
3289         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290         int ret;
3291
3292         ret = rbd_dev_refresh(rbd_dev, NULL);
3293
3294         return ret < 0 ? ret : size;
3295 }
3296
3297 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3298 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3299 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3300 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3301 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3302 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3303 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3304 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3305 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3306 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3307 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3308
3309 static struct attribute *rbd_attrs[] = {
3310         &dev_attr_size.attr,
3311         &dev_attr_features.attr,
3312         &dev_attr_major.attr,
3313         &dev_attr_client_id.attr,
3314         &dev_attr_pool.attr,
3315         &dev_attr_pool_id.attr,
3316         &dev_attr_name.attr,
3317         &dev_attr_image_id.attr,
3318         &dev_attr_current_snap.attr,
3319         &dev_attr_parent.attr,
3320         &dev_attr_refresh.attr,
3321         NULL
3322 };
3323
3324 static struct attribute_group rbd_attr_group = {
3325         .attrs = rbd_attrs,
3326 };
3327
3328 static const struct attribute_group *rbd_attr_groups[] = {
3329         &rbd_attr_group,
3330         NULL
3331 };
3332
3333 static void rbd_sysfs_dev_release(struct device *dev)
3334 {
3335 }
3336
3337 static struct device_type rbd_device_type = {
3338         .name           = "rbd",
3339         .groups         = rbd_attr_groups,
3340         .release        = rbd_sysfs_dev_release,
3341 };
3342
3343 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3344 {
3345         kref_get(&spec->kref);
3346
3347         return spec;
3348 }
3349
3350 static void rbd_spec_free(struct kref *kref);
3351 static void rbd_spec_put(struct rbd_spec *spec)
3352 {
3353         if (spec)
3354                 kref_put(&spec->kref, rbd_spec_free);
3355 }
3356
3357 static struct rbd_spec *rbd_spec_alloc(void)
3358 {
3359         struct rbd_spec *spec;
3360
3361         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3362         if (!spec)
3363                 return NULL;
3364         kref_init(&spec->kref);
3365
3366         return spec;
3367 }
3368
3369 static void rbd_spec_free(struct kref *kref)
3370 {
3371         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3372
3373         kfree(spec->pool_name);
3374         kfree(spec->image_id);
3375         kfree(spec->image_name);
3376         kfree(spec->snap_name);
3377         kfree(spec);
3378 }
3379
3380 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3381                                 struct rbd_spec *spec)
3382 {
3383         struct rbd_device *rbd_dev;
3384
3385         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3386         if (!rbd_dev)
3387                 return NULL;
3388
3389         spin_lock_init(&rbd_dev->lock);
3390         rbd_dev->flags = 0;
3391         INIT_LIST_HEAD(&rbd_dev->node);
3392         INIT_LIST_HEAD(&rbd_dev->snaps);
3393         init_rwsem(&rbd_dev->header_rwsem);
3394
3395         rbd_dev->spec = spec;
3396         rbd_dev->rbd_client = rbdc;
3397
3398         /* Initialize the layout used for all rbd requests */
3399
3400         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3401         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3402         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3403         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3404
3405         return rbd_dev;
3406 }
3407
3408 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3409 {
3410         rbd_spec_put(rbd_dev->parent_spec);
3411         kfree(rbd_dev->header_name);
3412         rbd_put_client(rbd_dev->rbd_client);
3413         rbd_spec_put(rbd_dev->spec);
3414         kfree(rbd_dev);
3415 }
3416
3417 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3418 {
3419         list_del(&snap->node);
3420         kfree(snap->name);
3421         kfree(snap);
3422 }
3423
3424 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3425                                                 const char *snap_name,
3426                                                 u64 snap_id, u64 snap_size,
3427                                                 u64 snap_features)
3428 {
3429         struct rbd_snap *snap;
3430         int ret;
3431
3432         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3433         if (!snap)
3434                 return ERR_PTR(-ENOMEM);
3435
3436         ret = -ENOMEM;
3437         snap->name = kstrdup(snap_name, GFP_KERNEL);
3438         if (!snap->name)
3439                 goto err;
3440
3441         snap->id = snap_id;
3442         snap->size = snap_size;
3443         snap->features = snap_features;
3444
3445         return snap;
3446
3447 err:
3448         kfree(snap->name);
3449         kfree(snap);
3450
3451         return ERR_PTR(ret);
3452 }
3453
3454 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3455                 u64 *snap_size, u64 *snap_features)
3456 {
3457         char *snap_name;
3458
3459         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3460
3461         *snap_size = rbd_dev->header.snap_sizes[which];
3462         *snap_features = 0;     /* No features for v1 */
3463
3464         /* Skip over names until we find the one we are looking for */
3465
3466         snap_name = rbd_dev->header.snap_names;
3467         while (which--)
3468                 snap_name += strlen(snap_name) + 1;
3469
3470         return snap_name;
3471 }
3472
3473 /*
3474  * Get the size and object order for an image snapshot, or if
3475  * snap_id is CEPH_NOSNAP, gets this information for the base
3476  * image.
3477  */
3478 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3479                                 u8 *order, u64 *snap_size)
3480 {
3481         __le64 snapid = cpu_to_le64(snap_id);
3482         int ret;
3483         struct {
3484                 u8 order;
3485                 __le64 size;
3486         } __attribute__ ((packed)) size_buf = { 0 };
3487
3488         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3489                                 "rbd", "get_size",
3490                                 &snapid, sizeof (snapid),
3491                                 &size_buf, sizeof (size_buf), NULL);
3492         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3493         if (ret < 0)
3494                 return ret;
3495         if (ret < sizeof (size_buf))
3496                 return -ERANGE;
3497
3498         *order = size_buf.order;
3499         *snap_size = le64_to_cpu(size_buf.size);
3500
3501         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3502                 (unsigned long long)snap_id, (unsigned int)*order,
3503                 (unsigned long long)*snap_size);
3504
3505         return 0;
3506 }
3507
3508 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3509 {
3510         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3511                                         &rbd_dev->header.obj_order,
3512                                         &rbd_dev->header.image_size);
3513 }
3514
3515 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3516 {
3517         void *reply_buf;
3518         int ret;
3519         void *p;
3520
3521         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3522         if (!reply_buf)
3523                 return -ENOMEM;
3524
3525         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3526                                 "rbd", "get_object_prefix", NULL, 0,
3527                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3528         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3529         if (ret < 0)
3530                 goto out;
3531
3532         p = reply_buf;
3533         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3534                                                 p + ret, NULL, GFP_NOIO);
3535         ret = 0;
3536
3537         if (IS_ERR(rbd_dev->header.object_prefix)) {
3538                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3539                 rbd_dev->header.object_prefix = NULL;
3540         } else {
3541                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3542         }
3543 out:
3544         kfree(reply_buf);
3545
3546         return ret;
3547 }
3548
3549 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3550                 u64 *snap_features)
3551 {
3552         __le64 snapid = cpu_to_le64(snap_id);
3553         struct {
3554                 __le64 features;
3555                 __le64 incompat;
3556         } __attribute__ ((packed)) features_buf = { 0 };
3557         u64 incompat;
3558         int ret;
3559
3560         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3561                                 "rbd", "get_features",
3562                                 &snapid, sizeof (snapid),
3563                                 &features_buf, sizeof (features_buf), NULL);
3564         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3565         if (ret < 0)
3566                 return ret;
3567         if (ret < sizeof (features_buf))
3568                 return -ERANGE;
3569
3570         incompat = le64_to_cpu(features_buf.incompat);
3571         if (incompat & ~RBD_FEATURES_SUPPORTED)
3572                 return -ENXIO;
3573
3574         *snap_features = le64_to_cpu(features_buf.features);
3575
3576         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3577                 (unsigned long long)snap_id,
3578                 (unsigned long long)*snap_features,
3579                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3580
3581         return 0;
3582 }
3583
3584 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3585 {
3586         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3587                                                 &rbd_dev->header.features);
3588 }
3589
3590 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3591 {
3592         struct rbd_spec *parent_spec;
3593         size_t size;
3594         void *reply_buf = NULL;
3595         __le64 snapid;
3596         void *p;
3597         void *end;
3598         char *image_id;
3599         u64 overlap;
3600         int ret;
3601
3602         parent_spec = rbd_spec_alloc();
3603         if (!parent_spec)
3604                 return -ENOMEM;
3605
3606         size = sizeof (__le64) +                                /* pool_id */
3607                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3608                 sizeof (__le64) +                               /* snap_id */
3609                 sizeof (__le64);                                /* overlap */
3610         reply_buf = kmalloc(size, GFP_KERNEL);
3611         if (!reply_buf) {
3612                 ret = -ENOMEM;
3613                 goto out_err;
3614         }
3615
3616         snapid = cpu_to_le64(CEPH_NOSNAP);
3617         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3618                                 "rbd", "get_parent",
3619                                 &snapid, sizeof (snapid),
3620                                 reply_buf, size, NULL);
3621         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3622         if (ret < 0)
3623                 goto out_err;
3624
3625         p = reply_buf;
3626         end = reply_buf + ret;
3627         ret = -ERANGE;
3628         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3629         if (parent_spec->pool_id == CEPH_NOPOOL)
3630                 goto out;       /* No parent?  No problem. */
3631
3632         /* The ceph file layout needs to fit pool id in 32 bits */
3633
3634         ret = -EIO;
3635         if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
3636                 goto out_err;
3637
3638         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3639         if (IS_ERR(image_id)) {
3640                 ret = PTR_ERR(image_id);
3641                 goto out_err;
3642         }
3643         parent_spec->image_id = image_id;
3644         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3645         ceph_decode_64_safe(&p, end, overlap, out_err);
3646
3647         rbd_dev->parent_overlap = overlap;
3648         rbd_dev->parent_spec = parent_spec;
3649         parent_spec = NULL;     /* rbd_dev now owns this */
3650 out:
3651         ret = 0;
3652 out_err:
3653         kfree(reply_buf);
3654         rbd_spec_put(parent_spec);
3655
3656         return ret;
3657 }
3658
3659 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3660 {
3661         struct {
3662                 __le64 stripe_unit;
3663                 __le64 stripe_count;
3664         } __attribute__ ((packed)) striping_info_buf = { 0 };
3665         size_t size = sizeof (striping_info_buf);
3666         void *p;
3667         u64 obj_size;
3668         u64 stripe_unit;
3669         u64 stripe_count;
3670         int ret;
3671
3672         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3673                                 "rbd", "get_stripe_unit_count", NULL, 0,
3674                                 (char *)&striping_info_buf, size, NULL);
3675         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3676         if (ret < 0)
3677                 return ret;
3678         if (ret < size)
3679                 return -ERANGE;
3680
3681         /*
3682          * We don't actually support the "fancy striping" feature
3683          * (STRIPINGV2) yet, but if the striping sizes are the
3684          * defaults the behavior is the same as before.  So find
3685          * out, and only fail if the image has non-default values.
3686          */
3687         ret = -EINVAL;
3688         obj_size = (u64)1 << rbd_dev->header.obj_order;
3689         p = &striping_info_buf;
3690         stripe_unit = ceph_decode_64(&p);
3691         if (stripe_unit != obj_size) {
3692                 rbd_warn(rbd_dev, "unsupported stripe unit "
3693                                 "(got %llu want %llu)",
3694                                 stripe_unit, obj_size);
3695                 return -EINVAL;
3696         }
3697         stripe_count = ceph_decode_64(&p);
3698         if (stripe_count != 1) {
3699                 rbd_warn(rbd_dev, "unsupported stripe count "
3700                                 "(got %llu want 1)", stripe_count);
3701                 return -EINVAL;
3702         }
3703         rbd_dev->stripe_unit = stripe_unit;
3704         rbd_dev->stripe_count = stripe_count;
3705
3706         return 0;
3707 }
3708
3709 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3710 {
3711         size_t image_id_size;
3712         char *image_id;
3713         void *p;
3714         void *end;
3715         size_t size;
3716         void *reply_buf = NULL;
3717         size_t len = 0;
3718         char *image_name = NULL;
3719         int ret;
3720
3721         rbd_assert(!rbd_dev->spec->image_name);
3722
3723         len = strlen(rbd_dev->spec->image_id);
3724         image_id_size = sizeof (__le32) + len;
3725         image_id = kmalloc(image_id_size, GFP_KERNEL);
3726         if (!image_id)
3727                 return NULL;
3728
3729         p = image_id;
3730         end = image_id + image_id_size;
3731         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3732
3733         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3734         reply_buf = kmalloc(size, GFP_KERNEL);
3735         if (!reply_buf)
3736                 goto out;
3737
3738         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3739                                 "rbd", "dir_get_name",
3740                                 image_id, image_id_size,
3741                                 reply_buf, size, NULL);
3742         if (ret < 0)
3743                 goto out;
3744         p = reply_buf;
3745         end = reply_buf + size;
3746         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3747         if (IS_ERR(image_name))
3748                 image_name = NULL;
3749         else
3750                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3751 out:
3752         kfree(reply_buf);
3753         kfree(image_id);
3754
3755         return image_name;
3756 }
3757
3758 /*
3759  * When a parent image gets probed, we only have the pool, image,
3760  * and snapshot ids but not the names of any of them.  This call
3761  * is made later to fill in those names.  It has to be done after
3762  * rbd_dev_snaps_update() has completed because some of the
3763  * information (in particular, snapshot name) is not available
3764  * until then.
3765  */
3766 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3767 {
3768         struct ceph_osd_client *osdc;
3769         const char *name;
3770         void *reply_buf = NULL;
3771         int ret;
3772
3773         if (rbd_dev->spec->pool_name)
3774                 return 0;       /* Already have the names */
3775
3776         /* Look up the pool name */
3777
3778         osdc = &rbd_dev->rbd_client->client->osdc;
3779         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3780         if (!name) {
3781                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3782                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3783                 return -EIO;
3784         }
3785
3786         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3787         if (!rbd_dev->spec->pool_name)
3788                 return -ENOMEM;
3789
3790         /* Fetch the image name; tolerate failure here */
3791
3792         name = rbd_dev_image_name(rbd_dev);
3793         if (name)
3794                 rbd_dev->spec->image_name = (char *)name;
3795         else
3796                 rbd_warn(rbd_dev, "unable to get image name");
3797
3798         /* Look up the snapshot name. */
3799
3800         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3801         if (!name) {
3802                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3803                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3804                 ret = -EIO;
3805                 goto out_err;
3806         }
3807         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3808         if(!rbd_dev->spec->snap_name)
3809                 goto out_err;
3810
3811         return 0;
3812 out_err:
3813         kfree(reply_buf);
3814         kfree(rbd_dev->spec->pool_name);
3815         rbd_dev->spec->pool_name = NULL;
3816
3817         return ret;
3818 }
3819
3820 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3821 {
3822         size_t size;
3823         int ret;
3824         void *reply_buf;
3825         void *p;
3826         void *end;
3827         u64 seq;
3828         u32 snap_count;
3829         struct ceph_snap_context *snapc;
3830         u32 i;
3831
3832         /*
3833          * We'll need room for the seq value (maximum snapshot id),
3834          * snapshot count, and array of that many snapshot ids.
3835          * For now we have a fixed upper limit on the number we're
3836          * prepared to receive.
3837          */
3838         size = sizeof (__le64) + sizeof (__le32) +
3839                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3840         reply_buf = kzalloc(size, GFP_KERNEL);
3841         if (!reply_buf)
3842                 return -ENOMEM;
3843
3844         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3845                                 "rbd", "get_snapcontext", NULL, 0,
3846                                 reply_buf, size, ver);
3847         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3848         if (ret < 0)
3849                 goto out;
3850
3851         p = reply_buf;
3852         end = reply_buf + ret;
3853         ret = -ERANGE;
3854         ceph_decode_64_safe(&p, end, seq, out);
3855         ceph_decode_32_safe(&p, end, snap_count, out);
3856
3857         /*
3858          * Make sure the reported number of snapshot ids wouldn't go
3859          * beyond the end of our buffer.  But before checking that,
3860          * make sure the computed size of the snapshot context we
3861          * allocate is representable in a size_t.
3862          */
3863         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3864                                  / sizeof (u64)) {
3865                 ret = -EINVAL;
3866                 goto out;
3867         }
3868         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3869                 goto out;
3870
3871         size = sizeof (struct ceph_snap_context) +
3872                                 snap_count * sizeof (snapc->snaps[0]);
3873         snapc = kmalloc(size, GFP_KERNEL);
3874         if (!snapc) {
3875                 ret = -ENOMEM;
3876                 goto out;
3877         }
3878         ret = 0;
3879
3880         atomic_set(&snapc->nref, 1);
3881         snapc->seq = seq;
3882         snapc->num_snaps = snap_count;
3883         for (i = 0; i < snap_count; i++)
3884                 snapc->snaps[i] = ceph_decode_64(&p);
3885
3886         rbd_dev->header.snapc = snapc;
3887
3888         dout("  snap context seq = %llu, snap_count = %u\n",
3889                 (unsigned long long)seq, (unsigned int)snap_count);
3890 out:
3891         kfree(reply_buf);
3892
3893         return ret;
3894 }
3895
3896 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3897 {
3898         size_t size;
3899         void *reply_buf;
3900         __le64 snap_id;
3901         int ret;
3902         void *p;
3903         void *end;
3904         char *snap_name;
3905
3906         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3907         reply_buf = kmalloc(size, GFP_KERNEL);
3908         if (!reply_buf)
3909                 return ERR_PTR(-ENOMEM);
3910
3911         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3912         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3913                                 "rbd", "get_snapshot_name",
3914                                 &snap_id, sizeof (snap_id),
3915                                 reply_buf, size, NULL);
3916         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3917         if (ret < 0)
3918                 goto out;
3919
3920         p = reply_buf;
3921         end = reply_buf + size;
3922         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3923         if (IS_ERR(snap_name)) {
3924                 ret = PTR_ERR(snap_name);
3925                 goto out;
3926         } else {
3927                 dout("  snap_id 0x%016llx snap_name = %s\n",
3928                         (unsigned long long)le64_to_cpu(snap_id), snap_name);
3929         }
3930         kfree(reply_buf);
3931
3932         return snap_name;
3933 out:
3934         kfree(reply_buf);
3935
3936         return ERR_PTR(ret);
3937 }
3938
3939 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3940                 u64 *snap_size, u64 *snap_features)
3941 {
3942         u64 snap_id;
3943         u8 order;
3944         int ret;
3945
3946         snap_id = rbd_dev->header.snapc->snaps[which];
3947         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3948         if (ret)
3949                 return ERR_PTR(ret);
3950         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3951         if (ret)
3952                 return ERR_PTR(ret);
3953
3954         return rbd_dev_v2_snap_name(rbd_dev, which);
3955 }
3956
3957 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3958                 u64 *snap_size, u64 *snap_features)
3959 {
3960         if (rbd_dev->image_format == 1)
3961                 return rbd_dev_v1_snap_info(rbd_dev, which,
3962                                         snap_size, snap_features);
3963         if (rbd_dev->image_format == 2)
3964                 return rbd_dev_v2_snap_info(rbd_dev, which,
3965                                         snap_size, snap_features);
3966         return ERR_PTR(-EINVAL);
3967 }
3968
3969 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3970 {
3971         int ret;
3972         __u8 obj_order;
3973
3974         down_write(&rbd_dev->header_rwsem);
3975
3976         /* Grab old order first, to see if it changes */
3977
3978         obj_order = rbd_dev->header.obj_order,
3979         ret = rbd_dev_v2_image_size(rbd_dev);
3980         if (ret)
3981                 goto out;
3982         if (rbd_dev->header.obj_order != obj_order) {
3983                 ret = -EIO;
3984                 goto out;
3985         }
3986         rbd_update_mapping_size(rbd_dev);
3987
3988         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3989         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3990         if (ret)
3991                 goto out;
3992         ret = rbd_dev_snaps_update(rbd_dev);
3993         dout("rbd_dev_snaps_update returned %d\n", ret);
3994         if (ret)
3995                 goto out;
3996 out:
3997         up_write(&rbd_dev->header_rwsem);
3998
3999         return ret;
4000 }
4001
4002 /*
4003  * Scan the rbd device's current snapshot list and compare it to the
4004  * newly-received snapshot context.  Remove any existing snapshots
4005  * not present in the new snapshot context.  Add a new snapshot for
4006  * any snaphots in the snapshot context not in the current list.
4007  * And verify there are no changes to snapshots we already know
4008  * about.
4009  *
4010  * Assumes the snapshots in the snapshot context are sorted by
4011  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4012  * are also maintained in that order.)
4013  */
4014 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4015 {
4016         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4017         const u32 snap_count = snapc->num_snaps;
4018         struct list_head *head = &rbd_dev->snaps;
4019         struct list_head *links = head->next;
4020         u32 index = 0;
4021
4022         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
4023         while (index < snap_count || links != head) {
4024                 u64 snap_id;
4025                 struct rbd_snap *snap;
4026                 char *snap_name;
4027                 u64 snap_size = 0;
4028                 u64 snap_features = 0;
4029
4030                 snap_id = index < snap_count ? snapc->snaps[index]
4031                                              : CEPH_NOSNAP;
4032                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4033                                      : NULL;
4034                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4035
4036                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4037                         struct list_head *next = links->next;
4038
4039                         /*
4040                          * A previously-existing snapshot is not in
4041                          * the new snap context.
4042                          *
4043                          * If the now missing snapshot is the one the
4044                          * image is mapped to, clear its exists flag
4045                          * so we can avoid sending any more requests
4046                          * to it.
4047                          */
4048                         if (rbd_dev->spec->snap_id == snap->id)
4049                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4050                         dout("removing %ssnap id %llu\n",
4051                                 rbd_dev->spec->snap_id == snap->id ?
4052                                                         "mapped " : "",
4053                                 (unsigned long long) snap->id);
4054                         rbd_remove_snap_dev(snap);
4055
4056                         /* Done with this list entry; advance */
4057
4058                         links = next;
4059                         continue;
4060                 }
4061
4062                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4063                                         &snap_size, &snap_features);
4064                 if (IS_ERR(snap_name))
4065                         return PTR_ERR(snap_name);
4066
4067                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
4068                         (unsigned long long) snap_id);
4069                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4070                         struct rbd_snap *new_snap;
4071
4072                         /* We haven't seen this snapshot before */
4073
4074                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
4075                                         snap_id, snap_size, snap_features);
4076                         if (IS_ERR(new_snap)) {
4077                                 int err = PTR_ERR(new_snap);
4078
4079                                 dout("  failed to add dev, error %d\n", err);
4080
4081                                 return err;
4082                         }
4083
4084                         /* New goes before existing, or at end of list */
4085
4086                         dout("  added dev%s\n", snap ? "" : " at end\n");
4087                         if (snap)
4088                                 list_add_tail(&new_snap->node, &snap->node);
4089                         else
4090                                 list_add_tail(&new_snap->node, head);
4091                 } else {
4092                         /* Already have this one */
4093
4094                         dout("  already present\n");
4095
4096                         rbd_assert(snap->size == snap_size);
4097                         rbd_assert(!strcmp(snap->name, snap_name));
4098                         rbd_assert(snap->features == snap_features);
4099
4100                         /* Done with this list entry; advance */
4101
4102                         links = links->next;
4103                 }
4104
4105                 /* Advance to the next entry in the snapshot context */
4106
4107                 index++;
4108         }
4109         dout("%s: done\n", __func__);
4110
4111         return 0;
4112 }
4113
4114 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4115 {
4116         struct device *dev;
4117         int ret;
4118
4119         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4120
4121         dev = &rbd_dev->dev;
4122         dev->bus = &rbd_bus_type;
4123         dev->type = &rbd_device_type;
4124         dev->parent = &rbd_root_dev;
4125         dev->release = rbd_dev_release;
4126         dev_set_name(dev, "%d", rbd_dev->dev_id);
4127         ret = device_register(dev);
4128
4129         mutex_unlock(&ctl_mutex);
4130
4131         return ret;
4132 }
4133
4134 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4135 {
4136         device_unregister(&rbd_dev->dev);
4137 }
4138
4139 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4140
4141 /*
4142  * Get a unique rbd identifier for the given new rbd_dev, and add
4143  * the rbd_dev to the global list.  The minimum rbd id is 1.
4144  */
4145 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4146 {
4147         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4148
4149         spin_lock(&rbd_dev_list_lock);
4150         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4151         spin_unlock(&rbd_dev_list_lock);
4152         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4153                 (unsigned long long) rbd_dev->dev_id);
4154 }
4155
4156 /*
4157  * Remove an rbd_dev from the global list, and record that its
4158  * identifier is no longer in use.
4159  */
4160 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4161 {
4162         struct list_head *tmp;
4163         int rbd_id = rbd_dev->dev_id;
4164         int max_id;
4165
4166         rbd_assert(rbd_id > 0);
4167
4168         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4169                 (unsigned long long) rbd_dev->dev_id);
4170         spin_lock(&rbd_dev_list_lock);
4171         list_del_init(&rbd_dev->node);
4172
4173         /*
4174          * If the id being "put" is not the current maximum, there
4175          * is nothing special we need to do.
4176          */
4177         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4178                 spin_unlock(&rbd_dev_list_lock);
4179                 return;
4180         }
4181
4182         /*
4183          * We need to update the current maximum id.  Search the
4184          * list to find out what it is.  We're more likely to find
4185          * the maximum at the end, so search the list backward.
4186          */
4187         max_id = 0;
4188         list_for_each_prev(tmp, &rbd_dev_list) {
4189                 struct rbd_device *rbd_dev;
4190
4191                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4192                 if (rbd_dev->dev_id > max_id)
4193                         max_id = rbd_dev->dev_id;
4194         }
4195         spin_unlock(&rbd_dev_list_lock);
4196
4197         /*
4198          * The max id could have been updated by rbd_dev_id_get(), in
4199          * which case it now accurately reflects the new maximum.
4200          * Be careful not to overwrite the maximum value in that
4201          * case.
4202          */
4203         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4204         dout("  max dev id has been reset\n");
4205 }
4206
4207 /*
4208  * Skips over white space at *buf, and updates *buf to point to the
4209  * first found non-space character (if any). Returns the length of
4210  * the token (string of non-white space characters) found.  Note
4211  * that *buf must be terminated with '\0'.
4212  */
4213 static inline size_t next_token(const char **buf)
4214 {
4215         /*
4216         * These are the characters that produce nonzero for
4217         * isspace() in the "C" and "POSIX" locales.
4218         */
4219         const char *spaces = " \f\n\r\t\v";
4220
4221         *buf += strspn(*buf, spaces);   /* Find start of token */
4222
4223         return strcspn(*buf, spaces);   /* Return token length */
4224 }
4225
4226 /*
4227  * Finds the next token in *buf, and if the provided token buffer is
4228  * big enough, copies the found token into it.  The result, if
4229  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4230  * must be terminated with '\0' on entry.
4231  *
4232  * Returns the length of the token found (not including the '\0').
4233  * Return value will be 0 if no token is found, and it will be >=
4234  * token_size if the token would not fit.
4235  *
4236  * The *buf pointer will be updated to point beyond the end of the
4237  * found token.  Note that this occurs even if the token buffer is
4238  * too small to hold it.
4239  */
4240 static inline size_t copy_token(const char **buf,
4241                                 char *token,
4242                                 size_t token_size)
4243 {
4244         size_t len;
4245
4246         len = next_token(buf);
4247         if (len < token_size) {
4248                 memcpy(token, *buf, len);
4249                 *(token + len) = '\0';
4250         }
4251         *buf += len;
4252
4253         return len;
4254 }
4255
4256 /*
4257  * Finds the next token in *buf, dynamically allocates a buffer big
4258  * enough to hold a copy of it, and copies the token into the new
4259  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4260  * that a duplicate buffer is created even for a zero-length token.
4261  *
4262  * Returns a pointer to the newly-allocated duplicate, or a null
4263  * pointer if memory for the duplicate was not available.  If
4264  * the lenp argument is a non-null pointer, the length of the token
4265  * (not including the '\0') is returned in *lenp.
4266  *
4267  * If successful, the *buf pointer will be updated to point beyond
4268  * the end of the found token.
4269  *
4270  * Note: uses GFP_KERNEL for allocation.
4271  */
4272 static inline char *dup_token(const char **buf, size_t *lenp)
4273 {
4274         char *dup;
4275         size_t len;
4276
4277         len = next_token(buf);
4278         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4279         if (!dup)
4280                 return NULL;
4281         *(dup + len) = '\0';
4282         *buf += len;
4283
4284         if (lenp)
4285                 *lenp = len;
4286
4287         return dup;
4288 }
4289
4290 /*
4291  * Parse the options provided for an "rbd add" (i.e., rbd image
4292  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4293  * and the data written is passed here via a NUL-terminated buffer.
4294  * Returns 0 if successful or an error code otherwise.
4295  *
4296  * The information extracted from these options is recorded in
4297  * the other parameters which return dynamically-allocated
4298  * structures:
4299  *  ceph_opts
4300  *      The address of a pointer that will refer to a ceph options
4301  *      structure.  Caller must release the returned pointer using
4302  *      ceph_destroy_options() when it is no longer needed.
4303  *  rbd_opts
4304  *      Address of an rbd options pointer.  Fully initialized by
4305  *      this function; caller must release with kfree().
4306  *  spec
4307  *      Address of an rbd image specification pointer.  Fully
4308  *      initialized by this function based on parsed options.
4309  *      Caller must release with rbd_spec_put().
4310  *
4311  * The options passed take this form:
4312  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4313  * where:
4314  *  <mon_addrs>
4315  *      A comma-separated list of one or more monitor addresses.
4316  *      A monitor address is an ip address, optionally followed
4317  *      by a port number (separated by a colon).
4318  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4319  *  <options>
4320  *      A comma-separated list of ceph and/or rbd options.
4321  *  <pool_name>
4322  *      The name of the rados pool containing the rbd image.
4323  *  <image_name>
4324  *      The name of the image in that pool to map.
4325  *  <snap_id>
4326  *      An optional snapshot id.  If provided, the mapping will
4327  *      present data from the image at the time that snapshot was
4328  *      created.  The image head is used if no snapshot id is
4329  *      provided.  Snapshot mappings are always read-only.
4330  */
4331 static int rbd_add_parse_args(const char *buf,
4332                                 struct ceph_options **ceph_opts,
4333                                 struct rbd_options **opts,
4334                                 struct rbd_spec **rbd_spec)
4335 {
4336         size_t len;
4337         char *options;
4338         const char *mon_addrs;
4339         size_t mon_addrs_size;
4340         struct rbd_spec *spec = NULL;
4341         struct rbd_options *rbd_opts = NULL;
4342         struct ceph_options *copts;
4343         int ret;
4344
4345         /* The first four tokens are required */
4346
4347         len = next_token(&buf);
4348         if (!len) {
4349                 rbd_warn(NULL, "no monitor address(es) provided");
4350                 return -EINVAL;
4351         }
4352         mon_addrs = buf;
4353         mon_addrs_size = len + 1;
4354         buf += len;
4355
4356         ret = -EINVAL;
4357         options = dup_token(&buf, NULL);
4358         if (!options)
4359                 return -ENOMEM;
4360         if (!*options) {
4361                 rbd_warn(NULL, "no options provided");
4362                 goto out_err;
4363         }
4364
4365         spec = rbd_spec_alloc();
4366         if (!spec)
4367                 goto out_mem;
4368
4369         spec->pool_name = dup_token(&buf, NULL);
4370         if (!spec->pool_name)
4371                 goto out_mem;
4372         if (!*spec->pool_name) {
4373                 rbd_warn(NULL, "no pool name provided");
4374                 goto out_err;
4375         }
4376
4377         spec->image_name = dup_token(&buf, NULL);
4378         if (!spec->image_name)
4379                 goto out_mem;
4380         if (!*spec->image_name) {
4381                 rbd_warn(NULL, "no image name provided");
4382                 goto out_err;
4383         }
4384
4385         /*
4386          * Snapshot name is optional; default is to use "-"
4387          * (indicating the head/no snapshot).
4388          */
4389         len = next_token(&buf);
4390         if (!len) {
4391                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4392                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4393         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4394                 ret = -ENAMETOOLONG;
4395                 goto out_err;
4396         }
4397         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4398         if (!spec->snap_name)
4399                 goto out_mem;
4400         *(spec->snap_name + len) = '\0';
4401
4402         /* Initialize all rbd options to the defaults */
4403
4404         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4405         if (!rbd_opts)
4406                 goto out_mem;
4407
4408         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4409
4410         copts = ceph_parse_options(options, mon_addrs,
4411                                         mon_addrs + mon_addrs_size - 1,
4412                                         parse_rbd_opts_token, rbd_opts);
4413         if (IS_ERR(copts)) {
4414                 ret = PTR_ERR(copts);
4415                 goto out_err;
4416         }
4417         kfree(options);
4418
4419         *ceph_opts = copts;
4420         *opts = rbd_opts;
4421         *rbd_spec = spec;
4422
4423         return 0;
4424 out_mem:
4425         ret = -ENOMEM;
4426 out_err:
4427         kfree(rbd_opts);
4428         rbd_spec_put(spec);
4429         kfree(options);
4430
4431         return ret;
4432 }
4433
4434 /*
4435  * An rbd format 2 image has a unique identifier, distinct from the
4436  * name given to it by the user.  Internally, that identifier is
4437  * what's used to specify the names of objects related to the image.
4438  *
4439  * A special "rbd id" object is used to map an rbd image name to its
4440  * id.  If that object doesn't exist, then there is no v2 rbd image
4441  * with the supplied name.
4442  *
4443  * This function will record the given rbd_dev's image_id field if
4444  * it can be determined, and in that case will return 0.  If any
4445  * errors occur a negative errno will be returned and the rbd_dev's
4446  * image_id field will be unchanged (and should be NULL).
4447  */
4448 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4449 {
4450         int ret;
4451         size_t size;
4452         char *object_name;
4453         void *response;
4454         void *p;
4455
4456         /* If we already have it we don't need to look it up */
4457
4458         if (rbd_dev->spec->image_id)
4459                 return 0;
4460
4461         /*
4462          * When probing a parent image, the image id is already
4463          * known (and the image name likely is not).  There's no
4464          * need to fetch the image id again in this case.
4465          */
4466         if (rbd_dev->spec->image_id)
4467                 return 0;
4468
4469         /*
4470          * First, see if the format 2 image id file exists, and if
4471          * so, get the image's persistent id from it.
4472          */
4473         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4474         object_name = kmalloc(size, GFP_NOIO);
4475         if (!object_name)
4476                 return -ENOMEM;
4477         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4478         dout("rbd id object name is %s\n", object_name);
4479
4480         /* Response will be an encoded string, which includes a length */
4481
4482         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4483         response = kzalloc(size, GFP_NOIO);
4484         if (!response) {
4485                 ret = -ENOMEM;
4486                 goto out;
4487         }
4488
4489         ret = rbd_obj_method_sync(rbd_dev, object_name,
4490                                 "rbd", "get_id", NULL, 0,
4491                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4492         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4493         if (ret < 0)
4494                 goto out;
4495
4496         p = response;
4497         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4498                                                 p + ret,
4499                                                 NULL, GFP_NOIO);
4500         ret = 0;
4501
4502         if (IS_ERR(rbd_dev->spec->image_id)) {
4503                 ret = PTR_ERR(rbd_dev->spec->image_id);
4504                 rbd_dev->spec->image_id = NULL;
4505         } else {
4506                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4507         }
4508 out:
4509         kfree(response);
4510         kfree(object_name);
4511
4512         return ret;
4513 }
4514
4515 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4516 {
4517         int ret;
4518         size_t size;
4519
4520         /* Version 1 images have no id; empty string is used */
4521
4522         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4523         if (!rbd_dev->spec->image_id)
4524                 return -ENOMEM;
4525
4526         /* Record the header object name for this rbd image. */
4527
4528         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4529         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4530         if (!rbd_dev->header_name) {
4531                 ret = -ENOMEM;
4532                 goto out_err;
4533         }
4534         sprintf(rbd_dev->header_name, "%s%s",
4535                 rbd_dev->spec->image_name, RBD_SUFFIX);
4536
4537         /* Populate rbd image metadata */
4538
4539         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4540         if (ret < 0)
4541                 goto out_err;
4542
4543         /* Version 1 images have no parent (no layering) */
4544
4545         rbd_dev->parent_spec = NULL;
4546         rbd_dev->parent_overlap = 0;
4547
4548         rbd_dev->image_format = 1;
4549
4550         dout("discovered version 1 image, header name is %s\n",
4551                 rbd_dev->header_name);
4552
4553         return 0;
4554
4555 out_err:
4556         kfree(rbd_dev->header_name);
4557         rbd_dev->header_name = NULL;
4558         kfree(rbd_dev->spec->image_id);
4559         rbd_dev->spec->image_id = NULL;
4560
4561         return ret;
4562 }
4563
4564 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4565 {
4566         size_t size;
4567         int ret;
4568         u64 ver = 0;
4569
4570         /*
4571          * Image id was filled in by the caller.  Record the header
4572          * object name for this rbd image.
4573          */
4574         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4575         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4576         if (!rbd_dev->header_name)
4577                 return -ENOMEM;
4578         sprintf(rbd_dev->header_name, "%s%s",
4579                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4580
4581         /* Get the size and object order for the image */
4582         ret = rbd_dev_v2_image_size(rbd_dev);
4583         if (ret)
4584                 goto out_err;
4585
4586         /* Get the object prefix (a.k.a. block_name) for the image */
4587
4588         ret = rbd_dev_v2_object_prefix(rbd_dev);
4589         if (ret)
4590                 goto out_err;
4591
4592         /* Get the and check features for the image */
4593
4594         ret = rbd_dev_v2_features(rbd_dev);
4595         if (ret)
4596                 goto out_err;
4597
4598         /* If the image supports layering, get the parent info */
4599
4600         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4601                 ret = rbd_dev_v2_parent_info(rbd_dev);
4602                 if (ret)
4603                         goto out_err;
4604                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4605                                         "layered rbd images is EXPERIMENTAL!");
4606         }
4607
4608         /* If the image supports fancy striping, get its parameters */
4609
4610         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4611                 ret = rbd_dev_v2_striping_info(rbd_dev);
4612                 if (ret < 0)
4613                         goto out_err;
4614         }
4615
4616         /* crypto and compression type aren't (yet) supported for v2 images */
4617
4618         rbd_dev->header.crypt_type = 0;
4619         rbd_dev->header.comp_type = 0;
4620
4621         /* Get the snapshot context, plus the header version */
4622
4623         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4624         if (ret)
4625                 goto out_err;
4626         rbd_dev->header.obj_version = ver;
4627
4628         rbd_dev->image_format = 2;
4629
4630         dout("discovered version 2 image, header name is %s\n",
4631                 rbd_dev->header_name);
4632
4633         return 0;
4634 out_err:
4635         rbd_dev->parent_overlap = 0;
4636         rbd_spec_put(rbd_dev->parent_spec);
4637         rbd_dev->parent_spec = NULL;
4638         kfree(rbd_dev->header_name);
4639         rbd_dev->header_name = NULL;
4640         kfree(rbd_dev->header.object_prefix);
4641         rbd_dev->header.object_prefix = NULL;
4642
4643         return ret;
4644 }
4645
4646 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4647 {
4648         struct rbd_device *parent = NULL;
4649         struct rbd_spec *parent_spec = NULL;
4650         struct rbd_client *rbdc = NULL;
4651         int ret;
4652
4653         /* no need to lock here, as rbd_dev is not registered yet */
4654         ret = rbd_dev_snaps_update(rbd_dev);
4655         if (ret)
4656                 return ret;
4657
4658         ret = rbd_dev_probe_update_spec(rbd_dev);
4659         if (ret)
4660                 goto err_out_snaps;
4661
4662         ret = rbd_dev_set_mapping(rbd_dev);
4663         if (ret)
4664                 goto err_out_snaps;
4665
4666         /* generate unique id: find highest unique id, add one */
4667         rbd_dev_id_get(rbd_dev);
4668
4669         /* Fill in the device name, now that we have its id. */
4670         BUILD_BUG_ON(DEV_NAME_LEN
4671                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4672         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4673
4674         /* Get our block major device number. */
4675
4676         ret = register_blkdev(0, rbd_dev->name);
4677         if (ret < 0)
4678                 goto err_out_id;
4679         rbd_dev->major = ret;
4680
4681         /* Set up the blkdev mapping. */
4682
4683         ret = rbd_init_disk(rbd_dev);
4684         if (ret)
4685                 goto err_out_blkdev;
4686
4687         ret = rbd_bus_add_dev(rbd_dev);
4688         if (ret)
4689                 goto err_out_disk;
4690
4691         /*
4692          * At this point cleanup in the event of an error is the job
4693          * of the sysfs code (initiated by rbd_bus_del_dev()).
4694          */
4695         /* Probe the parent if there is one */
4696
4697         if (rbd_dev->parent_spec) {
4698                 /*
4699                  * We need to pass a reference to the client and the
4700                  * parent spec when creating the parent rbd_dev.
4701                  * Images related by parent/child relationships
4702                  * always share both.
4703                  */
4704                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4705                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4706
4707                 parent = rbd_dev_create(rbdc, parent_spec);
4708                 if (!parent) {
4709                         ret = -ENOMEM;
4710                         goto err_out_spec;
4711                 }
4712                 rbdc = NULL;            /* parent now owns reference */
4713                 parent_spec = NULL;     /* parent now owns reference */
4714                 ret = rbd_dev_probe(parent);
4715                 if (ret < 0)
4716                         goto err_out_parent;
4717                 rbd_dev->parent = parent;
4718         }
4719
4720         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4721         if (ret)
4722                 goto err_out_bus;
4723
4724         /* Everything's ready.  Announce the disk to the world. */
4725
4726         add_disk(rbd_dev->disk);
4727
4728         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4729                 (unsigned long long) rbd_dev->mapping.size);
4730
4731         return ret;
4732
4733 err_out_parent:
4734         rbd_dev_destroy(parent);
4735 err_out_spec:
4736         rbd_spec_put(parent_spec);
4737         rbd_put_client(rbdc);
4738 err_out_bus:
4739         /* this will also clean up rest of rbd_dev stuff */
4740
4741         rbd_bus_del_dev(rbd_dev);
4742
4743         return ret;
4744 err_out_disk:
4745         rbd_free_disk(rbd_dev);
4746 err_out_blkdev:
4747         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4748 err_out_id:
4749         rbd_dev_id_put(rbd_dev);
4750 err_out_snaps:
4751         rbd_remove_all_snaps(rbd_dev);
4752
4753         return ret;
4754 }
4755
4756 /*
4757  * Probe for the existence of the header object for the given rbd
4758  * device.  For format 2 images this includes determining the image
4759  * id.
4760  */
4761 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4762 {
4763         int ret;
4764
4765         /*
4766          * Get the id from the image id object.  If it's not a
4767          * format 2 image, we'll get ENOENT back, and we'll assume
4768          * it's a format 1 image.
4769          */
4770         ret = rbd_dev_image_id(rbd_dev);
4771         if (ret)
4772                 ret = rbd_dev_v1_probe(rbd_dev);
4773         else
4774                 ret = rbd_dev_v2_probe(rbd_dev);
4775         if (ret) {
4776                 dout("probe failed, returning %d\n", ret);
4777
4778                 return ret;
4779         }
4780
4781         ret = rbd_dev_probe_finish(rbd_dev);
4782         if (ret)
4783                 rbd_header_free(&rbd_dev->header);
4784
4785         return ret;
4786 }
4787
4788 static ssize_t rbd_add(struct bus_type *bus,
4789                        const char *buf,
4790                        size_t count)
4791 {
4792         struct rbd_device *rbd_dev = NULL;
4793         struct ceph_options *ceph_opts = NULL;
4794         struct rbd_options *rbd_opts = NULL;
4795         struct rbd_spec *spec = NULL;
4796         struct rbd_client *rbdc;
4797         struct ceph_osd_client *osdc;
4798         int rc = -ENOMEM;
4799
4800         if (!try_module_get(THIS_MODULE))
4801                 return -ENODEV;
4802
4803         /* parse add command */
4804         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4805         if (rc < 0)
4806                 goto err_out_module;
4807
4808         rbdc = rbd_get_client(ceph_opts);
4809         if (IS_ERR(rbdc)) {
4810                 rc = PTR_ERR(rbdc);
4811                 goto err_out_args;
4812         }
4813         ceph_opts = NULL;       /* rbd_dev client now owns this */
4814
4815         /* pick the pool */
4816         osdc = &rbdc->client->osdc;
4817         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4818         if (rc < 0)
4819                 goto err_out_client;
4820         spec->pool_id = (u64) rc;
4821
4822         /* The ceph file layout needs to fit pool id in 32 bits */
4823
4824         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4825                 rc = -EIO;
4826                 goto err_out_client;
4827         }
4828
4829         rbd_dev = rbd_dev_create(rbdc, spec);
4830         if (!rbd_dev)
4831                 goto err_out_client;
4832         rbdc = NULL;            /* rbd_dev now owns this */
4833         spec = NULL;            /* rbd_dev now owns this */
4834
4835         rbd_dev->mapping.read_only = rbd_opts->read_only;
4836         kfree(rbd_opts);
4837         rbd_opts = NULL;        /* done with this */
4838
4839         rc = rbd_dev_probe(rbd_dev);
4840         if (rc < 0)
4841                 goto err_out_rbd_dev;
4842
4843         return count;
4844 err_out_rbd_dev:
4845         rbd_dev_destroy(rbd_dev);
4846 err_out_client:
4847         rbd_put_client(rbdc);
4848 err_out_args:
4849         if (ceph_opts)
4850                 ceph_destroy_options(ceph_opts);
4851         kfree(rbd_opts);
4852         rbd_spec_put(spec);
4853 err_out_module:
4854         module_put(THIS_MODULE);
4855
4856         dout("Error adding device %s\n", buf);
4857
4858         return (ssize_t) rc;
4859 }
4860
4861 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4862 {
4863         struct list_head *tmp;
4864         struct rbd_device *rbd_dev;
4865
4866         spin_lock(&rbd_dev_list_lock);
4867         list_for_each(tmp, &rbd_dev_list) {
4868                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4869                 if (rbd_dev->dev_id == dev_id) {
4870                         spin_unlock(&rbd_dev_list_lock);
4871                         return rbd_dev;
4872                 }
4873         }
4874         spin_unlock(&rbd_dev_list_lock);
4875         return NULL;
4876 }
4877
4878 static void rbd_dev_release(struct device *dev)
4879 {
4880         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4881
4882         if (rbd_dev->watch_event)
4883                 rbd_dev_header_watch_sync(rbd_dev, 0);
4884
4885         /* clean up and free blkdev */
4886         rbd_free_disk(rbd_dev);
4887         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4888
4889         /* release allocated disk header fields */
4890         rbd_header_free(&rbd_dev->header);
4891
4892         /* done with the id, and with the rbd_dev */
4893         rbd_dev_id_put(rbd_dev);
4894         rbd_assert(rbd_dev->rbd_client != NULL);
4895         rbd_dev_destroy(rbd_dev);
4896
4897         /* release module ref */
4898         module_put(THIS_MODULE);
4899 }
4900
4901 static void __rbd_remove(struct rbd_device *rbd_dev)
4902 {
4903         rbd_remove_all_snaps(rbd_dev);
4904         rbd_bus_del_dev(rbd_dev);
4905 }
4906
4907 static ssize_t rbd_remove(struct bus_type *bus,
4908                           const char *buf,
4909                           size_t count)
4910 {
4911         struct rbd_device *rbd_dev = NULL;
4912         int target_id, rc;
4913         unsigned long ul;
4914         int ret = count;
4915
4916         rc = strict_strtoul(buf, 10, &ul);
4917         if (rc)
4918                 return rc;
4919
4920         /* convert to int; abort if we lost anything in the conversion */
4921         target_id = (int) ul;
4922         if (target_id != ul)
4923                 return -EINVAL;
4924
4925         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4926
4927         rbd_dev = __rbd_get_dev(target_id);
4928         if (!rbd_dev) {
4929                 ret = -ENOENT;
4930                 goto done;
4931         }
4932
4933         spin_lock_irq(&rbd_dev->lock);
4934         if (rbd_dev->open_count)
4935                 ret = -EBUSY;
4936         else
4937                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4938         spin_unlock_irq(&rbd_dev->lock);
4939         if (ret < 0)
4940                 goto done;
4941
4942         while (rbd_dev->parent_spec) {
4943                 struct rbd_device *first = rbd_dev;
4944                 struct rbd_device *second = first->parent;
4945                 struct rbd_device *third;
4946
4947                 /*
4948                  * Follow to the parent with no grandparent and
4949                  * remove it.
4950                  */
4951                 while (second && (third = second->parent)) {
4952                         first = second;
4953                         second = third;
4954                 }
4955                 __rbd_remove(second);
4956                 rbd_spec_put(first->parent_spec);
4957                 first->parent_spec = NULL;
4958                 first->parent_overlap = 0;
4959                 first->parent = NULL;
4960         }
4961         __rbd_remove(rbd_dev);
4962
4963 done:
4964         mutex_unlock(&ctl_mutex);
4965
4966         return ret;
4967 }
4968
4969 /*
4970  * create control files in sysfs
4971  * /sys/bus/rbd/...
4972  */
4973 static int rbd_sysfs_init(void)
4974 {
4975         int ret;
4976
4977         ret = device_register(&rbd_root_dev);
4978         if (ret < 0)
4979                 return ret;
4980
4981         ret = bus_register(&rbd_bus_type);
4982         if (ret < 0)
4983                 device_unregister(&rbd_root_dev);
4984
4985         return ret;
4986 }
4987
4988 static void rbd_sysfs_cleanup(void)
4989 {
4990         bus_unregister(&rbd_bus_type);
4991         device_unregister(&rbd_root_dev);
4992 }
4993
4994 static int __init rbd_init(void)
4995 {
4996         int rc;
4997
4998         if (!libceph_compatible(NULL)) {
4999                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5000
5001                 return -EINVAL;
5002         }
5003         rc = rbd_sysfs_init();
5004         if (rc)
5005                 return rc;
5006         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5007         return 0;
5008 }
5009
5010 static void __exit rbd_exit(void)
5011 {
5012         rbd_sysfs_cleanup();
5013 }
5014
5015 module_init(rbd_init);
5016 module_exit(rbd_exit);
5017
5018 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5019 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5020 MODULE_DESCRIPTION("rados block device");
5021
5022 /* following authorship retained from original osdblk.c */
5023 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5024
5025 MODULE_LICENSE("GPL");