]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: only set device exists flag when ready
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
675
676 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
677 {
678         struct ceph_snap_context *snapc;
679         size_t size;
680
681         size = sizeof (struct ceph_snap_context);
682         size += snap_count * sizeof (snapc->snaps[0]);
683         snapc = kzalloc(size, GFP_KERNEL);
684         if (!snapc)
685                 return NULL;
686
687         atomic_set(&snapc->nref, 1);
688         snapc->num_snaps = snap_count;
689
690         return snapc;
691 }
692
693 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
694 {
695         (void)ceph_get_snap_context(snapc);
696 }
697
698 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
699 {
700         ceph_put_snap_context(snapc);
701 }
702
703 /*
704  * Drop reference to ceph client node. If it's not referenced anymore, release
705  * it.
706  */
707 static void rbd_put_client(struct rbd_client *rbdc)
708 {
709         if (rbdc)
710                 kref_put(&rbdc->kref, rbd_client_release);
711 }
712
713 static bool rbd_image_format_valid(u32 image_format)
714 {
715         return image_format == 1 || image_format == 2;
716 }
717
718 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
719 {
720         size_t size;
721         u32 snap_count;
722
723         /* The header has to start with the magic rbd header text */
724         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
725                 return false;
726
727         /* The bio layer requires at least sector-sized I/O */
728
729         if (ondisk->options.order < SECTOR_SHIFT)
730                 return false;
731
732         /* If we use u64 in a few spots we may be able to loosen this */
733
734         if (ondisk->options.order > 8 * sizeof (int) - 1)
735                 return false;
736
737         /*
738          * The size of a snapshot header has to fit in a size_t, and
739          * that limits the number of snapshots.
740          */
741         snap_count = le32_to_cpu(ondisk->snap_count);
742         size = SIZE_MAX - sizeof (struct ceph_snap_context);
743         if (snap_count > size / sizeof (__le64))
744                 return false;
745
746         /*
747          * Not only that, but the size of the entire the snapshot
748          * header must also be representable in a size_t.
749          */
750         size -= snap_count * sizeof (__le64);
751         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
752                 return false;
753
754         return true;
755 }
756
757 /*
758  * Create a new header structure, translate header format from the on-disk
759  * header.
760  */
761 static int rbd_header_from_disk(struct rbd_image_header *header,
762                                  struct rbd_image_header_ondisk *ondisk)
763 {
764         u32 snap_count;
765         size_t len;
766         size_t size;
767         u32 i;
768
769         memset(header, 0, sizeof (*header));
770
771         snap_count = le32_to_cpu(ondisk->snap_count);
772
773         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
774         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
775         if (!header->object_prefix)
776                 return -ENOMEM;
777         memcpy(header->object_prefix, ondisk->object_prefix, len);
778         header->object_prefix[len] = '\0';
779
780         if (snap_count) {
781                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
782
783                 /* Save a copy of the snapshot names */
784
785                 if (snap_names_len > (u64) SIZE_MAX)
786                         return -EIO;
787                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
788                 if (!header->snap_names)
789                         goto out_err;
790                 /*
791                  * Note that rbd_dev_v1_header_read() guarantees
792                  * the ondisk buffer we're working with has
793                  * snap_names_len bytes beyond the end of the
794                  * snapshot id array, this memcpy() is safe.
795                  */
796                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
797                         snap_names_len);
798
799                 /* Record each snapshot's size */
800
801                 size = snap_count * sizeof (*header->snap_sizes);
802                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
803                 if (!header->snap_sizes)
804                         goto out_err;
805                 for (i = 0; i < snap_count; i++)
806                         header->snap_sizes[i] =
807                                 le64_to_cpu(ondisk->snaps[i].image_size);
808         } else {
809                 header->snap_names = NULL;
810                 header->snap_sizes = NULL;
811         }
812
813         header->features = 0;   /* No features support in v1 images */
814         header->obj_order = ondisk->options.order;
815         header->crypt_type = ondisk->options.crypt_type;
816         header->comp_type = ondisk->options.comp_type;
817
818         /* Allocate and fill in the snapshot context */
819
820         header->image_size = le64_to_cpu(ondisk->image_size);
821
822         header->snapc = rbd_snap_context_create(snap_count);
823         if (!header->snapc)
824                 goto out_err;
825         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
826         for (i = 0; i < snap_count; i++)
827                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
828
829         return 0;
830
831 out_err:
832         kfree(header->snap_sizes);
833         header->snap_sizes = NULL;
834         kfree(header->snap_names);
835         header->snap_names = NULL;
836         kfree(header->object_prefix);
837         header->object_prefix = NULL;
838
839         return -ENOMEM;
840 }
841
842 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
843 {
844         struct rbd_snap *snap;
845
846         if (snap_id == CEPH_NOSNAP)
847                 return RBD_SNAP_HEAD_NAME;
848
849         list_for_each_entry(snap, &rbd_dev->snaps, node)
850                 if (snap_id == snap->id)
851                         return snap->name;
852
853         return NULL;
854 }
855
856 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
857                                         const char *snap_name)
858 {
859         struct rbd_snap *snap;
860
861         list_for_each_entry(snap, &rbd_dev->snaps, node)
862                 if (!strcmp(snap_name, snap->name))
863                         return snap;
864
865         return NULL;
866 }
867
868 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
869 {
870         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
871                     sizeof (RBD_SNAP_HEAD_NAME))) {
872                 rbd_dev->mapping.size = rbd_dev->header.image_size;
873                 rbd_dev->mapping.features = rbd_dev->header.features;
874         } else {
875                 struct rbd_snap *snap;
876
877                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
878                 if (!snap)
879                         return -ENOENT;
880                 rbd_dev->mapping.size = snap->size;
881                 rbd_dev->mapping.features = snap->features;
882                 rbd_dev->mapping.read_only = true;
883         }
884
885         return 0;
886 }
887
888 static void rbd_header_free(struct rbd_image_header *header)
889 {
890         kfree(header->object_prefix);
891         header->object_prefix = NULL;
892         kfree(header->snap_sizes);
893         header->snap_sizes = NULL;
894         kfree(header->snap_names);
895         header->snap_names = NULL;
896         rbd_snap_context_put(header->snapc);
897         header->snapc = NULL;
898 }
899
900 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
901 {
902         char *name;
903         u64 segment;
904         int ret;
905
906         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
907         if (!name)
908                 return NULL;
909         segment = offset >> rbd_dev->header.obj_order;
910         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
911                         rbd_dev->header.object_prefix, segment);
912         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
913                 pr_err("error formatting segment name for #%llu (%d)\n",
914                         segment, ret);
915                 kfree(name);
916                 name = NULL;
917         }
918
919         return name;
920 }
921
922 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
923 {
924         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
925
926         return offset & (segment_size - 1);
927 }
928
929 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
930                                 u64 offset, u64 length)
931 {
932         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
933
934         offset &= segment_size - 1;
935
936         rbd_assert(length <= U64_MAX - offset);
937         if (offset + length > segment_size)
938                 length = segment_size - offset;
939
940         return length;
941 }
942
943 /*
944  * returns the size of an object in the image
945  */
946 static u64 rbd_obj_bytes(struct rbd_image_header *header)
947 {
948         return 1 << header->obj_order;
949 }
950
951 /*
952  * bio helpers
953  */
954
955 static void bio_chain_put(struct bio *chain)
956 {
957         struct bio *tmp;
958
959         while (chain) {
960                 tmp = chain;
961                 chain = chain->bi_next;
962                 bio_put(tmp);
963         }
964 }
965
966 /*
967  * zeros a bio chain, starting at specific offset
968  */
969 static void zero_bio_chain(struct bio *chain, int start_ofs)
970 {
971         struct bio_vec *bv;
972         unsigned long flags;
973         void *buf;
974         int i;
975         int pos = 0;
976
977         while (chain) {
978                 bio_for_each_segment(bv, chain, i) {
979                         if (pos + bv->bv_len > start_ofs) {
980                                 int remainder = max(start_ofs - pos, 0);
981                                 buf = bvec_kmap_irq(bv, &flags);
982                                 memset(buf + remainder, 0,
983                                        bv->bv_len - remainder);
984                                 bvec_kunmap_irq(buf, &flags);
985                         }
986                         pos += bv->bv_len;
987                 }
988
989                 chain = chain->bi_next;
990         }
991 }
992
993 /*
994  * similar to zero_bio_chain(), zeros data defined by a page array,
995  * starting at the given byte offset from the start of the array and
996  * continuing up to the given end offset.  The pages array is
997  * assumed to be big enough to hold all bytes up to the end.
998  */
999 static void zero_pages(struct page **pages, u64 offset, u64 end)
1000 {
1001         struct page **page = &pages[offset >> PAGE_SHIFT];
1002
1003         rbd_assert(end > offset);
1004         rbd_assert(end - offset <= (u64)SIZE_MAX);
1005         while (offset < end) {
1006                 size_t page_offset;
1007                 size_t length;
1008                 unsigned long flags;
1009                 void *kaddr;
1010
1011                 page_offset = (size_t)(offset & ~PAGE_MASK);
1012                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1013                 local_irq_save(flags);
1014                 kaddr = kmap_atomic(*page);
1015                 memset(kaddr + page_offset, 0, length);
1016                 kunmap_atomic(kaddr);
1017                 local_irq_restore(flags);
1018
1019                 offset += length;
1020                 page++;
1021         }
1022 }
1023
1024 /*
1025  * Clone a portion of a bio, starting at the given byte offset
1026  * and continuing for the number of bytes indicated.
1027  */
1028 static struct bio *bio_clone_range(struct bio *bio_src,
1029                                         unsigned int offset,
1030                                         unsigned int len,
1031                                         gfp_t gfpmask)
1032 {
1033         struct bio_vec *bv;
1034         unsigned int resid;
1035         unsigned short idx;
1036         unsigned int voff;
1037         unsigned short end_idx;
1038         unsigned short vcnt;
1039         struct bio *bio;
1040
1041         /* Handle the easy case for the caller */
1042
1043         if (!offset && len == bio_src->bi_size)
1044                 return bio_clone(bio_src, gfpmask);
1045
1046         if (WARN_ON_ONCE(!len))
1047                 return NULL;
1048         if (WARN_ON_ONCE(len > bio_src->bi_size))
1049                 return NULL;
1050         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1051                 return NULL;
1052
1053         /* Find first affected segment... */
1054
1055         resid = offset;
1056         __bio_for_each_segment(bv, bio_src, idx, 0) {
1057                 if (resid < bv->bv_len)
1058                         break;
1059                 resid -= bv->bv_len;
1060         }
1061         voff = resid;
1062
1063         /* ...and the last affected segment */
1064
1065         resid += len;
1066         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1067                 if (resid <= bv->bv_len)
1068                         break;
1069                 resid -= bv->bv_len;
1070         }
1071         vcnt = end_idx - idx + 1;
1072
1073         /* Build the clone */
1074
1075         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1076         if (!bio)
1077                 return NULL;    /* ENOMEM */
1078
1079         bio->bi_bdev = bio_src->bi_bdev;
1080         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1081         bio->bi_rw = bio_src->bi_rw;
1082         bio->bi_flags |= 1 << BIO_CLONED;
1083
1084         /*
1085          * Copy over our part of the bio_vec, then update the first
1086          * and last (or only) entries.
1087          */
1088         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1089                         vcnt * sizeof (struct bio_vec));
1090         bio->bi_io_vec[0].bv_offset += voff;
1091         if (vcnt > 1) {
1092                 bio->bi_io_vec[0].bv_len -= voff;
1093                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1094         } else {
1095                 bio->bi_io_vec[0].bv_len = len;
1096         }
1097
1098         bio->bi_vcnt = vcnt;
1099         bio->bi_size = len;
1100         bio->bi_idx = 0;
1101
1102         return bio;
1103 }
1104
1105 /*
1106  * Clone a portion of a bio chain, starting at the given byte offset
1107  * into the first bio in the source chain and continuing for the
1108  * number of bytes indicated.  The result is another bio chain of
1109  * exactly the given length, or a null pointer on error.
1110  *
1111  * The bio_src and offset parameters are both in-out.  On entry they
1112  * refer to the first source bio and the offset into that bio where
1113  * the start of data to be cloned is located.
1114  *
1115  * On return, bio_src is updated to refer to the bio in the source
1116  * chain that contains first un-cloned byte, and *offset will
1117  * contain the offset of that byte within that bio.
1118  */
1119 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1120                                         unsigned int *offset,
1121                                         unsigned int len,
1122                                         gfp_t gfpmask)
1123 {
1124         struct bio *bi = *bio_src;
1125         unsigned int off = *offset;
1126         struct bio *chain = NULL;
1127         struct bio **end;
1128
1129         /* Build up a chain of clone bios up to the limit */
1130
1131         if (!bi || off >= bi->bi_size || !len)
1132                 return NULL;            /* Nothing to clone */
1133
1134         end = &chain;
1135         while (len) {
1136                 unsigned int bi_size;
1137                 struct bio *bio;
1138
1139                 if (!bi) {
1140                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1141                         goto out_err;   /* EINVAL; ran out of bio's */
1142                 }
1143                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1144                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1145                 if (!bio)
1146                         goto out_err;   /* ENOMEM */
1147
1148                 *end = bio;
1149                 end = &bio->bi_next;
1150
1151                 off += bi_size;
1152                 if (off == bi->bi_size) {
1153                         bi = bi->bi_next;
1154                         off = 0;
1155                 }
1156                 len -= bi_size;
1157         }
1158         *bio_src = bi;
1159         *offset = off;
1160
1161         return chain;
1162 out_err:
1163         bio_chain_put(chain);
1164
1165         return NULL;
1166 }
1167
1168 /*
1169  * The default/initial value for all object request flags is 0.  For
1170  * each flag, once its value is set to 1 it is never reset to 0
1171  * again.
1172  */
1173 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1174 {
1175         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = obj_request->img_request->rbd_dev;
1179                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1188 }
1189
1190 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1191 {
1192         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1193                 struct rbd_device *rbd_dev = NULL;
1194
1195                 if (obj_request_img_data_test(obj_request))
1196                         rbd_dev = obj_request->img_request->rbd_dev;
1197                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1198                         obj_request);
1199         }
1200 }
1201
1202 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1203 {
1204         smp_mb();
1205         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1206 }
1207
1208 /*
1209  * This sets the KNOWN flag after (possibly) setting the EXISTS
1210  * flag.  The latter is set based on the "exists" value provided.
1211  *
1212  * Note that for our purposes once an object exists it never goes
1213  * away again.  It's possible that the response from two existence
1214  * checks are separated by the creation of the target object, and
1215  * the first ("doesn't exist") response arrives *after* the second
1216  * ("does exist").  In that case we ignore the second one.
1217  */
1218 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1219                                 bool exists)
1220 {
1221         if (exists)
1222                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1223         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1224         smp_mb();
1225 }
1226
1227 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1228 {
1229         smp_mb();
1230         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1231 }
1232
1233 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1234 {
1235         smp_mb();
1236         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1237 }
1238
1239 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1240 {
1241         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1242                 atomic_read(&obj_request->kref.refcount));
1243         kref_get(&obj_request->kref);
1244 }
1245
1246 static void rbd_obj_request_destroy(struct kref *kref);
1247 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1248 {
1249         rbd_assert(obj_request != NULL);
1250         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1251                 atomic_read(&obj_request->kref.refcount));
1252         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1253 }
1254
1255 static void rbd_img_request_get(struct rbd_img_request *img_request)
1256 {
1257         dout("%s: img %p (was %d)\n", __func__, img_request,
1258                 atomic_read(&img_request->kref.refcount));
1259         kref_get(&img_request->kref);
1260 }
1261
1262 static void rbd_img_request_destroy(struct kref *kref);
1263 static void rbd_img_request_put(struct rbd_img_request *img_request)
1264 {
1265         rbd_assert(img_request != NULL);
1266         dout("%s: img %p (was %d)\n", __func__, img_request,
1267                 atomic_read(&img_request->kref.refcount));
1268         kref_put(&img_request->kref, rbd_img_request_destroy);
1269 }
1270
1271 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1272                                         struct rbd_obj_request *obj_request)
1273 {
1274         rbd_assert(obj_request->img_request == NULL);
1275
1276         /* Image request now owns object's original reference */
1277         obj_request->img_request = img_request;
1278         obj_request->which = img_request->obj_request_count;
1279         rbd_assert(!obj_request_img_data_test(obj_request));
1280         obj_request_img_data_set(obj_request);
1281         rbd_assert(obj_request->which != BAD_WHICH);
1282         img_request->obj_request_count++;
1283         list_add_tail(&obj_request->links, &img_request->obj_requests);
1284         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1285                 obj_request->which);
1286 }
1287
1288 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1289                                         struct rbd_obj_request *obj_request)
1290 {
1291         rbd_assert(obj_request->which != BAD_WHICH);
1292
1293         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1294                 obj_request->which);
1295         list_del(&obj_request->links);
1296         rbd_assert(img_request->obj_request_count > 0);
1297         img_request->obj_request_count--;
1298         rbd_assert(obj_request->which == img_request->obj_request_count);
1299         obj_request->which = BAD_WHICH;
1300         rbd_assert(obj_request_img_data_test(obj_request));
1301         rbd_assert(obj_request->img_request == img_request);
1302         obj_request->img_request = NULL;
1303         obj_request->callback = NULL;
1304         rbd_obj_request_put(obj_request);
1305 }
1306
1307 static bool obj_request_type_valid(enum obj_request_type type)
1308 {
1309         switch (type) {
1310         case OBJ_REQUEST_NODATA:
1311         case OBJ_REQUEST_BIO:
1312         case OBJ_REQUEST_PAGES:
1313                 return true;
1314         default:
1315                 return false;
1316         }
1317 }
1318
1319 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1320                                 struct rbd_obj_request *obj_request)
1321 {
1322         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1323
1324         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1325 }
1326
1327 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1328 {
1329
1330         dout("%s: img %p\n", __func__, img_request);
1331
1332         /*
1333          * If no error occurred, compute the aggregate transfer
1334          * count for the image request.  We could instead use
1335          * atomic64_cmpxchg() to update it as each object request
1336          * completes; not clear which way is better off hand.
1337          */
1338         if (!img_request->result) {
1339                 struct rbd_obj_request *obj_request;
1340                 u64 xferred = 0;
1341
1342                 for_each_obj_request(img_request, obj_request)
1343                         xferred += obj_request->xferred;
1344                 img_request->xferred = xferred;
1345         }
1346
1347         if (img_request->callback)
1348                 img_request->callback(img_request);
1349         else
1350                 rbd_img_request_put(img_request);
1351 }
1352
1353 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1354
1355 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1356 {
1357         dout("%s: obj %p\n", __func__, obj_request);
1358
1359         return wait_for_completion_interruptible(&obj_request->completion);
1360 }
1361
1362 /*
1363  * The default/initial value for all image request flags is 0.  Each
1364  * is conditionally set to 1 at image request initialization time
1365  * and currently never change thereafter.
1366  */
1367 static void img_request_write_set(struct rbd_img_request *img_request)
1368 {
1369         set_bit(IMG_REQ_WRITE, &img_request->flags);
1370         smp_mb();
1371 }
1372
1373 static bool img_request_write_test(struct rbd_img_request *img_request)
1374 {
1375         smp_mb();
1376         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1377 }
1378
1379 static void img_request_child_set(struct rbd_img_request *img_request)
1380 {
1381         set_bit(IMG_REQ_CHILD, &img_request->flags);
1382         smp_mb();
1383 }
1384
1385 static bool img_request_child_test(struct rbd_img_request *img_request)
1386 {
1387         smp_mb();
1388         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1389 }
1390
1391 static void img_request_layered_set(struct rbd_img_request *img_request)
1392 {
1393         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1394         smp_mb();
1395 }
1396
1397 static bool img_request_layered_test(struct rbd_img_request *img_request)
1398 {
1399         smp_mb();
1400         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1401 }
1402
1403 static void
1404 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1405 {
1406         u64 xferred = obj_request->xferred;
1407         u64 length = obj_request->length;
1408
1409         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1410                 obj_request, obj_request->img_request, obj_request->result,
1411                 xferred, length);
1412         /*
1413          * ENOENT means a hole in the image.  We zero-fill the
1414          * entire length of the request.  A short read also implies
1415          * zero-fill to the end of the request.  Either way we
1416          * update the xferred count to indicate the whole request
1417          * was satisfied.
1418          */
1419         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1420         if (obj_request->result == -ENOENT) {
1421                 if (obj_request->type == OBJ_REQUEST_BIO)
1422                         zero_bio_chain(obj_request->bio_list, 0);
1423                 else
1424                         zero_pages(obj_request->pages, 0, length);
1425                 obj_request->result = 0;
1426                 obj_request->xferred = length;
1427         } else if (xferred < length && !obj_request->result) {
1428                 if (obj_request->type == OBJ_REQUEST_BIO)
1429                         zero_bio_chain(obj_request->bio_list, xferred);
1430                 else
1431                         zero_pages(obj_request->pages, xferred, length);
1432                 obj_request->xferred = length;
1433         }
1434         obj_request_done_set(obj_request);
1435 }
1436
1437 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1438 {
1439         dout("%s: obj %p cb %p\n", __func__, obj_request,
1440                 obj_request->callback);
1441         if (obj_request->callback)
1442                 obj_request->callback(obj_request);
1443         else
1444                 complete_all(&obj_request->completion);
1445 }
1446
1447 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1448 {
1449         dout("%s: obj %p\n", __func__, obj_request);
1450         obj_request_done_set(obj_request);
1451 }
1452
1453 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1454 {
1455         struct rbd_img_request *img_request = NULL;
1456         struct rbd_device *rbd_dev = NULL;
1457         bool layered = false;
1458
1459         if (obj_request_img_data_test(obj_request)) {
1460                 img_request = obj_request->img_request;
1461                 layered = img_request && img_request_layered_test(img_request);
1462                 rbd_dev = img_request->rbd_dev;
1463         }
1464
1465         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1466                 obj_request, img_request, obj_request->result,
1467                 obj_request->xferred, obj_request->length);
1468         if (layered && obj_request->result == -ENOENT &&
1469                         obj_request->img_offset < rbd_dev->parent_overlap)
1470                 rbd_img_parent_read(obj_request);
1471         else if (img_request)
1472                 rbd_img_obj_request_read_callback(obj_request);
1473         else
1474                 obj_request_done_set(obj_request);
1475 }
1476
1477 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1478 {
1479         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1480                 obj_request->result, obj_request->length);
1481         /*
1482          * There is no such thing as a successful short write.  Set
1483          * it to our originally-requested length.
1484          */
1485         obj_request->xferred = obj_request->length;
1486         obj_request_done_set(obj_request);
1487 }
1488
1489 /*
1490  * For a simple stat call there's nothing to do.  We'll do more if
1491  * this is part of a write sequence for a layered image.
1492  */
1493 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1494 {
1495         dout("%s: obj %p\n", __func__, obj_request);
1496         obj_request_done_set(obj_request);
1497 }
1498
1499 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1500                                 struct ceph_msg *msg)
1501 {
1502         struct rbd_obj_request *obj_request = osd_req->r_priv;
1503         u16 opcode;
1504
1505         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1506         rbd_assert(osd_req == obj_request->osd_req);
1507         if (obj_request_img_data_test(obj_request)) {
1508                 rbd_assert(obj_request->img_request);
1509                 rbd_assert(obj_request->which != BAD_WHICH);
1510         } else {
1511                 rbd_assert(obj_request->which == BAD_WHICH);
1512         }
1513
1514         if (osd_req->r_result < 0)
1515                 obj_request->result = osd_req->r_result;
1516         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1517
1518         BUG_ON(osd_req->r_num_ops > 2);
1519
1520         /*
1521          * We support a 64-bit length, but ultimately it has to be
1522          * passed to blk_end_request(), which takes an unsigned int.
1523          */
1524         obj_request->xferred = osd_req->r_reply_op_len[0];
1525         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1526         opcode = osd_req->r_ops[0].op;
1527         switch (opcode) {
1528         case CEPH_OSD_OP_READ:
1529                 rbd_osd_read_callback(obj_request);
1530                 break;
1531         case CEPH_OSD_OP_WRITE:
1532                 rbd_osd_write_callback(obj_request);
1533                 break;
1534         case CEPH_OSD_OP_STAT:
1535                 rbd_osd_stat_callback(obj_request);
1536                 break;
1537         case CEPH_OSD_OP_CALL:
1538         case CEPH_OSD_OP_NOTIFY_ACK:
1539         case CEPH_OSD_OP_WATCH:
1540                 rbd_osd_trivial_callback(obj_request);
1541                 break;
1542         default:
1543                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1544                         obj_request->object_name, (unsigned short) opcode);
1545                 break;
1546         }
1547
1548         if (obj_request_done_test(obj_request))
1549                 rbd_obj_request_complete(obj_request);
1550 }
1551
1552 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1553 {
1554         struct rbd_img_request *img_request = obj_request->img_request;
1555         struct ceph_osd_request *osd_req = obj_request->osd_req;
1556         u64 snap_id;
1557
1558         rbd_assert(osd_req != NULL);
1559
1560         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1561         ceph_osdc_build_request(osd_req, obj_request->offset,
1562                         NULL, snap_id, NULL);
1563 }
1564
1565 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1566 {
1567         struct rbd_img_request *img_request = obj_request->img_request;
1568         struct ceph_osd_request *osd_req = obj_request->osd_req;
1569         struct ceph_snap_context *snapc;
1570         struct timespec mtime = CURRENT_TIME;
1571
1572         rbd_assert(osd_req != NULL);
1573
1574         snapc = img_request ? img_request->snapc : NULL;
1575         ceph_osdc_build_request(osd_req, obj_request->offset,
1576                         snapc, CEPH_NOSNAP, &mtime);
1577 }
1578
1579 static struct ceph_osd_request *rbd_osd_req_create(
1580                                         struct rbd_device *rbd_dev,
1581                                         bool write_request,
1582                                         struct rbd_obj_request *obj_request)
1583 {
1584         struct ceph_snap_context *snapc = NULL;
1585         struct ceph_osd_client *osdc;
1586         struct ceph_osd_request *osd_req;
1587
1588         if (obj_request_img_data_test(obj_request)) {
1589                 struct rbd_img_request *img_request = obj_request->img_request;
1590
1591                 rbd_assert(write_request ==
1592                                 img_request_write_test(img_request));
1593                 if (write_request)
1594                         snapc = img_request->snapc;
1595         }
1596
1597         /* Allocate and initialize the request, for the single op */
1598
1599         osdc = &rbd_dev->rbd_client->client->osdc;
1600         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1601         if (!osd_req)
1602                 return NULL;    /* ENOMEM */
1603
1604         if (write_request)
1605                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1606         else
1607                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1608
1609         osd_req->r_callback = rbd_osd_req_callback;
1610         osd_req->r_priv = obj_request;
1611
1612         osd_req->r_oid_len = strlen(obj_request->object_name);
1613         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1614         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1615
1616         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1617
1618         return osd_req;
1619 }
1620
1621 /*
1622  * Create a copyup osd request based on the information in the
1623  * object request supplied.  A copyup request has two osd ops,
1624  * a copyup method call, and a "normal" write request.
1625  */
1626 static struct ceph_osd_request *
1627 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1628 {
1629         struct rbd_img_request *img_request;
1630         struct ceph_snap_context *snapc;
1631         struct rbd_device *rbd_dev;
1632         struct ceph_osd_client *osdc;
1633         struct ceph_osd_request *osd_req;
1634
1635         rbd_assert(obj_request_img_data_test(obj_request));
1636         img_request = obj_request->img_request;
1637         rbd_assert(img_request);
1638         rbd_assert(img_request_write_test(img_request));
1639
1640         /* Allocate and initialize the request, for the two ops */
1641
1642         snapc = img_request->snapc;
1643         rbd_dev = img_request->rbd_dev;
1644         osdc = &rbd_dev->rbd_client->client->osdc;
1645         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1646         if (!osd_req)
1647                 return NULL;    /* ENOMEM */
1648
1649         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1650         osd_req->r_callback = rbd_osd_req_callback;
1651         osd_req->r_priv = obj_request;
1652
1653         osd_req->r_oid_len = strlen(obj_request->object_name);
1654         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1655         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1656
1657         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1658
1659         return osd_req;
1660 }
1661
1662
1663 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1664 {
1665         ceph_osdc_put_request(osd_req);
1666 }
1667
1668 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1669
1670 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1671                                                 u64 offset, u64 length,
1672                                                 enum obj_request_type type)
1673 {
1674         struct rbd_obj_request *obj_request;
1675         size_t size;
1676         char *name;
1677
1678         rbd_assert(obj_request_type_valid(type));
1679
1680         size = strlen(object_name) + 1;
1681         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1682         if (!obj_request)
1683                 return NULL;
1684
1685         name = (char *)(obj_request + 1);
1686         obj_request->object_name = memcpy(name, object_name, size);
1687         obj_request->offset = offset;
1688         obj_request->length = length;
1689         obj_request->flags = 0;
1690         obj_request->which = BAD_WHICH;
1691         obj_request->type = type;
1692         INIT_LIST_HEAD(&obj_request->links);
1693         init_completion(&obj_request->completion);
1694         kref_init(&obj_request->kref);
1695
1696         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1697                 offset, length, (int)type, obj_request);
1698
1699         return obj_request;
1700 }
1701
1702 static void rbd_obj_request_destroy(struct kref *kref)
1703 {
1704         struct rbd_obj_request *obj_request;
1705
1706         obj_request = container_of(kref, struct rbd_obj_request, kref);
1707
1708         dout("%s: obj %p\n", __func__, obj_request);
1709
1710         rbd_assert(obj_request->img_request == NULL);
1711         rbd_assert(obj_request->which == BAD_WHICH);
1712
1713         if (obj_request->osd_req)
1714                 rbd_osd_req_destroy(obj_request->osd_req);
1715
1716         rbd_assert(obj_request_type_valid(obj_request->type));
1717         switch (obj_request->type) {
1718         case OBJ_REQUEST_NODATA:
1719                 break;          /* Nothing to do */
1720         case OBJ_REQUEST_BIO:
1721                 if (obj_request->bio_list)
1722                         bio_chain_put(obj_request->bio_list);
1723                 break;
1724         case OBJ_REQUEST_PAGES:
1725                 if (obj_request->pages)
1726                         ceph_release_page_vector(obj_request->pages,
1727                                                 obj_request->page_count);
1728                 break;
1729         }
1730
1731         kfree(obj_request);
1732 }
1733
1734 /*
1735  * Caller is responsible for filling in the list of object requests
1736  * that comprises the image request, and the Linux request pointer
1737  * (if there is one).
1738  */
1739 static struct rbd_img_request *rbd_img_request_create(
1740                                         struct rbd_device *rbd_dev,
1741                                         u64 offset, u64 length,
1742                                         bool write_request,
1743                                         bool child_request)
1744 {
1745         struct rbd_img_request *img_request;
1746
1747         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1748         if (!img_request)
1749                 return NULL;
1750
1751         if (write_request) {
1752                 down_read(&rbd_dev->header_rwsem);
1753                 rbd_snap_context_get(rbd_dev->header.snapc);
1754                 up_read(&rbd_dev->header_rwsem);
1755         }
1756
1757         img_request->rq = NULL;
1758         img_request->rbd_dev = rbd_dev;
1759         img_request->offset = offset;
1760         img_request->length = length;
1761         img_request->flags = 0;
1762         if (write_request) {
1763                 img_request_write_set(img_request);
1764                 img_request->snapc = rbd_dev->header.snapc;
1765         } else {
1766                 img_request->snap_id = rbd_dev->spec->snap_id;
1767         }
1768         if (child_request)
1769                 img_request_child_set(img_request);
1770         if (rbd_dev->parent_spec)
1771                 img_request_layered_set(img_request);
1772         spin_lock_init(&img_request->completion_lock);
1773         img_request->next_completion = 0;
1774         img_request->callback = NULL;
1775         img_request->result = 0;
1776         img_request->obj_request_count = 0;
1777         INIT_LIST_HEAD(&img_request->obj_requests);
1778         kref_init(&img_request->kref);
1779
1780         rbd_img_request_get(img_request);       /* Avoid a warning */
1781         rbd_img_request_put(img_request);       /* TEMPORARY */
1782
1783         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1784                 write_request ? "write" : "read", offset, length,
1785                 img_request);
1786
1787         return img_request;
1788 }
1789
1790 static void rbd_img_request_destroy(struct kref *kref)
1791 {
1792         struct rbd_img_request *img_request;
1793         struct rbd_obj_request *obj_request;
1794         struct rbd_obj_request *next_obj_request;
1795
1796         img_request = container_of(kref, struct rbd_img_request, kref);
1797
1798         dout("%s: img %p\n", __func__, img_request);
1799
1800         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1801                 rbd_img_obj_request_del(img_request, obj_request);
1802         rbd_assert(img_request->obj_request_count == 0);
1803
1804         if (img_request_write_test(img_request))
1805                 rbd_snap_context_put(img_request->snapc);
1806
1807         if (img_request_child_test(img_request))
1808                 rbd_obj_request_put(img_request->obj_request);
1809
1810         kfree(img_request);
1811 }
1812
1813 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1814 {
1815         struct rbd_img_request *img_request;
1816         unsigned int xferred;
1817         int result;
1818         bool more;
1819
1820         rbd_assert(obj_request_img_data_test(obj_request));
1821         img_request = obj_request->img_request;
1822
1823         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1824         xferred = (unsigned int)obj_request->xferred;
1825         result = obj_request->result;
1826         if (result) {
1827                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1828
1829                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1830                         img_request_write_test(img_request) ? "write" : "read",
1831                         obj_request->length, obj_request->img_offset,
1832                         obj_request->offset);
1833                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1834                         result, xferred);
1835                 if (!img_request->result)
1836                         img_request->result = result;
1837         }
1838
1839         /* Image object requests don't own their page array */
1840
1841         if (obj_request->type == OBJ_REQUEST_PAGES) {
1842                 obj_request->pages = NULL;
1843                 obj_request->page_count = 0;
1844         }
1845
1846         if (img_request_child_test(img_request)) {
1847                 rbd_assert(img_request->obj_request != NULL);
1848                 more = obj_request->which < img_request->obj_request_count - 1;
1849         } else {
1850                 rbd_assert(img_request->rq != NULL);
1851                 more = blk_end_request(img_request->rq, result, xferred);
1852         }
1853
1854         return more;
1855 }
1856
1857 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1858 {
1859         struct rbd_img_request *img_request;
1860         u32 which = obj_request->which;
1861         bool more = true;
1862
1863         rbd_assert(obj_request_img_data_test(obj_request));
1864         img_request = obj_request->img_request;
1865
1866         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1867         rbd_assert(img_request != NULL);
1868         rbd_assert(img_request->obj_request_count > 0);
1869         rbd_assert(which != BAD_WHICH);
1870         rbd_assert(which < img_request->obj_request_count);
1871         rbd_assert(which >= img_request->next_completion);
1872
1873         spin_lock_irq(&img_request->completion_lock);
1874         if (which != img_request->next_completion)
1875                 goto out;
1876
1877         for_each_obj_request_from(img_request, obj_request) {
1878                 rbd_assert(more);
1879                 rbd_assert(which < img_request->obj_request_count);
1880
1881                 if (!obj_request_done_test(obj_request))
1882                         break;
1883                 more = rbd_img_obj_end_request(obj_request);
1884                 which++;
1885         }
1886
1887         rbd_assert(more ^ (which == img_request->obj_request_count));
1888         img_request->next_completion = which;
1889 out:
1890         spin_unlock_irq(&img_request->completion_lock);
1891
1892         if (!more)
1893                 rbd_img_request_complete(img_request);
1894 }
1895
1896 /*
1897  * Split up an image request into one or more object requests, each
1898  * to a different object.  The "type" parameter indicates whether
1899  * "data_desc" is the pointer to the head of a list of bio
1900  * structures, or the base of a page array.  In either case this
1901  * function assumes data_desc describes memory sufficient to hold
1902  * all data described by the image request.
1903  */
1904 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1905                                         enum obj_request_type type,
1906                                         void *data_desc)
1907 {
1908         struct rbd_device *rbd_dev = img_request->rbd_dev;
1909         struct rbd_obj_request *obj_request = NULL;
1910         struct rbd_obj_request *next_obj_request;
1911         bool write_request = img_request_write_test(img_request);
1912         struct bio *bio_list;
1913         unsigned int bio_offset = 0;
1914         struct page **pages;
1915         u64 img_offset;
1916         u64 resid;
1917         u16 opcode;
1918
1919         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1920                 (int)type, data_desc);
1921
1922         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1923         img_offset = img_request->offset;
1924         resid = img_request->length;
1925         rbd_assert(resid > 0);
1926
1927         if (type == OBJ_REQUEST_BIO) {
1928                 bio_list = data_desc;
1929                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1930         } else {
1931                 rbd_assert(type == OBJ_REQUEST_PAGES);
1932                 pages = data_desc;
1933         }
1934
1935         while (resid) {
1936                 struct ceph_osd_request *osd_req;
1937                 const char *object_name;
1938                 u64 offset;
1939                 u64 length;
1940
1941                 object_name = rbd_segment_name(rbd_dev, img_offset);
1942                 if (!object_name)
1943                         goto out_unwind;
1944                 offset = rbd_segment_offset(rbd_dev, img_offset);
1945                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1946                 obj_request = rbd_obj_request_create(object_name,
1947                                                 offset, length, type);
1948                 kfree(object_name);     /* object request has its own copy */
1949                 if (!obj_request)
1950                         goto out_unwind;
1951
1952                 if (type == OBJ_REQUEST_BIO) {
1953                         unsigned int clone_size;
1954
1955                         rbd_assert(length <= (u64)UINT_MAX);
1956                         clone_size = (unsigned int)length;
1957                         obj_request->bio_list =
1958                                         bio_chain_clone_range(&bio_list,
1959                                                                 &bio_offset,
1960                                                                 clone_size,
1961                                                                 GFP_ATOMIC);
1962                         if (!obj_request->bio_list)
1963                                 goto out_partial;
1964                 } else {
1965                         unsigned int page_count;
1966
1967                         obj_request->pages = pages;
1968                         page_count = (u32)calc_pages_for(offset, length);
1969                         obj_request->page_count = page_count;
1970                         if ((offset + length) & ~PAGE_MASK)
1971                                 page_count--;   /* more on last page */
1972                         pages += page_count;
1973                 }
1974
1975                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1976                                                 obj_request);
1977                 if (!osd_req)
1978                         goto out_partial;
1979                 obj_request->osd_req = osd_req;
1980                 obj_request->callback = rbd_img_obj_callback;
1981
1982                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1983                                                 0, 0);
1984                 if (type == OBJ_REQUEST_BIO)
1985                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1986                                         obj_request->bio_list, length);
1987                 else
1988                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1989                                         obj_request->pages, length,
1990                                         offset & ~PAGE_MASK, false, false);
1991
1992                 if (write_request)
1993                         rbd_osd_req_format_write(obj_request);
1994                 else
1995                         rbd_osd_req_format_read(obj_request);
1996
1997                 obj_request->img_offset = img_offset;
1998                 rbd_img_obj_request_add(img_request, obj_request);
1999
2000                 img_offset += length;
2001                 resid -= length;
2002         }
2003
2004         return 0;
2005
2006 out_partial:
2007         rbd_obj_request_put(obj_request);
2008 out_unwind:
2009         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2010                 rbd_obj_request_put(obj_request);
2011
2012         return -ENOMEM;
2013 }
2014
2015 static void
2016 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2017 {
2018         struct rbd_img_request *img_request;
2019         struct rbd_device *rbd_dev;
2020         u64 length;
2021         u32 page_count;
2022
2023         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2024         rbd_assert(obj_request_img_data_test(obj_request));
2025         img_request = obj_request->img_request;
2026         rbd_assert(img_request);
2027
2028         rbd_dev = img_request->rbd_dev;
2029         rbd_assert(rbd_dev);
2030         length = (u64)1 << rbd_dev->header.obj_order;
2031         page_count = (u32)calc_pages_for(0, length);
2032
2033         rbd_assert(obj_request->copyup_pages);
2034         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2035         obj_request->copyup_pages = NULL;
2036
2037         /*
2038          * We want the transfer count to reflect the size of the
2039          * original write request.  There is no such thing as a
2040          * successful short write, so if the request was successful
2041          * we can just set it to the originally-requested length.
2042          */
2043         if (!obj_request->result)
2044                 obj_request->xferred = obj_request->length;
2045
2046         /* Finish up with the normal image object callback */
2047
2048         rbd_img_obj_callback(obj_request);
2049 }
2050
2051 static void
2052 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2053 {
2054         struct rbd_obj_request *orig_request;
2055         struct ceph_osd_request *osd_req;
2056         struct ceph_osd_client *osdc;
2057         struct rbd_device *rbd_dev;
2058         struct page **pages;
2059         int result;
2060         u64 obj_size;
2061         u64 xferred;
2062
2063         rbd_assert(img_request_child_test(img_request));
2064
2065         /* First get what we need from the image request */
2066
2067         pages = img_request->copyup_pages;
2068         rbd_assert(pages != NULL);
2069         img_request->copyup_pages = NULL;
2070
2071         orig_request = img_request->obj_request;
2072         rbd_assert(orig_request != NULL);
2073         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2074         result = img_request->result;
2075         obj_size = img_request->length;
2076         xferred = img_request->xferred;
2077
2078         rbd_dev = img_request->rbd_dev;
2079         rbd_assert(rbd_dev);
2080         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2081
2082         rbd_img_request_put(img_request);
2083
2084         if (result)
2085                 goto out_err;
2086
2087         /* Allocate the new copyup osd request for the original request */
2088
2089         result = -ENOMEM;
2090         rbd_assert(!orig_request->osd_req);
2091         osd_req = rbd_osd_req_create_copyup(orig_request);
2092         if (!osd_req)
2093                 goto out_err;
2094         orig_request->osd_req = osd_req;
2095         orig_request->copyup_pages = pages;
2096
2097         /* Initialize the copyup op */
2098
2099         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2100         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2101                                                 false, false);
2102
2103         /* Then the original write request op */
2104
2105         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2106                                         orig_request->offset,
2107                                         orig_request->length, 0, 0);
2108         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2109                                         orig_request->length);
2110
2111         rbd_osd_req_format_write(orig_request);
2112
2113         /* All set, send it off. */
2114
2115         orig_request->callback = rbd_img_obj_copyup_callback;
2116         osdc = &rbd_dev->rbd_client->client->osdc;
2117         result = rbd_obj_request_submit(osdc, orig_request);
2118         if (!result)
2119                 return;
2120 out_err:
2121         /* Record the error code and complete the request */
2122
2123         orig_request->result = result;
2124         orig_request->xferred = 0;
2125         obj_request_done_set(orig_request);
2126         rbd_obj_request_complete(orig_request);
2127 }
2128
2129 /*
2130  * Read from the parent image the range of data that covers the
2131  * entire target of the given object request.  This is used for
2132  * satisfying a layered image write request when the target of an
2133  * object request from the image request does not exist.
2134  *
2135  * A page array big enough to hold the returned data is allocated
2136  * and supplied to rbd_img_request_fill() as the "data descriptor."
2137  * When the read completes, this page array will be transferred to
2138  * the original object request for the copyup operation.
2139  *
2140  * If an error occurs, record it as the result of the original
2141  * object request and mark it done so it gets completed.
2142  */
2143 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2144 {
2145         struct rbd_img_request *img_request = NULL;
2146         struct rbd_img_request *parent_request = NULL;
2147         struct rbd_device *rbd_dev;
2148         u64 img_offset;
2149         u64 length;
2150         struct page **pages = NULL;
2151         u32 page_count;
2152         int result;
2153
2154         rbd_assert(obj_request_img_data_test(obj_request));
2155         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2156
2157         img_request = obj_request->img_request;
2158         rbd_assert(img_request != NULL);
2159         rbd_dev = img_request->rbd_dev;
2160         rbd_assert(rbd_dev->parent != NULL);
2161
2162         /*
2163          * First things first.  The original osd request is of no
2164          * use to use any more, we'll need a new one that can hold
2165          * the two ops in a copyup request.  We'll get that later,
2166          * but for now we can release the old one.
2167          */
2168         rbd_osd_req_destroy(obj_request->osd_req);
2169         obj_request->osd_req = NULL;
2170
2171         /*
2172          * Determine the byte range covered by the object in the
2173          * child image to which the original request was to be sent.
2174          */
2175         img_offset = obj_request->img_offset - obj_request->offset;
2176         length = (u64)1 << rbd_dev->header.obj_order;
2177
2178         /*
2179          * There is no defined parent data beyond the parent
2180          * overlap, so limit what we read at that boundary if
2181          * necessary.
2182          */
2183         if (img_offset + length > rbd_dev->parent_overlap) {
2184                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2185                 length = rbd_dev->parent_overlap - img_offset;
2186         }
2187
2188         /*
2189          * Allocate a page array big enough to receive the data read
2190          * from the parent.
2191          */
2192         page_count = (u32)calc_pages_for(0, length);
2193         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2194         if (IS_ERR(pages)) {
2195                 result = PTR_ERR(pages);
2196                 pages = NULL;
2197                 goto out_err;
2198         }
2199
2200         result = -ENOMEM;
2201         parent_request = rbd_img_request_create(rbd_dev->parent,
2202                                                 img_offset, length,
2203                                                 false, true);
2204         if (!parent_request)
2205                 goto out_err;
2206         rbd_obj_request_get(obj_request);
2207         parent_request->obj_request = obj_request;
2208
2209         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2210         if (result)
2211                 goto out_err;
2212         parent_request->copyup_pages = pages;
2213
2214         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2215         result = rbd_img_request_submit(parent_request);
2216         if (!result)
2217                 return 0;
2218
2219         parent_request->copyup_pages = NULL;
2220         parent_request->obj_request = NULL;
2221         rbd_obj_request_put(obj_request);
2222 out_err:
2223         if (pages)
2224                 ceph_release_page_vector(pages, page_count);
2225         if (parent_request)
2226                 rbd_img_request_put(parent_request);
2227         obj_request->result = result;
2228         obj_request->xferred = 0;
2229         obj_request_done_set(obj_request);
2230
2231         return result;
2232 }
2233
2234 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2235 {
2236         struct rbd_obj_request *orig_request;
2237         int result;
2238
2239         rbd_assert(!obj_request_img_data_test(obj_request));
2240
2241         /*
2242          * All we need from the object request is the original
2243          * request and the result of the STAT op.  Grab those, then
2244          * we're done with the request.
2245          */
2246         orig_request = obj_request->obj_request;
2247         obj_request->obj_request = NULL;
2248         rbd_assert(orig_request);
2249         rbd_assert(orig_request->img_request);
2250
2251         result = obj_request->result;
2252         obj_request->result = 0;
2253
2254         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2255                 obj_request, orig_request, result,
2256                 obj_request->xferred, obj_request->length);
2257         rbd_obj_request_put(obj_request);
2258
2259         rbd_assert(orig_request);
2260         rbd_assert(orig_request->img_request);
2261
2262         /*
2263          * Our only purpose here is to determine whether the object
2264          * exists, and we don't want to treat the non-existence as
2265          * an error.  If something else comes back, transfer the
2266          * error to the original request and complete it now.
2267          */
2268         if (!result) {
2269                 obj_request_existence_set(orig_request, true);
2270         } else if (result == -ENOENT) {
2271                 obj_request_existence_set(orig_request, false);
2272         } else if (result) {
2273                 orig_request->result = result;
2274                 goto out;
2275         }
2276
2277         /*
2278          * Resubmit the original request now that we have recorded
2279          * whether the target object exists.
2280          */
2281         orig_request->result = rbd_img_obj_request_submit(orig_request);
2282 out:
2283         if (orig_request->result)
2284                 rbd_obj_request_complete(orig_request);
2285         rbd_obj_request_put(orig_request);
2286 }
2287
2288 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2289 {
2290         struct rbd_obj_request *stat_request;
2291         struct rbd_device *rbd_dev;
2292         struct ceph_osd_client *osdc;
2293         struct page **pages = NULL;
2294         u32 page_count;
2295         size_t size;
2296         int ret;
2297
2298         /*
2299          * The response data for a STAT call consists of:
2300          *     le64 length;
2301          *     struct {
2302          *         le32 tv_sec;
2303          *         le32 tv_nsec;
2304          *     } mtime;
2305          */
2306         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2307         page_count = (u32)calc_pages_for(0, size);
2308         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2309         if (IS_ERR(pages))
2310                 return PTR_ERR(pages);
2311
2312         ret = -ENOMEM;
2313         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2314                                                         OBJ_REQUEST_PAGES);
2315         if (!stat_request)
2316                 goto out;
2317
2318         rbd_obj_request_get(obj_request);
2319         stat_request->obj_request = obj_request;
2320         stat_request->pages = pages;
2321         stat_request->page_count = page_count;
2322
2323         rbd_assert(obj_request->img_request);
2324         rbd_dev = obj_request->img_request->rbd_dev;
2325         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2326                                                 stat_request);
2327         if (!stat_request->osd_req)
2328                 goto out;
2329         stat_request->callback = rbd_img_obj_exists_callback;
2330
2331         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2332         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2333                                         false, false);
2334         rbd_osd_req_format_read(stat_request);
2335
2336         osdc = &rbd_dev->rbd_client->client->osdc;
2337         ret = rbd_obj_request_submit(osdc, stat_request);
2338 out:
2339         if (ret)
2340                 rbd_obj_request_put(obj_request);
2341
2342         return ret;
2343 }
2344
2345 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2346 {
2347         struct rbd_img_request *img_request;
2348         struct rbd_device *rbd_dev;
2349         bool known;
2350
2351         rbd_assert(obj_request_img_data_test(obj_request));
2352
2353         img_request = obj_request->img_request;
2354         rbd_assert(img_request);
2355         rbd_dev = img_request->rbd_dev;
2356
2357         /*
2358          * Only writes to layered images need special handling.
2359          * Reads and non-layered writes are simple object requests.
2360          * Layered writes that start beyond the end of the overlap
2361          * with the parent have no parent data, so they too are
2362          * simple object requests.  Finally, if the target object is
2363          * known to already exist, its parent data has already been
2364          * copied, so a write to the object can also be handled as a
2365          * simple object request.
2366          */
2367         if (!img_request_write_test(img_request) ||
2368                 !img_request_layered_test(img_request) ||
2369                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2370                 ((known = obj_request_known_test(obj_request)) &&
2371                         obj_request_exists_test(obj_request))) {
2372
2373                 struct rbd_device *rbd_dev;
2374                 struct ceph_osd_client *osdc;
2375
2376                 rbd_dev = obj_request->img_request->rbd_dev;
2377                 osdc = &rbd_dev->rbd_client->client->osdc;
2378
2379                 return rbd_obj_request_submit(osdc, obj_request);
2380         }
2381
2382         /*
2383          * It's a layered write.  The target object might exist but
2384          * we may not know that yet.  If we know it doesn't exist,
2385          * start by reading the data for the full target object from
2386          * the parent so we can use it for a copyup to the target.
2387          */
2388         if (known)
2389                 return rbd_img_obj_parent_read_full(obj_request);
2390
2391         /* We don't know whether the target exists.  Go find out. */
2392
2393         return rbd_img_obj_exists_submit(obj_request);
2394 }
2395
2396 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2397 {
2398         struct rbd_obj_request *obj_request;
2399         struct rbd_obj_request *next_obj_request;
2400
2401         dout("%s: img %p\n", __func__, img_request);
2402         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2403                 int ret;
2404
2405                 ret = rbd_img_obj_request_submit(obj_request);
2406                 if (ret)
2407                         return ret;
2408         }
2409
2410         return 0;
2411 }
2412
2413 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2414 {
2415         struct rbd_obj_request *obj_request;
2416         struct rbd_device *rbd_dev;
2417         u64 obj_end;
2418
2419         rbd_assert(img_request_child_test(img_request));
2420
2421         obj_request = img_request->obj_request;
2422         rbd_assert(obj_request);
2423         rbd_assert(obj_request->img_request);
2424
2425         obj_request->result = img_request->result;
2426         if (obj_request->result)
2427                 goto out;
2428
2429         /*
2430          * We need to zero anything beyond the parent overlap
2431          * boundary.  Since rbd_img_obj_request_read_callback()
2432          * will zero anything beyond the end of a short read, an
2433          * easy way to do this is to pretend the data from the
2434          * parent came up short--ending at the overlap boundary.
2435          */
2436         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2437         obj_end = obj_request->img_offset + obj_request->length;
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         if (obj_end > rbd_dev->parent_overlap) {
2440                 u64 xferred = 0;
2441
2442                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2443                         xferred = rbd_dev->parent_overlap -
2444                                         obj_request->img_offset;
2445
2446                 obj_request->xferred = min(img_request->xferred, xferred);
2447         } else {
2448                 obj_request->xferred = img_request->xferred;
2449         }
2450 out:
2451         rbd_img_obj_request_read_callback(obj_request);
2452         rbd_obj_request_complete(obj_request);
2453 }
2454
2455 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2456 {
2457         struct rbd_device *rbd_dev;
2458         struct rbd_img_request *img_request;
2459         int result;
2460
2461         rbd_assert(obj_request_img_data_test(obj_request));
2462         rbd_assert(obj_request->img_request != NULL);
2463         rbd_assert(obj_request->result == (s32) -ENOENT);
2464         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2465
2466         rbd_dev = obj_request->img_request->rbd_dev;
2467         rbd_assert(rbd_dev->parent != NULL);
2468         /* rbd_read_finish(obj_request, obj_request->length); */
2469         img_request = rbd_img_request_create(rbd_dev->parent,
2470                                                 obj_request->img_offset,
2471                                                 obj_request->length,
2472                                                 false, true);
2473         result = -ENOMEM;
2474         if (!img_request)
2475                 goto out_err;
2476
2477         rbd_obj_request_get(obj_request);
2478         img_request->obj_request = obj_request;
2479
2480         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2481                                         obj_request->bio_list);
2482         if (result)
2483                 goto out_err;
2484
2485         img_request->callback = rbd_img_parent_read_callback;
2486         result = rbd_img_request_submit(img_request);
2487         if (result)
2488                 goto out_err;
2489
2490         return;
2491 out_err:
2492         if (img_request)
2493                 rbd_img_request_put(img_request);
2494         obj_request->result = result;
2495         obj_request->xferred = 0;
2496         obj_request_done_set(obj_request);
2497 }
2498
2499 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2500                                    u64 ver, u64 notify_id)
2501 {
2502         struct rbd_obj_request *obj_request;
2503         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2504         int ret;
2505
2506         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2507                                                         OBJ_REQUEST_NODATA);
2508         if (!obj_request)
2509                 return -ENOMEM;
2510
2511         ret = -ENOMEM;
2512         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2513         if (!obj_request->osd_req)
2514                 goto out;
2515         obj_request->callback = rbd_obj_request_put;
2516
2517         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2518                                         notify_id, ver, 0);
2519         rbd_osd_req_format_read(obj_request);
2520
2521         ret = rbd_obj_request_submit(osdc, obj_request);
2522 out:
2523         if (ret)
2524                 rbd_obj_request_put(obj_request);
2525
2526         return ret;
2527 }
2528
2529 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2530 {
2531         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2532         u64 hver;
2533
2534         if (!rbd_dev)
2535                 return;
2536
2537         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2538                 rbd_dev->header_name, (unsigned long long) notify_id,
2539                 (unsigned int) opcode);
2540         (void)rbd_dev_refresh(rbd_dev, &hver);
2541
2542         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2543 }
2544
2545 /*
2546  * Request sync osd watch/unwatch.  The value of "start" determines
2547  * whether a watch request is being initiated or torn down.
2548  */
2549 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2550 {
2551         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2552         struct rbd_obj_request *obj_request;
2553         int ret;
2554
2555         rbd_assert(start ^ !!rbd_dev->watch_event);
2556         rbd_assert(start ^ !!rbd_dev->watch_request);
2557
2558         if (start) {
2559                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2560                                                 &rbd_dev->watch_event);
2561                 if (ret < 0)
2562                         return ret;
2563                 rbd_assert(rbd_dev->watch_event != NULL);
2564         }
2565
2566         ret = -ENOMEM;
2567         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2568                                                         OBJ_REQUEST_NODATA);
2569         if (!obj_request)
2570                 goto out_cancel;
2571
2572         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2573         if (!obj_request->osd_req)
2574                 goto out_cancel;
2575
2576         if (start)
2577                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2578         else
2579                 ceph_osdc_unregister_linger_request(osdc,
2580                                         rbd_dev->watch_request->osd_req);
2581
2582         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2583                                 rbd_dev->watch_event->cookie,
2584                                 rbd_dev->header.obj_version, start);
2585         rbd_osd_req_format_write(obj_request);
2586
2587         ret = rbd_obj_request_submit(osdc, obj_request);
2588         if (ret)
2589                 goto out_cancel;
2590         ret = rbd_obj_request_wait(obj_request);
2591         if (ret)
2592                 goto out_cancel;
2593         ret = obj_request->result;
2594         if (ret)
2595                 goto out_cancel;
2596
2597         /*
2598          * A watch request is set to linger, so the underlying osd
2599          * request won't go away until we unregister it.  We retain
2600          * a pointer to the object request during that time (in
2601          * rbd_dev->watch_request), so we'll keep a reference to
2602          * it.  We'll drop that reference (below) after we've
2603          * unregistered it.
2604          */
2605         if (start) {
2606                 rbd_dev->watch_request = obj_request;
2607
2608                 return 0;
2609         }
2610
2611         /* We have successfully torn down the watch request */
2612
2613         rbd_obj_request_put(rbd_dev->watch_request);
2614         rbd_dev->watch_request = NULL;
2615 out_cancel:
2616         /* Cancel the event if we're tearing down, or on error */
2617         ceph_osdc_cancel_event(rbd_dev->watch_event);
2618         rbd_dev->watch_event = NULL;
2619         if (obj_request)
2620                 rbd_obj_request_put(obj_request);
2621
2622         return ret;
2623 }
2624
2625 /*
2626  * Synchronous osd object method call.  Returns the number of bytes
2627  * returned in the outbound buffer, or a negative error code.
2628  */
2629 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2630                              const char *object_name,
2631                              const char *class_name,
2632                              const char *method_name,
2633                              const void *outbound,
2634                              size_t outbound_size,
2635                              void *inbound,
2636                              size_t inbound_size,
2637                              u64 *version)
2638 {
2639         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2640         struct rbd_obj_request *obj_request;
2641         struct page **pages;
2642         u32 page_count;
2643         int ret;
2644
2645         /*
2646          * Method calls are ultimately read operations.  The result
2647          * should placed into the inbound buffer provided.  They
2648          * also supply outbound data--parameters for the object
2649          * method.  Currently if this is present it will be a
2650          * snapshot id.
2651          */
2652         page_count = (u32)calc_pages_for(0, inbound_size);
2653         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2654         if (IS_ERR(pages))
2655                 return PTR_ERR(pages);
2656
2657         ret = -ENOMEM;
2658         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2659                                                         OBJ_REQUEST_PAGES);
2660         if (!obj_request)
2661                 goto out;
2662
2663         obj_request->pages = pages;
2664         obj_request->page_count = page_count;
2665
2666         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2667         if (!obj_request->osd_req)
2668                 goto out;
2669
2670         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2671                                         class_name, method_name);
2672         if (outbound_size) {
2673                 struct ceph_pagelist *pagelist;
2674
2675                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2676                 if (!pagelist)
2677                         goto out;
2678
2679                 ceph_pagelist_init(pagelist);
2680                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2681                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2682                                                 pagelist);
2683         }
2684         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2685                                         obj_request->pages, inbound_size,
2686                                         0, false, false);
2687         rbd_osd_req_format_read(obj_request);
2688
2689         ret = rbd_obj_request_submit(osdc, obj_request);
2690         if (ret)
2691                 goto out;
2692         ret = rbd_obj_request_wait(obj_request);
2693         if (ret)
2694                 goto out;
2695
2696         ret = obj_request->result;
2697         if (ret < 0)
2698                 goto out;
2699
2700         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2701         ret = (int)obj_request->xferred;
2702         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2703         if (version)
2704                 *version = obj_request->version;
2705 out:
2706         if (obj_request)
2707                 rbd_obj_request_put(obj_request);
2708         else
2709                 ceph_release_page_vector(pages, page_count);
2710
2711         return ret;
2712 }
2713
2714 static void rbd_request_fn(struct request_queue *q)
2715                 __releases(q->queue_lock) __acquires(q->queue_lock)
2716 {
2717         struct rbd_device *rbd_dev = q->queuedata;
2718         bool read_only = rbd_dev->mapping.read_only;
2719         struct request *rq;
2720         int result;
2721
2722         while ((rq = blk_fetch_request(q))) {
2723                 bool write_request = rq_data_dir(rq) == WRITE;
2724                 struct rbd_img_request *img_request;
2725                 u64 offset;
2726                 u64 length;
2727
2728                 /* Ignore any non-FS requests that filter through. */
2729
2730                 if (rq->cmd_type != REQ_TYPE_FS) {
2731                         dout("%s: non-fs request type %d\n", __func__,
2732                                 (int) rq->cmd_type);
2733                         __blk_end_request_all(rq, 0);
2734                         continue;
2735                 }
2736
2737                 /* Ignore/skip any zero-length requests */
2738
2739                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2740                 length = (u64) blk_rq_bytes(rq);
2741
2742                 if (!length) {
2743                         dout("%s: zero-length request\n", __func__);
2744                         __blk_end_request_all(rq, 0);
2745                         continue;
2746                 }
2747
2748                 spin_unlock_irq(q->queue_lock);
2749
2750                 /* Disallow writes to a read-only device */
2751
2752                 if (write_request) {
2753                         result = -EROFS;
2754                         if (read_only)
2755                                 goto end_request;
2756                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2757                 }
2758
2759                 /*
2760                  * Quit early if the mapped snapshot no longer
2761                  * exists.  It's still possible the snapshot will
2762                  * have disappeared by the time our request arrives
2763                  * at the osd, but there's no sense in sending it if
2764                  * we already know.
2765                  */
2766                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2767                         dout("request for non-existent snapshot");
2768                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2769                         result = -ENXIO;
2770                         goto end_request;
2771                 }
2772
2773                 result = -EINVAL;
2774                 if (offset && length > U64_MAX - offset + 1) {
2775                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2776                                 offset, length);
2777                         goto end_request;       /* Shouldn't happen */
2778                 }
2779
2780                 result = -ENOMEM;
2781                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2782                                                         write_request, false);
2783                 if (!img_request)
2784                         goto end_request;
2785
2786                 img_request->rq = rq;
2787
2788                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2789                                                 rq->bio);
2790                 if (!result)
2791                         result = rbd_img_request_submit(img_request);
2792                 if (result)
2793                         rbd_img_request_put(img_request);
2794 end_request:
2795                 spin_lock_irq(q->queue_lock);
2796                 if (result < 0) {
2797                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2798                                 write_request ? "write" : "read",
2799                                 length, offset, result);
2800
2801                         __blk_end_request_all(rq, result);
2802                 }
2803         }
2804 }
2805
2806 /*
2807  * a queue callback. Makes sure that we don't create a bio that spans across
2808  * multiple osd objects. One exception would be with a single page bios,
2809  * which we handle later at bio_chain_clone_range()
2810  */
2811 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2812                           struct bio_vec *bvec)
2813 {
2814         struct rbd_device *rbd_dev = q->queuedata;
2815         sector_t sector_offset;
2816         sector_t sectors_per_obj;
2817         sector_t obj_sector_offset;
2818         int ret;
2819
2820         /*
2821          * Find how far into its rbd object the partition-relative
2822          * bio start sector is to offset relative to the enclosing
2823          * device.
2824          */
2825         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2826         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2827         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2828
2829         /*
2830          * Compute the number of bytes from that offset to the end
2831          * of the object.  Account for what's already used by the bio.
2832          */
2833         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2834         if (ret > bmd->bi_size)
2835                 ret -= bmd->bi_size;
2836         else
2837                 ret = 0;
2838
2839         /*
2840          * Don't send back more than was asked for.  And if the bio
2841          * was empty, let the whole thing through because:  "Note
2842          * that a block device *must* allow a single page to be
2843          * added to an empty bio."
2844          */
2845         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2846         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2847                 ret = (int) bvec->bv_len;
2848
2849         return ret;
2850 }
2851
2852 static void rbd_free_disk(struct rbd_device *rbd_dev)
2853 {
2854         struct gendisk *disk = rbd_dev->disk;
2855
2856         if (!disk)
2857                 return;
2858
2859         rbd_dev->disk = NULL;
2860         if (disk->flags & GENHD_FL_UP) {
2861                 del_gendisk(disk);
2862                 if (disk->queue)
2863                         blk_cleanup_queue(disk->queue);
2864         }
2865         put_disk(disk);
2866 }
2867
2868 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2869                                 const char *object_name,
2870                                 u64 offset, u64 length,
2871                                 void *buf, u64 *version)
2872
2873 {
2874         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2875         struct rbd_obj_request *obj_request;
2876         struct page **pages = NULL;
2877         u32 page_count;
2878         size_t size;
2879         int ret;
2880
2881         page_count = (u32) calc_pages_for(offset, length);
2882         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2883         if (IS_ERR(pages))
2884                 ret = PTR_ERR(pages);
2885
2886         ret = -ENOMEM;
2887         obj_request = rbd_obj_request_create(object_name, offset, length,
2888                                                         OBJ_REQUEST_PAGES);
2889         if (!obj_request)
2890                 goto out;
2891
2892         obj_request->pages = pages;
2893         obj_request->page_count = page_count;
2894
2895         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2896         if (!obj_request->osd_req)
2897                 goto out;
2898
2899         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2900                                         offset, length, 0, 0);
2901         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2902                                         obj_request->pages,
2903                                         obj_request->length,
2904                                         obj_request->offset & ~PAGE_MASK,
2905                                         false, false);
2906         rbd_osd_req_format_read(obj_request);
2907
2908         ret = rbd_obj_request_submit(osdc, obj_request);
2909         if (ret)
2910                 goto out;
2911         ret = rbd_obj_request_wait(obj_request);
2912         if (ret)
2913                 goto out;
2914
2915         ret = obj_request->result;
2916         if (ret < 0)
2917                 goto out;
2918
2919         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2920         size = (size_t) obj_request->xferred;
2921         ceph_copy_from_page_vector(pages, buf, 0, size);
2922         rbd_assert(size <= (size_t) INT_MAX);
2923         ret = (int) size;
2924         if (version)
2925                 *version = obj_request->version;
2926 out:
2927         if (obj_request)
2928                 rbd_obj_request_put(obj_request);
2929         else
2930                 ceph_release_page_vector(pages, page_count);
2931
2932         return ret;
2933 }
2934
2935 /*
2936  * Read the complete header for the given rbd device.
2937  *
2938  * Returns a pointer to a dynamically-allocated buffer containing
2939  * the complete and validated header.  Caller can pass the address
2940  * of a variable that will be filled in with the version of the
2941  * header object at the time it was read.
2942  *
2943  * Returns a pointer-coded errno if a failure occurs.
2944  */
2945 static struct rbd_image_header_ondisk *
2946 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2947 {
2948         struct rbd_image_header_ondisk *ondisk = NULL;
2949         u32 snap_count = 0;
2950         u64 names_size = 0;
2951         u32 want_count;
2952         int ret;
2953
2954         /*
2955          * The complete header will include an array of its 64-bit
2956          * snapshot ids, followed by the names of those snapshots as
2957          * a contiguous block of NUL-terminated strings.  Note that
2958          * the number of snapshots could change by the time we read
2959          * it in, in which case we re-read it.
2960          */
2961         do {
2962                 size_t size;
2963
2964                 kfree(ondisk);
2965
2966                 size = sizeof (*ondisk);
2967                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2968                 size += names_size;
2969                 ondisk = kmalloc(size, GFP_KERNEL);
2970                 if (!ondisk)
2971                         return ERR_PTR(-ENOMEM);
2972
2973                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2974                                        0, size, ondisk, version);
2975                 if (ret < 0)
2976                         goto out_err;
2977                 if ((size_t)ret < size) {
2978                         ret = -ENXIO;
2979                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2980                                 size, ret);
2981                         goto out_err;
2982                 }
2983                 if (!rbd_dev_ondisk_valid(ondisk)) {
2984                         ret = -ENXIO;
2985                         rbd_warn(rbd_dev, "invalid header");
2986                         goto out_err;
2987                 }
2988
2989                 names_size = le64_to_cpu(ondisk->snap_names_len);
2990                 want_count = snap_count;
2991                 snap_count = le32_to_cpu(ondisk->snap_count);
2992         } while (snap_count != want_count);
2993
2994         return ondisk;
2995
2996 out_err:
2997         kfree(ondisk);
2998
2999         return ERR_PTR(ret);
3000 }
3001
3002 /*
3003  * reload the ondisk the header
3004  */
3005 static int rbd_read_header(struct rbd_device *rbd_dev,
3006                            struct rbd_image_header *header)
3007 {
3008         struct rbd_image_header_ondisk *ondisk;
3009         u64 ver = 0;
3010         int ret;
3011
3012         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3013         if (IS_ERR(ondisk))
3014                 return PTR_ERR(ondisk);
3015         ret = rbd_header_from_disk(header, ondisk);
3016         if (ret >= 0)
3017                 header->obj_version = ver;
3018         kfree(ondisk);
3019
3020         return ret;
3021 }
3022
3023 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3024 {
3025         struct rbd_snap *snap;
3026         struct rbd_snap *next;
3027
3028         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3029                 list_del(&snap->node);
3030                 rbd_snap_destroy(snap);
3031         }
3032 }
3033
3034 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3035 {
3036         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3037                 return;
3038
3039         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3040                 sector_t size;
3041
3042                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3043                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3044                 dout("setting size to %llu sectors", (unsigned long long)size);
3045                 set_capacity(rbd_dev->disk, size);
3046         }
3047 }
3048
3049 /*
3050  * only read the first part of the ondisk header, without the snaps info
3051  */
3052 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3053 {
3054         int ret;
3055         struct rbd_image_header h;
3056
3057         ret = rbd_read_header(rbd_dev, &h);
3058         if (ret < 0)
3059                 return ret;
3060
3061         down_write(&rbd_dev->header_rwsem);
3062
3063         /* Update image size, and check for resize of mapped image */
3064         rbd_dev->header.image_size = h.image_size;
3065         rbd_update_mapping_size(rbd_dev);
3066
3067         /* rbd_dev->header.object_prefix shouldn't change */
3068         kfree(rbd_dev->header.snap_sizes);
3069         kfree(rbd_dev->header.snap_names);
3070         /* osd requests may still refer to snapc */
3071         rbd_snap_context_put(rbd_dev->header.snapc);
3072
3073         if (hver)
3074                 *hver = h.obj_version;
3075         rbd_dev->header.obj_version = h.obj_version;
3076         rbd_dev->header.image_size = h.image_size;
3077         rbd_dev->header.snapc = h.snapc;
3078         rbd_dev->header.snap_names = h.snap_names;
3079         rbd_dev->header.snap_sizes = h.snap_sizes;
3080         /* Free the extra copy of the object prefix */
3081         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3082                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3083         kfree(h.object_prefix);
3084
3085         ret = rbd_dev_snaps_update(rbd_dev);
3086
3087         up_write(&rbd_dev->header_rwsem);
3088
3089         return ret;
3090 }
3091
3092 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3093 {
3094         int ret;
3095
3096         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3097         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3098         if (rbd_dev->image_format == 1)
3099                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3100         else
3101                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3102         mutex_unlock(&ctl_mutex);
3103         revalidate_disk(rbd_dev->disk);
3104         if (ret)
3105                 rbd_warn(rbd_dev, "got notification but failed to "
3106                            " update snaps: %d\n", ret);
3107
3108         return ret;
3109 }
3110
3111 static int rbd_init_disk(struct rbd_device *rbd_dev)
3112 {
3113         struct gendisk *disk;
3114         struct request_queue *q;
3115         u64 segment_size;
3116
3117         /* create gendisk info */
3118         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3119         if (!disk)
3120                 return -ENOMEM;
3121
3122         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3123                  rbd_dev->dev_id);
3124         disk->major = rbd_dev->major;
3125         disk->first_minor = 0;
3126         disk->fops = &rbd_bd_ops;
3127         disk->private_data = rbd_dev;
3128
3129         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3130         if (!q)
3131                 goto out_disk;
3132
3133         /* We use the default size, but let's be explicit about it. */
3134         blk_queue_physical_block_size(q, SECTOR_SIZE);
3135
3136         /* set io sizes to object size */
3137         segment_size = rbd_obj_bytes(&rbd_dev->header);
3138         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3139         blk_queue_max_segment_size(q, segment_size);
3140         blk_queue_io_min(q, segment_size);
3141         blk_queue_io_opt(q, segment_size);
3142
3143         blk_queue_merge_bvec(q, rbd_merge_bvec);
3144         disk->queue = q;
3145
3146         q->queuedata = rbd_dev;
3147
3148         rbd_dev->disk = disk;
3149
3150         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3151
3152         return 0;
3153 out_disk:
3154         put_disk(disk);
3155
3156         return -ENOMEM;
3157 }
3158
3159 /*
3160   sysfs
3161 */
3162
3163 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3164 {
3165         return container_of(dev, struct rbd_device, dev);
3166 }
3167
3168 static ssize_t rbd_size_show(struct device *dev,
3169                              struct device_attribute *attr, char *buf)
3170 {
3171         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3172
3173         return sprintf(buf, "%llu\n",
3174                 (unsigned long long)rbd_dev->mapping.size);
3175 }
3176
3177 /*
3178  * Note this shows the features for whatever's mapped, which is not
3179  * necessarily the base image.
3180  */
3181 static ssize_t rbd_features_show(struct device *dev,
3182                              struct device_attribute *attr, char *buf)
3183 {
3184         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3185
3186         return sprintf(buf, "0x%016llx\n",
3187                         (unsigned long long)rbd_dev->mapping.features);
3188 }
3189
3190 static ssize_t rbd_major_show(struct device *dev,
3191                               struct device_attribute *attr, char *buf)
3192 {
3193         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3194
3195         if (rbd_dev->major)
3196                 return sprintf(buf, "%d\n", rbd_dev->major);
3197
3198         return sprintf(buf, "(none)\n");
3199
3200 }
3201
3202 static ssize_t rbd_client_id_show(struct device *dev,
3203                                   struct device_attribute *attr, char *buf)
3204 {
3205         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3206
3207         return sprintf(buf, "client%lld\n",
3208                         ceph_client_id(rbd_dev->rbd_client->client));
3209 }
3210
3211 static ssize_t rbd_pool_show(struct device *dev,
3212                              struct device_attribute *attr, char *buf)
3213 {
3214         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3215
3216         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3217 }
3218
3219 static ssize_t rbd_pool_id_show(struct device *dev,
3220                              struct device_attribute *attr, char *buf)
3221 {
3222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
3224         return sprintf(buf, "%llu\n",
3225                         (unsigned long long) rbd_dev->spec->pool_id);
3226 }
3227
3228 static ssize_t rbd_name_show(struct device *dev,
3229                              struct device_attribute *attr, char *buf)
3230 {
3231         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3232
3233         if (rbd_dev->spec->image_name)
3234                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3235
3236         return sprintf(buf, "(unknown)\n");
3237 }
3238
3239 static ssize_t rbd_image_id_show(struct device *dev,
3240                              struct device_attribute *attr, char *buf)
3241 {
3242         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3243
3244         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3245 }
3246
3247 /*
3248  * Shows the name of the currently-mapped snapshot (or
3249  * RBD_SNAP_HEAD_NAME for the base image).
3250  */
3251 static ssize_t rbd_snap_show(struct device *dev,
3252                              struct device_attribute *attr,
3253                              char *buf)
3254 {
3255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3256
3257         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3258 }
3259
3260 /*
3261  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3262  * for the parent image.  If there is no parent, simply shows
3263  * "(no parent image)".
3264  */
3265 static ssize_t rbd_parent_show(struct device *dev,
3266                              struct device_attribute *attr,
3267                              char *buf)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270         struct rbd_spec *spec = rbd_dev->parent_spec;
3271         int count;
3272         char *bufp = buf;
3273
3274         if (!spec)
3275                 return sprintf(buf, "(no parent image)\n");
3276
3277         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3278                         (unsigned long long) spec->pool_id, spec->pool_name);
3279         if (count < 0)
3280                 return count;
3281         bufp += count;
3282
3283         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3284                         spec->image_name ? spec->image_name : "(unknown)");
3285         if (count < 0)
3286                 return count;
3287         bufp += count;
3288
3289         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3290                         (unsigned long long) spec->snap_id, spec->snap_name);
3291         if (count < 0)
3292                 return count;
3293         bufp += count;
3294
3295         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3296         if (count < 0)
3297                 return count;
3298         bufp += count;
3299
3300         return (ssize_t) (bufp - buf);
3301 }
3302
3303 static ssize_t rbd_image_refresh(struct device *dev,
3304                                  struct device_attribute *attr,
3305                                  const char *buf,
3306                                  size_t size)
3307 {
3308         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3309         int ret;
3310
3311         ret = rbd_dev_refresh(rbd_dev, NULL);
3312
3313         return ret < 0 ? ret : size;
3314 }
3315
3316 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3317 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3318 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3319 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3320 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3321 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3322 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3323 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3324 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3325 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3326 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3327
3328 static struct attribute *rbd_attrs[] = {
3329         &dev_attr_size.attr,
3330         &dev_attr_features.attr,
3331         &dev_attr_major.attr,
3332         &dev_attr_client_id.attr,
3333         &dev_attr_pool.attr,
3334         &dev_attr_pool_id.attr,
3335         &dev_attr_name.attr,
3336         &dev_attr_image_id.attr,
3337         &dev_attr_current_snap.attr,
3338         &dev_attr_parent.attr,
3339         &dev_attr_refresh.attr,
3340         NULL
3341 };
3342
3343 static struct attribute_group rbd_attr_group = {
3344         .attrs = rbd_attrs,
3345 };
3346
3347 static const struct attribute_group *rbd_attr_groups[] = {
3348         &rbd_attr_group,
3349         NULL
3350 };
3351
3352 static void rbd_sysfs_dev_release(struct device *dev)
3353 {
3354 }
3355
3356 static struct device_type rbd_device_type = {
3357         .name           = "rbd",
3358         .groups         = rbd_attr_groups,
3359         .release        = rbd_sysfs_dev_release,
3360 };
3361
3362 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3363 {
3364         kref_get(&spec->kref);
3365
3366         return spec;
3367 }
3368
3369 static void rbd_spec_free(struct kref *kref);
3370 static void rbd_spec_put(struct rbd_spec *spec)
3371 {
3372         if (spec)
3373                 kref_put(&spec->kref, rbd_spec_free);
3374 }
3375
3376 static struct rbd_spec *rbd_spec_alloc(void)
3377 {
3378         struct rbd_spec *spec;
3379
3380         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3381         if (!spec)
3382                 return NULL;
3383         kref_init(&spec->kref);
3384
3385         return spec;
3386 }
3387
3388 static void rbd_spec_free(struct kref *kref)
3389 {
3390         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3391
3392         kfree(spec->pool_name);
3393         kfree(spec->image_id);
3394         kfree(spec->image_name);
3395         kfree(spec->snap_name);
3396         kfree(spec);
3397 }
3398
3399 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3400                                 struct rbd_spec *spec)
3401 {
3402         struct rbd_device *rbd_dev;
3403
3404         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3405         if (!rbd_dev)
3406                 return NULL;
3407
3408         spin_lock_init(&rbd_dev->lock);
3409         rbd_dev->flags = 0;
3410         INIT_LIST_HEAD(&rbd_dev->node);
3411         INIT_LIST_HEAD(&rbd_dev->snaps);
3412         init_rwsem(&rbd_dev->header_rwsem);
3413
3414         rbd_dev->spec = spec;
3415         rbd_dev->rbd_client = rbdc;
3416
3417         /* Initialize the layout used for all rbd requests */
3418
3419         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3420         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3421         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3422         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3423
3424         return rbd_dev;
3425 }
3426
3427 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3428 {
3429         rbd_put_client(rbd_dev->rbd_client);
3430         rbd_spec_put(rbd_dev->spec);
3431         kfree(rbd_dev);
3432 }
3433
3434 static void rbd_snap_destroy(struct rbd_snap *snap)
3435 {
3436         kfree(snap->name);
3437         kfree(snap);
3438 }
3439
3440 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3441                                                 const char *snap_name,
3442                                                 u64 snap_id, u64 snap_size,
3443                                                 u64 snap_features)
3444 {
3445         struct rbd_snap *snap;
3446
3447         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3448         if (!snap)
3449                 return ERR_PTR(-ENOMEM);
3450
3451         snap->name = snap_name;
3452         snap->id = snap_id;
3453         snap->size = snap_size;
3454         snap->features = snap_features;
3455
3456         return snap;
3457 }
3458
3459 /*
3460  * Returns a dynamically-allocated snapshot name if successful, or a
3461  * pointer-coded error otherwise.
3462  */
3463 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3464                 u64 *snap_size, u64 *snap_features)
3465 {
3466         char *snap_name;
3467         int i;
3468
3469         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3470
3471         /* Skip over names until we find the one we are looking for */
3472
3473         snap_name = rbd_dev->header.snap_names;
3474         for (i = 0; i < which; i++)
3475                 snap_name += strlen(snap_name) + 1;
3476
3477         snap_name = kstrdup(snap_name, GFP_KERNEL);
3478         if (!snap_name)
3479                 return ERR_PTR(-ENOMEM);
3480
3481         *snap_size = rbd_dev->header.snap_sizes[which];
3482         *snap_features = 0;     /* No features for v1 */
3483
3484         return snap_name;
3485 }
3486
3487 /*
3488  * Get the size and object order for an image snapshot, or if
3489  * snap_id is CEPH_NOSNAP, gets this information for the base
3490  * image.
3491  */
3492 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3493                                 u8 *order, u64 *snap_size)
3494 {
3495         __le64 snapid = cpu_to_le64(snap_id);
3496         int ret;
3497         struct {
3498                 u8 order;
3499                 __le64 size;
3500         } __attribute__ ((packed)) size_buf = { 0 };
3501
3502         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3503                                 "rbd", "get_size",
3504                                 &snapid, sizeof (snapid),
3505                                 &size_buf, sizeof (size_buf), NULL);
3506         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3507         if (ret < 0)
3508                 return ret;
3509         if (ret < sizeof (size_buf))
3510                 return -ERANGE;
3511
3512         if (order)
3513                 *order = size_buf.order;
3514         *snap_size = le64_to_cpu(size_buf.size);
3515
3516         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3517                 (unsigned long long)snap_id, (unsigned int)*order,
3518                 (unsigned long long)*snap_size);
3519
3520         return 0;
3521 }
3522
3523 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3524 {
3525         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3526                                         &rbd_dev->header.obj_order,
3527                                         &rbd_dev->header.image_size);
3528 }
3529
3530 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3531 {
3532         void *reply_buf;
3533         int ret;
3534         void *p;
3535
3536         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3537         if (!reply_buf)
3538                 return -ENOMEM;
3539
3540         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3541                                 "rbd", "get_object_prefix", NULL, 0,
3542                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3543         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3544         if (ret < 0)
3545                 goto out;
3546
3547         p = reply_buf;
3548         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3549                                                 p + ret, NULL, GFP_NOIO);
3550         ret = 0;
3551
3552         if (IS_ERR(rbd_dev->header.object_prefix)) {
3553                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3554                 rbd_dev->header.object_prefix = NULL;
3555         } else {
3556                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3557         }
3558 out:
3559         kfree(reply_buf);
3560
3561         return ret;
3562 }
3563
3564 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3565                 u64 *snap_features)
3566 {
3567         __le64 snapid = cpu_to_le64(snap_id);
3568         struct {
3569                 __le64 features;
3570                 __le64 incompat;
3571         } __attribute__ ((packed)) features_buf = { 0 };
3572         u64 incompat;
3573         int ret;
3574
3575         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3576                                 "rbd", "get_features",
3577                                 &snapid, sizeof (snapid),
3578                                 &features_buf, sizeof (features_buf), NULL);
3579         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3580         if (ret < 0)
3581                 return ret;
3582         if (ret < sizeof (features_buf))
3583                 return -ERANGE;
3584
3585         incompat = le64_to_cpu(features_buf.incompat);
3586         if (incompat & ~RBD_FEATURES_SUPPORTED)
3587                 return -ENXIO;
3588
3589         *snap_features = le64_to_cpu(features_buf.features);
3590
3591         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3592                 (unsigned long long)snap_id,
3593                 (unsigned long long)*snap_features,
3594                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3595
3596         return 0;
3597 }
3598
3599 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3600 {
3601         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3602                                                 &rbd_dev->header.features);
3603 }
3604
3605 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3606 {
3607         struct rbd_spec *parent_spec;
3608         size_t size;
3609         void *reply_buf = NULL;
3610         __le64 snapid;
3611         void *p;
3612         void *end;
3613         char *image_id;
3614         u64 overlap;
3615         int ret;
3616
3617         parent_spec = rbd_spec_alloc();
3618         if (!parent_spec)
3619                 return -ENOMEM;
3620
3621         size = sizeof (__le64) +                                /* pool_id */
3622                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3623                 sizeof (__le64) +                               /* snap_id */
3624                 sizeof (__le64);                                /* overlap */
3625         reply_buf = kmalloc(size, GFP_KERNEL);
3626         if (!reply_buf) {
3627                 ret = -ENOMEM;
3628                 goto out_err;
3629         }
3630
3631         snapid = cpu_to_le64(CEPH_NOSNAP);
3632         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3633                                 "rbd", "get_parent",
3634                                 &snapid, sizeof (snapid),
3635                                 reply_buf, size, NULL);
3636         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3637         if (ret < 0)
3638                 goto out_err;
3639
3640         p = reply_buf;
3641         end = reply_buf + ret;
3642         ret = -ERANGE;
3643         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3644         if (parent_spec->pool_id == CEPH_NOPOOL)
3645                 goto out;       /* No parent?  No problem. */
3646
3647         /* The ceph file layout needs to fit pool id in 32 bits */
3648
3649         ret = -EIO;
3650         if (parent_spec->pool_id > (u64)U32_MAX) {
3651                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3652                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3653                 goto out_err;
3654         }
3655
3656         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3657         if (IS_ERR(image_id)) {
3658                 ret = PTR_ERR(image_id);
3659                 goto out_err;
3660         }
3661         parent_spec->image_id = image_id;
3662         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3663         ceph_decode_64_safe(&p, end, overlap, out_err);
3664
3665         rbd_dev->parent_overlap = overlap;
3666         rbd_dev->parent_spec = parent_spec;
3667         parent_spec = NULL;     /* rbd_dev now owns this */
3668 out:
3669         ret = 0;
3670 out_err:
3671         kfree(reply_buf);
3672         rbd_spec_put(parent_spec);
3673
3674         return ret;
3675 }
3676
3677 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3678 {
3679         struct {
3680                 __le64 stripe_unit;
3681                 __le64 stripe_count;
3682         } __attribute__ ((packed)) striping_info_buf = { 0 };
3683         size_t size = sizeof (striping_info_buf);
3684         void *p;
3685         u64 obj_size;
3686         u64 stripe_unit;
3687         u64 stripe_count;
3688         int ret;
3689
3690         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3691                                 "rbd", "get_stripe_unit_count", NULL, 0,
3692                                 (char *)&striping_info_buf, size, NULL);
3693         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3694         if (ret < 0)
3695                 return ret;
3696         if (ret < size)
3697                 return -ERANGE;
3698
3699         /*
3700          * We don't actually support the "fancy striping" feature
3701          * (STRIPINGV2) yet, but if the striping sizes are the
3702          * defaults the behavior is the same as before.  So find
3703          * out, and only fail if the image has non-default values.
3704          */
3705         ret = -EINVAL;
3706         obj_size = (u64)1 << rbd_dev->header.obj_order;
3707         p = &striping_info_buf;
3708         stripe_unit = ceph_decode_64(&p);
3709         if (stripe_unit != obj_size) {
3710                 rbd_warn(rbd_dev, "unsupported stripe unit "
3711                                 "(got %llu want %llu)",
3712                                 stripe_unit, obj_size);
3713                 return -EINVAL;
3714         }
3715         stripe_count = ceph_decode_64(&p);
3716         if (stripe_count != 1) {
3717                 rbd_warn(rbd_dev, "unsupported stripe count "
3718                                 "(got %llu want 1)", stripe_count);
3719                 return -EINVAL;
3720         }
3721         rbd_dev->header.stripe_unit = stripe_unit;
3722         rbd_dev->header.stripe_count = stripe_count;
3723
3724         return 0;
3725 }
3726
3727 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3728 {
3729         size_t image_id_size;
3730         char *image_id;
3731         void *p;
3732         void *end;
3733         size_t size;
3734         void *reply_buf = NULL;
3735         size_t len = 0;
3736         char *image_name = NULL;
3737         int ret;
3738
3739         rbd_assert(!rbd_dev->spec->image_name);
3740
3741         len = strlen(rbd_dev->spec->image_id);
3742         image_id_size = sizeof (__le32) + len;
3743         image_id = kmalloc(image_id_size, GFP_KERNEL);
3744         if (!image_id)
3745                 return NULL;
3746
3747         p = image_id;
3748         end = image_id + image_id_size;
3749         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3750
3751         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3752         reply_buf = kmalloc(size, GFP_KERNEL);
3753         if (!reply_buf)
3754                 goto out;
3755
3756         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3757                                 "rbd", "dir_get_name",
3758                                 image_id, image_id_size,
3759                                 reply_buf, size, NULL);
3760         if (ret < 0)
3761                 goto out;
3762         p = reply_buf;
3763         end = reply_buf + ret;
3764
3765         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3766         if (IS_ERR(image_name))
3767                 image_name = NULL;
3768         else
3769                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3770 out:
3771         kfree(reply_buf);
3772         kfree(image_id);
3773
3774         return image_name;
3775 }
3776
3777 /*
3778  * When an rbd image has a parent image, it is identified by the
3779  * pool, image, and snapshot ids (not names).  This function fills
3780  * in the names for those ids.  (It's OK if we can't figure out the
3781  * name for an image id, but the pool and snapshot ids should always
3782  * exist and have names.)  All names in an rbd spec are dynamically
3783  * allocated.
3784  *
3785  * When an image being mapped (not a parent) is probed, we have the
3786  * pool name and pool id, image name and image id, and the snapshot
3787  * name.  The only thing we're missing is the snapshot id.
3788  *
3789  * The set of snapshots for an image is not known until they have
3790  * been read by rbd_dev_snaps_update(), so we can't completely fill
3791  * in this information until after that has been called.
3792  */
3793 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3794 {
3795         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3796         struct rbd_spec *spec = rbd_dev->spec;
3797         const char *pool_name;
3798         const char *image_name;
3799         const char *snap_name;
3800         int ret;
3801
3802         /*
3803          * An image being mapped will have the pool name (etc.), but
3804          * we need to look up the snapshot id.
3805          */
3806         if (spec->pool_name) {
3807                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3808                         struct rbd_snap *snap;
3809
3810                         snap = snap_by_name(rbd_dev, spec->snap_name);
3811                         if (!snap)
3812                                 return -ENOENT;
3813                         spec->snap_id = snap->id;
3814                 } else {
3815                         spec->snap_id = CEPH_NOSNAP;
3816                 }
3817
3818                 return 0;
3819         }
3820
3821         /* Get the pool name; we have to make our own copy of this */
3822
3823         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3824         if (!pool_name) {
3825                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3826                 return -EIO;
3827         }
3828         pool_name = kstrdup(pool_name, GFP_KERNEL);
3829         if (!pool_name)
3830                 return -ENOMEM;
3831
3832         /* Fetch the image name; tolerate failure here */
3833
3834         image_name = rbd_dev_image_name(rbd_dev);
3835         if (!image_name)
3836                 rbd_warn(rbd_dev, "unable to get image name");
3837
3838         /* Look up the snapshot name, and make a copy */
3839
3840         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3841         if (!snap_name) {
3842                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3843                 ret = -EIO;
3844                 goto out_err;
3845         }
3846         snap_name = kstrdup(snap_name, GFP_KERNEL);
3847         if (!snap_name) {
3848                 ret = -ENOMEM;
3849                 goto out_err;
3850         }
3851
3852         spec->pool_name = pool_name;
3853         spec->image_name = image_name;
3854         spec->snap_name = snap_name;
3855
3856         return 0;
3857 out_err:
3858         kfree(image_name);
3859         kfree(pool_name);
3860
3861         return ret;
3862 }
3863
3864 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3865 {
3866         size_t size;
3867         int ret;
3868         void *reply_buf;
3869         void *p;
3870         void *end;
3871         u64 seq;
3872         u32 snap_count;
3873         struct ceph_snap_context *snapc;
3874         u32 i;
3875
3876         /*
3877          * We'll need room for the seq value (maximum snapshot id),
3878          * snapshot count, and array of that many snapshot ids.
3879          * For now we have a fixed upper limit on the number we're
3880          * prepared to receive.
3881          */
3882         size = sizeof (__le64) + sizeof (__le32) +
3883                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3884         reply_buf = kzalloc(size, GFP_KERNEL);
3885         if (!reply_buf)
3886                 return -ENOMEM;
3887
3888         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3889                                 "rbd", "get_snapcontext", NULL, 0,
3890                                 reply_buf, size, ver);
3891         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3892         if (ret < 0)
3893                 goto out;
3894
3895         p = reply_buf;
3896         end = reply_buf + ret;
3897         ret = -ERANGE;
3898         ceph_decode_64_safe(&p, end, seq, out);
3899         ceph_decode_32_safe(&p, end, snap_count, out);
3900
3901         /*
3902          * Make sure the reported number of snapshot ids wouldn't go
3903          * beyond the end of our buffer.  But before checking that,
3904          * make sure the computed size of the snapshot context we
3905          * allocate is representable in a size_t.
3906          */
3907         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3908                                  / sizeof (u64)) {
3909                 ret = -EINVAL;
3910                 goto out;
3911         }
3912         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3913                 goto out;
3914         ret = 0;
3915
3916         snapc = rbd_snap_context_create(snap_count);
3917         if (!snapc) {
3918                 ret = -ENOMEM;
3919                 goto out;
3920         }
3921         snapc->seq = seq;
3922         for (i = 0; i < snap_count; i++)
3923                 snapc->snaps[i] = ceph_decode_64(&p);
3924
3925         rbd_dev->header.snapc = snapc;
3926
3927         dout("  snap context seq = %llu, snap_count = %u\n",
3928                 (unsigned long long)seq, (unsigned int)snap_count);
3929 out:
3930         kfree(reply_buf);
3931
3932         return ret;
3933 }
3934
3935 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3936 {
3937         size_t size;
3938         void *reply_buf;
3939         __le64 snap_id;
3940         int ret;
3941         void *p;
3942         void *end;
3943         char *snap_name;
3944
3945         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3946         reply_buf = kmalloc(size, GFP_KERNEL);
3947         if (!reply_buf)
3948                 return ERR_PTR(-ENOMEM);
3949
3950         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3951         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3952         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3953                                 "rbd", "get_snapshot_name",
3954                                 &snap_id, sizeof (snap_id),
3955                                 reply_buf, size, NULL);
3956         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3957         if (ret < 0) {
3958                 snap_name = ERR_PTR(ret);
3959                 goto out;
3960         }
3961
3962         p = reply_buf;
3963         end = reply_buf + ret;
3964         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3965         if (IS_ERR(snap_name))
3966                 goto out;
3967
3968         dout("  snap_id 0x%016llx snap_name = %s\n",
3969                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3970 out:
3971         kfree(reply_buf);
3972
3973         return snap_name;
3974 }
3975
3976 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3977                 u64 *snap_size, u64 *snap_features)
3978 {
3979         u64 snap_id;
3980         u64 size;
3981         u64 features;
3982         char *snap_name;
3983         int ret;
3984
3985         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3986         snap_id = rbd_dev->header.snapc->snaps[which];
3987         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3988         if (ret)
3989                 goto out_err;
3990
3991         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3992         if (ret)
3993                 goto out_err;
3994
3995         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3996         if (!IS_ERR(snap_name)) {
3997                 *snap_size = size;
3998                 *snap_features = features;
3999         }
4000
4001         return snap_name;
4002 out_err:
4003         return ERR_PTR(ret);
4004 }
4005
4006 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4007                 u64 *snap_size, u64 *snap_features)
4008 {
4009         if (rbd_dev->image_format == 1)
4010                 return rbd_dev_v1_snap_info(rbd_dev, which,
4011                                         snap_size, snap_features);
4012         if (rbd_dev->image_format == 2)
4013                 return rbd_dev_v2_snap_info(rbd_dev, which,
4014                                         snap_size, snap_features);
4015         return ERR_PTR(-EINVAL);
4016 }
4017
4018 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4019 {
4020         int ret;
4021         __u8 obj_order;
4022
4023         down_write(&rbd_dev->header_rwsem);
4024
4025         /* Grab old order first, to see if it changes */
4026
4027         obj_order = rbd_dev->header.obj_order,
4028         ret = rbd_dev_v2_image_size(rbd_dev);
4029         if (ret)
4030                 goto out;
4031         if (rbd_dev->header.obj_order != obj_order) {
4032                 ret = -EIO;
4033                 goto out;
4034         }
4035         rbd_update_mapping_size(rbd_dev);
4036
4037         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4038         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4039         if (ret)
4040                 goto out;
4041         ret = rbd_dev_snaps_update(rbd_dev);
4042         dout("rbd_dev_snaps_update returned %d\n", ret);
4043         if (ret)
4044                 goto out;
4045 out:
4046         up_write(&rbd_dev->header_rwsem);
4047
4048         return ret;
4049 }
4050
4051 /*
4052  * Scan the rbd device's current snapshot list and compare it to the
4053  * newly-received snapshot context.  Remove any existing snapshots
4054  * not present in the new snapshot context.  Add a new snapshot for
4055  * any snaphots in the snapshot context not in the current list.
4056  * And verify there are no changes to snapshots we already know
4057  * about.
4058  *
4059  * Assumes the snapshots in the snapshot context are sorted by
4060  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4061  * are also maintained in that order.)
4062  *
4063  * Note that any error occurs while updating the snapshot list
4064  * aborts the update, and the entire list is cleared.  The snapshot
4065  * list becomes inconsistent at that point anyway, so it might as
4066  * well be empty.
4067  */
4068 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4069 {
4070         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4071         const u32 snap_count = snapc->num_snaps;
4072         struct list_head *head = &rbd_dev->snaps;
4073         struct list_head *links = head->next;
4074         u32 index = 0;
4075         int ret = 0;
4076
4077         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4078         while (index < snap_count || links != head) {
4079                 u64 snap_id;
4080                 struct rbd_snap *snap;
4081                 char *snap_name;
4082                 u64 snap_size = 0;
4083                 u64 snap_features = 0;
4084
4085                 snap_id = index < snap_count ? snapc->snaps[index]
4086                                              : CEPH_NOSNAP;
4087                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4088                                      : NULL;
4089                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4090
4091                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4092                         struct list_head *next = links->next;
4093
4094                         /*
4095                          * A previously-existing snapshot is not in
4096                          * the new snap context.
4097                          *
4098                          * If the now-missing snapshot is the one
4099                          * the image represents, clear its existence
4100                          * flag so we can avoid sending any more
4101                          * requests to it.
4102                          */
4103                         if (rbd_dev->spec->snap_id == snap->id)
4104                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4105                         dout("removing %ssnap id %llu\n",
4106                                 rbd_dev->spec->snap_id == snap->id ?
4107                                                         "mapped " : "",
4108                                 (unsigned long long)snap->id);
4109
4110                         list_del(&snap->node);
4111                         rbd_snap_destroy(snap);
4112
4113                         /* Done with this list entry; advance */
4114
4115                         links = next;
4116                         continue;
4117                 }
4118
4119                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4120                                         &snap_size, &snap_features);
4121                 if (IS_ERR(snap_name)) {
4122                         ret = PTR_ERR(snap_name);
4123                         dout("failed to get snap info, error %d\n", ret);
4124                         goto out_err;
4125                 }
4126
4127                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4128                         (unsigned long long)snap_id);
4129                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4130                         struct rbd_snap *new_snap;
4131
4132                         /* We haven't seen this snapshot before */
4133
4134                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4135                                         snap_id, snap_size, snap_features);
4136                         if (IS_ERR(new_snap)) {
4137                                 ret = PTR_ERR(new_snap);
4138                                 dout("  failed to add dev, error %d\n", ret);
4139                                 goto out_err;
4140                         }
4141
4142                         /* New goes before existing, or at end of list */
4143
4144                         dout("  added dev%s\n", snap ? "" : " at end\n");
4145                         if (snap)
4146                                 list_add_tail(&new_snap->node, &snap->node);
4147                         else
4148                                 list_add_tail(&new_snap->node, head);
4149                 } else {
4150                         /* Already have this one */
4151
4152                         dout("  already present\n");
4153
4154                         rbd_assert(snap->size == snap_size);
4155                         rbd_assert(!strcmp(snap->name, snap_name));
4156                         rbd_assert(snap->features == snap_features);
4157
4158                         /* Done with this list entry; advance */
4159
4160                         links = links->next;
4161                 }
4162
4163                 /* Advance to the next entry in the snapshot context */
4164
4165                 index++;
4166         }
4167         dout("%s: done\n", __func__);
4168
4169         return 0;
4170 out_err:
4171         rbd_remove_all_snaps(rbd_dev);
4172
4173         return ret;
4174 }
4175
4176 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4177 {
4178         struct device *dev;
4179         int ret;
4180
4181         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4182
4183         dev = &rbd_dev->dev;
4184         dev->bus = &rbd_bus_type;
4185         dev->type = &rbd_device_type;
4186         dev->parent = &rbd_root_dev;
4187         dev->release = rbd_dev_release;
4188         dev_set_name(dev, "%d", rbd_dev->dev_id);
4189         ret = device_register(dev);
4190
4191         mutex_unlock(&ctl_mutex);
4192
4193         return ret;
4194 }
4195
4196 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4197 {
4198         device_unregister(&rbd_dev->dev);
4199 }
4200
4201 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4202
4203 /*
4204  * Get a unique rbd identifier for the given new rbd_dev, and add
4205  * the rbd_dev to the global list.  The minimum rbd id is 1.
4206  */
4207 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4208 {
4209         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4210
4211         spin_lock(&rbd_dev_list_lock);
4212         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4213         spin_unlock(&rbd_dev_list_lock);
4214         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4215                 (unsigned long long) rbd_dev->dev_id);
4216 }
4217
4218 /*
4219  * Remove an rbd_dev from the global list, and record that its
4220  * identifier is no longer in use.
4221  */
4222 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4223 {
4224         struct list_head *tmp;
4225         int rbd_id = rbd_dev->dev_id;
4226         int max_id;
4227
4228         rbd_assert(rbd_id > 0);
4229
4230         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4231                 (unsigned long long) rbd_dev->dev_id);
4232         spin_lock(&rbd_dev_list_lock);
4233         list_del_init(&rbd_dev->node);
4234
4235         /*
4236          * If the id being "put" is not the current maximum, there
4237          * is nothing special we need to do.
4238          */
4239         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4240                 spin_unlock(&rbd_dev_list_lock);
4241                 return;
4242         }
4243
4244         /*
4245          * We need to update the current maximum id.  Search the
4246          * list to find out what it is.  We're more likely to find
4247          * the maximum at the end, so search the list backward.
4248          */
4249         max_id = 0;
4250         list_for_each_prev(tmp, &rbd_dev_list) {
4251                 struct rbd_device *rbd_dev;
4252
4253                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4254                 if (rbd_dev->dev_id > max_id)
4255                         max_id = rbd_dev->dev_id;
4256         }
4257         spin_unlock(&rbd_dev_list_lock);
4258
4259         /*
4260          * The max id could have been updated by rbd_dev_id_get(), in
4261          * which case it now accurately reflects the new maximum.
4262          * Be careful not to overwrite the maximum value in that
4263          * case.
4264          */
4265         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4266         dout("  max dev id has been reset\n");
4267 }
4268
4269 /*
4270  * Skips over white space at *buf, and updates *buf to point to the
4271  * first found non-space character (if any). Returns the length of
4272  * the token (string of non-white space characters) found.  Note
4273  * that *buf must be terminated with '\0'.
4274  */
4275 static inline size_t next_token(const char **buf)
4276 {
4277         /*
4278         * These are the characters that produce nonzero for
4279         * isspace() in the "C" and "POSIX" locales.
4280         */
4281         const char *spaces = " \f\n\r\t\v";
4282
4283         *buf += strspn(*buf, spaces);   /* Find start of token */
4284
4285         return strcspn(*buf, spaces);   /* Return token length */
4286 }
4287
4288 /*
4289  * Finds the next token in *buf, and if the provided token buffer is
4290  * big enough, copies the found token into it.  The result, if
4291  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4292  * must be terminated with '\0' on entry.
4293  *
4294  * Returns the length of the token found (not including the '\0').
4295  * Return value will be 0 if no token is found, and it will be >=
4296  * token_size if the token would not fit.
4297  *
4298  * The *buf pointer will be updated to point beyond the end of the
4299  * found token.  Note that this occurs even if the token buffer is
4300  * too small to hold it.
4301  */
4302 static inline size_t copy_token(const char **buf,
4303                                 char *token,
4304                                 size_t token_size)
4305 {
4306         size_t len;
4307
4308         len = next_token(buf);
4309         if (len < token_size) {
4310                 memcpy(token, *buf, len);
4311                 *(token + len) = '\0';
4312         }
4313         *buf += len;
4314
4315         return len;
4316 }
4317
4318 /*
4319  * Finds the next token in *buf, dynamically allocates a buffer big
4320  * enough to hold a copy of it, and copies the token into the new
4321  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4322  * that a duplicate buffer is created even for a zero-length token.
4323  *
4324  * Returns a pointer to the newly-allocated duplicate, or a null
4325  * pointer if memory for the duplicate was not available.  If
4326  * the lenp argument is a non-null pointer, the length of the token
4327  * (not including the '\0') is returned in *lenp.
4328  *
4329  * If successful, the *buf pointer will be updated to point beyond
4330  * the end of the found token.
4331  *
4332  * Note: uses GFP_KERNEL for allocation.
4333  */
4334 static inline char *dup_token(const char **buf, size_t *lenp)
4335 {
4336         char *dup;
4337         size_t len;
4338
4339         len = next_token(buf);
4340         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4341         if (!dup)
4342                 return NULL;
4343         *(dup + len) = '\0';
4344         *buf += len;
4345
4346         if (lenp)
4347                 *lenp = len;
4348
4349         return dup;
4350 }
4351
4352 /*
4353  * Parse the options provided for an "rbd add" (i.e., rbd image
4354  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4355  * and the data written is passed here via a NUL-terminated buffer.
4356  * Returns 0 if successful or an error code otherwise.
4357  *
4358  * The information extracted from these options is recorded in
4359  * the other parameters which return dynamically-allocated
4360  * structures:
4361  *  ceph_opts
4362  *      The address of a pointer that will refer to a ceph options
4363  *      structure.  Caller must release the returned pointer using
4364  *      ceph_destroy_options() when it is no longer needed.
4365  *  rbd_opts
4366  *      Address of an rbd options pointer.  Fully initialized by
4367  *      this function; caller must release with kfree().
4368  *  spec
4369  *      Address of an rbd image specification pointer.  Fully
4370  *      initialized by this function based on parsed options.
4371  *      Caller must release with rbd_spec_put().
4372  *
4373  * The options passed take this form:
4374  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4375  * where:
4376  *  <mon_addrs>
4377  *      A comma-separated list of one or more monitor addresses.
4378  *      A monitor address is an ip address, optionally followed
4379  *      by a port number (separated by a colon).
4380  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4381  *  <options>
4382  *      A comma-separated list of ceph and/or rbd options.
4383  *  <pool_name>
4384  *      The name of the rados pool containing the rbd image.
4385  *  <image_name>
4386  *      The name of the image in that pool to map.
4387  *  <snap_id>
4388  *      An optional snapshot id.  If provided, the mapping will
4389  *      present data from the image at the time that snapshot was
4390  *      created.  The image head is used if no snapshot id is
4391  *      provided.  Snapshot mappings are always read-only.
4392  */
4393 static int rbd_add_parse_args(const char *buf,
4394                                 struct ceph_options **ceph_opts,
4395                                 struct rbd_options **opts,
4396                                 struct rbd_spec **rbd_spec)
4397 {
4398         size_t len;
4399         char *options;
4400         const char *mon_addrs;
4401         char *snap_name;
4402         size_t mon_addrs_size;
4403         struct rbd_spec *spec = NULL;
4404         struct rbd_options *rbd_opts = NULL;
4405         struct ceph_options *copts;
4406         int ret;
4407
4408         /* The first four tokens are required */
4409
4410         len = next_token(&buf);
4411         if (!len) {
4412                 rbd_warn(NULL, "no monitor address(es) provided");
4413                 return -EINVAL;
4414         }
4415         mon_addrs = buf;
4416         mon_addrs_size = len + 1;
4417         buf += len;
4418
4419         ret = -EINVAL;
4420         options = dup_token(&buf, NULL);
4421         if (!options)
4422                 return -ENOMEM;
4423         if (!*options) {
4424                 rbd_warn(NULL, "no options provided");
4425                 goto out_err;
4426         }
4427
4428         spec = rbd_spec_alloc();
4429         if (!spec)
4430                 goto out_mem;
4431
4432         spec->pool_name = dup_token(&buf, NULL);
4433         if (!spec->pool_name)
4434                 goto out_mem;
4435         if (!*spec->pool_name) {
4436                 rbd_warn(NULL, "no pool name provided");
4437                 goto out_err;
4438         }
4439
4440         spec->image_name = dup_token(&buf, NULL);
4441         if (!spec->image_name)
4442                 goto out_mem;
4443         if (!*spec->image_name) {
4444                 rbd_warn(NULL, "no image name provided");
4445                 goto out_err;
4446         }
4447
4448         /*
4449          * Snapshot name is optional; default is to use "-"
4450          * (indicating the head/no snapshot).
4451          */
4452         len = next_token(&buf);
4453         if (!len) {
4454                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4455                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4456         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4457                 ret = -ENAMETOOLONG;
4458                 goto out_err;
4459         }
4460         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4461         if (!snap_name)
4462                 goto out_mem;
4463         *(snap_name + len) = '\0';
4464         spec->snap_name = snap_name;
4465
4466         /* Initialize all rbd options to the defaults */
4467
4468         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4469         if (!rbd_opts)
4470                 goto out_mem;
4471
4472         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4473
4474         copts = ceph_parse_options(options, mon_addrs,
4475                                         mon_addrs + mon_addrs_size - 1,
4476                                         parse_rbd_opts_token, rbd_opts);
4477         if (IS_ERR(copts)) {
4478                 ret = PTR_ERR(copts);
4479                 goto out_err;
4480         }
4481         kfree(options);
4482
4483         *ceph_opts = copts;
4484         *opts = rbd_opts;
4485         *rbd_spec = spec;
4486
4487         return 0;
4488 out_mem:
4489         ret = -ENOMEM;
4490 out_err:
4491         kfree(rbd_opts);
4492         rbd_spec_put(spec);
4493         kfree(options);
4494
4495         return ret;
4496 }
4497
4498 /*
4499  * An rbd format 2 image has a unique identifier, distinct from the
4500  * name given to it by the user.  Internally, that identifier is
4501  * what's used to specify the names of objects related to the image.
4502  *
4503  * A special "rbd id" object is used to map an rbd image name to its
4504  * id.  If that object doesn't exist, then there is no v2 rbd image
4505  * with the supplied name.
4506  *
4507  * This function will record the given rbd_dev's image_id field if
4508  * it can be determined, and in that case will return 0.  If any
4509  * errors occur a negative errno will be returned and the rbd_dev's
4510  * image_id field will be unchanged (and should be NULL).
4511  */
4512 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4513 {
4514         int ret;
4515         size_t size;
4516         char *object_name;
4517         void *response;
4518         char *image_id;
4519
4520         /*
4521          * When probing a parent image, the image id is already
4522          * known (and the image name likely is not).  There's no
4523          * need to fetch the image id again in this case.  We
4524          * do still need to set the image format though.
4525          */
4526         if (rbd_dev->spec->image_id) {
4527                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4528
4529                 return 0;
4530         }
4531
4532         /*
4533          * First, see if the format 2 image id file exists, and if
4534          * so, get the image's persistent id from it.
4535          */
4536         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4537         object_name = kmalloc(size, GFP_NOIO);
4538         if (!object_name)
4539                 return -ENOMEM;
4540         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4541         dout("rbd id object name is %s\n", object_name);
4542
4543         /* Response will be an encoded string, which includes a length */
4544
4545         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4546         response = kzalloc(size, GFP_NOIO);
4547         if (!response) {
4548                 ret = -ENOMEM;
4549                 goto out;
4550         }
4551
4552         /* If it doesn't exist we'll assume it's a format 1 image */
4553
4554         ret = rbd_obj_method_sync(rbd_dev, object_name,
4555                                 "rbd", "get_id", NULL, 0,
4556                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4557         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4558         if (ret == -ENOENT) {
4559                 image_id = kstrdup("", GFP_KERNEL);
4560                 ret = image_id ? 0 : -ENOMEM;
4561                 if (!ret)
4562                         rbd_dev->image_format = 1;
4563         } else if (ret > sizeof (__le32)) {
4564                 void *p = response;
4565
4566                 image_id = ceph_extract_encoded_string(&p, p + ret,
4567                                                 NULL, GFP_NOIO);
4568                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4569                 if (!ret)
4570                         rbd_dev->image_format = 2;
4571         } else {
4572                 ret = -EINVAL;
4573         }
4574
4575         if (!ret) {
4576                 rbd_dev->spec->image_id = image_id;
4577                 dout("image_id is %s\n", image_id);
4578         }
4579 out:
4580         kfree(response);
4581         kfree(object_name);
4582
4583         return ret;
4584 }
4585
4586 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4587 {
4588         int ret;
4589         size_t size;
4590
4591         /* Record the header object name for this rbd image. */
4592
4593         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4594         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4595         if (!rbd_dev->header_name) {
4596                 ret = -ENOMEM;
4597                 goto out_err;
4598         }
4599         sprintf(rbd_dev->header_name, "%s%s",
4600                 rbd_dev->spec->image_name, RBD_SUFFIX);
4601
4602         /* Populate rbd image metadata */
4603
4604         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4605         if (ret < 0)
4606                 goto out_err;
4607
4608         /* Version 1 images have no parent (no layering) */
4609
4610         rbd_dev->parent_spec = NULL;
4611         rbd_dev->parent_overlap = 0;
4612
4613         dout("discovered version 1 image, header name is %s\n",
4614                 rbd_dev->header_name);
4615
4616         return 0;
4617
4618 out_err:
4619         kfree(rbd_dev->header_name);
4620         rbd_dev->header_name = NULL;
4621         kfree(rbd_dev->spec->image_id);
4622         rbd_dev->spec->image_id = NULL;
4623
4624         return ret;
4625 }
4626
4627 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4628 {
4629         size_t size;
4630         int ret;
4631         u64 ver = 0;
4632
4633         /*
4634          * Image id was filled in by the caller.  Record the header
4635          * object name for this rbd image.
4636          */
4637         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4638         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4639         if (!rbd_dev->header_name)
4640                 return -ENOMEM;
4641         sprintf(rbd_dev->header_name, "%s%s",
4642                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4643
4644         /* Get the size and object order for the image */
4645         ret = rbd_dev_v2_image_size(rbd_dev);
4646         if (ret)
4647                 goto out_err;
4648
4649         /* Get the object prefix (a.k.a. block_name) for the image */
4650
4651         ret = rbd_dev_v2_object_prefix(rbd_dev);
4652         if (ret)
4653                 goto out_err;
4654
4655         /* Get the and check features for the image */
4656
4657         ret = rbd_dev_v2_features(rbd_dev);
4658         if (ret)
4659                 goto out_err;
4660
4661         /* If the image supports layering, get the parent info */
4662
4663         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4664                 ret = rbd_dev_v2_parent_info(rbd_dev);
4665                 if (ret)
4666                         goto out_err;
4667                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4668                                         "layered rbd images is EXPERIMENTAL!");
4669         }
4670
4671         /* If the image supports fancy striping, get its parameters */
4672
4673         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4674                 ret = rbd_dev_v2_striping_info(rbd_dev);
4675                 if (ret < 0)
4676                         goto out_err;
4677         }
4678
4679         /* crypto and compression type aren't (yet) supported for v2 images */
4680
4681         rbd_dev->header.crypt_type = 0;
4682         rbd_dev->header.comp_type = 0;
4683
4684         /* Get the snapshot context, plus the header version */
4685
4686         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4687         if (ret)
4688                 goto out_err;
4689         rbd_dev->header.obj_version = ver;
4690
4691         dout("discovered version 2 image, header name is %s\n",
4692                 rbd_dev->header_name);
4693
4694         return 0;
4695 out_err:
4696         rbd_dev->parent_overlap = 0;
4697         rbd_spec_put(rbd_dev->parent_spec);
4698         rbd_dev->parent_spec = NULL;
4699         kfree(rbd_dev->header_name);
4700         rbd_dev->header_name = NULL;
4701         kfree(rbd_dev->header.object_prefix);
4702         rbd_dev->header.object_prefix = NULL;
4703
4704         return ret;
4705 }
4706
4707 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4708 {
4709         struct rbd_device *parent = NULL;
4710         struct rbd_spec *parent_spec = NULL;
4711         struct rbd_client *rbdc = NULL;
4712         int ret;
4713
4714         /* no need to lock here, as rbd_dev is not registered yet */
4715         ret = rbd_dev_snaps_update(rbd_dev);
4716         if (ret)
4717                 return ret;
4718
4719         ret = rbd_dev_spec_update(rbd_dev);
4720         if (ret)
4721                 goto err_out_snaps;
4722
4723         ret = rbd_dev_set_mapping(rbd_dev);
4724         if (ret)
4725                 goto err_out_snaps;
4726
4727         /* generate unique id: find highest unique id, add one */
4728         rbd_dev_id_get(rbd_dev);
4729
4730         /* Fill in the device name, now that we have its id. */
4731         BUILD_BUG_ON(DEV_NAME_LEN
4732                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4733         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4734
4735         /* Get our block major device number. */
4736
4737         ret = register_blkdev(0, rbd_dev->name);
4738         if (ret < 0)
4739                 goto err_out_id;
4740         rbd_dev->major = ret;
4741
4742         /* Set up the blkdev mapping. */
4743
4744         ret = rbd_init_disk(rbd_dev);
4745         if (ret)
4746                 goto err_out_blkdev;
4747
4748         ret = rbd_bus_add_dev(rbd_dev);
4749         if (ret)
4750                 goto err_out_disk;
4751
4752         /*
4753          * At this point cleanup in the event of an error is the job
4754          * of the sysfs code (initiated by rbd_bus_del_dev()).
4755          */
4756         /* Probe the parent if there is one */
4757
4758         if (rbd_dev->parent_spec) {
4759                 /*
4760                  * We need to pass a reference to the client and the
4761                  * parent spec when creating the parent rbd_dev.
4762                  * Images related by parent/child relationships
4763                  * always share both.
4764                  */
4765                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4766                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4767
4768                 parent = rbd_dev_create(rbdc, parent_spec);
4769                 if (!parent) {
4770                         ret = -ENOMEM;
4771                         goto err_out_spec;
4772                 }
4773                 rbdc = NULL;            /* parent now owns reference */
4774                 parent_spec = NULL;     /* parent now owns reference */
4775                 ret = rbd_dev_image_probe(parent);
4776                 if (ret < 0)
4777                         goto err_out_parent;
4778                 rbd_dev->parent = parent;
4779         }
4780
4781         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4782         if (ret)
4783                 goto err_out_bus;
4784
4785         /* Everything's ready.  Announce the disk to the world. */
4786
4787         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4788         add_disk(rbd_dev->disk);
4789
4790         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4791                 (unsigned long long) rbd_dev->mapping.size);
4792
4793         return ret;
4794
4795 err_out_parent:
4796         rbd_spec_put(rbd_dev->parent_spec);
4797         kfree(rbd_dev->header_name);
4798         rbd_dev_destroy(parent);
4799 err_out_spec:
4800         rbd_spec_put(parent_spec);
4801         rbd_put_client(rbdc);
4802 err_out_bus:
4803         /* this will also clean up rest of rbd_dev stuff */
4804
4805         rbd_bus_del_dev(rbd_dev);
4806
4807         return ret;
4808 err_out_disk:
4809         rbd_free_disk(rbd_dev);
4810 err_out_blkdev:
4811         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4812 err_out_id:
4813         rbd_dev_id_put(rbd_dev);
4814 err_out_snaps:
4815         rbd_remove_all_snaps(rbd_dev);
4816
4817         return ret;
4818 }
4819
4820 /*
4821  * Probe for the existence of the header object for the given rbd
4822  * device.  For format 2 images this includes determining the image
4823  * id.
4824  */
4825 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4826 {
4827         int ret;
4828
4829         /*
4830          * Get the id from the image id object.  If it's not a
4831          * format 2 image, we'll get ENOENT back, and we'll assume
4832          * it's a format 1 image.
4833          */
4834         ret = rbd_dev_image_id(rbd_dev);
4835         if (ret)
4836                 return ret;
4837         rbd_assert(rbd_dev->spec->image_id);
4838         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4839
4840         if (rbd_dev->image_format == 1)
4841                 ret = rbd_dev_v1_probe(rbd_dev);
4842         else
4843                 ret = rbd_dev_v2_probe(rbd_dev);
4844         if (ret)
4845                 goto out_err;
4846
4847         ret = rbd_dev_probe_finish(rbd_dev);
4848         if (ret)
4849                 rbd_header_free(&rbd_dev->header);
4850
4851         return ret;
4852 out_err:
4853         kfree(rbd_dev->spec->image_id);
4854         rbd_dev->spec->image_id = NULL;
4855
4856         dout("probe failed, returning %d\n", ret);
4857
4858         return ret;
4859 }
4860
4861 static ssize_t rbd_add(struct bus_type *bus,
4862                        const char *buf,
4863                        size_t count)
4864 {
4865         struct rbd_device *rbd_dev = NULL;
4866         struct ceph_options *ceph_opts = NULL;
4867         struct rbd_options *rbd_opts = NULL;
4868         struct rbd_spec *spec = NULL;
4869         struct rbd_client *rbdc;
4870         struct ceph_osd_client *osdc;
4871         int rc = -ENOMEM;
4872
4873         if (!try_module_get(THIS_MODULE))
4874                 return -ENODEV;
4875
4876         /* parse add command */
4877         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4878         if (rc < 0)
4879                 goto err_out_module;
4880
4881         rbdc = rbd_get_client(ceph_opts);
4882         if (IS_ERR(rbdc)) {
4883                 rc = PTR_ERR(rbdc);
4884                 goto err_out_args;
4885         }
4886         ceph_opts = NULL;       /* rbd_dev client now owns this */
4887
4888         /* pick the pool */
4889         osdc = &rbdc->client->osdc;
4890         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4891         if (rc < 0)
4892                 goto err_out_client;
4893         spec->pool_id = (u64)rc;
4894
4895         /* The ceph file layout needs to fit pool id in 32 bits */
4896
4897         if (spec->pool_id > (u64)U32_MAX) {
4898                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4899                                 (unsigned long long)spec->pool_id, U32_MAX);
4900                 rc = -EIO;
4901                 goto err_out_client;
4902         }
4903
4904         rbd_dev = rbd_dev_create(rbdc, spec);
4905         if (!rbd_dev)
4906                 goto err_out_client;
4907         rbdc = NULL;            /* rbd_dev now owns this */
4908         spec = NULL;            /* rbd_dev now owns this */
4909
4910         rbd_dev->mapping.read_only = rbd_opts->read_only;
4911         kfree(rbd_opts);
4912         rbd_opts = NULL;        /* done with this */
4913
4914         rc = rbd_dev_image_probe(rbd_dev);
4915         if (rc < 0)
4916                 goto err_out_rbd_dev;
4917
4918         return count;
4919 err_out_rbd_dev:
4920         rbd_spec_put(rbd_dev->parent_spec);
4921         kfree(rbd_dev->header_name);
4922         rbd_dev_destroy(rbd_dev);
4923 err_out_client:
4924         rbd_put_client(rbdc);
4925 err_out_args:
4926         if (ceph_opts)
4927                 ceph_destroy_options(ceph_opts);
4928         kfree(rbd_opts);
4929         rbd_spec_put(spec);
4930 err_out_module:
4931         module_put(THIS_MODULE);
4932
4933         dout("Error adding device %s\n", buf);
4934
4935         return (ssize_t)rc;
4936 }
4937
4938 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4939 {
4940         struct list_head *tmp;
4941         struct rbd_device *rbd_dev;
4942
4943         spin_lock(&rbd_dev_list_lock);
4944         list_for_each(tmp, &rbd_dev_list) {
4945                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4946                 if (rbd_dev->dev_id == dev_id) {
4947                         spin_unlock(&rbd_dev_list_lock);
4948                         return rbd_dev;
4949                 }
4950         }
4951         spin_unlock(&rbd_dev_list_lock);
4952         return NULL;
4953 }
4954
4955 static void rbd_dev_release(struct device *dev)
4956 {
4957         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4958
4959         if (rbd_dev->watch_event)
4960                 rbd_dev_header_watch_sync(rbd_dev, 0);
4961
4962         /* clean up and free blkdev */
4963         rbd_free_disk(rbd_dev);
4964         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4965
4966         /* release allocated disk header fields */
4967         rbd_header_free(&rbd_dev->header);
4968
4969         /* done with the id, and with the rbd_dev */
4970         rbd_dev_id_put(rbd_dev);
4971         rbd_assert(rbd_dev->rbd_client != NULL);
4972         rbd_spec_put(rbd_dev->parent_spec);
4973         kfree(rbd_dev->header_name);
4974         rbd_dev_destroy(rbd_dev);
4975
4976         /* release module ref */
4977         module_put(THIS_MODULE);
4978 }
4979
4980 static void __rbd_remove(struct rbd_device *rbd_dev)
4981 {
4982         rbd_remove_all_snaps(rbd_dev);
4983         rbd_bus_del_dev(rbd_dev);
4984 }
4985
4986 static ssize_t rbd_remove(struct bus_type *bus,
4987                           const char *buf,
4988                           size_t count)
4989 {
4990         struct rbd_device *rbd_dev = NULL;
4991         int target_id, rc;
4992         unsigned long ul;
4993         int ret = count;
4994
4995         rc = strict_strtoul(buf, 10, &ul);
4996         if (rc)
4997                 return rc;
4998
4999         /* convert to int; abort if we lost anything in the conversion */
5000         target_id = (int) ul;
5001         if (target_id != ul)
5002                 return -EINVAL;
5003
5004         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5005
5006         rbd_dev = __rbd_get_dev(target_id);
5007         if (!rbd_dev) {
5008                 ret = -ENOENT;
5009                 goto done;
5010         }
5011
5012         spin_lock_irq(&rbd_dev->lock);
5013         if (rbd_dev->open_count)
5014                 ret = -EBUSY;
5015         else
5016                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5017         spin_unlock_irq(&rbd_dev->lock);
5018         if (ret < 0)
5019                 goto done;
5020
5021         while (rbd_dev->parent_spec) {
5022                 struct rbd_device *first = rbd_dev;
5023                 struct rbd_device *second = first->parent;
5024                 struct rbd_device *third;
5025
5026                 /*
5027                  * Follow to the parent with no grandparent and
5028                  * remove it.
5029                  */
5030                 while (second && (third = second->parent)) {
5031                         first = second;
5032                         second = third;
5033                 }
5034                 __rbd_remove(second);
5035                 rbd_spec_put(first->parent_spec);
5036                 first->parent_spec = NULL;
5037                 first->parent_overlap = 0;
5038                 first->parent = NULL;
5039         }
5040         __rbd_remove(rbd_dev);
5041
5042 done:
5043         mutex_unlock(&ctl_mutex);
5044
5045         return ret;
5046 }
5047
5048 /*
5049  * create control files in sysfs
5050  * /sys/bus/rbd/...
5051  */
5052 static int rbd_sysfs_init(void)
5053 {
5054         int ret;
5055
5056         ret = device_register(&rbd_root_dev);
5057         if (ret < 0)
5058                 return ret;
5059
5060         ret = bus_register(&rbd_bus_type);
5061         if (ret < 0)
5062                 device_unregister(&rbd_root_dev);
5063
5064         return ret;
5065 }
5066
5067 static void rbd_sysfs_cleanup(void)
5068 {
5069         bus_unregister(&rbd_bus_type);
5070         device_unregister(&rbd_root_dev);
5071 }
5072
5073 static int __init rbd_init(void)
5074 {
5075         int rc;
5076
5077         if (!libceph_compatible(NULL)) {
5078                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5079
5080                 return -EINVAL;
5081         }
5082         rc = rbd_sysfs_init();
5083         if (rc)
5084                 return rc;
5085         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5086         return 0;
5087 }
5088
5089 static void __exit rbd_exit(void)
5090 {
5091         rbd_sysfs_cleanup();
5092 }
5093
5094 module_init(rbd_init);
5095 module_exit(rbd_exit);
5096
5097 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5098 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5099 MODULE_DESCRIPTION("rados block device");
5100
5101 /* following authorship retained from original osdblk.c */
5102 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5103
5104 MODULE_LICENSE("GPL");