]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
c80fc1a3a60407387678663e8e3166e7c48110fe
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
675
676 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
677 {
678         struct ceph_snap_context *snapc;
679         size_t size;
680
681         size = sizeof (struct ceph_snap_context);
682         size += snap_count * sizeof (snapc->snaps[0]);
683         snapc = kzalloc(size, GFP_KERNEL);
684         if (!snapc)
685                 return NULL;
686
687         atomic_set(&snapc->nref, 1);
688         snapc->num_snaps = snap_count;
689
690         return snapc;
691 }
692
693 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
694 {
695         (void)ceph_get_snap_context(snapc);
696 }
697
698 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
699 {
700         ceph_put_snap_context(snapc);
701 }
702
703 /*
704  * Drop reference to ceph client node. If it's not referenced anymore, release
705  * it.
706  */
707 static void rbd_put_client(struct rbd_client *rbdc)
708 {
709         if (rbdc)
710                 kref_put(&rbdc->kref, rbd_client_release);
711 }
712
713 static bool rbd_image_format_valid(u32 image_format)
714 {
715         return image_format == 1 || image_format == 2;
716 }
717
718 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
719 {
720         size_t size;
721         u32 snap_count;
722
723         /* The header has to start with the magic rbd header text */
724         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
725                 return false;
726
727         /* The bio layer requires at least sector-sized I/O */
728
729         if (ondisk->options.order < SECTOR_SHIFT)
730                 return false;
731
732         /* If we use u64 in a few spots we may be able to loosen this */
733
734         if (ondisk->options.order > 8 * sizeof (int) - 1)
735                 return false;
736
737         /*
738          * The size of a snapshot header has to fit in a size_t, and
739          * that limits the number of snapshots.
740          */
741         snap_count = le32_to_cpu(ondisk->snap_count);
742         size = SIZE_MAX - sizeof (struct ceph_snap_context);
743         if (snap_count > size / sizeof (__le64))
744                 return false;
745
746         /*
747          * Not only that, but the size of the entire the snapshot
748          * header must also be representable in a size_t.
749          */
750         size -= snap_count * sizeof (__le64);
751         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
752                 return false;
753
754         return true;
755 }
756
757 /*
758  * Create a new header structure, translate header format from the on-disk
759  * header.
760  */
761 static int rbd_header_from_disk(struct rbd_image_header *header,
762                                  struct rbd_image_header_ondisk *ondisk)
763 {
764         u32 snap_count;
765         size_t len;
766         size_t size;
767         u32 i;
768
769         memset(header, 0, sizeof (*header));
770
771         snap_count = le32_to_cpu(ondisk->snap_count);
772
773         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
774         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
775         if (!header->object_prefix)
776                 return -ENOMEM;
777         memcpy(header->object_prefix, ondisk->object_prefix, len);
778         header->object_prefix[len] = '\0';
779
780         if (snap_count) {
781                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
782
783                 /* Save a copy of the snapshot names */
784
785                 if (snap_names_len > (u64) SIZE_MAX)
786                         return -EIO;
787                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
788                 if (!header->snap_names)
789                         goto out_err;
790                 /*
791                  * Note that rbd_dev_v1_header_read() guarantees
792                  * the ondisk buffer we're working with has
793                  * snap_names_len bytes beyond the end of the
794                  * snapshot id array, this memcpy() is safe.
795                  */
796                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
797                         snap_names_len);
798
799                 /* Record each snapshot's size */
800
801                 size = snap_count * sizeof (*header->snap_sizes);
802                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
803                 if (!header->snap_sizes)
804                         goto out_err;
805                 for (i = 0; i < snap_count; i++)
806                         header->snap_sizes[i] =
807                                 le64_to_cpu(ondisk->snaps[i].image_size);
808         } else {
809                 header->snap_names = NULL;
810                 header->snap_sizes = NULL;
811         }
812
813         header->features = 0;   /* No features support in v1 images */
814         header->obj_order = ondisk->options.order;
815         header->crypt_type = ondisk->options.crypt_type;
816         header->comp_type = ondisk->options.comp_type;
817
818         /* Allocate and fill in the snapshot context */
819
820         header->image_size = le64_to_cpu(ondisk->image_size);
821
822         header->snapc = rbd_snap_context_create(snap_count);
823         if (!header->snapc)
824                 goto out_err;
825         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
826         for (i = 0; i < snap_count; i++)
827                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
828
829         return 0;
830
831 out_err:
832         kfree(header->snap_sizes);
833         header->snap_sizes = NULL;
834         kfree(header->snap_names);
835         header->snap_names = NULL;
836         kfree(header->object_prefix);
837         header->object_prefix = NULL;
838
839         return -ENOMEM;
840 }
841
842 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
843 {
844         struct rbd_snap *snap;
845
846         if (snap_id == CEPH_NOSNAP)
847                 return RBD_SNAP_HEAD_NAME;
848
849         list_for_each_entry(snap, &rbd_dev->snaps, node)
850                 if (snap_id == snap->id)
851                         return snap->name;
852
853         return NULL;
854 }
855
856 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
857                                         const char *snap_name)
858 {
859         struct rbd_snap *snap;
860
861         list_for_each_entry(snap, &rbd_dev->snaps, node)
862                 if (!strcmp(snap_name, snap->name))
863                         return snap;
864
865         return NULL;
866 }
867
868 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
869 {
870         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
871                     sizeof (RBD_SNAP_HEAD_NAME))) {
872                 rbd_dev->mapping.size = rbd_dev->header.image_size;
873                 rbd_dev->mapping.features = rbd_dev->header.features;
874         } else {
875                 struct rbd_snap *snap;
876
877                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
878                 if (!snap)
879                         return -ENOENT;
880                 rbd_dev->mapping.size = snap->size;
881                 rbd_dev->mapping.features = snap->features;
882                 rbd_dev->mapping.read_only = true;
883         }
884
885         return 0;
886 }
887
888 static void rbd_header_free(struct rbd_image_header *header)
889 {
890         kfree(header->object_prefix);
891         header->object_prefix = NULL;
892         kfree(header->snap_sizes);
893         header->snap_sizes = NULL;
894         kfree(header->snap_names);
895         header->snap_names = NULL;
896         rbd_snap_context_put(header->snapc);
897         header->snapc = NULL;
898 }
899
900 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
901 {
902         char *name;
903         u64 segment;
904         int ret;
905
906         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
907         if (!name)
908                 return NULL;
909         segment = offset >> rbd_dev->header.obj_order;
910         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
911                         rbd_dev->header.object_prefix, segment);
912         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
913                 pr_err("error formatting segment name for #%llu (%d)\n",
914                         segment, ret);
915                 kfree(name);
916                 name = NULL;
917         }
918
919         return name;
920 }
921
922 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
923 {
924         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
925
926         return offset & (segment_size - 1);
927 }
928
929 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
930                                 u64 offset, u64 length)
931 {
932         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
933
934         offset &= segment_size - 1;
935
936         rbd_assert(length <= U64_MAX - offset);
937         if (offset + length > segment_size)
938                 length = segment_size - offset;
939
940         return length;
941 }
942
943 /*
944  * returns the size of an object in the image
945  */
946 static u64 rbd_obj_bytes(struct rbd_image_header *header)
947 {
948         return 1 << header->obj_order;
949 }
950
951 /*
952  * bio helpers
953  */
954
955 static void bio_chain_put(struct bio *chain)
956 {
957         struct bio *tmp;
958
959         while (chain) {
960                 tmp = chain;
961                 chain = chain->bi_next;
962                 bio_put(tmp);
963         }
964 }
965
966 /*
967  * zeros a bio chain, starting at specific offset
968  */
969 static void zero_bio_chain(struct bio *chain, int start_ofs)
970 {
971         struct bio_vec *bv;
972         unsigned long flags;
973         void *buf;
974         int i;
975         int pos = 0;
976
977         while (chain) {
978                 bio_for_each_segment(bv, chain, i) {
979                         if (pos + bv->bv_len > start_ofs) {
980                                 int remainder = max(start_ofs - pos, 0);
981                                 buf = bvec_kmap_irq(bv, &flags);
982                                 memset(buf + remainder, 0,
983                                        bv->bv_len - remainder);
984                                 bvec_kunmap_irq(buf, &flags);
985                         }
986                         pos += bv->bv_len;
987                 }
988
989                 chain = chain->bi_next;
990         }
991 }
992
993 /*
994  * similar to zero_bio_chain(), zeros data defined by a page array,
995  * starting at the given byte offset from the start of the array and
996  * continuing up to the given end offset.  The pages array is
997  * assumed to be big enough to hold all bytes up to the end.
998  */
999 static void zero_pages(struct page **pages, u64 offset, u64 end)
1000 {
1001         struct page **page = &pages[offset >> PAGE_SHIFT];
1002
1003         rbd_assert(end > offset);
1004         rbd_assert(end - offset <= (u64)SIZE_MAX);
1005         while (offset < end) {
1006                 size_t page_offset;
1007                 size_t length;
1008                 unsigned long flags;
1009                 void *kaddr;
1010
1011                 page_offset = (size_t)(offset & ~PAGE_MASK);
1012                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1013                 local_irq_save(flags);
1014                 kaddr = kmap_atomic(*page);
1015                 memset(kaddr + page_offset, 0, length);
1016                 kunmap_atomic(kaddr);
1017                 local_irq_restore(flags);
1018
1019                 offset += length;
1020                 page++;
1021         }
1022 }
1023
1024 /*
1025  * Clone a portion of a bio, starting at the given byte offset
1026  * and continuing for the number of bytes indicated.
1027  */
1028 static struct bio *bio_clone_range(struct bio *bio_src,
1029                                         unsigned int offset,
1030                                         unsigned int len,
1031                                         gfp_t gfpmask)
1032 {
1033         struct bio_vec *bv;
1034         unsigned int resid;
1035         unsigned short idx;
1036         unsigned int voff;
1037         unsigned short end_idx;
1038         unsigned short vcnt;
1039         struct bio *bio;
1040
1041         /* Handle the easy case for the caller */
1042
1043         if (!offset && len == bio_src->bi_size)
1044                 return bio_clone(bio_src, gfpmask);
1045
1046         if (WARN_ON_ONCE(!len))
1047                 return NULL;
1048         if (WARN_ON_ONCE(len > bio_src->bi_size))
1049                 return NULL;
1050         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1051                 return NULL;
1052
1053         /* Find first affected segment... */
1054
1055         resid = offset;
1056         __bio_for_each_segment(bv, bio_src, idx, 0) {
1057                 if (resid < bv->bv_len)
1058                         break;
1059                 resid -= bv->bv_len;
1060         }
1061         voff = resid;
1062
1063         /* ...and the last affected segment */
1064
1065         resid += len;
1066         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1067                 if (resid <= bv->bv_len)
1068                         break;
1069                 resid -= bv->bv_len;
1070         }
1071         vcnt = end_idx - idx + 1;
1072
1073         /* Build the clone */
1074
1075         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1076         if (!bio)
1077                 return NULL;    /* ENOMEM */
1078
1079         bio->bi_bdev = bio_src->bi_bdev;
1080         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1081         bio->bi_rw = bio_src->bi_rw;
1082         bio->bi_flags |= 1 << BIO_CLONED;
1083
1084         /*
1085          * Copy over our part of the bio_vec, then update the first
1086          * and last (or only) entries.
1087          */
1088         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1089                         vcnt * sizeof (struct bio_vec));
1090         bio->bi_io_vec[0].bv_offset += voff;
1091         if (vcnt > 1) {
1092                 bio->bi_io_vec[0].bv_len -= voff;
1093                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1094         } else {
1095                 bio->bi_io_vec[0].bv_len = len;
1096         }
1097
1098         bio->bi_vcnt = vcnt;
1099         bio->bi_size = len;
1100         bio->bi_idx = 0;
1101
1102         return bio;
1103 }
1104
1105 /*
1106  * Clone a portion of a bio chain, starting at the given byte offset
1107  * into the first bio in the source chain and continuing for the
1108  * number of bytes indicated.  The result is another bio chain of
1109  * exactly the given length, or a null pointer on error.
1110  *
1111  * The bio_src and offset parameters are both in-out.  On entry they
1112  * refer to the first source bio and the offset into that bio where
1113  * the start of data to be cloned is located.
1114  *
1115  * On return, bio_src is updated to refer to the bio in the source
1116  * chain that contains first un-cloned byte, and *offset will
1117  * contain the offset of that byte within that bio.
1118  */
1119 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1120                                         unsigned int *offset,
1121                                         unsigned int len,
1122                                         gfp_t gfpmask)
1123 {
1124         struct bio *bi = *bio_src;
1125         unsigned int off = *offset;
1126         struct bio *chain = NULL;
1127         struct bio **end;
1128
1129         /* Build up a chain of clone bios up to the limit */
1130
1131         if (!bi || off >= bi->bi_size || !len)
1132                 return NULL;            /* Nothing to clone */
1133
1134         end = &chain;
1135         while (len) {
1136                 unsigned int bi_size;
1137                 struct bio *bio;
1138
1139                 if (!bi) {
1140                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1141                         goto out_err;   /* EINVAL; ran out of bio's */
1142                 }
1143                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1144                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1145                 if (!bio)
1146                         goto out_err;   /* ENOMEM */
1147
1148                 *end = bio;
1149                 end = &bio->bi_next;
1150
1151                 off += bi_size;
1152                 if (off == bi->bi_size) {
1153                         bi = bi->bi_next;
1154                         off = 0;
1155                 }
1156                 len -= bi_size;
1157         }
1158         *bio_src = bi;
1159         *offset = off;
1160
1161         return chain;
1162 out_err:
1163         bio_chain_put(chain);
1164
1165         return NULL;
1166 }
1167
1168 /*
1169  * The default/initial value for all object request flags is 0.  For
1170  * each flag, once its value is set to 1 it is never reset to 0
1171  * again.
1172  */
1173 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1174 {
1175         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = obj_request->img_request->rbd_dev;
1179                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1188 }
1189
1190 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1191 {
1192         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1193                 struct rbd_device *rbd_dev = NULL;
1194
1195                 if (obj_request_img_data_test(obj_request))
1196                         rbd_dev = obj_request->img_request->rbd_dev;
1197                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1198                         obj_request);
1199         }
1200 }
1201
1202 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1203 {
1204         smp_mb();
1205         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1206 }
1207
1208 /*
1209  * This sets the KNOWN flag after (possibly) setting the EXISTS
1210  * flag.  The latter is set based on the "exists" value provided.
1211  *
1212  * Note that for our purposes once an object exists it never goes
1213  * away again.  It's possible that the response from two existence
1214  * checks are separated by the creation of the target object, and
1215  * the first ("doesn't exist") response arrives *after* the second
1216  * ("does exist").  In that case we ignore the second one.
1217  */
1218 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1219                                 bool exists)
1220 {
1221         if (exists)
1222                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1223         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1224         smp_mb();
1225 }
1226
1227 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1228 {
1229         smp_mb();
1230         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1231 }
1232
1233 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1234 {
1235         smp_mb();
1236         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1237 }
1238
1239 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1240 {
1241         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1242                 atomic_read(&obj_request->kref.refcount));
1243         kref_get(&obj_request->kref);
1244 }
1245
1246 static void rbd_obj_request_destroy(struct kref *kref);
1247 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1248 {
1249         rbd_assert(obj_request != NULL);
1250         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1251                 atomic_read(&obj_request->kref.refcount));
1252         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1253 }
1254
1255 static void rbd_img_request_get(struct rbd_img_request *img_request)
1256 {
1257         dout("%s: img %p (was %d)\n", __func__, img_request,
1258                 atomic_read(&img_request->kref.refcount));
1259         kref_get(&img_request->kref);
1260 }
1261
1262 static void rbd_img_request_destroy(struct kref *kref);
1263 static void rbd_img_request_put(struct rbd_img_request *img_request)
1264 {
1265         rbd_assert(img_request != NULL);
1266         dout("%s: img %p (was %d)\n", __func__, img_request,
1267                 atomic_read(&img_request->kref.refcount));
1268         kref_put(&img_request->kref, rbd_img_request_destroy);
1269 }
1270
1271 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1272                                         struct rbd_obj_request *obj_request)
1273 {
1274         rbd_assert(obj_request->img_request == NULL);
1275
1276         /* Image request now owns object's original reference */
1277         obj_request->img_request = img_request;
1278         obj_request->which = img_request->obj_request_count;
1279         rbd_assert(!obj_request_img_data_test(obj_request));
1280         obj_request_img_data_set(obj_request);
1281         rbd_assert(obj_request->which != BAD_WHICH);
1282         img_request->obj_request_count++;
1283         list_add_tail(&obj_request->links, &img_request->obj_requests);
1284         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1285                 obj_request->which);
1286 }
1287
1288 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1289                                         struct rbd_obj_request *obj_request)
1290 {
1291         rbd_assert(obj_request->which != BAD_WHICH);
1292
1293         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1294                 obj_request->which);
1295         list_del(&obj_request->links);
1296         rbd_assert(img_request->obj_request_count > 0);
1297         img_request->obj_request_count--;
1298         rbd_assert(obj_request->which == img_request->obj_request_count);
1299         obj_request->which = BAD_WHICH;
1300         rbd_assert(obj_request_img_data_test(obj_request));
1301         rbd_assert(obj_request->img_request == img_request);
1302         obj_request->img_request = NULL;
1303         obj_request->callback = NULL;
1304         rbd_obj_request_put(obj_request);
1305 }
1306
1307 static bool obj_request_type_valid(enum obj_request_type type)
1308 {
1309         switch (type) {
1310         case OBJ_REQUEST_NODATA:
1311         case OBJ_REQUEST_BIO:
1312         case OBJ_REQUEST_PAGES:
1313                 return true;
1314         default:
1315                 return false;
1316         }
1317 }
1318
1319 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1320                                 struct rbd_obj_request *obj_request)
1321 {
1322         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1323
1324         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1325 }
1326
1327 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1328 {
1329
1330         dout("%s: img %p\n", __func__, img_request);
1331
1332         /*
1333          * If no error occurred, compute the aggregate transfer
1334          * count for the image request.  We could instead use
1335          * atomic64_cmpxchg() to update it as each object request
1336          * completes; not clear which way is better off hand.
1337          */
1338         if (!img_request->result) {
1339                 struct rbd_obj_request *obj_request;
1340                 u64 xferred = 0;
1341
1342                 for_each_obj_request(img_request, obj_request)
1343                         xferred += obj_request->xferred;
1344                 img_request->xferred = xferred;
1345         }
1346
1347         if (img_request->callback)
1348                 img_request->callback(img_request);
1349         else
1350                 rbd_img_request_put(img_request);
1351 }
1352
1353 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1354
1355 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1356 {
1357         dout("%s: obj %p\n", __func__, obj_request);
1358
1359         return wait_for_completion_interruptible(&obj_request->completion);
1360 }
1361
1362 /*
1363  * The default/initial value for all image request flags is 0.  Each
1364  * is conditionally set to 1 at image request initialization time
1365  * and currently never change thereafter.
1366  */
1367 static void img_request_write_set(struct rbd_img_request *img_request)
1368 {
1369         set_bit(IMG_REQ_WRITE, &img_request->flags);
1370         smp_mb();
1371 }
1372
1373 static bool img_request_write_test(struct rbd_img_request *img_request)
1374 {
1375         smp_mb();
1376         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1377 }
1378
1379 static void img_request_child_set(struct rbd_img_request *img_request)
1380 {
1381         set_bit(IMG_REQ_CHILD, &img_request->flags);
1382         smp_mb();
1383 }
1384
1385 static bool img_request_child_test(struct rbd_img_request *img_request)
1386 {
1387         smp_mb();
1388         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1389 }
1390
1391 static void img_request_layered_set(struct rbd_img_request *img_request)
1392 {
1393         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1394         smp_mb();
1395 }
1396
1397 static bool img_request_layered_test(struct rbd_img_request *img_request)
1398 {
1399         smp_mb();
1400         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1401 }
1402
1403 static void
1404 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1405 {
1406         u64 xferred = obj_request->xferred;
1407         u64 length = obj_request->length;
1408
1409         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1410                 obj_request, obj_request->img_request, obj_request->result,
1411                 xferred, length);
1412         /*
1413          * ENOENT means a hole in the image.  We zero-fill the
1414          * entire length of the request.  A short read also implies
1415          * zero-fill to the end of the request.  Either way we
1416          * update the xferred count to indicate the whole request
1417          * was satisfied.
1418          */
1419         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1420         if (obj_request->result == -ENOENT) {
1421                 if (obj_request->type == OBJ_REQUEST_BIO)
1422                         zero_bio_chain(obj_request->bio_list, 0);
1423                 else
1424                         zero_pages(obj_request->pages, 0, length);
1425                 obj_request->result = 0;
1426                 obj_request->xferred = length;
1427         } else if (xferred < length && !obj_request->result) {
1428                 if (obj_request->type == OBJ_REQUEST_BIO)
1429                         zero_bio_chain(obj_request->bio_list, xferred);
1430                 else
1431                         zero_pages(obj_request->pages, xferred, length);
1432                 obj_request->xferred = length;
1433         }
1434         obj_request_done_set(obj_request);
1435 }
1436
1437 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1438 {
1439         dout("%s: obj %p cb %p\n", __func__, obj_request,
1440                 obj_request->callback);
1441         if (obj_request->callback)
1442                 obj_request->callback(obj_request);
1443         else
1444                 complete_all(&obj_request->completion);
1445 }
1446
1447 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1448 {
1449         dout("%s: obj %p\n", __func__, obj_request);
1450         obj_request_done_set(obj_request);
1451 }
1452
1453 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1454 {
1455         struct rbd_img_request *img_request = NULL;
1456         struct rbd_device *rbd_dev = NULL;
1457         bool layered = false;
1458
1459         if (obj_request_img_data_test(obj_request)) {
1460                 img_request = obj_request->img_request;
1461                 layered = img_request && img_request_layered_test(img_request);
1462                 rbd_dev = img_request->rbd_dev;
1463         }
1464
1465         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1466                 obj_request, img_request, obj_request->result,
1467                 obj_request->xferred, obj_request->length);
1468         if (layered && obj_request->result == -ENOENT &&
1469                         obj_request->img_offset < rbd_dev->parent_overlap)
1470                 rbd_img_parent_read(obj_request);
1471         else if (img_request)
1472                 rbd_img_obj_request_read_callback(obj_request);
1473         else
1474                 obj_request_done_set(obj_request);
1475 }
1476
1477 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1478 {
1479         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1480                 obj_request->result, obj_request->length);
1481         /*
1482          * There is no such thing as a successful short write.  Set
1483          * it to our originally-requested length.
1484          */
1485         obj_request->xferred = obj_request->length;
1486         obj_request_done_set(obj_request);
1487 }
1488
1489 /*
1490  * For a simple stat call there's nothing to do.  We'll do more if
1491  * this is part of a write sequence for a layered image.
1492  */
1493 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1494 {
1495         dout("%s: obj %p\n", __func__, obj_request);
1496         obj_request_done_set(obj_request);
1497 }
1498
1499 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1500                                 struct ceph_msg *msg)
1501 {
1502         struct rbd_obj_request *obj_request = osd_req->r_priv;
1503         u16 opcode;
1504
1505         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1506         rbd_assert(osd_req == obj_request->osd_req);
1507         if (obj_request_img_data_test(obj_request)) {
1508                 rbd_assert(obj_request->img_request);
1509                 rbd_assert(obj_request->which != BAD_WHICH);
1510         } else {
1511                 rbd_assert(obj_request->which == BAD_WHICH);
1512         }
1513
1514         if (osd_req->r_result < 0)
1515                 obj_request->result = osd_req->r_result;
1516         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1517
1518         BUG_ON(osd_req->r_num_ops > 2);
1519
1520         /*
1521          * We support a 64-bit length, but ultimately it has to be
1522          * passed to blk_end_request(), which takes an unsigned int.
1523          */
1524         obj_request->xferred = osd_req->r_reply_op_len[0];
1525         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1526         opcode = osd_req->r_ops[0].op;
1527         switch (opcode) {
1528         case CEPH_OSD_OP_READ:
1529                 rbd_osd_read_callback(obj_request);
1530                 break;
1531         case CEPH_OSD_OP_WRITE:
1532                 rbd_osd_write_callback(obj_request);
1533                 break;
1534         case CEPH_OSD_OP_STAT:
1535                 rbd_osd_stat_callback(obj_request);
1536                 break;
1537         case CEPH_OSD_OP_CALL:
1538         case CEPH_OSD_OP_NOTIFY_ACK:
1539         case CEPH_OSD_OP_WATCH:
1540                 rbd_osd_trivial_callback(obj_request);
1541                 break;
1542         default:
1543                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1544                         obj_request->object_name, (unsigned short) opcode);
1545                 break;
1546         }
1547
1548         if (obj_request_done_test(obj_request))
1549                 rbd_obj_request_complete(obj_request);
1550 }
1551
1552 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1553 {
1554         struct rbd_img_request *img_request = obj_request->img_request;
1555         struct ceph_osd_request *osd_req = obj_request->osd_req;
1556         u64 snap_id;
1557
1558         rbd_assert(osd_req != NULL);
1559
1560         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1561         ceph_osdc_build_request(osd_req, obj_request->offset,
1562                         NULL, snap_id, NULL);
1563 }
1564
1565 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1566 {
1567         struct rbd_img_request *img_request = obj_request->img_request;
1568         struct ceph_osd_request *osd_req = obj_request->osd_req;
1569         struct ceph_snap_context *snapc;
1570         struct timespec mtime = CURRENT_TIME;
1571
1572         rbd_assert(osd_req != NULL);
1573
1574         snapc = img_request ? img_request->snapc : NULL;
1575         ceph_osdc_build_request(osd_req, obj_request->offset,
1576                         snapc, CEPH_NOSNAP, &mtime);
1577 }
1578
1579 static struct ceph_osd_request *rbd_osd_req_create(
1580                                         struct rbd_device *rbd_dev,
1581                                         bool write_request,
1582                                         struct rbd_obj_request *obj_request)
1583 {
1584         struct ceph_snap_context *snapc = NULL;
1585         struct ceph_osd_client *osdc;
1586         struct ceph_osd_request *osd_req;
1587
1588         if (obj_request_img_data_test(obj_request)) {
1589                 struct rbd_img_request *img_request = obj_request->img_request;
1590
1591                 rbd_assert(write_request ==
1592                                 img_request_write_test(img_request));
1593                 if (write_request)
1594                         snapc = img_request->snapc;
1595         }
1596
1597         /* Allocate and initialize the request, for the single op */
1598
1599         osdc = &rbd_dev->rbd_client->client->osdc;
1600         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1601         if (!osd_req)
1602                 return NULL;    /* ENOMEM */
1603
1604         if (write_request)
1605                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1606         else
1607                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1608
1609         osd_req->r_callback = rbd_osd_req_callback;
1610         osd_req->r_priv = obj_request;
1611
1612         osd_req->r_oid_len = strlen(obj_request->object_name);
1613         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1614         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1615
1616         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1617
1618         return osd_req;
1619 }
1620
1621 /*
1622  * Create a copyup osd request based on the information in the
1623  * object request supplied.  A copyup request has two osd ops,
1624  * a copyup method call, and a "normal" write request.
1625  */
1626 static struct ceph_osd_request *
1627 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1628 {
1629         struct rbd_img_request *img_request;
1630         struct ceph_snap_context *snapc;
1631         struct rbd_device *rbd_dev;
1632         struct ceph_osd_client *osdc;
1633         struct ceph_osd_request *osd_req;
1634
1635         rbd_assert(obj_request_img_data_test(obj_request));
1636         img_request = obj_request->img_request;
1637         rbd_assert(img_request);
1638         rbd_assert(img_request_write_test(img_request));
1639
1640         /* Allocate and initialize the request, for the two ops */
1641
1642         snapc = img_request->snapc;
1643         rbd_dev = img_request->rbd_dev;
1644         osdc = &rbd_dev->rbd_client->client->osdc;
1645         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1646         if (!osd_req)
1647                 return NULL;    /* ENOMEM */
1648
1649         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1650         osd_req->r_callback = rbd_osd_req_callback;
1651         osd_req->r_priv = obj_request;
1652
1653         osd_req->r_oid_len = strlen(obj_request->object_name);
1654         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1655         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1656
1657         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1658
1659         return osd_req;
1660 }
1661
1662
1663 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1664 {
1665         ceph_osdc_put_request(osd_req);
1666 }
1667
1668 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1669
1670 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1671                                                 u64 offset, u64 length,
1672                                                 enum obj_request_type type)
1673 {
1674         struct rbd_obj_request *obj_request;
1675         size_t size;
1676         char *name;
1677
1678         rbd_assert(obj_request_type_valid(type));
1679
1680         size = strlen(object_name) + 1;
1681         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1682         if (!obj_request)
1683                 return NULL;
1684
1685         name = (char *)(obj_request + 1);
1686         obj_request->object_name = memcpy(name, object_name, size);
1687         obj_request->offset = offset;
1688         obj_request->length = length;
1689         obj_request->flags = 0;
1690         obj_request->which = BAD_WHICH;
1691         obj_request->type = type;
1692         INIT_LIST_HEAD(&obj_request->links);
1693         init_completion(&obj_request->completion);
1694         kref_init(&obj_request->kref);
1695
1696         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1697                 offset, length, (int)type, obj_request);
1698
1699         return obj_request;
1700 }
1701
1702 static void rbd_obj_request_destroy(struct kref *kref)
1703 {
1704         struct rbd_obj_request *obj_request;
1705
1706         obj_request = container_of(kref, struct rbd_obj_request, kref);
1707
1708         dout("%s: obj %p\n", __func__, obj_request);
1709
1710         rbd_assert(obj_request->img_request == NULL);
1711         rbd_assert(obj_request->which == BAD_WHICH);
1712
1713         if (obj_request->osd_req)
1714                 rbd_osd_req_destroy(obj_request->osd_req);
1715
1716         rbd_assert(obj_request_type_valid(obj_request->type));
1717         switch (obj_request->type) {
1718         case OBJ_REQUEST_NODATA:
1719                 break;          /* Nothing to do */
1720         case OBJ_REQUEST_BIO:
1721                 if (obj_request->bio_list)
1722                         bio_chain_put(obj_request->bio_list);
1723                 break;
1724         case OBJ_REQUEST_PAGES:
1725                 if (obj_request->pages)
1726                         ceph_release_page_vector(obj_request->pages,
1727                                                 obj_request->page_count);
1728                 break;
1729         }
1730
1731         kfree(obj_request);
1732 }
1733
1734 /*
1735  * Caller is responsible for filling in the list of object requests
1736  * that comprises the image request, and the Linux request pointer
1737  * (if there is one).
1738  */
1739 static struct rbd_img_request *rbd_img_request_create(
1740                                         struct rbd_device *rbd_dev,
1741                                         u64 offset, u64 length,
1742                                         bool write_request,
1743                                         bool child_request)
1744 {
1745         struct rbd_img_request *img_request;
1746
1747         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1748         if (!img_request)
1749                 return NULL;
1750
1751         if (write_request) {
1752                 down_read(&rbd_dev->header_rwsem);
1753                 rbd_snap_context_get(rbd_dev->header.snapc);
1754                 up_read(&rbd_dev->header_rwsem);
1755         }
1756
1757         img_request->rq = NULL;
1758         img_request->rbd_dev = rbd_dev;
1759         img_request->offset = offset;
1760         img_request->length = length;
1761         img_request->flags = 0;
1762         if (write_request) {
1763                 img_request_write_set(img_request);
1764                 img_request->snapc = rbd_dev->header.snapc;
1765         } else {
1766                 img_request->snap_id = rbd_dev->spec->snap_id;
1767         }
1768         if (child_request)
1769                 img_request_child_set(img_request);
1770         if (rbd_dev->parent_spec)
1771                 img_request_layered_set(img_request);
1772         spin_lock_init(&img_request->completion_lock);
1773         img_request->next_completion = 0;
1774         img_request->callback = NULL;
1775         img_request->result = 0;
1776         img_request->obj_request_count = 0;
1777         INIT_LIST_HEAD(&img_request->obj_requests);
1778         kref_init(&img_request->kref);
1779
1780         rbd_img_request_get(img_request);       /* Avoid a warning */
1781         rbd_img_request_put(img_request);       /* TEMPORARY */
1782
1783         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1784                 write_request ? "write" : "read", offset, length,
1785                 img_request);
1786
1787         return img_request;
1788 }
1789
1790 static void rbd_img_request_destroy(struct kref *kref)
1791 {
1792         struct rbd_img_request *img_request;
1793         struct rbd_obj_request *obj_request;
1794         struct rbd_obj_request *next_obj_request;
1795
1796         img_request = container_of(kref, struct rbd_img_request, kref);
1797
1798         dout("%s: img %p\n", __func__, img_request);
1799
1800         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1801                 rbd_img_obj_request_del(img_request, obj_request);
1802         rbd_assert(img_request->obj_request_count == 0);
1803
1804         if (img_request_write_test(img_request))
1805                 rbd_snap_context_put(img_request->snapc);
1806
1807         if (img_request_child_test(img_request))
1808                 rbd_obj_request_put(img_request->obj_request);
1809
1810         kfree(img_request);
1811 }
1812
1813 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1814 {
1815         struct rbd_img_request *img_request;
1816         unsigned int xferred;
1817         int result;
1818         bool more;
1819
1820         rbd_assert(obj_request_img_data_test(obj_request));
1821         img_request = obj_request->img_request;
1822
1823         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1824         xferred = (unsigned int)obj_request->xferred;
1825         result = obj_request->result;
1826         if (result) {
1827                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1828
1829                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1830                         img_request_write_test(img_request) ? "write" : "read",
1831                         obj_request->length, obj_request->img_offset,
1832                         obj_request->offset);
1833                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1834                         result, xferred);
1835                 if (!img_request->result)
1836                         img_request->result = result;
1837         }
1838
1839         /* Image object requests don't own their page array */
1840
1841         if (obj_request->type == OBJ_REQUEST_PAGES) {
1842                 obj_request->pages = NULL;
1843                 obj_request->page_count = 0;
1844         }
1845
1846         if (img_request_child_test(img_request)) {
1847                 rbd_assert(img_request->obj_request != NULL);
1848                 more = obj_request->which < img_request->obj_request_count - 1;
1849         } else {
1850                 rbd_assert(img_request->rq != NULL);
1851                 more = blk_end_request(img_request->rq, result, xferred);
1852         }
1853
1854         return more;
1855 }
1856
1857 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1858 {
1859         struct rbd_img_request *img_request;
1860         u32 which = obj_request->which;
1861         bool more = true;
1862
1863         rbd_assert(obj_request_img_data_test(obj_request));
1864         img_request = obj_request->img_request;
1865
1866         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1867         rbd_assert(img_request != NULL);
1868         rbd_assert(img_request->obj_request_count > 0);
1869         rbd_assert(which != BAD_WHICH);
1870         rbd_assert(which < img_request->obj_request_count);
1871         rbd_assert(which >= img_request->next_completion);
1872
1873         spin_lock_irq(&img_request->completion_lock);
1874         if (which != img_request->next_completion)
1875                 goto out;
1876
1877         for_each_obj_request_from(img_request, obj_request) {
1878                 rbd_assert(more);
1879                 rbd_assert(which < img_request->obj_request_count);
1880
1881                 if (!obj_request_done_test(obj_request))
1882                         break;
1883                 more = rbd_img_obj_end_request(obj_request);
1884                 which++;
1885         }
1886
1887         rbd_assert(more ^ (which == img_request->obj_request_count));
1888         img_request->next_completion = which;
1889 out:
1890         spin_unlock_irq(&img_request->completion_lock);
1891
1892         if (!more)
1893                 rbd_img_request_complete(img_request);
1894 }
1895
1896 /*
1897  * Split up an image request into one or more object requests, each
1898  * to a different object.  The "type" parameter indicates whether
1899  * "data_desc" is the pointer to the head of a list of bio
1900  * structures, or the base of a page array.  In either case this
1901  * function assumes data_desc describes memory sufficient to hold
1902  * all data described by the image request.
1903  */
1904 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1905                                         enum obj_request_type type,
1906                                         void *data_desc)
1907 {
1908         struct rbd_device *rbd_dev = img_request->rbd_dev;
1909         struct rbd_obj_request *obj_request = NULL;
1910         struct rbd_obj_request *next_obj_request;
1911         bool write_request = img_request_write_test(img_request);
1912         struct bio *bio_list;
1913         unsigned int bio_offset = 0;
1914         struct page **pages;
1915         u64 img_offset;
1916         u64 resid;
1917         u16 opcode;
1918
1919         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1920                 (int)type, data_desc);
1921
1922         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1923         img_offset = img_request->offset;
1924         resid = img_request->length;
1925         rbd_assert(resid > 0);
1926
1927         if (type == OBJ_REQUEST_BIO) {
1928                 bio_list = data_desc;
1929                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1930         } else {
1931                 rbd_assert(type == OBJ_REQUEST_PAGES);
1932                 pages = data_desc;
1933         }
1934
1935         while (resid) {
1936                 struct ceph_osd_request *osd_req;
1937                 const char *object_name;
1938                 u64 offset;
1939                 u64 length;
1940
1941                 object_name = rbd_segment_name(rbd_dev, img_offset);
1942                 if (!object_name)
1943                         goto out_unwind;
1944                 offset = rbd_segment_offset(rbd_dev, img_offset);
1945                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1946                 obj_request = rbd_obj_request_create(object_name,
1947                                                 offset, length, type);
1948                 kfree(object_name);     /* object request has its own copy */
1949                 if (!obj_request)
1950                         goto out_unwind;
1951
1952                 if (type == OBJ_REQUEST_BIO) {
1953                         unsigned int clone_size;
1954
1955                         rbd_assert(length <= (u64)UINT_MAX);
1956                         clone_size = (unsigned int)length;
1957                         obj_request->bio_list =
1958                                         bio_chain_clone_range(&bio_list,
1959                                                                 &bio_offset,
1960                                                                 clone_size,
1961                                                                 GFP_ATOMIC);
1962                         if (!obj_request->bio_list)
1963                                 goto out_partial;
1964                 } else {
1965                         unsigned int page_count;
1966
1967                         obj_request->pages = pages;
1968                         page_count = (u32)calc_pages_for(offset, length);
1969                         obj_request->page_count = page_count;
1970                         if ((offset + length) & ~PAGE_MASK)
1971                                 page_count--;   /* more on last page */
1972                         pages += page_count;
1973                 }
1974
1975                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1976                                                 obj_request);
1977                 if (!osd_req)
1978                         goto out_partial;
1979                 obj_request->osd_req = osd_req;
1980                 obj_request->callback = rbd_img_obj_callback;
1981
1982                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1983                                                 0, 0);
1984                 if (type == OBJ_REQUEST_BIO)
1985                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1986                                         obj_request->bio_list, length);
1987                 else
1988                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1989                                         obj_request->pages, length,
1990                                         offset & ~PAGE_MASK, false, false);
1991
1992                 if (write_request)
1993                         rbd_osd_req_format_write(obj_request);
1994                 else
1995                         rbd_osd_req_format_read(obj_request);
1996
1997                 obj_request->img_offset = img_offset;
1998                 rbd_img_obj_request_add(img_request, obj_request);
1999
2000                 img_offset += length;
2001                 resid -= length;
2002         }
2003
2004         return 0;
2005
2006 out_partial:
2007         rbd_obj_request_put(obj_request);
2008 out_unwind:
2009         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2010                 rbd_obj_request_put(obj_request);
2011
2012         return -ENOMEM;
2013 }
2014
2015 static void
2016 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2017 {
2018         struct rbd_img_request *img_request;
2019         struct rbd_device *rbd_dev;
2020         u64 length;
2021         u32 page_count;
2022
2023         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2024         rbd_assert(obj_request_img_data_test(obj_request));
2025         img_request = obj_request->img_request;
2026         rbd_assert(img_request);
2027
2028         rbd_dev = img_request->rbd_dev;
2029         rbd_assert(rbd_dev);
2030         length = (u64)1 << rbd_dev->header.obj_order;
2031         page_count = (u32)calc_pages_for(0, length);
2032
2033         rbd_assert(obj_request->copyup_pages);
2034         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2035         obj_request->copyup_pages = NULL;
2036
2037         /*
2038          * We want the transfer count to reflect the size of the
2039          * original write request.  There is no such thing as a
2040          * successful short write, so if the request was successful
2041          * we can just set it to the originally-requested length.
2042          */
2043         if (!obj_request->result)
2044                 obj_request->xferred = obj_request->length;
2045
2046         /* Finish up with the normal image object callback */
2047
2048         rbd_img_obj_callback(obj_request);
2049 }
2050
2051 static void
2052 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2053 {
2054         struct rbd_obj_request *orig_request;
2055         struct ceph_osd_request *osd_req;
2056         struct ceph_osd_client *osdc;
2057         struct rbd_device *rbd_dev;
2058         struct page **pages;
2059         int result;
2060         u64 obj_size;
2061         u64 xferred;
2062
2063         rbd_assert(img_request_child_test(img_request));
2064
2065         /* First get what we need from the image request */
2066
2067         pages = img_request->copyup_pages;
2068         rbd_assert(pages != NULL);
2069         img_request->copyup_pages = NULL;
2070
2071         orig_request = img_request->obj_request;
2072         rbd_assert(orig_request != NULL);
2073         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2074         result = img_request->result;
2075         obj_size = img_request->length;
2076         xferred = img_request->xferred;
2077
2078         rbd_dev = img_request->rbd_dev;
2079         rbd_assert(rbd_dev);
2080         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2081
2082         rbd_img_request_put(img_request);
2083
2084         if (result)
2085                 goto out_err;
2086
2087         /* Allocate the new copyup osd request for the original request */
2088
2089         result = -ENOMEM;
2090         rbd_assert(!orig_request->osd_req);
2091         osd_req = rbd_osd_req_create_copyup(orig_request);
2092         if (!osd_req)
2093                 goto out_err;
2094         orig_request->osd_req = osd_req;
2095         orig_request->copyup_pages = pages;
2096
2097         /* Initialize the copyup op */
2098
2099         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2100         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2101                                                 false, false);
2102
2103         /* Then the original write request op */
2104
2105         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2106                                         orig_request->offset,
2107                                         orig_request->length, 0, 0);
2108         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2109                                         orig_request->length);
2110
2111         rbd_osd_req_format_write(orig_request);
2112
2113         /* All set, send it off. */
2114
2115         orig_request->callback = rbd_img_obj_copyup_callback;
2116         osdc = &rbd_dev->rbd_client->client->osdc;
2117         result = rbd_obj_request_submit(osdc, orig_request);
2118         if (!result)
2119                 return;
2120 out_err:
2121         /* Record the error code and complete the request */
2122
2123         orig_request->result = result;
2124         orig_request->xferred = 0;
2125         obj_request_done_set(orig_request);
2126         rbd_obj_request_complete(orig_request);
2127 }
2128
2129 /*
2130  * Read from the parent image the range of data that covers the
2131  * entire target of the given object request.  This is used for
2132  * satisfying a layered image write request when the target of an
2133  * object request from the image request does not exist.
2134  *
2135  * A page array big enough to hold the returned data is allocated
2136  * and supplied to rbd_img_request_fill() as the "data descriptor."
2137  * When the read completes, this page array will be transferred to
2138  * the original object request for the copyup operation.
2139  *
2140  * If an error occurs, record it as the result of the original
2141  * object request and mark it done so it gets completed.
2142  */
2143 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2144 {
2145         struct rbd_img_request *img_request = NULL;
2146         struct rbd_img_request *parent_request = NULL;
2147         struct rbd_device *rbd_dev;
2148         u64 img_offset;
2149         u64 length;
2150         struct page **pages = NULL;
2151         u32 page_count;
2152         int result;
2153
2154         rbd_assert(obj_request_img_data_test(obj_request));
2155         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2156
2157         img_request = obj_request->img_request;
2158         rbd_assert(img_request != NULL);
2159         rbd_dev = img_request->rbd_dev;
2160         rbd_assert(rbd_dev->parent != NULL);
2161
2162         /*
2163          * First things first.  The original osd request is of no
2164          * use to use any more, we'll need a new one that can hold
2165          * the two ops in a copyup request.  We'll get that later,
2166          * but for now we can release the old one.
2167          */
2168         rbd_osd_req_destroy(obj_request->osd_req);
2169         obj_request->osd_req = NULL;
2170
2171         /*
2172          * Determine the byte range covered by the object in the
2173          * child image to which the original request was to be sent.
2174          */
2175         img_offset = obj_request->img_offset - obj_request->offset;
2176         length = (u64)1 << rbd_dev->header.obj_order;
2177
2178         /*
2179          * There is no defined parent data beyond the parent
2180          * overlap, so limit what we read at that boundary if
2181          * necessary.
2182          */
2183         if (img_offset + length > rbd_dev->parent_overlap) {
2184                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2185                 length = rbd_dev->parent_overlap - img_offset;
2186         }
2187
2188         /*
2189          * Allocate a page array big enough to receive the data read
2190          * from the parent.
2191          */
2192         page_count = (u32)calc_pages_for(0, length);
2193         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2194         if (IS_ERR(pages)) {
2195                 result = PTR_ERR(pages);
2196                 pages = NULL;
2197                 goto out_err;
2198         }
2199
2200         result = -ENOMEM;
2201         parent_request = rbd_img_request_create(rbd_dev->parent,
2202                                                 img_offset, length,
2203                                                 false, true);
2204         if (!parent_request)
2205                 goto out_err;
2206         rbd_obj_request_get(obj_request);
2207         parent_request->obj_request = obj_request;
2208
2209         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2210         if (result)
2211                 goto out_err;
2212         parent_request->copyup_pages = pages;
2213
2214         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2215         result = rbd_img_request_submit(parent_request);
2216         if (!result)
2217                 return 0;
2218
2219         parent_request->copyup_pages = NULL;
2220         parent_request->obj_request = NULL;
2221         rbd_obj_request_put(obj_request);
2222 out_err:
2223         if (pages)
2224                 ceph_release_page_vector(pages, page_count);
2225         if (parent_request)
2226                 rbd_img_request_put(parent_request);
2227         obj_request->result = result;
2228         obj_request->xferred = 0;
2229         obj_request_done_set(obj_request);
2230
2231         return result;
2232 }
2233
2234 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2235 {
2236         struct rbd_obj_request *orig_request;
2237         int result;
2238
2239         rbd_assert(!obj_request_img_data_test(obj_request));
2240
2241         /*
2242          * All we need from the object request is the original
2243          * request and the result of the STAT op.  Grab those, then
2244          * we're done with the request.
2245          */
2246         orig_request = obj_request->obj_request;
2247         obj_request->obj_request = NULL;
2248         rbd_assert(orig_request);
2249         rbd_assert(orig_request->img_request);
2250
2251         result = obj_request->result;
2252         obj_request->result = 0;
2253
2254         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2255                 obj_request, orig_request, result,
2256                 obj_request->xferred, obj_request->length);
2257         rbd_obj_request_put(obj_request);
2258
2259         rbd_assert(orig_request);
2260         rbd_assert(orig_request->img_request);
2261
2262         /*
2263          * Our only purpose here is to determine whether the object
2264          * exists, and we don't want to treat the non-existence as
2265          * an error.  If something else comes back, transfer the
2266          * error to the original request and complete it now.
2267          */
2268         if (!result) {
2269                 obj_request_existence_set(orig_request, true);
2270         } else if (result == -ENOENT) {
2271                 obj_request_existence_set(orig_request, false);
2272         } else if (result) {
2273                 orig_request->result = result;
2274                 goto out;
2275         }
2276
2277         /*
2278          * Resubmit the original request now that we have recorded
2279          * whether the target object exists.
2280          */
2281         orig_request->result = rbd_img_obj_request_submit(orig_request);
2282 out:
2283         if (orig_request->result)
2284                 rbd_obj_request_complete(orig_request);
2285         rbd_obj_request_put(orig_request);
2286 }
2287
2288 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2289 {
2290         struct rbd_obj_request *stat_request;
2291         struct rbd_device *rbd_dev;
2292         struct ceph_osd_client *osdc;
2293         struct page **pages = NULL;
2294         u32 page_count;
2295         size_t size;
2296         int ret;
2297
2298         /*
2299          * The response data for a STAT call consists of:
2300          *     le64 length;
2301          *     struct {
2302          *         le32 tv_sec;
2303          *         le32 tv_nsec;
2304          *     } mtime;
2305          */
2306         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2307         page_count = (u32)calc_pages_for(0, size);
2308         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2309         if (IS_ERR(pages))
2310                 return PTR_ERR(pages);
2311
2312         ret = -ENOMEM;
2313         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2314                                                         OBJ_REQUEST_PAGES);
2315         if (!stat_request)
2316                 goto out;
2317
2318         rbd_obj_request_get(obj_request);
2319         stat_request->obj_request = obj_request;
2320         stat_request->pages = pages;
2321         stat_request->page_count = page_count;
2322
2323         rbd_assert(obj_request->img_request);
2324         rbd_dev = obj_request->img_request->rbd_dev;
2325         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2326                                                 stat_request);
2327         if (!stat_request->osd_req)
2328                 goto out;
2329         stat_request->callback = rbd_img_obj_exists_callback;
2330
2331         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2332         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2333                                         false, false);
2334         rbd_osd_req_format_read(stat_request);
2335
2336         osdc = &rbd_dev->rbd_client->client->osdc;
2337         ret = rbd_obj_request_submit(osdc, stat_request);
2338 out:
2339         if (ret)
2340                 rbd_obj_request_put(obj_request);
2341
2342         return ret;
2343 }
2344
2345 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2346 {
2347         struct rbd_img_request *img_request;
2348         struct rbd_device *rbd_dev;
2349         bool known;
2350
2351         rbd_assert(obj_request_img_data_test(obj_request));
2352
2353         img_request = obj_request->img_request;
2354         rbd_assert(img_request);
2355         rbd_dev = img_request->rbd_dev;
2356
2357         /*
2358          * Only writes to layered images need special handling.
2359          * Reads and non-layered writes are simple object requests.
2360          * Layered writes that start beyond the end of the overlap
2361          * with the parent have no parent data, so they too are
2362          * simple object requests.  Finally, if the target object is
2363          * known to already exist, its parent data has already been
2364          * copied, so a write to the object can also be handled as a
2365          * simple object request.
2366          */
2367         if (!img_request_write_test(img_request) ||
2368                 !img_request_layered_test(img_request) ||
2369                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2370                 ((known = obj_request_known_test(obj_request)) &&
2371                         obj_request_exists_test(obj_request))) {
2372
2373                 struct rbd_device *rbd_dev;
2374                 struct ceph_osd_client *osdc;
2375
2376                 rbd_dev = obj_request->img_request->rbd_dev;
2377                 osdc = &rbd_dev->rbd_client->client->osdc;
2378
2379                 return rbd_obj_request_submit(osdc, obj_request);
2380         }
2381
2382         /*
2383          * It's a layered write.  The target object might exist but
2384          * we may not know that yet.  If we know it doesn't exist,
2385          * start by reading the data for the full target object from
2386          * the parent so we can use it for a copyup to the target.
2387          */
2388         if (known)
2389                 return rbd_img_obj_parent_read_full(obj_request);
2390
2391         /* We don't know whether the target exists.  Go find out. */
2392
2393         return rbd_img_obj_exists_submit(obj_request);
2394 }
2395
2396 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2397 {
2398         struct rbd_obj_request *obj_request;
2399         struct rbd_obj_request *next_obj_request;
2400
2401         dout("%s: img %p\n", __func__, img_request);
2402         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2403                 int ret;
2404
2405                 ret = rbd_img_obj_request_submit(obj_request);
2406                 if (ret)
2407                         return ret;
2408         }
2409
2410         return 0;
2411 }
2412
2413 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2414 {
2415         struct rbd_obj_request *obj_request;
2416         struct rbd_device *rbd_dev;
2417         u64 obj_end;
2418
2419         rbd_assert(img_request_child_test(img_request));
2420
2421         obj_request = img_request->obj_request;
2422         rbd_assert(obj_request);
2423         rbd_assert(obj_request->img_request);
2424
2425         obj_request->result = img_request->result;
2426         if (obj_request->result)
2427                 goto out;
2428
2429         /*
2430          * We need to zero anything beyond the parent overlap
2431          * boundary.  Since rbd_img_obj_request_read_callback()
2432          * will zero anything beyond the end of a short read, an
2433          * easy way to do this is to pretend the data from the
2434          * parent came up short--ending at the overlap boundary.
2435          */
2436         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2437         obj_end = obj_request->img_offset + obj_request->length;
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         if (obj_end > rbd_dev->parent_overlap) {
2440                 u64 xferred = 0;
2441
2442                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2443                         xferred = rbd_dev->parent_overlap -
2444                                         obj_request->img_offset;
2445
2446                 obj_request->xferred = min(img_request->xferred, xferred);
2447         } else {
2448                 obj_request->xferred = img_request->xferred;
2449         }
2450 out:
2451         rbd_img_obj_request_read_callback(obj_request);
2452         rbd_obj_request_complete(obj_request);
2453 }
2454
2455 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2456 {
2457         struct rbd_device *rbd_dev;
2458         struct rbd_img_request *img_request;
2459         int result;
2460
2461         rbd_assert(obj_request_img_data_test(obj_request));
2462         rbd_assert(obj_request->img_request != NULL);
2463         rbd_assert(obj_request->result == (s32) -ENOENT);
2464         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2465
2466         rbd_dev = obj_request->img_request->rbd_dev;
2467         rbd_assert(rbd_dev->parent != NULL);
2468         /* rbd_read_finish(obj_request, obj_request->length); */
2469         img_request = rbd_img_request_create(rbd_dev->parent,
2470                                                 obj_request->img_offset,
2471                                                 obj_request->length,
2472                                                 false, true);
2473         result = -ENOMEM;
2474         if (!img_request)
2475                 goto out_err;
2476
2477         rbd_obj_request_get(obj_request);
2478         img_request->obj_request = obj_request;
2479
2480         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2481                                         obj_request->bio_list);
2482         if (result)
2483                 goto out_err;
2484
2485         img_request->callback = rbd_img_parent_read_callback;
2486         result = rbd_img_request_submit(img_request);
2487         if (result)
2488                 goto out_err;
2489
2490         return;
2491 out_err:
2492         if (img_request)
2493                 rbd_img_request_put(img_request);
2494         obj_request->result = result;
2495         obj_request->xferred = 0;
2496         obj_request_done_set(obj_request);
2497 }
2498
2499 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2500                                    u64 ver, u64 notify_id)
2501 {
2502         struct rbd_obj_request *obj_request;
2503         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2504         int ret;
2505
2506         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2507                                                         OBJ_REQUEST_NODATA);
2508         if (!obj_request)
2509                 return -ENOMEM;
2510
2511         ret = -ENOMEM;
2512         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2513         if (!obj_request->osd_req)
2514                 goto out;
2515         obj_request->callback = rbd_obj_request_put;
2516
2517         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2518                                         notify_id, ver, 0);
2519         rbd_osd_req_format_read(obj_request);
2520
2521         ret = rbd_obj_request_submit(osdc, obj_request);
2522 out:
2523         if (ret)
2524                 rbd_obj_request_put(obj_request);
2525
2526         return ret;
2527 }
2528
2529 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2530 {
2531         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2532         u64 hver;
2533
2534         if (!rbd_dev)
2535                 return;
2536
2537         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2538                 rbd_dev->header_name, (unsigned long long) notify_id,
2539                 (unsigned int) opcode);
2540         (void)rbd_dev_refresh(rbd_dev, &hver);
2541
2542         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2543 }
2544
2545 /*
2546  * Request sync osd watch/unwatch.  The value of "start" determines
2547  * whether a watch request is being initiated or torn down.
2548  */
2549 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2550 {
2551         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2552         struct rbd_obj_request *obj_request;
2553         int ret;
2554
2555         rbd_assert(start ^ !!rbd_dev->watch_event);
2556         rbd_assert(start ^ !!rbd_dev->watch_request);
2557
2558         if (start) {
2559                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2560                                                 &rbd_dev->watch_event);
2561                 if (ret < 0)
2562                         return ret;
2563                 rbd_assert(rbd_dev->watch_event != NULL);
2564         }
2565
2566         ret = -ENOMEM;
2567         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2568                                                         OBJ_REQUEST_NODATA);
2569         if (!obj_request)
2570                 goto out_cancel;
2571
2572         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2573         if (!obj_request->osd_req)
2574                 goto out_cancel;
2575
2576         if (start)
2577                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2578         else
2579                 ceph_osdc_unregister_linger_request(osdc,
2580                                         rbd_dev->watch_request->osd_req);
2581
2582         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2583                                 rbd_dev->watch_event->cookie,
2584                                 rbd_dev->header.obj_version, start);
2585         rbd_osd_req_format_write(obj_request);
2586
2587         ret = rbd_obj_request_submit(osdc, obj_request);
2588         if (ret)
2589                 goto out_cancel;
2590         ret = rbd_obj_request_wait(obj_request);
2591         if (ret)
2592                 goto out_cancel;
2593         ret = obj_request->result;
2594         if (ret)
2595                 goto out_cancel;
2596
2597         /*
2598          * A watch request is set to linger, so the underlying osd
2599          * request won't go away until we unregister it.  We retain
2600          * a pointer to the object request during that time (in
2601          * rbd_dev->watch_request), so we'll keep a reference to
2602          * it.  We'll drop that reference (below) after we've
2603          * unregistered it.
2604          */
2605         if (start) {
2606                 rbd_dev->watch_request = obj_request;
2607
2608                 return 0;
2609         }
2610
2611         /* We have successfully torn down the watch request */
2612
2613         rbd_obj_request_put(rbd_dev->watch_request);
2614         rbd_dev->watch_request = NULL;
2615 out_cancel:
2616         /* Cancel the event if we're tearing down, or on error */
2617         ceph_osdc_cancel_event(rbd_dev->watch_event);
2618         rbd_dev->watch_event = NULL;
2619         if (obj_request)
2620                 rbd_obj_request_put(obj_request);
2621
2622         return ret;
2623 }
2624
2625 /*
2626  * Synchronous osd object method call.  Returns the number of bytes
2627  * returned in the outbound buffer, or a negative error code.
2628  */
2629 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2630                              const char *object_name,
2631                              const char *class_name,
2632                              const char *method_name,
2633                              const void *outbound,
2634                              size_t outbound_size,
2635                              void *inbound,
2636                              size_t inbound_size,
2637                              u64 *version)
2638 {
2639         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2640         struct rbd_obj_request *obj_request;
2641         struct page **pages;
2642         u32 page_count;
2643         int ret;
2644
2645         /*
2646          * Method calls are ultimately read operations.  The result
2647          * should placed into the inbound buffer provided.  They
2648          * also supply outbound data--parameters for the object
2649          * method.  Currently if this is present it will be a
2650          * snapshot id.
2651          */
2652         page_count = (u32)calc_pages_for(0, inbound_size);
2653         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2654         if (IS_ERR(pages))
2655                 return PTR_ERR(pages);
2656
2657         ret = -ENOMEM;
2658         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2659                                                         OBJ_REQUEST_PAGES);
2660         if (!obj_request)
2661                 goto out;
2662
2663         obj_request->pages = pages;
2664         obj_request->page_count = page_count;
2665
2666         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2667         if (!obj_request->osd_req)
2668                 goto out;
2669
2670         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2671                                         class_name, method_name);
2672         if (outbound_size) {
2673                 struct ceph_pagelist *pagelist;
2674
2675                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2676                 if (!pagelist)
2677                         goto out;
2678
2679                 ceph_pagelist_init(pagelist);
2680                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2681                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2682                                                 pagelist);
2683         }
2684         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2685                                         obj_request->pages, inbound_size,
2686                                         0, false, false);
2687         rbd_osd_req_format_read(obj_request);
2688
2689         ret = rbd_obj_request_submit(osdc, obj_request);
2690         if (ret)
2691                 goto out;
2692         ret = rbd_obj_request_wait(obj_request);
2693         if (ret)
2694                 goto out;
2695
2696         ret = obj_request->result;
2697         if (ret < 0)
2698                 goto out;
2699
2700         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2701         ret = (int)obj_request->xferred;
2702         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2703         if (version)
2704                 *version = obj_request->version;
2705 out:
2706         if (obj_request)
2707                 rbd_obj_request_put(obj_request);
2708         else
2709                 ceph_release_page_vector(pages, page_count);
2710
2711         return ret;
2712 }
2713
2714 static void rbd_request_fn(struct request_queue *q)
2715                 __releases(q->queue_lock) __acquires(q->queue_lock)
2716 {
2717         struct rbd_device *rbd_dev = q->queuedata;
2718         bool read_only = rbd_dev->mapping.read_only;
2719         struct request *rq;
2720         int result;
2721
2722         while ((rq = blk_fetch_request(q))) {
2723                 bool write_request = rq_data_dir(rq) == WRITE;
2724                 struct rbd_img_request *img_request;
2725                 u64 offset;
2726                 u64 length;
2727
2728                 /* Ignore any non-FS requests that filter through. */
2729
2730                 if (rq->cmd_type != REQ_TYPE_FS) {
2731                         dout("%s: non-fs request type %d\n", __func__,
2732                                 (int) rq->cmd_type);
2733                         __blk_end_request_all(rq, 0);
2734                         continue;
2735                 }
2736
2737                 /* Ignore/skip any zero-length requests */
2738
2739                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2740                 length = (u64) blk_rq_bytes(rq);
2741
2742                 if (!length) {
2743                         dout("%s: zero-length request\n", __func__);
2744                         __blk_end_request_all(rq, 0);
2745                         continue;
2746                 }
2747
2748                 spin_unlock_irq(q->queue_lock);
2749
2750                 /* Disallow writes to a read-only device */
2751
2752                 if (write_request) {
2753                         result = -EROFS;
2754                         if (read_only)
2755                                 goto end_request;
2756                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2757                 }
2758
2759                 /*
2760                  * Quit early if the mapped snapshot no longer
2761                  * exists.  It's still possible the snapshot will
2762                  * have disappeared by the time our request arrives
2763                  * at the osd, but there's no sense in sending it if
2764                  * we already know.
2765                  */
2766                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2767                         dout("request for non-existent snapshot");
2768                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2769                         result = -ENXIO;
2770                         goto end_request;
2771                 }
2772
2773                 result = -EINVAL;
2774                 if (offset && length > U64_MAX - offset + 1) {
2775                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2776                                 offset, length);
2777                         goto end_request;       /* Shouldn't happen */
2778                 }
2779
2780                 result = -ENOMEM;
2781                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2782                                                         write_request, false);
2783                 if (!img_request)
2784                         goto end_request;
2785
2786                 img_request->rq = rq;
2787
2788                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2789                                                 rq->bio);
2790                 if (!result)
2791                         result = rbd_img_request_submit(img_request);
2792                 if (result)
2793                         rbd_img_request_put(img_request);
2794 end_request:
2795                 spin_lock_irq(q->queue_lock);
2796                 if (result < 0) {
2797                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2798                                 write_request ? "write" : "read",
2799                                 length, offset, result);
2800
2801                         __blk_end_request_all(rq, result);
2802                 }
2803         }
2804 }
2805
2806 /*
2807  * a queue callback. Makes sure that we don't create a bio that spans across
2808  * multiple osd objects. One exception would be with a single page bios,
2809  * which we handle later at bio_chain_clone_range()
2810  */
2811 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2812                           struct bio_vec *bvec)
2813 {
2814         struct rbd_device *rbd_dev = q->queuedata;
2815         sector_t sector_offset;
2816         sector_t sectors_per_obj;
2817         sector_t obj_sector_offset;
2818         int ret;
2819
2820         /*
2821          * Find how far into its rbd object the partition-relative
2822          * bio start sector is to offset relative to the enclosing
2823          * device.
2824          */
2825         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2826         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2827         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2828
2829         /*
2830          * Compute the number of bytes from that offset to the end
2831          * of the object.  Account for what's already used by the bio.
2832          */
2833         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2834         if (ret > bmd->bi_size)
2835                 ret -= bmd->bi_size;
2836         else
2837                 ret = 0;
2838
2839         /*
2840          * Don't send back more than was asked for.  And if the bio
2841          * was empty, let the whole thing through because:  "Note
2842          * that a block device *must* allow a single page to be
2843          * added to an empty bio."
2844          */
2845         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2846         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2847                 ret = (int) bvec->bv_len;
2848
2849         return ret;
2850 }
2851
2852 static void rbd_free_disk(struct rbd_device *rbd_dev)
2853 {
2854         struct gendisk *disk = rbd_dev->disk;
2855
2856         if (!disk)
2857                 return;
2858
2859         rbd_dev->disk = NULL;
2860         if (disk->flags & GENHD_FL_UP) {
2861                 del_gendisk(disk);
2862                 if (disk->queue)
2863                         blk_cleanup_queue(disk->queue);
2864         }
2865         put_disk(disk);
2866 }
2867
2868 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2869                                 const char *object_name,
2870                                 u64 offset, u64 length,
2871                                 void *buf, u64 *version)
2872
2873 {
2874         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2875         struct rbd_obj_request *obj_request;
2876         struct page **pages = NULL;
2877         u32 page_count;
2878         size_t size;
2879         int ret;
2880
2881         page_count = (u32) calc_pages_for(offset, length);
2882         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2883         if (IS_ERR(pages))
2884                 ret = PTR_ERR(pages);
2885
2886         ret = -ENOMEM;
2887         obj_request = rbd_obj_request_create(object_name, offset, length,
2888                                                         OBJ_REQUEST_PAGES);
2889         if (!obj_request)
2890                 goto out;
2891
2892         obj_request->pages = pages;
2893         obj_request->page_count = page_count;
2894
2895         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2896         if (!obj_request->osd_req)
2897                 goto out;
2898
2899         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2900                                         offset, length, 0, 0);
2901         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2902                                         obj_request->pages,
2903                                         obj_request->length,
2904                                         obj_request->offset & ~PAGE_MASK,
2905                                         false, false);
2906         rbd_osd_req_format_read(obj_request);
2907
2908         ret = rbd_obj_request_submit(osdc, obj_request);
2909         if (ret)
2910                 goto out;
2911         ret = rbd_obj_request_wait(obj_request);
2912         if (ret)
2913                 goto out;
2914
2915         ret = obj_request->result;
2916         if (ret < 0)
2917                 goto out;
2918
2919         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2920         size = (size_t) obj_request->xferred;
2921         ceph_copy_from_page_vector(pages, buf, 0, size);
2922         rbd_assert(size <= (size_t) INT_MAX);
2923         ret = (int) size;
2924         if (version)
2925                 *version = obj_request->version;
2926 out:
2927         if (obj_request)
2928                 rbd_obj_request_put(obj_request);
2929         else
2930                 ceph_release_page_vector(pages, page_count);
2931
2932         return ret;
2933 }
2934
2935 /*
2936  * Read the complete header for the given rbd device.
2937  *
2938  * Returns a pointer to a dynamically-allocated buffer containing
2939  * the complete and validated header.  Caller can pass the address
2940  * of a variable that will be filled in with the version of the
2941  * header object at the time it was read.
2942  *
2943  * Returns a pointer-coded errno if a failure occurs.
2944  */
2945 static struct rbd_image_header_ondisk *
2946 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2947 {
2948         struct rbd_image_header_ondisk *ondisk = NULL;
2949         u32 snap_count = 0;
2950         u64 names_size = 0;
2951         u32 want_count;
2952         int ret;
2953
2954         /*
2955          * The complete header will include an array of its 64-bit
2956          * snapshot ids, followed by the names of those snapshots as
2957          * a contiguous block of NUL-terminated strings.  Note that
2958          * the number of snapshots could change by the time we read
2959          * it in, in which case we re-read it.
2960          */
2961         do {
2962                 size_t size;
2963
2964                 kfree(ondisk);
2965
2966                 size = sizeof (*ondisk);
2967                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2968                 size += names_size;
2969                 ondisk = kmalloc(size, GFP_KERNEL);
2970                 if (!ondisk)
2971                         return ERR_PTR(-ENOMEM);
2972
2973                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2974                                        0, size, ondisk, version);
2975                 if (ret < 0)
2976                         goto out_err;
2977                 if ((size_t)ret < size) {
2978                         ret = -ENXIO;
2979                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2980                                 size, ret);
2981                         goto out_err;
2982                 }
2983                 if (!rbd_dev_ondisk_valid(ondisk)) {
2984                         ret = -ENXIO;
2985                         rbd_warn(rbd_dev, "invalid header");
2986                         goto out_err;
2987                 }
2988
2989                 names_size = le64_to_cpu(ondisk->snap_names_len);
2990                 want_count = snap_count;
2991                 snap_count = le32_to_cpu(ondisk->snap_count);
2992         } while (snap_count != want_count);
2993
2994         return ondisk;
2995
2996 out_err:
2997         kfree(ondisk);
2998
2999         return ERR_PTR(ret);
3000 }
3001
3002 /*
3003  * reload the ondisk the header
3004  */
3005 static int rbd_read_header(struct rbd_device *rbd_dev,
3006                            struct rbd_image_header *header)
3007 {
3008         struct rbd_image_header_ondisk *ondisk;
3009         u64 ver = 0;
3010         int ret;
3011
3012         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3013         if (IS_ERR(ondisk))
3014                 return PTR_ERR(ondisk);
3015         ret = rbd_header_from_disk(header, ondisk);
3016         if (ret >= 0)
3017                 header->obj_version = ver;
3018         kfree(ondisk);
3019
3020         return ret;
3021 }
3022
3023 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3024 {
3025         struct rbd_snap *snap;
3026         struct rbd_snap *next;
3027
3028         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3029                 list_del(&snap->node);
3030                 rbd_snap_destroy(snap);
3031         }
3032 }
3033
3034 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3035 {
3036         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3037                 return;
3038
3039         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3040                 sector_t size;
3041
3042                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3043                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3044                 dout("setting size to %llu sectors", (unsigned long long)size);
3045                 set_capacity(rbd_dev->disk, size);
3046         }
3047 }
3048
3049 /*
3050  * only read the first part of the ondisk header, without the snaps info
3051  */
3052 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3053 {
3054         int ret;
3055         struct rbd_image_header h;
3056
3057         ret = rbd_read_header(rbd_dev, &h);
3058         if (ret < 0)
3059                 return ret;
3060
3061         down_write(&rbd_dev->header_rwsem);
3062
3063         /* Update image size, and check for resize of mapped image */
3064         rbd_dev->header.image_size = h.image_size;
3065         rbd_update_mapping_size(rbd_dev);
3066
3067         /* rbd_dev->header.object_prefix shouldn't change */
3068         kfree(rbd_dev->header.snap_sizes);
3069         kfree(rbd_dev->header.snap_names);
3070         /* osd requests may still refer to snapc */
3071         rbd_snap_context_put(rbd_dev->header.snapc);
3072
3073         if (hver)
3074                 *hver = h.obj_version;
3075         rbd_dev->header.obj_version = h.obj_version;
3076         rbd_dev->header.image_size = h.image_size;
3077         rbd_dev->header.snapc = h.snapc;
3078         rbd_dev->header.snap_names = h.snap_names;
3079         rbd_dev->header.snap_sizes = h.snap_sizes;
3080         /* Free the extra copy of the object prefix */
3081         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3082                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3083         kfree(h.object_prefix);
3084
3085         ret = rbd_dev_snaps_update(rbd_dev);
3086
3087         up_write(&rbd_dev->header_rwsem);
3088
3089         return ret;
3090 }
3091
3092 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3093 {
3094         int ret;
3095
3096         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3097         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3098         if (rbd_dev->image_format == 1)
3099                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3100         else
3101                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3102         mutex_unlock(&ctl_mutex);
3103         revalidate_disk(rbd_dev->disk);
3104         if (ret)
3105                 rbd_warn(rbd_dev, "got notification but failed to "
3106                            " update snaps: %d\n", ret);
3107
3108         return ret;
3109 }
3110
3111 static int rbd_init_disk(struct rbd_device *rbd_dev)
3112 {
3113         struct gendisk *disk;
3114         struct request_queue *q;
3115         u64 segment_size;
3116
3117         /* create gendisk info */
3118         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3119         if (!disk)
3120                 return -ENOMEM;
3121
3122         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3123                  rbd_dev->dev_id);
3124         disk->major = rbd_dev->major;
3125         disk->first_minor = 0;
3126         disk->fops = &rbd_bd_ops;
3127         disk->private_data = rbd_dev;
3128
3129         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3130         if (!q)
3131                 goto out_disk;
3132
3133         /* We use the default size, but let's be explicit about it. */
3134         blk_queue_physical_block_size(q, SECTOR_SIZE);
3135
3136         /* set io sizes to object size */
3137         segment_size = rbd_obj_bytes(&rbd_dev->header);
3138         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3139         blk_queue_max_segment_size(q, segment_size);
3140         blk_queue_io_min(q, segment_size);
3141         blk_queue_io_opt(q, segment_size);
3142
3143         blk_queue_merge_bvec(q, rbd_merge_bvec);
3144         disk->queue = q;
3145
3146         q->queuedata = rbd_dev;
3147
3148         rbd_dev->disk = disk;
3149
3150         return 0;
3151 out_disk:
3152         put_disk(disk);
3153
3154         return -ENOMEM;
3155 }
3156
3157 /*
3158   sysfs
3159 */
3160
3161 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3162 {
3163         return container_of(dev, struct rbd_device, dev);
3164 }
3165
3166 static ssize_t rbd_size_show(struct device *dev,
3167                              struct device_attribute *attr, char *buf)
3168 {
3169         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3170
3171         return sprintf(buf, "%llu\n",
3172                 (unsigned long long)rbd_dev->mapping.size);
3173 }
3174
3175 /*
3176  * Note this shows the features for whatever's mapped, which is not
3177  * necessarily the base image.
3178  */
3179 static ssize_t rbd_features_show(struct device *dev,
3180                              struct device_attribute *attr, char *buf)
3181 {
3182         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3183
3184         return sprintf(buf, "0x%016llx\n",
3185                         (unsigned long long)rbd_dev->mapping.features);
3186 }
3187
3188 static ssize_t rbd_major_show(struct device *dev,
3189                               struct device_attribute *attr, char *buf)
3190 {
3191         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3192
3193         if (rbd_dev->major)
3194                 return sprintf(buf, "%d\n", rbd_dev->major);
3195
3196         return sprintf(buf, "(none)\n");
3197
3198 }
3199
3200 static ssize_t rbd_client_id_show(struct device *dev,
3201                                   struct device_attribute *attr, char *buf)
3202 {
3203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205         return sprintf(buf, "client%lld\n",
3206                         ceph_client_id(rbd_dev->rbd_client->client));
3207 }
3208
3209 static ssize_t rbd_pool_show(struct device *dev,
3210                              struct device_attribute *attr, char *buf)
3211 {
3212         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3213
3214         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3215 }
3216
3217 static ssize_t rbd_pool_id_show(struct device *dev,
3218                              struct device_attribute *attr, char *buf)
3219 {
3220         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3221
3222         return sprintf(buf, "%llu\n",
3223                         (unsigned long long) rbd_dev->spec->pool_id);
3224 }
3225
3226 static ssize_t rbd_name_show(struct device *dev,
3227                              struct device_attribute *attr, char *buf)
3228 {
3229         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3230
3231         if (rbd_dev->spec->image_name)
3232                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3233
3234         return sprintf(buf, "(unknown)\n");
3235 }
3236
3237 static ssize_t rbd_image_id_show(struct device *dev,
3238                              struct device_attribute *attr, char *buf)
3239 {
3240         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3241
3242         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3243 }
3244
3245 /*
3246  * Shows the name of the currently-mapped snapshot (or
3247  * RBD_SNAP_HEAD_NAME for the base image).
3248  */
3249 static ssize_t rbd_snap_show(struct device *dev,
3250                              struct device_attribute *attr,
3251                              char *buf)
3252 {
3253         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3254
3255         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3256 }
3257
3258 /*
3259  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3260  * for the parent image.  If there is no parent, simply shows
3261  * "(no parent image)".
3262  */
3263 static ssize_t rbd_parent_show(struct device *dev,
3264                              struct device_attribute *attr,
3265                              char *buf)
3266 {
3267         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268         struct rbd_spec *spec = rbd_dev->parent_spec;
3269         int count;
3270         char *bufp = buf;
3271
3272         if (!spec)
3273                 return sprintf(buf, "(no parent image)\n");
3274
3275         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3276                         (unsigned long long) spec->pool_id, spec->pool_name);
3277         if (count < 0)
3278                 return count;
3279         bufp += count;
3280
3281         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3282                         spec->image_name ? spec->image_name : "(unknown)");
3283         if (count < 0)
3284                 return count;
3285         bufp += count;
3286
3287         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3288                         (unsigned long long) spec->snap_id, spec->snap_name);
3289         if (count < 0)
3290                 return count;
3291         bufp += count;
3292
3293         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3294         if (count < 0)
3295                 return count;
3296         bufp += count;
3297
3298         return (ssize_t) (bufp - buf);
3299 }
3300
3301 static ssize_t rbd_image_refresh(struct device *dev,
3302                                  struct device_attribute *attr,
3303                                  const char *buf,
3304                                  size_t size)
3305 {
3306         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3307         int ret;
3308
3309         ret = rbd_dev_refresh(rbd_dev, NULL);
3310
3311         return ret < 0 ? ret : size;
3312 }
3313
3314 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3315 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3316 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3317 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3318 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3319 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3320 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3321 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3322 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3323 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3324 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3325
3326 static struct attribute *rbd_attrs[] = {
3327         &dev_attr_size.attr,
3328         &dev_attr_features.attr,
3329         &dev_attr_major.attr,
3330         &dev_attr_client_id.attr,
3331         &dev_attr_pool.attr,
3332         &dev_attr_pool_id.attr,
3333         &dev_attr_name.attr,
3334         &dev_attr_image_id.attr,
3335         &dev_attr_current_snap.attr,
3336         &dev_attr_parent.attr,
3337         &dev_attr_refresh.attr,
3338         NULL
3339 };
3340
3341 static struct attribute_group rbd_attr_group = {
3342         .attrs = rbd_attrs,
3343 };
3344
3345 static const struct attribute_group *rbd_attr_groups[] = {
3346         &rbd_attr_group,
3347         NULL
3348 };
3349
3350 static void rbd_sysfs_dev_release(struct device *dev)
3351 {
3352 }
3353
3354 static struct device_type rbd_device_type = {
3355         .name           = "rbd",
3356         .groups         = rbd_attr_groups,
3357         .release        = rbd_sysfs_dev_release,
3358 };
3359
3360 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3361 {
3362         kref_get(&spec->kref);
3363
3364         return spec;
3365 }
3366
3367 static void rbd_spec_free(struct kref *kref);
3368 static void rbd_spec_put(struct rbd_spec *spec)
3369 {
3370         if (spec)
3371                 kref_put(&spec->kref, rbd_spec_free);
3372 }
3373
3374 static struct rbd_spec *rbd_spec_alloc(void)
3375 {
3376         struct rbd_spec *spec;
3377
3378         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3379         if (!spec)
3380                 return NULL;
3381         kref_init(&spec->kref);
3382
3383         return spec;
3384 }
3385
3386 static void rbd_spec_free(struct kref *kref)
3387 {
3388         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3389
3390         kfree(spec->pool_name);
3391         kfree(spec->image_id);
3392         kfree(spec->image_name);
3393         kfree(spec->snap_name);
3394         kfree(spec);
3395 }
3396
3397 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3398                                 struct rbd_spec *spec)
3399 {
3400         struct rbd_device *rbd_dev;
3401
3402         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3403         if (!rbd_dev)
3404                 return NULL;
3405
3406         spin_lock_init(&rbd_dev->lock);
3407         rbd_dev->flags = 0;
3408         INIT_LIST_HEAD(&rbd_dev->node);
3409         INIT_LIST_HEAD(&rbd_dev->snaps);
3410         init_rwsem(&rbd_dev->header_rwsem);
3411
3412         rbd_dev->spec = spec;
3413         rbd_dev->rbd_client = rbdc;
3414
3415         /* Initialize the layout used for all rbd requests */
3416
3417         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3418         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3419         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3420         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3421
3422         return rbd_dev;
3423 }
3424
3425 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3426 {
3427         rbd_put_client(rbd_dev->rbd_client);
3428         rbd_spec_put(rbd_dev->spec);
3429         kfree(rbd_dev);
3430 }
3431
3432 static void rbd_snap_destroy(struct rbd_snap *snap)
3433 {
3434         kfree(snap->name);
3435         kfree(snap);
3436 }
3437
3438 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3439                                                 const char *snap_name,
3440                                                 u64 snap_id, u64 snap_size,
3441                                                 u64 snap_features)
3442 {
3443         struct rbd_snap *snap;
3444
3445         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3446         if (!snap)
3447                 return ERR_PTR(-ENOMEM);
3448
3449         snap->name = snap_name;
3450         snap->id = snap_id;
3451         snap->size = snap_size;
3452         snap->features = snap_features;
3453
3454         return snap;
3455 }
3456
3457 /*
3458  * Returns a dynamically-allocated snapshot name if successful, or a
3459  * pointer-coded error otherwise.
3460  */
3461 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3462                 u64 *snap_size, u64 *snap_features)
3463 {
3464         char *snap_name;
3465         int i;
3466
3467         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3468
3469         /* Skip over names until we find the one we are looking for */
3470
3471         snap_name = rbd_dev->header.snap_names;
3472         for (i = 0; i < which; i++)
3473                 snap_name += strlen(snap_name) + 1;
3474
3475         snap_name = kstrdup(snap_name, GFP_KERNEL);
3476         if (!snap_name)
3477                 return ERR_PTR(-ENOMEM);
3478
3479         *snap_size = rbd_dev->header.snap_sizes[which];
3480         *snap_features = 0;     /* No features for v1 */
3481
3482         return snap_name;
3483 }
3484
3485 /*
3486  * Get the size and object order for an image snapshot, or if
3487  * snap_id is CEPH_NOSNAP, gets this information for the base
3488  * image.
3489  */
3490 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3491                                 u8 *order, u64 *snap_size)
3492 {
3493         __le64 snapid = cpu_to_le64(snap_id);
3494         int ret;
3495         struct {
3496                 u8 order;
3497                 __le64 size;
3498         } __attribute__ ((packed)) size_buf = { 0 };
3499
3500         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3501                                 "rbd", "get_size",
3502                                 &snapid, sizeof (snapid),
3503                                 &size_buf, sizeof (size_buf), NULL);
3504         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3505         if (ret < 0)
3506                 return ret;
3507         if (ret < sizeof (size_buf))
3508                 return -ERANGE;
3509
3510         if (order)
3511                 *order = size_buf.order;
3512         *snap_size = le64_to_cpu(size_buf.size);
3513
3514         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3515                 (unsigned long long)snap_id, (unsigned int)*order,
3516                 (unsigned long long)*snap_size);
3517
3518         return 0;
3519 }
3520
3521 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3522 {
3523         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3524                                         &rbd_dev->header.obj_order,
3525                                         &rbd_dev->header.image_size);
3526 }
3527
3528 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3529 {
3530         void *reply_buf;
3531         int ret;
3532         void *p;
3533
3534         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3535         if (!reply_buf)
3536                 return -ENOMEM;
3537
3538         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3539                                 "rbd", "get_object_prefix", NULL, 0,
3540                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3541         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3542         if (ret < 0)
3543                 goto out;
3544
3545         p = reply_buf;
3546         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3547                                                 p + ret, NULL, GFP_NOIO);
3548         ret = 0;
3549
3550         if (IS_ERR(rbd_dev->header.object_prefix)) {
3551                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3552                 rbd_dev->header.object_prefix = NULL;
3553         } else {
3554                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3555         }
3556 out:
3557         kfree(reply_buf);
3558
3559         return ret;
3560 }
3561
3562 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3563                 u64 *snap_features)
3564 {
3565         __le64 snapid = cpu_to_le64(snap_id);
3566         struct {
3567                 __le64 features;
3568                 __le64 incompat;
3569         } __attribute__ ((packed)) features_buf = { 0 };
3570         u64 incompat;
3571         int ret;
3572
3573         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3574                                 "rbd", "get_features",
3575                                 &snapid, sizeof (snapid),
3576                                 &features_buf, sizeof (features_buf), NULL);
3577         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3578         if (ret < 0)
3579                 return ret;
3580         if (ret < sizeof (features_buf))
3581                 return -ERANGE;
3582
3583         incompat = le64_to_cpu(features_buf.incompat);
3584         if (incompat & ~RBD_FEATURES_SUPPORTED)
3585                 return -ENXIO;
3586
3587         *snap_features = le64_to_cpu(features_buf.features);
3588
3589         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3590                 (unsigned long long)snap_id,
3591                 (unsigned long long)*snap_features,
3592                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3593
3594         return 0;
3595 }
3596
3597 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3598 {
3599         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3600                                                 &rbd_dev->header.features);
3601 }
3602
3603 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3604 {
3605         struct rbd_spec *parent_spec;
3606         size_t size;
3607         void *reply_buf = NULL;
3608         __le64 snapid;
3609         void *p;
3610         void *end;
3611         char *image_id;
3612         u64 overlap;
3613         int ret;
3614
3615         parent_spec = rbd_spec_alloc();
3616         if (!parent_spec)
3617                 return -ENOMEM;
3618
3619         size = sizeof (__le64) +                                /* pool_id */
3620                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3621                 sizeof (__le64) +                               /* snap_id */
3622                 sizeof (__le64);                                /* overlap */
3623         reply_buf = kmalloc(size, GFP_KERNEL);
3624         if (!reply_buf) {
3625                 ret = -ENOMEM;
3626                 goto out_err;
3627         }
3628
3629         snapid = cpu_to_le64(CEPH_NOSNAP);
3630         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3631                                 "rbd", "get_parent",
3632                                 &snapid, sizeof (snapid),
3633                                 reply_buf, size, NULL);
3634         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3635         if (ret < 0)
3636                 goto out_err;
3637
3638         p = reply_buf;
3639         end = reply_buf + ret;
3640         ret = -ERANGE;
3641         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3642         if (parent_spec->pool_id == CEPH_NOPOOL)
3643                 goto out;       /* No parent?  No problem. */
3644
3645         /* The ceph file layout needs to fit pool id in 32 bits */
3646
3647         ret = -EIO;
3648         if (parent_spec->pool_id > (u64)U32_MAX) {
3649                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3650                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3651                 goto out_err;
3652         }
3653
3654         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3655         if (IS_ERR(image_id)) {
3656                 ret = PTR_ERR(image_id);
3657                 goto out_err;
3658         }
3659         parent_spec->image_id = image_id;
3660         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3661         ceph_decode_64_safe(&p, end, overlap, out_err);
3662
3663         rbd_dev->parent_overlap = overlap;
3664         rbd_dev->parent_spec = parent_spec;
3665         parent_spec = NULL;     /* rbd_dev now owns this */
3666 out:
3667         ret = 0;
3668 out_err:
3669         kfree(reply_buf);
3670         rbd_spec_put(parent_spec);
3671
3672         return ret;
3673 }
3674
3675 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3676 {
3677         struct {
3678                 __le64 stripe_unit;
3679                 __le64 stripe_count;
3680         } __attribute__ ((packed)) striping_info_buf = { 0 };
3681         size_t size = sizeof (striping_info_buf);
3682         void *p;
3683         u64 obj_size;
3684         u64 stripe_unit;
3685         u64 stripe_count;
3686         int ret;
3687
3688         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3689                                 "rbd", "get_stripe_unit_count", NULL, 0,
3690                                 (char *)&striping_info_buf, size, NULL);
3691         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3692         if (ret < 0)
3693                 return ret;
3694         if (ret < size)
3695                 return -ERANGE;
3696
3697         /*
3698          * We don't actually support the "fancy striping" feature
3699          * (STRIPINGV2) yet, but if the striping sizes are the
3700          * defaults the behavior is the same as before.  So find
3701          * out, and only fail if the image has non-default values.
3702          */
3703         ret = -EINVAL;
3704         obj_size = (u64)1 << rbd_dev->header.obj_order;
3705         p = &striping_info_buf;
3706         stripe_unit = ceph_decode_64(&p);
3707         if (stripe_unit != obj_size) {
3708                 rbd_warn(rbd_dev, "unsupported stripe unit "
3709                                 "(got %llu want %llu)",
3710                                 stripe_unit, obj_size);
3711                 return -EINVAL;
3712         }
3713         stripe_count = ceph_decode_64(&p);
3714         if (stripe_count != 1) {
3715                 rbd_warn(rbd_dev, "unsupported stripe count "
3716                                 "(got %llu want 1)", stripe_count);
3717                 return -EINVAL;
3718         }
3719         rbd_dev->header.stripe_unit = stripe_unit;
3720         rbd_dev->header.stripe_count = stripe_count;
3721
3722         return 0;
3723 }
3724
3725 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3726 {
3727         size_t image_id_size;
3728         char *image_id;
3729         void *p;
3730         void *end;
3731         size_t size;
3732         void *reply_buf = NULL;
3733         size_t len = 0;
3734         char *image_name = NULL;
3735         int ret;
3736
3737         rbd_assert(!rbd_dev->spec->image_name);
3738
3739         len = strlen(rbd_dev->spec->image_id);
3740         image_id_size = sizeof (__le32) + len;
3741         image_id = kmalloc(image_id_size, GFP_KERNEL);
3742         if (!image_id)
3743                 return NULL;
3744
3745         p = image_id;
3746         end = image_id + image_id_size;
3747         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3748
3749         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3750         reply_buf = kmalloc(size, GFP_KERNEL);
3751         if (!reply_buf)
3752                 goto out;
3753
3754         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3755                                 "rbd", "dir_get_name",
3756                                 image_id, image_id_size,
3757                                 reply_buf, size, NULL);
3758         if (ret < 0)
3759                 goto out;
3760         p = reply_buf;
3761         end = reply_buf + ret;
3762
3763         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3764         if (IS_ERR(image_name))
3765                 image_name = NULL;
3766         else
3767                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3768 out:
3769         kfree(reply_buf);
3770         kfree(image_id);
3771
3772         return image_name;
3773 }
3774
3775 /*
3776  * When an rbd image has a parent image, it is identified by the
3777  * pool, image, and snapshot ids (not names).  This function fills
3778  * in the names for those ids.  (It's OK if we can't figure out the
3779  * name for an image id, but the pool and snapshot ids should always
3780  * exist and have names.)  All names in an rbd spec are dynamically
3781  * allocated.
3782  *
3783  * When an image being mapped (not a parent) is probed, we have the
3784  * pool name and pool id, image name and image id, and the snapshot
3785  * name.  The only thing we're missing is the snapshot id.
3786  *
3787  * The set of snapshots for an image is not known until they have
3788  * been read by rbd_dev_snaps_update(), so we can't completely fill
3789  * in this information until after that has been called.
3790  */
3791 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3792 {
3793         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3794         struct rbd_spec *spec = rbd_dev->spec;
3795         const char *pool_name;
3796         const char *image_name;
3797         const char *snap_name;
3798         int ret;
3799
3800         /*
3801          * An image being mapped will have the pool name (etc.), but
3802          * we need to look up the snapshot id.
3803          */
3804         if (spec->pool_name) {
3805                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3806                         struct rbd_snap *snap;
3807
3808                         snap = snap_by_name(rbd_dev, spec->snap_name);
3809                         if (!snap)
3810                                 return -ENOENT;
3811                         spec->snap_id = snap->id;
3812                 } else {
3813                         spec->snap_id = CEPH_NOSNAP;
3814                 }
3815
3816                 return 0;
3817         }
3818
3819         /* Get the pool name; we have to make our own copy of this */
3820
3821         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3822         if (!pool_name) {
3823                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3824                 return -EIO;
3825         }
3826         pool_name = kstrdup(pool_name, GFP_KERNEL);
3827         if (!pool_name)
3828                 return -ENOMEM;
3829
3830         /* Fetch the image name; tolerate failure here */
3831
3832         image_name = rbd_dev_image_name(rbd_dev);
3833         if (!image_name)
3834                 rbd_warn(rbd_dev, "unable to get image name");
3835
3836         /* Look up the snapshot name, and make a copy */
3837
3838         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3839         if (!snap_name) {
3840                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3841                 ret = -EIO;
3842                 goto out_err;
3843         }
3844         snap_name = kstrdup(snap_name, GFP_KERNEL);
3845         if (!snap_name) {
3846                 ret = -ENOMEM;
3847                 goto out_err;
3848         }
3849
3850         spec->pool_name = pool_name;
3851         spec->image_name = image_name;
3852         spec->snap_name = snap_name;
3853
3854         return 0;
3855 out_err:
3856         kfree(image_name);
3857         kfree(pool_name);
3858
3859         return ret;
3860 }
3861
3862 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3863 {
3864         size_t size;
3865         int ret;
3866         void *reply_buf;
3867         void *p;
3868         void *end;
3869         u64 seq;
3870         u32 snap_count;
3871         struct ceph_snap_context *snapc;
3872         u32 i;
3873
3874         /*
3875          * We'll need room for the seq value (maximum snapshot id),
3876          * snapshot count, and array of that many snapshot ids.
3877          * For now we have a fixed upper limit on the number we're
3878          * prepared to receive.
3879          */
3880         size = sizeof (__le64) + sizeof (__le32) +
3881                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3882         reply_buf = kzalloc(size, GFP_KERNEL);
3883         if (!reply_buf)
3884                 return -ENOMEM;
3885
3886         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3887                                 "rbd", "get_snapcontext", NULL, 0,
3888                                 reply_buf, size, ver);
3889         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3890         if (ret < 0)
3891                 goto out;
3892
3893         p = reply_buf;
3894         end = reply_buf + ret;
3895         ret = -ERANGE;
3896         ceph_decode_64_safe(&p, end, seq, out);
3897         ceph_decode_32_safe(&p, end, snap_count, out);
3898
3899         /*
3900          * Make sure the reported number of snapshot ids wouldn't go
3901          * beyond the end of our buffer.  But before checking that,
3902          * make sure the computed size of the snapshot context we
3903          * allocate is representable in a size_t.
3904          */
3905         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3906                                  / sizeof (u64)) {
3907                 ret = -EINVAL;
3908                 goto out;
3909         }
3910         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3911                 goto out;
3912         ret = 0;
3913
3914         snapc = rbd_snap_context_create(snap_count);
3915         if (!snapc) {
3916                 ret = -ENOMEM;
3917                 goto out;
3918         }
3919         snapc->seq = seq;
3920         for (i = 0; i < snap_count; i++)
3921                 snapc->snaps[i] = ceph_decode_64(&p);
3922
3923         rbd_dev->header.snapc = snapc;
3924
3925         dout("  snap context seq = %llu, snap_count = %u\n",
3926                 (unsigned long long)seq, (unsigned int)snap_count);
3927 out:
3928         kfree(reply_buf);
3929
3930         return ret;
3931 }
3932
3933 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3934 {
3935         size_t size;
3936         void *reply_buf;
3937         __le64 snap_id;
3938         int ret;
3939         void *p;
3940         void *end;
3941         char *snap_name;
3942
3943         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3944         reply_buf = kmalloc(size, GFP_KERNEL);
3945         if (!reply_buf)
3946                 return ERR_PTR(-ENOMEM);
3947
3948         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3949         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3950         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3951                                 "rbd", "get_snapshot_name",
3952                                 &snap_id, sizeof (snap_id),
3953                                 reply_buf, size, NULL);
3954         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3955         if (ret < 0) {
3956                 snap_name = ERR_PTR(ret);
3957                 goto out;
3958         }
3959
3960         p = reply_buf;
3961         end = reply_buf + ret;
3962         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3963         if (IS_ERR(snap_name))
3964                 goto out;
3965
3966         dout("  snap_id 0x%016llx snap_name = %s\n",
3967                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3968 out:
3969         kfree(reply_buf);
3970
3971         return snap_name;
3972 }
3973
3974 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3975                 u64 *snap_size, u64 *snap_features)
3976 {
3977         u64 snap_id;
3978         u64 size;
3979         u64 features;
3980         char *snap_name;
3981         int ret;
3982
3983         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3984         snap_id = rbd_dev->header.snapc->snaps[which];
3985         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3986         if (ret)
3987                 goto out_err;
3988
3989         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3990         if (ret)
3991                 goto out_err;
3992
3993         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3994         if (!IS_ERR(snap_name)) {
3995                 *snap_size = size;
3996                 *snap_features = features;
3997         }
3998
3999         return snap_name;
4000 out_err:
4001         return ERR_PTR(ret);
4002 }
4003
4004 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4005                 u64 *snap_size, u64 *snap_features)
4006 {
4007         if (rbd_dev->image_format == 1)
4008                 return rbd_dev_v1_snap_info(rbd_dev, which,
4009                                         snap_size, snap_features);
4010         if (rbd_dev->image_format == 2)
4011                 return rbd_dev_v2_snap_info(rbd_dev, which,
4012                                         snap_size, snap_features);
4013         return ERR_PTR(-EINVAL);
4014 }
4015
4016 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4017 {
4018         int ret;
4019         __u8 obj_order;
4020
4021         down_write(&rbd_dev->header_rwsem);
4022
4023         /* Grab old order first, to see if it changes */
4024
4025         obj_order = rbd_dev->header.obj_order,
4026         ret = rbd_dev_v2_image_size(rbd_dev);
4027         if (ret)
4028                 goto out;
4029         if (rbd_dev->header.obj_order != obj_order) {
4030                 ret = -EIO;
4031                 goto out;
4032         }
4033         rbd_update_mapping_size(rbd_dev);
4034
4035         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4036         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4037         if (ret)
4038                 goto out;
4039         ret = rbd_dev_snaps_update(rbd_dev);
4040         dout("rbd_dev_snaps_update returned %d\n", ret);
4041         if (ret)
4042                 goto out;
4043 out:
4044         up_write(&rbd_dev->header_rwsem);
4045
4046         return ret;
4047 }
4048
4049 /*
4050  * Scan the rbd device's current snapshot list and compare it to the
4051  * newly-received snapshot context.  Remove any existing snapshots
4052  * not present in the new snapshot context.  Add a new snapshot for
4053  * any snaphots in the snapshot context not in the current list.
4054  * And verify there are no changes to snapshots we already know
4055  * about.
4056  *
4057  * Assumes the snapshots in the snapshot context are sorted by
4058  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4059  * are also maintained in that order.)
4060  *
4061  * Note that any error occurs while updating the snapshot list
4062  * aborts the update, and the entire list is cleared.  The snapshot
4063  * list becomes inconsistent at that point anyway, so it might as
4064  * well be empty.
4065  */
4066 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4067 {
4068         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4069         const u32 snap_count = snapc->num_snaps;
4070         struct list_head *head = &rbd_dev->snaps;
4071         struct list_head *links = head->next;
4072         u32 index = 0;
4073         int ret = 0;
4074
4075         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4076         while (index < snap_count || links != head) {
4077                 u64 snap_id;
4078                 struct rbd_snap *snap;
4079                 char *snap_name;
4080                 u64 snap_size = 0;
4081                 u64 snap_features = 0;
4082
4083                 snap_id = index < snap_count ? snapc->snaps[index]
4084                                              : CEPH_NOSNAP;
4085                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4086                                      : NULL;
4087                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4088
4089                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4090                         struct list_head *next = links->next;
4091
4092                         /*
4093                          * A previously-existing snapshot is not in
4094                          * the new snap context.
4095                          *
4096                          * If the now-missing snapshot is the one
4097                          * the image represents, clear its existence
4098                          * flag so we can avoid sending any more
4099                          * requests to it.
4100                          */
4101                         if (rbd_dev->spec->snap_id == snap->id)
4102                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4103                         dout("removing %ssnap id %llu\n",
4104                                 rbd_dev->spec->snap_id == snap->id ?
4105                                                         "mapped " : "",
4106                                 (unsigned long long)snap->id);
4107
4108                         list_del(&snap->node);
4109                         rbd_snap_destroy(snap);
4110
4111                         /* Done with this list entry; advance */
4112
4113                         links = next;
4114                         continue;
4115                 }
4116
4117                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4118                                         &snap_size, &snap_features);
4119                 if (IS_ERR(snap_name)) {
4120                         ret = PTR_ERR(snap_name);
4121                         dout("failed to get snap info, error %d\n", ret);
4122                         goto out_err;
4123                 }
4124
4125                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4126                         (unsigned long long)snap_id);
4127                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4128                         struct rbd_snap *new_snap;
4129
4130                         /* We haven't seen this snapshot before */
4131
4132                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4133                                         snap_id, snap_size, snap_features);
4134                         if (IS_ERR(new_snap)) {
4135                                 ret = PTR_ERR(new_snap);
4136                                 dout("  failed to add dev, error %d\n", ret);
4137                                 goto out_err;
4138                         }
4139
4140                         /* New goes before existing, or at end of list */
4141
4142                         dout("  added dev%s\n", snap ? "" : " at end\n");
4143                         if (snap)
4144                                 list_add_tail(&new_snap->node, &snap->node);
4145                         else
4146                                 list_add_tail(&new_snap->node, head);
4147                 } else {
4148                         /* Already have this one */
4149
4150                         dout("  already present\n");
4151
4152                         rbd_assert(snap->size == snap_size);
4153                         rbd_assert(!strcmp(snap->name, snap_name));
4154                         rbd_assert(snap->features == snap_features);
4155
4156                         /* Done with this list entry; advance */
4157
4158                         links = links->next;
4159                 }
4160
4161                 /* Advance to the next entry in the snapshot context */
4162
4163                 index++;
4164         }
4165         dout("%s: done\n", __func__);
4166
4167         return 0;
4168 out_err:
4169         rbd_remove_all_snaps(rbd_dev);
4170
4171         return ret;
4172 }
4173
4174 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4175 {
4176         struct device *dev;
4177         int ret;
4178
4179         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4180
4181         dev = &rbd_dev->dev;
4182         dev->bus = &rbd_bus_type;
4183         dev->type = &rbd_device_type;
4184         dev->parent = &rbd_root_dev;
4185         dev->release = rbd_dev_release;
4186         dev_set_name(dev, "%d", rbd_dev->dev_id);
4187         ret = device_register(dev);
4188
4189         mutex_unlock(&ctl_mutex);
4190
4191         return ret;
4192 }
4193
4194 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4195 {
4196         device_unregister(&rbd_dev->dev);
4197 }
4198
4199 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4200
4201 /*
4202  * Get a unique rbd identifier for the given new rbd_dev, and add
4203  * the rbd_dev to the global list.  The minimum rbd id is 1.
4204  */
4205 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4206 {
4207         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4208
4209         spin_lock(&rbd_dev_list_lock);
4210         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4211         spin_unlock(&rbd_dev_list_lock);
4212         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4213                 (unsigned long long) rbd_dev->dev_id);
4214 }
4215
4216 /*
4217  * Remove an rbd_dev from the global list, and record that its
4218  * identifier is no longer in use.
4219  */
4220 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4221 {
4222         struct list_head *tmp;
4223         int rbd_id = rbd_dev->dev_id;
4224         int max_id;
4225
4226         rbd_assert(rbd_id > 0);
4227
4228         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4229                 (unsigned long long) rbd_dev->dev_id);
4230         spin_lock(&rbd_dev_list_lock);
4231         list_del_init(&rbd_dev->node);
4232
4233         /*
4234          * If the id being "put" is not the current maximum, there
4235          * is nothing special we need to do.
4236          */
4237         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4238                 spin_unlock(&rbd_dev_list_lock);
4239                 return;
4240         }
4241
4242         /*
4243          * We need to update the current maximum id.  Search the
4244          * list to find out what it is.  We're more likely to find
4245          * the maximum at the end, so search the list backward.
4246          */
4247         max_id = 0;
4248         list_for_each_prev(tmp, &rbd_dev_list) {
4249                 struct rbd_device *rbd_dev;
4250
4251                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4252                 if (rbd_dev->dev_id > max_id)
4253                         max_id = rbd_dev->dev_id;
4254         }
4255         spin_unlock(&rbd_dev_list_lock);
4256
4257         /*
4258          * The max id could have been updated by rbd_dev_id_get(), in
4259          * which case it now accurately reflects the new maximum.
4260          * Be careful not to overwrite the maximum value in that
4261          * case.
4262          */
4263         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4264         dout("  max dev id has been reset\n");
4265 }
4266
4267 /*
4268  * Skips over white space at *buf, and updates *buf to point to the
4269  * first found non-space character (if any). Returns the length of
4270  * the token (string of non-white space characters) found.  Note
4271  * that *buf must be terminated with '\0'.
4272  */
4273 static inline size_t next_token(const char **buf)
4274 {
4275         /*
4276         * These are the characters that produce nonzero for
4277         * isspace() in the "C" and "POSIX" locales.
4278         */
4279         const char *spaces = " \f\n\r\t\v";
4280
4281         *buf += strspn(*buf, spaces);   /* Find start of token */
4282
4283         return strcspn(*buf, spaces);   /* Return token length */
4284 }
4285
4286 /*
4287  * Finds the next token in *buf, and if the provided token buffer is
4288  * big enough, copies the found token into it.  The result, if
4289  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4290  * must be terminated with '\0' on entry.
4291  *
4292  * Returns the length of the token found (not including the '\0').
4293  * Return value will be 0 if no token is found, and it will be >=
4294  * token_size if the token would not fit.
4295  *
4296  * The *buf pointer will be updated to point beyond the end of the
4297  * found token.  Note that this occurs even if the token buffer is
4298  * too small to hold it.
4299  */
4300 static inline size_t copy_token(const char **buf,
4301                                 char *token,
4302                                 size_t token_size)
4303 {
4304         size_t len;
4305
4306         len = next_token(buf);
4307         if (len < token_size) {
4308                 memcpy(token, *buf, len);
4309                 *(token + len) = '\0';
4310         }
4311         *buf += len;
4312
4313         return len;
4314 }
4315
4316 /*
4317  * Finds the next token in *buf, dynamically allocates a buffer big
4318  * enough to hold a copy of it, and copies the token into the new
4319  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4320  * that a duplicate buffer is created even for a zero-length token.
4321  *
4322  * Returns a pointer to the newly-allocated duplicate, or a null
4323  * pointer if memory for the duplicate was not available.  If
4324  * the lenp argument is a non-null pointer, the length of the token
4325  * (not including the '\0') is returned in *lenp.
4326  *
4327  * If successful, the *buf pointer will be updated to point beyond
4328  * the end of the found token.
4329  *
4330  * Note: uses GFP_KERNEL for allocation.
4331  */
4332 static inline char *dup_token(const char **buf, size_t *lenp)
4333 {
4334         char *dup;
4335         size_t len;
4336
4337         len = next_token(buf);
4338         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4339         if (!dup)
4340                 return NULL;
4341         *(dup + len) = '\0';
4342         *buf += len;
4343
4344         if (lenp)
4345                 *lenp = len;
4346
4347         return dup;
4348 }
4349
4350 /*
4351  * Parse the options provided for an "rbd add" (i.e., rbd image
4352  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4353  * and the data written is passed here via a NUL-terminated buffer.
4354  * Returns 0 if successful or an error code otherwise.
4355  *
4356  * The information extracted from these options is recorded in
4357  * the other parameters which return dynamically-allocated
4358  * structures:
4359  *  ceph_opts
4360  *      The address of a pointer that will refer to a ceph options
4361  *      structure.  Caller must release the returned pointer using
4362  *      ceph_destroy_options() when it is no longer needed.
4363  *  rbd_opts
4364  *      Address of an rbd options pointer.  Fully initialized by
4365  *      this function; caller must release with kfree().
4366  *  spec
4367  *      Address of an rbd image specification pointer.  Fully
4368  *      initialized by this function based on parsed options.
4369  *      Caller must release with rbd_spec_put().
4370  *
4371  * The options passed take this form:
4372  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4373  * where:
4374  *  <mon_addrs>
4375  *      A comma-separated list of one or more monitor addresses.
4376  *      A monitor address is an ip address, optionally followed
4377  *      by a port number (separated by a colon).
4378  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4379  *  <options>
4380  *      A comma-separated list of ceph and/or rbd options.
4381  *  <pool_name>
4382  *      The name of the rados pool containing the rbd image.
4383  *  <image_name>
4384  *      The name of the image in that pool to map.
4385  *  <snap_id>
4386  *      An optional snapshot id.  If provided, the mapping will
4387  *      present data from the image at the time that snapshot was
4388  *      created.  The image head is used if no snapshot id is
4389  *      provided.  Snapshot mappings are always read-only.
4390  */
4391 static int rbd_add_parse_args(const char *buf,
4392                                 struct ceph_options **ceph_opts,
4393                                 struct rbd_options **opts,
4394                                 struct rbd_spec **rbd_spec)
4395 {
4396         size_t len;
4397         char *options;
4398         const char *mon_addrs;
4399         char *snap_name;
4400         size_t mon_addrs_size;
4401         struct rbd_spec *spec = NULL;
4402         struct rbd_options *rbd_opts = NULL;
4403         struct ceph_options *copts;
4404         int ret;
4405
4406         /* The first four tokens are required */
4407
4408         len = next_token(&buf);
4409         if (!len) {
4410                 rbd_warn(NULL, "no monitor address(es) provided");
4411                 return -EINVAL;
4412         }
4413         mon_addrs = buf;
4414         mon_addrs_size = len + 1;
4415         buf += len;
4416
4417         ret = -EINVAL;
4418         options = dup_token(&buf, NULL);
4419         if (!options)
4420                 return -ENOMEM;
4421         if (!*options) {
4422                 rbd_warn(NULL, "no options provided");
4423                 goto out_err;
4424         }
4425
4426         spec = rbd_spec_alloc();
4427         if (!spec)
4428                 goto out_mem;
4429
4430         spec->pool_name = dup_token(&buf, NULL);
4431         if (!spec->pool_name)
4432                 goto out_mem;
4433         if (!*spec->pool_name) {
4434                 rbd_warn(NULL, "no pool name provided");
4435                 goto out_err;
4436         }
4437
4438         spec->image_name = dup_token(&buf, NULL);
4439         if (!spec->image_name)
4440                 goto out_mem;
4441         if (!*spec->image_name) {
4442                 rbd_warn(NULL, "no image name provided");
4443                 goto out_err;
4444         }
4445
4446         /*
4447          * Snapshot name is optional; default is to use "-"
4448          * (indicating the head/no snapshot).
4449          */
4450         len = next_token(&buf);
4451         if (!len) {
4452                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4453                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4454         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4455                 ret = -ENAMETOOLONG;
4456                 goto out_err;
4457         }
4458         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4459         if (!snap_name)
4460                 goto out_mem;
4461         *(snap_name + len) = '\0';
4462         spec->snap_name = snap_name;
4463
4464         /* Initialize all rbd options to the defaults */
4465
4466         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4467         if (!rbd_opts)
4468                 goto out_mem;
4469
4470         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4471
4472         copts = ceph_parse_options(options, mon_addrs,
4473                                         mon_addrs + mon_addrs_size - 1,
4474                                         parse_rbd_opts_token, rbd_opts);
4475         if (IS_ERR(copts)) {
4476                 ret = PTR_ERR(copts);
4477                 goto out_err;
4478         }
4479         kfree(options);
4480
4481         *ceph_opts = copts;
4482         *opts = rbd_opts;
4483         *rbd_spec = spec;
4484
4485         return 0;
4486 out_mem:
4487         ret = -ENOMEM;
4488 out_err:
4489         kfree(rbd_opts);
4490         rbd_spec_put(spec);
4491         kfree(options);
4492
4493         return ret;
4494 }
4495
4496 /*
4497  * An rbd format 2 image has a unique identifier, distinct from the
4498  * name given to it by the user.  Internally, that identifier is
4499  * what's used to specify the names of objects related to the image.
4500  *
4501  * A special "rbd id" object is used to map an rbd image name to its
4502  * id.  If that object doesn't exist, then there is no v2 rbd image
4503  * with the supplied name.
4504  *
4505  * This function will record the given rbd_dev's image_id field if
4506  * it can be determined, and in that case will return 0.  If any
4507  * errors occur a negative errno will be returned and the rbd_dev's
4508  * image_id field will be unchanged (and should be NULL).
4509  */
4510 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4511 {
4512         int ret;
4513         size_t size;
4514         char *object_name;
4515         void *response;
4516         char *image_id;
4517
4518         /*
4519          * When probing a parent image, the image id is already
4520          * known (and the image name likely is not).  There's no
4521          * need to fetch the image id again in this case.  We
4522          * do still need to set the image format though.
4523          */
4524         if (rbd_dev->spec->image_id) {
4525                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4526
4527                 return 0;
4528         }
4529
4530         /*
4531          * First, see if the format 2 image id file exists, and if
4532          * so, get the image's persistent id from it.
4533          */
4534         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4535         object_name = kmalloc(size, GFP_NOIO);
4536         if (!object_name)
4537                 return -ENOMEM;
4538         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4539         dout("rbd id object name is %s\n", object_name);
4540
4541         /* Response will be an encoded string, which includes a length */
4542
4543         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4544         response = kzalloc(size, GFP_NOIO);
4545         if (!response) {
4546                 ret = -ENOMEM;
4547                 goto out;
4548         }
4549
4550         /* If it doesn't exist we'll assume it's a format 1 image */
4551
4552         ret = rbd_obj_method_sync(rbd_dev, object_name,
4553                                 "rbd", "get_id", NULL, 0,
4554                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4555         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4556         if (ret == -ENOENT) {
4557                 image_id = kstrdup("", GFP_KERNEL);
4558                 ret = image_id ? 0 : -ENOMEM;
4559                 if (!ret)
4560                         rbd_dev->image_format = 1;
4561         } else if (ret > sizeof (__le32)) {
4562                 void *p = response;
4563
4564                 image_id = ceph_extract_encoded_string(&p, p + ret,
4565                                                 NULL, GFP_NOIO);
4566                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4567                 if (!ret)
4568                         rbd_dev->image_format = 2;
4569         } else {
4570                 ret = -EINVAL;
4571         }
4572
4573         if (!ret) {
4574                 rbd_dev->spec->image_id = image_id;
4575                 dout("image_id is %s\n", image_id);
4576         }
4577 out:
4578         kfree(response);
4579         kfree(object_name);
4580
4581         return ret;
4582 }
4583
4584 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4585 {
4586         int ret;
4587         size_t size;
4588
4589         /* Record the header object name for this rbd image. */
4590
4591         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4592         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4593         if (!rbd_dev->header_name) {
4594                 ret = -ENOMEM;
4595                 goto out_err;
4596         }
4597         sprintf(rbd_dev->header_name, "%s%s",
4598                 rbd_dev->spec->image_name, RBD_SUFFIX);
4599
4600         /* Populate rbd image metadata */
4601
4602         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4603         if (ret < 0)
4604                 goto out_err;
4605
4606         /* Version 1 images have no parent (no layering) */
4607
4608         rbd_dev->parent_spec = NULL;
4609         rbd_dev->parent_overlap = 0;
4610
4611         dout("discovered version 1 image, header name is %s\n",
4612                 rbd_dev->header_name);
4613
4614         return 0;
4615
4616 out_err:
4617         kfree(rbd_dev->header_name);
4618         rbd_dev->header_name = NULL;
4619         kfree(rbd_dev->spec->image_id);
4620         rbd_dev->spec->image_id = NULL;
4621
4622         return ret;
4623 }
4624
4625 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4626 {
4627         size_t size;
4628         int ret;
4629         u64 ver = 0;
4630
4631         /*
4632          * Image id was filled in by the caller.  Record the header
4633          * object name for this rbd image.
4634          */
4635         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4636         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4637         if (!rbd_dev->header_name)
4638                 return -ENOMEM;
4639         sprintf(rbd_dev->header_name, "%s%s",
4640                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4641
4642         /* Get the size and object order for the image */
4643         ret = rbd_dev_v2_image_size(rbd_dev);
4644         if (ret)
4645                 goto out_err;
4646
4647         /* Get the object prefix (a.k.a. block_name) for the image */
4648
4649         ret = rbd_dev_v2_object_prefix(rbd_dev);
4650         if (ret)
4651                 goto out_err;
4652
4653         /* Get the and check features for the image */
4654
4655         ret = rbd_dev_v2_features(rbd_dev);
4656         if (ret)
4657                 goto out_err;
4658
4659         /* If the image supports layering, get the parent info */
4660
4661         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4662                 ret = rbd_dev_v2_parent_info(rbd_dev);
4663                 if (ret)
4664                         goto out_err;
4665                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4666                                         "layered rbd images is EXPERIMENTAL!");
4667         }
4668
4669         /* If the image supports fancy striping, get its parameters */
4670
4671         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4672                 ret = rbd_dev_v2_striping_info(rbd_dev);
4673                 if (ret < 0)
4674                         goto out_err;
4675         }
4676
4677         /* crypto and compression type aren't (yet) supported for v2 images */
4678
4679         rbd_dev->header.crypt_type = 0;
4680         rbd_dev->header.comp_type = 0;
4681
4682         /* Get the snapshot context, plus the header version */
4683
4684         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4685         if (ret)
4686                 goto out_err;
4687         rbd_dev->header.obj_version = ver;
4688
4689         dout("discovered version 2 image, header name is %s\n",
4690                 rbd_dev->header_name);
4691
4692         return 0;
4693 out_err:
4694         rbd_dev->parent_overlap = 0;
4695         rbd_spec_put(rbd_dev->parent_spec);
4696         rbd_dev->parent_spec = NULL;
4697         kfree(rbd_dev->header_name);
4698         rbd_dev->header_name = NULL;
4699         kfree(rbd_dev->header.object_prefix);
4700         rbd_dev->header.object_prefix = NULL;
4701
4702         return ret;
4703 }
4704
4705 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4706 {
4707         struct rbd_device *parent = NULL;
4708         struct rbd_spec *parent_spec;
4709         struct rbd_client *rbdc;
4710         int ret;
4711
4712         if (!rbd_dev->parent_spec)
4713                 return 0;
4714         /*
4715          * We need to pass a reference to the client and the parent
4716          * spec when creating the parent rbd_dev.  Images related by
4717          * parent/child relationships always share both.
4718          */
4719         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4720         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4721
4722         ret = -ENOMEM;
4723         parent = rbd_dev_create(rbdc, parent_spec);
4724         if (!parent)
4725                 goto out_err;
4726
4727         ret = rbd_dev_image_probe(parent);
4728         if (ret < 0)
4729                 goto out_err;
4730         rbd_dev->parent = parent;
4731
4732         return 0;
4733 out_err:
4734         if (parent) {
4735                 rbd_spec_put(rbd_dev->parent_spec);
4736                 kfree(rbd_dev->header_name);
4737                 rbd_dev_destroy(parent);
4738         } else {
4739                 rbd_put_client(rbdc);
4740                 rbd_spec_put(parent_spec);
4741         }
4742
4743         return ret;
4744 }
4745
4746 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4747 {
4748         int ret;
4749
4750         /* no need to lock here, as rbd_dev is not registered yet */
4751         ret = rbd_dev_snaps_update(rbd_dev);
4752         if (ret)
4753                 return ret;
4754
4755         ret = rbd_dev_spec_update(rbd_dev);
4756         if (ret)
4757                 goto err_out_snaps;
4758
4759         ret = rbd_dev_set_mapping(rbd_dev);
4760         if (ret)
4761                 goto err_out_snaps;
4762
4763         /* generate unique id: find highest unique id, add one */
4764         rbd_dev_id_get(rbd_dev);
4765
4766         /* Fill in the device name, now that we have its id. */
4767         BUILD_BUG_ON(DEV_NAME_LEN
4768                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4769         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4770
4771         /* Get our block major device number. */
4772
4773         ret = register_blkdev(0, rbd_dev->name);
4774         if (ret < 0)
4775                 goto err_out_id;
4776         rbd_dev->major = ret;
4777
4778         /* Set up the blkdev mapping. */
4779
4780         ret = rbd_init_disk(rbd_dev);
4781         if (ret)
4782                 goto err_out_blkdev;
4783
4784         ret = rbd_bus_add_dev(rbd_dev);
4785         if (ret)
4786                 goto err_out_disk;
4787
4788         ret = rbd_dev_probe_parent(rbd_dev);
4789         if (ret)
4790                 goto err_out_bus;
4791
4792         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4793         if (ret)
4794                 goto err_out_bus;
4795
4796         /* Everything's ready.  Announce the disk to the world. */
4797
4798         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4799         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4800         add_disk(rbd_dev->disk);
4801
4802         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4803                 (unsigned long long) rbd_dev->mapping.size);
4804
4805         return ret;
4806
4807 err_out_bus:
4808         /* this will also clean up rest of rbd_dev stuff */
4809
4810         rbd_bus_del_dev(rbd_dev);
4811
4812         return ret;
4813 err_out_disk:
4814         rbd_free_disk(rbd_dev);
4815 err_out_blkdev:
4816         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4817 err_out_id:
4818         rbd_dev_id_put(rbd_dev);
4819 err_out_snaps:
4820         rbd_remove_all_snaps(rbd_dev);
4821
4822         return ret;
4823 }
4824
4825 /*
4826  * Probe for the existence of the header object for the given rbd
4827  * device.  For format 2 images this includes determining the image
4828  * id.
4829  */
4830 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4831 {
4832         int ret;
4833
4834         /*
4835          * Get the id from the image id object.  If it's not a
4836          * format 2 image, we'll get ENOENT back, and we'll assume
4837          * it's a format 1 image.
4838          */
4839         ret = rbd_dev_image_id(rbd_dev);
4840         if (ret)
4841                 return ret;
4842         rbd_assert(rbd_dev->spec->image_id);
4843         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4844
4845         if (rbd_dev->image_format == 1)
4846                 ret = rbd_dev_v1_probe(rbd_dev);
4847         else
4848                 ret = rbd_dev_v2_probe(rbd_dev);
4849         if (ret)
4850                 goto out_err;
4851
4852         ret = rbd_dev_probe_finish(rbd_dev);
4853         if (ret)
4854                 rbd_header_free(&rbd_dev->header);
4855
4856         return ret;
4857 out_err:
4858         kfree(rbd_dev->spec->image_id);
4859         rbd_dev->spec->image_id = NULL;
4860
4861         dout("probe failed, returning %d\n", ret);
4862
4863         return ret;
4864 }
4865
4866 static ssize_t rbd_add(struct bus_type *bus,
4867                        const char *buf,
4868                        size_t count)
4869 {
4870         struct rbd_device *rbd_dev = NULL;
4871         struct ceph_options *ceph_opts = NULL;
4872         struct rbd_options *rbd_opts = NULL;
4873         struct rbd_spec *spec = NULL;
4874         struct rbd_client *rbdc;
4875         struct ceph_osd_client *osdc;
4876         int rc = -ENOMEM;
4877
4878         if (!try_module_get(THIS_MODULE))
4879                 return -ENODEV;
4880
4881         /* parse add command */
4882         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4883         if (rc < 0)
4884                 goto err_out_module;
4885
4886         rbdc = rbd_get_client(ceph_opts);
4887         if (IS_ERR(rbdc)) {
4888                 rc = PTR_ERR(rbdc);
4889                 goto err_out_args;
4890         }
4891         ceph_opts = NULL;       /* rbd_dev client now owns this */
4892
4893         /* pick the pool */
4894         osdc = &rbdc->client->osdc;
4895         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4896         if (rc < 0)
4897                 goto err_out_client;
4898         spec->pool_id = (u64)rc;
4899
4900         /* The ceph file layout needs to fit pool id in 32 bits */
4901
4902         if (spec->pool_id > (u64)U32_MAX) {
4903                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4904                                 (unsigned long long)spec->pool_id, U32_MAX);
4905                 rc = -EIO;
4906                 goto err_out_client;
4907         }
4908
4909         rbd_dev = rbd_dev_create(rbdc, spec);
4910         if (!rbd_dev)
4911                 goto err_out_client;
4912         rbdc = NULL;            /* rbd_dev now owns this */
4913         spec = NULL;            /* rbd_dev now owns this */
4914
4915         rbd_dev->mapping.read_only = rbd_opts->read_only;
4916         kfree(rbd_opts);
4917         rbd_opts = NULL;        /* done with this */
4918
4919         rc = rbd_dev_image_probe(rbd_dev);
4920         if (rc < 0)
4921                 goto err_out_rbd_dev;
4922
4923         return count;
4924 err_out_rbd_dev:
4925         rbd_spec_put(rbd_dev->parent_spec);
4926         kfree(rbd_dev->header_name);
4927         rbd_dev_destroy(rbd_dev);
4928 err_out_client:
4929         rbd_put_client(rbdc);
4930 err_out_args:
4931         if (ceph_opts)
4932                 ceph_destroy_options(ceph_opts);
4933         kfree(rbd_opts);
4934         rbd_spec_put(spec);
4935 err_out_module:
4936         module_put(THIS_MODULE);
4937
4938         dout("Error adding device %s\n", buf);
4939
4940         return (ssize_t)rc;
4941 }
4942
4943 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4944 {
4945         struct list_head *tmp;
4946         struct rbd_device *rbd_dev;
4947
4948         spin_lock(&rbd_dev_list_lock);
4949         list_for_each(tmp, &rbd_dev_list) {
4950                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4951                 if (rbd_dev->dev_id == dev_id) {
4952                         spin_unlock(&rbd_dev_list_lock);
4953                         return rbd_dev;
4954                 }
4955         }
4956         spin_unlock(&rbd_dev_list_lock);
4957         return NULL;
4958 }
4959
4960 static void rbd_dev_release(struct device *dev)
4961 {
4962         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4963
4964         if (rbd_dev->watch_event)
4965                 rbd_dev_header_watch_sync(rbd_dev, 0);
4966
4967         /* clean up and free blkdev */
4968         rbd_free_disk(rbd_dev);
4969         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4970
4971         /* release allocated disk header fields */
4972         rbd_header_free(&rbd_dev->header);
4973
4974         /* done with the id, and with the rbd_dev */
4975         rbd_dev_id_put(rbd_dev);
4976         rbd_assert(rbd_dev->rbd_client != NULL);
4977         rbd_spec_put(rbd_dev->parent_spec);
4978         kfree(rbd_dev->header_name);
4979         rbd_dev_destroy(rbd_dev);
4980
4981         /* release module ref */
4982         module_put(THIS_MODULE);
4983 }
4984
4985 static void __rbd_remove(struct rbd_device *rbd_dev)
4986 {
4987         rbd_remove_all_snaps(rbd_dev);
4988         rbd_bus_del_dev(rbd_dev);
4989 }
4990
4991 static ssize_t rbd_remove(struct bus_type *bus,
4992                           const char *buf,
4993                           size_t count)
4994 {
4995         struct rbd_device *rbd_dev = NULL;
4996         int target_id, rc;
4997         unsigned long ul;
4998         int ret = count;
4999
5000         rc = strict_strtoul(buf, 10, &ul);
5001         if (rc)
5002                 return rc;
5003
5004         /* convert to int; abort if we lost anything in the conversion */
5005         target_id = (int) ul;
5006         if (target_id != ul)
5007                 return -EINVAL;
5008
5009         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5010
5011         rbd_dev = __rbd_get_dev(target_id);
5012         if (!rbd_dev) {
5013                 ret = -ENOENT;
5014                 goto done;
5015         }
5016
5017         spin_lock_irq(&rbd_dev->lock);
5018         if (rbd_dev->open_count)
5019                 ret = -EBUSY;
5020         else
5021                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5022         spin_unlock_irq(&rbd_dev->lock);
5023         if (ret < 0)
5024                 goto done;
5025
5026         while (rbd_dev->parent_spec) {
5027                 struct rbd_device *first = rbd_dev;
5028                 struct rbd_device *second = first->parent;
5029                 struct rbd_device *third;
5030
5031                 /*
5032                  * Follow to the parent with no grandparent and
5033                  * remove it.
5034                  */
5035                 while (second && (third = second->parent)) {
5036                         first = second;
5037                         second = third;
5038                 }
5039                 __rbd_remove(second);
5040                 rbd_spec_put(first->parent_spec);
5041                 first->parent_spec = NULL;
5042                 first->parent_overlap = 0;
5043                 first->parent = NULL;
5044         }
5045         __rbd_remove(rbd_dev);
5046
5047 done:
5048         mutex_unlock(&ctl_mutex);
5049
5050         return ret;
5051 }
5052
5053 /*
5054  * create control files in sysfs
5055  * /sys/bus/rbd/...
5056  */
5057 static int rbd_sysfs_init(void)
5058 {
5059         int ret;
5060
5061         ret = device_register(&rbd_root_dev);
5062         if (ret < 0)
5063                 return ret;
5064
5065         ret = bus_register(&rbd_bus_type);
5066         if (ret < 0)
5067                 device_unregister(&rbd_root_dev);
5068
5069         return ret;
5070 }
5071
5072 static void rbd_sysfs_cleanup(void)
5073 {
5074         bus_unregister(&rbd_bus_type);
5075         device_unregister(&rbd_root_dev);
5076 }
5077
5078 static int __init rbd_init(void)
5079 {
5080         int rc;
5081
5082         if (!libceph_compatible(NULL)) {
5083                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5084
5085                 return -EINVAL;
5086         }
5087         rc = rbd_sysfs_init();
5088         if (rc)
5089                 return rc;
5090         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5091         return 0;
5092 }
5093
5094 static void __exit rbd_exit(void)
5095 {
5096         rbd_sysfs_cleanup();
5097 }
5098
5099 module_init(rbd_init);
5100 module_exit(rbd_exit);
5101
5102 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5103 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5104 MODULE_DESCRIPTION("rados block device");
5105
5106 /* following authorship retained from original osdblk.c */
5107 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5108
5109 MODULE_LICENSE("GPL");