]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: use snap_id not index to look up snap info
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
70
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX    64
74
75 #define RBD_OBJ_PREFIX_LEN_MAX  64
76
77 /* Feature bits */
78
79 #define RBD_FEATURE_LAYERING    (1<<0)
80 #define RBD_FEATURE_STRIPINGV2  (1<<1)
81 #define RBD_FEATURES_ALL \
82             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 /*
98  * block device image metadata (in-memory version)
99  */
100 struct rbd_image_header {
101         /* These four fields never change for a given rbd image */
102         char *object_prefix;
103         u64 features;
104         __u8 obj_order;
105         __u8 crypt_type;
106         __u8 comp_type;
107
108         /* The remaining fields need to be updated occasionally */
109         u64 image_size;
110         struct ceph_snap_context *snapc;
111         char *snap_names;
112         u64 *snap_sizes;
113
114         u64 stripe_unit;
115         u64 stripe_count;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         const char      *pool_name;
146
147         const char      *image_id;
148         const char      *image_name;
149
150         u64             snap_id;
151         const char      *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 enum obj_req_flags {
178         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
179         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
180         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
181         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
182 };
183
184 struct rbd_obj_request {
185         const char              *object_name;
186         u64                     offset;         /* object start byte */
187         u64                     length;         /* bytes from offset */
188         unsigned long           flags;
189
190         /*
191          * An object request associated with an image will have its
192          * img_data flag set; a standalone object request will not.
193          *
194          * A standalone object request will have which == BAD_WHICH
195          * and a null obj_request pointer.
196          *
197          * An object request initiated in support of a layered image
198          * object (to check for its existence before a write) will
199          * have which == BAD_WHICH and a non-null obj_request pointer.
200          *
201          * Finally, an object request for rbd image data will have
202          * which != BAD_WHICH, and will have a non-null img_request
203          * pointer.  The value of which will be in the range
204          * 0..(img_request->obj_request_count-1).
205          */
206         union {
207                 struct rbd_obj_request  *obj_request;   /* STAT op */
208                 struct {
209                         struct rbd_img_request  *img_request;
210                         u64                     img_offset;
211                         /* links for img_request->obj_requests list */
212                         struct list_head        links;
213                 };
214         };
215         u32                     which;          /* posn image request list */
216
217         enum obj_request_type   type;
218         union {
219                 struct bio      *bio_list;
220                 struct {
221                         struct page     **pages;
222                         u32             page_count;
223                 };
224         };
225         struct page             **copyup_pages;
226
227         struct ceph_osd_request *osd_req;
228
229         u64                     xferred;        /* bytes transferred */
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
436 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
437                                         u64 snap_id);
438
439 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 {
441         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
442         bool removing = false;
443
444         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
445                 return -EROFS;
446
447         spin_lock_irq(&rbd_dev->lock);
448         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
449                 removing = true;
450         else
451                 rbd_dev->open_count++;
452         spin_unlock_irq(&rbd_dev->lock);
453         if (removing)
454                 return -ENOENT;
455
456         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
457         (void) get_device(&rbd_dev->dev);
458         set_device_ro(bdev, rbd_dev->mapping.read_only);
459         mutex_unlock(&ctl_mutex);
460
461         return 0;
462 }
463
464 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 {
466         struct rbd_device *rbd_dev = disk->private_data;
467         unsigned long open_count_before;
468
469         spin_lock_irq(&rbd_dev->lock);
470         open_count_before = rbd_dev->open_count--;
471         spin_unlock_irq(&rbd_dev->lock);
472         rbd_assert(open_count_before > 0);
473
474         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
475         put_device(&rbd_dev->dev);
476         mutex_unlock(&ctl_mutex);
477
478         return 0;
479 }
480
481 static const struct block_device_operations rbd_bd_ops = {
482         .owner                  = THIS_MODULE,
483         .open                   = rbd_open,
484         .release                = rbd_release,
485 };
486
487 /*
488  * Initialize an rbd client instance.
489  * We own *ceph_opts.
490  */
491 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 {
493         struct rbd_client *rbdc;
494         int ret = -ENOMEM;
495
496         dout("%s:\n", __func__);
497         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
498         if (!rbdc)
499                 goto out_opt;
500
501         kref_init(&rbdc->kref);
502         INIT_LIST_HEAD(&rbdc->node);
503
504         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505
506         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
507         if (IS_ERR(rbdc->client))
508                 goto out_mutex;
509         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510
511         ret = ceph_open_session(rbdc->client);
512         if (ret < 0)
513                 goto out_err;
514
515         spin_lock(&rbd_client_list_lock);
516         list_add_tail(&rbdc->node, &rbd_client_list);
517         spin_unlock(&rbd_client_list_lock);
518
519         mutex_unlock(&ctl_mutex);
520         dout("%s: rbdc %p\n", __func__, rbdc);
521
522         return rbdc;
523
524 out_err:
525         ceph_destroy_client(rbdc->client);
526 out_mutex:
527         mutex_unlock(&ctl_mutex);
528         kfree(rbdc);
529 out_opt:
530         if (ceph_opts)
531                 ceph_destroy_options(ceph_opts);
532         dout("%s: error %d\n", __func__, ret);
533
534         return ERR_PTR(ret);
535 }
536
537 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 {
539         kref_get(&rbdc->kref);
540
541         return rbdc;
542 }
543
544 /*
545  * Find a ceph client with specific addr and configuration.  If
546  * found, bump its reference count.
547  */
548 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 {
550         struct rbd_client *client_node;
551         bool found = false;
552
553         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
554                 return NULL;
555
556         spin_lock(&rbd_client_list_lock);
557         list_for_each_entry(client_node, &rbd_client_list, node) {
558                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
559                         __rbd_get_client(client_node);
560
561                         found = true;
562                         break;
563                 }
564         }
565         spin_unlock(&rbd_client_list_lock);
566
567         return found ? client_node : NULL;
568 }
569
570 /*
571  * mount options
572  */
573 enum {
574         Opt_last_int,
575         /* int args above */
576         Opt_last_string,
577         /* string args above */
578         Opt_read_only,
579         Opt_read_write,
580         /* Boolean args above */
581         Opt_last_bool,
582 };
583
584 static match_table_t rbd_opts_tokens = {
585         /* int args above */
586         /* string args above */
587         {Opt_read_only, "read_only"},
588         {Opt_read_only, "ro"},          /* Alternate spelling */
589         {Opt_read_write, "read_write"},
590         {Opt_read_write, "rw"},         /* Alternate spelling */
591         /* Boolean args above */
592         {-1, NULL}
593 };
594
595 struct rbd_options {
596         bool    read_only;
597 };
598
599 #define RBD_READ_ONLY_DEFAULT   false
600
601 static int parse_rbd_opts_token(char *c, void *private)
602 {
603         struct rbd_options *rbd_opts = private;
604         substring_t argstr[MAX_OPT_ARGS];
605         int token, intval, ret;
606
607         token = match_token(c, rbd_opts_tokens, argstr);
608         if (token < 0)
609                 return -EINVAL;
610
611         if (token < Opt_last_int) {
612                 ret = match_int(&argstr[0], &intval);
613                 if (ret < 0) {
614                         pr_err("bad mount option arg (not int) "
615                                "at '%s'\n", c);
616                         return ret;
617                 }
618                 dout("got int token %d val %d\n", token, intval);
619         } else if (token > Opt_last_int && token < Opt_last_string) {
620                 dout("got string token %d val %s\n", token,
621                      argstr[0].from);
622         } else if (token > Opt_last_string && token < Opt_last_bool) {
623                 dout("got Boolean token %d\n", token);
624         } else {
625                 dout("got token %d\n", token);
626         }
627
628         switch (token) {
629         case Opt_read_only:
630                 rbd_opts->read_only = true;
631                 break;
632         case Opt_read_write:
633                 rbd_opts->read_only = false;
634                 break;
635         default:
636                 rbd_assert(false);
637                 break;
638         }
639         return 0;
640 }
641
642 /*
643  * Get a ceph client with specific addr and configuration, if one does
644  * not exist create it.
645  */
646 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 {
648         struct rbd_client *rbdc;
649
650         rbdc = rbd_client_find(ceph_opts);
651         if (rbdc)       /* using an existing client */
652                 ceph_destroy_options(ceph_opts);
653         else
654                 rbdc = rbd_client_create(ceph_opts);
655
656         return rbdc;
657 }
658
659 /*
660  * Destroy ceph client
661  *
662  * Caller must hold rbd_client_list_lock.
663  */
664 static void rbd_client_release(struct kref *kref)
665 {
666         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667
668         dout("%s: rbdc %p\n", __func__, rbdc);
669         spin_lock(&rbd_client_list_lock);
670         list_del(&rbdc->node);
671         spin_unlock(&rbd_client_list_lock);
672
673         ceph_destroy_client(rbdc->client);
674         kfree(rbdc);
675 }
676
677 /*
678  * Drop reference to ceph client node. If it's not referenced anymore, release
679  * it.
680  */
681 static void rbd_put_client(struct rbd_client *rbdc)
682 {
683         if (rbdc)
684                 kref_put(&rbdc->kref, rbd_client_release);
685 }
686
687 static bool rbd_image_format_valid(u32 image_format)
688 {
689         return image_format == 1 || image_format == 2;
690 }
691
692 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
693 {
694         size_t size;
695         u32 snap_count;
696
697         /* The header has to start with the magic rbd header text */
698         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
699                 return false;
700
701         /* The bio layer requires at least sector-sized I/O */
702
703         if (ondisk->options.order < SECTOR_SHIFT)
704                 return false;
705
706         /* If we use u64 in a few spots we may be able to loosen this */
707
708         if (ondisk->options.order > 8 * sizeof (int) - 1)
709                 return false;
710
711         /*
712          * The size of a snapshot header has to fit in a size_t, and
713          * that limits the number of snapshots.
714          */
715         snap_count = le32_to_cpu(ondisk->snap_count);
716         size = SIZE_MAX - sizeof (struct ceph_snap_context);
717         if (snap_count > size / sizeof (__le64))
718                 return false;
719
720         /*
721          * Not only that, but the size of the entire the snapshot
722          * header must also be representable in a size_t.
723          */
724         size -= snap_count * sizeof (__le64);
725         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
726                 return false;
727
728         return true;
729 }
730
731 /*
732  * Create a new header structure, translate header format from the on-disk
733  * header.
734  */
735 static int rbd_header_from_disk(struct rbd_image_header *header,
736                                  struct rbd_image_header_ondisk *ondisk)
737 {
738         u32 snap_count;
739         size_t len;
740         size_t size;
741         u32 i;
742
743         memset(header, 0, sizeof (*header));
744
745         snap_count = le32_to_cpu(ondisk->snap_count);
746
747         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
748         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
749         if (!header->object_prefix)
750                 return -ENOMEM;
751         memcpy(header->object_prefix, ondisk->object_prefix, len);
752         header->object_prefix[len] = '\0';
753
754         if (snap_count) {
755                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
756
757                 /* Save a copy of the snapshot names */
758
759                 if (snap_names_len > (u64) SIZE_MAX)
760                         return -EIO;
761                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
762                 if (!header->snap_names)
763                         goto out_err;
764                 /*
765                  * Note that rbd_dev_v1_header_read() guarantees
766                  * the ondisk buffer we're working with has
767                  * snap_names_len bytes beyond the end of the
768                  * snapshot id array, this memcpy() is safe.
769                  */
770                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
771                         snap_names_len);
772
773                 /* Record each snapshot's size */
774
775                 size = snap_count * sizeof (*header->snap_sizes);
776                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
777                 if (!header->snap_sizes)
778                         goto out_err;
779                 for (i = 0; i < snap_count; i++)
780                         header->snap_sizes[i] =
781                                 le64_to_cpu(ondisk->snaps[i].image_size);
782         } else {
783                 header->snap_names = NULL;
784                 header->snap_sizes = NULL;
785         }
786
787         header->features = 0;   /* No features support in v1 images */
788         header->obj_order = ondisk->options.order;
789         header->crypt_type = ondisk->options.crypt_type;
790         header->comp_type = ondisk->options.comp_type;
791
792         /* Allocate and fill in the snapshot context */
793
794         header->image_size = le64_to_cpu(ondisk->image_size);
795
796         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
797         if (!header->snapc)
798                 goto out_err;
799         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
800         for (i = 0; i < snap_count; i++)
801                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
802
803         return 0;
804
805 out_err:
806         kfree(header->snap_sizes);
807         header->snap_sizes = NULL;
808         kfree(header->snap_names);
809         header->snap_names = NULL;
810         kfree(header->object_prefix);
811         header->object_prefix = NULL;
812
813         return -ENOMEM;
814 }
815
816 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
817 {
818         const char *snap_name;
819
820         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
821
822         /* Skip over names until we find the one we are looking for */
823
824         snap_name = rbd_dev->header.snap_names;
825         while (which--)
826                 snap_name += strlen(snap_name) + 1;
827
828         return kstrdup(snap_name, GFP_KERNEL);
829 }
830
831 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
832 {
833         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
834         u32 which;
835
836         for (which = 0; which < snapc->num_snaps; which++)
837                 if (snapc->snaps[which] == snap_id)
838                         return which;
839
840         return BAD_SNAP_INDEX;
841 }
842
843 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
844 {
845         u32 which;
846
847         which = rbd_dev_snap_index(rbd_dev, snap_id);
848         if (which == BAD_SNAP_INDEX)
849                 return NULL;
850
851         return _rbd_dev_v1_snap_name(rbd_dev, which);
852 }
853
854 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
855 {
856         if (snap_id == CEPH_NOSNAP)
857                 return RBD_SNAP_HEAD_NAME;
858
859         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
860         if (rbd_dev->image_format == 1)
861                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
862
863         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
864 }
865
866 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
867                                         const char *snap_name)
868 {
869         struct rbd_snap *snap;
870
871         list_for_each_entry(snap, &rbd_dev->snaps, node)
872                 if (!strcmp(snap_name, snap->name))
873                         return snap;
874
875         return NULL;
876 }
877
878 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
879 {
880         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
881                     sizeof (RBD_SNAP_HEAD_NAME))) {
882                 rbd_dev->mapping.size = rbd_dev->header.image_size;
883                 rbd_dev->mapping.features = rbd_dev->header.features;
884         } else {
885                 struct rbd_snap *snap;
886
887                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
888                 if (!snap)
889                         return -ENOENT;
890                 rbd_dev->mapping.size = snap->size;
891                 rbd_dev->mapping.features = snap->features;
892                 rbd_dev->mapping.read_only = true;
893         }
894
895         return 0;
896 }
897
898 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
899 {
900         rbd_dev->mapping.size = 0;
901         rbd_dev->mapping.features = 0;
902         rbd_dev->mapping.read_only = true;
903 }
904
905 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
906 {
907         rbd_dev->mapping.size = 0;
908         rbd_dev->mapping.features = 0;
909         rbd_dev->mapping.read_only = true;
910 }
911
912 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
913 {
914         char *name;
915         u64 segment;
916         int ret;
917
918         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
919         if (!name)
920                 return NULL;
921         segment = offset >> rbd_dev->header.obj_order;
922         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
923                         rbd_dev->header.object_prefix, segment);
924         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
925                 pr_err("error formatting segment name for #%llu (%d)\n",
926                         segment, ret);
927                 kfree(name);
928                 name = NULL;
929         }
930
931         return name;
932 }
933
934 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
935 {
936         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
937
938         return offset & (segment_size - 1);
939 }
940
941 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
942                                 u64 offset, u64 length)
943 {
944         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
945
946         offset &= segment_size - 1;
947
948         rbd_assert(length <= U64_MAX - offset);
949         if (offset + length > segment_size)
950                 length = segment_size - offset;
951
952         return length;
953 }
954
955 /*
956  * returns the size of an object in the image
957  */
958 static u64 rbd_obj_bytes(struct rbd_image_header *header)
959 {
960         return 1 << header->obj_order;
961 }
962
963 /*
964  * bio helpers
965  */
966
967 static void bio_chain_put(struct bio *chain)
968 {
969         struct bio *tmp;
970
971         while (chain) {
972                 tmp = chain;
973                 chain = chain->bi_next;
974                 bio_put(tmp);
975         }
976 }
977
978 /*
979  * zeros a bio chain, starting at specific offset
980  */
981 static void zero_bio_chain(struct bio *chain, int start_ofs)
982 {
983         struct bio_vec *bv;
984         unsigned long flags;
985         void *buf;
986         int i;
987         int pos = 0;
988
989         while (chain) {
990                 bio_for_each_segment(bv, chain, i) {
991                         if (pos + bv->bv_len > start_ofs) {
992                                 int remainder = max(start_ofs - pos, 0);
993                                 buf = bvec_kmap_irq(bv, &flags);
994                                 memset(buf + remainder, 0,
995                                        bv->bv_len - remainder);
996                                 bvec_kunmap_irq(buf, &flags);
997                         }
998                         pos += bv->bv_len;
999                 }
1000
1001                 chain = chain->bi_next;
1002         }
1003 }
1004
1005 /*
1006  * similar to zero_bio_chain(), zeros data defined by a page array,
1007  * starting at the given byte offset from the start of the array and
1008  * continuing up to the given end offset.  The pages array is
1009  * assumed to be big enough to hold all bytes up to the end.
1010  */
1011 static void zero_pages(struct page **pages, u64 offset, u64 end)
1012 {
1013         struct page **page = &pages[offset >> PAGE_SHIFT];
1014
1015         rbd_assert(end > offset);
1016         rbd_assert(end - offset <= (u64)SIZE_MAX);
1017         while (offset < end) {
1018                 size_t page_offset;
1019                 size_t length;
1020                 unsigned long flags;
1021                 void *kaddr;
1022
1023                 page_offset = (size_t)(offset & ~PAGE_MASK);
1024                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1025                 local_irq_save(flags);
1026                 kaddr = kmap_atomic(*page);
1027                 memset(kaddr + page_offset, 0, length);
1028                 kunmap_atomic(kaddr);
1029                 local_irq_restore(flags);
1030
1031                 offset += length;
1032                 page++;
1033         }
1034 }
1035
1036 /*
1037  * Clone a portion of a bio, starting at the given byte offset
1038  * and continuing for the number of bytes indicated.
1039  */
1040 static struct bio *bio_clone_range(struct bio *bio_src,
1041                                         unsigned int offset,
1042                                         unsigned int len,
1043                                         gfp_t gfpmask)
1044 {
1045         struct bio_vec *bv;
1046         unsigned int resid;
1047         unsigned short idx;
1048         unsigned int voff;
1049         unsigned short end_idx;
1050         unsigned short vcnt;
1051         struct bio *bio;
1052
1053         /* Handle the easy case for the caller */
1054
1055         if (!offset && len == bio_src->bi_size)
1056                 return bio_clone(bio_src, gfpmask);
1057
1058         if (WARN_ON_ONCE(!len))
1059                 return NULL;
1060         if (WARN_ON_ONCE(len > bio_src->bi_size))
1061                 return NULL;
1062         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1063                 return NULL;
1064
1065         /* Find first affected segment... */
1066
1067         resid = offset;
1068         __bio_for_each_segment(bv, bio_src, idx, 0) {
1069                 if (resid < bv->bv_len)
1070                         break;
1071                 resid -= bv->bv_len;
1072         }
1073         voff = resid;
1074
1075         /* ...and the last affected segment */
1076
1077         resid += len;
1078         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1079                 if (resid <= bv->bv_len)
1080                         break;
1081                 resid -= bv->bv_len;
1082         }
1083         vcnt = end_idx - idx + 1;
1084
1085         /* Build the clone */
1086
1087         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1088         if (!bio)
1089                 return NULL;    /* ENOMEM */
1090
1091         bio->bi_bdev = bio_src->bi_bdev;
1092         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1093         bio->bi_rw = bio_src->bi_rw;
1094         bio->bi_flags |= 1 << BIO_CLONED;
1095
1096         /*
1097          * Copy over our part of the bio_vec, then update the first
1098          * and last (or only) entries.
1099          */
1100         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1101                         vcnt * sizeof (struct bio_vec));
1102         bio->bi_io_vec[0].bv_offset += voff;
1103         if (vcnt > 1) {
1104                 bio->bi_io_vec[0].bv_len -= voff;
1105                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1106         } else {
1107                 bio->bi_io_vec[0].bv_len = len;
1108         }
1109
1110         bio->bi_vcnt = vcnt;
1111         bio->bi_size = len;
1112         bio->bi_idx = 0;
1113
1114         return bio;
1115 }
1116
1117 /*
1118  * Clone a portion of a bio chain, starting at the given byte offset
1119  * into the first bio in the source chain and continuing for the
1120  * number of bytes indicated.  The result is another bio chain of
1121  * exactly the given length, or a null pointer on error.
1122  *
1123  * The bio_src and offset parameters are both in-out.  On entry they
1124  * refer to the first source bio and the offset into that bio where
1125  * the start of data to be cloned is located.
1126  *
1127  * On return, bio_src is updated to refer to the bio in the source
1128  * chain that contains first un-cloned byte, and *offset will
1129  * contain the offset of that byte within that bio.
1130  */
1131 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1132                                         unsigned int *offset,
1133                                         unsigned int len,
1134                                         gfp_t gfpmask)
1135 {
1136         struct bio *bi = *bio_src;
1137         unsigned int off = *offset;
1138         struct bio *chain = NULL;
1139         struct bio **end;
1140
1141         /* Build up a chain of clone bios up to the limit */
1142
1143         if (!bi || off >= bi->bi_size || !len)
1144                 return NULL;            /* Nothing to clone */
1145
1146         end = &chain;
1147         while (len) {
1148                 unsigned int bi_size;
1149                 struct bio *bio;
1150
1151                 if (!bi) {
1152                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1153                         goto out_err;   /* EINVAL; ran out of bio's */
1154                 }
1155                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1156                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1157                 if (!bio)
1158                         goto out_err;   /* ENOMEM */
1159
1160                 *end = bio;
1161                 end = &bio->bi_next;
1162
1163                 off += bi_size;
1164                 if (off == bi->bi_size) {
1165                         bi = bi->bi_next;
1166                         off = 0;
1167                 }
1168                 len -= bi_size;
1169         }
1170         *bio_src = bi;
1171         *offset = off;
1172
1173         return chain;
1174 out_err:
1175         bio_chain_put(chain);
1176
1177         return NULL;
1178 }
1179
1180 /*
1181  * The default/initial value for all object request flags is 0.  For
1182  * each flag, once its value is set to 1 it is never reset to 0
1183  * again.
1184  */
1185 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1186 {
1187         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1188                 struct rbd_device *rbd_dev;
1189
1190                 rbd_dev = obj_request->img_request->rbd_dev;
1191                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1192                         obj_request);
1193         }
1194 }
1195
1196 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1197 {
1198         smp_mb();
1199         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1200 }
1201
1202 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1203 {
1204         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1205                 struct rbd_device *rbd_dev = NULL;
1206
1207                 if (obj_request_img_data_test(obj_request))
1208                         rbd_dev = obj_request->img_request->rbd_dev;
1209                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1210                         obj_request);
1211         }
1212 }
1213
1214 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1215 {
1216         smp_mb();
1217         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1218 }
1219
1220 /*
1221  * This sets the KNOWN flag after (possibly) setting the EXISTS
1222  * flag.  The latter is set based on the "exists" value provided.
1223  *
1224  * Note that for our purposes once an object exists it never goes
1225  * away again.  It's possible that the response from two existence
1226  * checks are separated by the creation of the target object, and
1227  * the first ("doesn't exist") response arrives *after* the second
1228  * ("does exist").  In that case we ignore the second one.
1229  */
1230 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1231                                 bool exists)
1232 {
1233         if (exists)
1234                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1235         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1236         smp_mb();
1237 }
1238
1239 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1240 {
1241         smp_mb();
1242         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1243 }
1244
1245 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1246 {
1247         smp_mb();
1248         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1249 }
1250
1251 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1252 {
1253         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1254                 atomic_read(&obj_request->kref.refcount));
1255         kref_get(&obj_request->kref);
1256 }
1257
1258 static void rbd_obj_request_destroy(struct kref *kref);
1259 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1260 {
1261         rbd_assert(obj_request != NULL);
1262         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1263                 atomic_read(&obj_request->kref.refcount));
1264         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1265 }
1266
1267 static void rbd_img_request_get(struct rbd_img_request *img_request)
1268 {
1269         dout("%s: img %p (was %d)\n", __func__, img_request,
1270                 atomic_read(&img_request->kref.refcount));
1271         kref_get(&img_request->kref);
1272 }
1273
1274 static void rbd_img_request_destroy(struct kref *kref);
1275 static void rbd_img_request_put(struct rbd_img_request *img_request)
1276 {
1277         rbd_assert(img_request != NULL);
1278         dout("%s: img %p (was %d)\n", __func__, img_request,
1279                 atomic_read(&img_request->kref.refcount));
1280         kref_put(&img_request->kref, rbd_img_request_destroy);
1281 }
1282
1283 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1284                                         struct rbd_obj_request *obj_request)
1285 {
1286         rbd_assert(obj_request->img_request == NULL);
1287
1288         /* Image request now owns object's original reference */
1289         obj_request->img_request = img_request;
1290         obj_request->which = img_request->obj_request_count;
1291         rbd_assert(!obj_request_img_data_test(obj_request));
1292         obj_request_img_data_set(obj_request);
1293         rbd_assert(obj_request->which != BAD_WHICH);
1294         img_request->obj_request_count++;
1295         list_add_tail(&obj_request->links, &img_request->obj_requests);
1296         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1297                 obj_request->which);
1298 }
1299
1300 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1301                                         struct rbd_obj_request *obj_request)
1302 {
1303         rbd_assert(obj_request->which != BAD_WHICH);
1304
1305         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1306                 obj_request->which);
1307         list_del(&obj_request->links);
1308         rbd_assert(img_request->obj_request_count > 0);
1309         img_request->obj_request_count--;
1310         rbd_assert(obj_request->which == img_request->obj_request_count);
1311         obj_request->which = BAD_WHICH;
1312         rbd_assert(obj_request_img_data_test(obj_request));
1313         rbd_assert(obj_request->img_request == img_request);
1314         obj_request->img_request = NULL;
1315         obj_request->callback = NULL;
1316         rbd_obj_request_put(obj_request);
1317 }
1318
1319 static bool obj_request_type_valid(enum obj_request_type type)
1320 {
1321         switch (type) {
1322         case OBJ_REQUEST_NODATA:
1323         case OBJ_REQUEST_BIO:
1324         case OBJ_REQUEST_PAGES:
1325                 return true;
1326         default:
1327                 return false;
1328         }
1329 }
1330
1331 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1332                                 struct rbd_obj_request *obj_request)
1333 {
1334         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1335
1336         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1337 }
1338
1339 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1340 {
1341
1342         dout("%s: img %p\n", __func__, img_request);
1343
1344         /*
1345          * If no error occurred, compute the aggregate transfer
1346          * count for the image request.  We could instead use
1347          * atomic64_cmpxchg() to update it as each object request
1348          * completes; not clear which way is better off hand.
1349          */
1350         if (!img_request->result) {
1351                 struct rbd_obj_request *obj_request;
1352                 u64 xferred = 0;
1353
1354                 for_each_obj_request(img_request, obj_request)
1355                         xferred += obj_request->xferred;
1356                 img_request->xferred = xferred;
1357         }
1358
1359         if (img_request->callback)
1360                 img_request->callback(img_request);
1361         else
1362                 rbd_img_request_put(img_request);
1363 }
1364
1365 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1366
1367 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1368 {
1369         dout("%s: obj %p\n", __func__, obj_request);
1370
1371         return wait_for_completion_interruptible(&obj_request->completion);
1372 }
1373
1374 /*
1375  * The default/initial value for all image request flags is 0.  Each
1376  * is conditionally set to 1 at image request initialization time
1377  * and currently never change thereafter.
1378  */
1379 static void img_request_write_set(struct rbd_img_request *img_request)
1380 {
1381         set_bit(IMG_REQ_WRITE, &img_request->flags);
1382         smp_mb();
1383 }
1384
1385 static bool img_request_write_test(struct rbd_img_request *img_request)
1386 {
1387         smp_mb();
1388         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1389 }
1390
1391 static void img_request_child_set(struct rbd_img_request *img_request)
1392 {
1393         set_bit(IMG_REQ_CHILD, &img_request->flags);
1394         smp_mb();
1395 }
1396
1397 static bool img_request_child_test(struct rbd_img_request *img_request)
1398 {
1399         smp_mb();
1400         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1401 }
1402
1403 static void img_request_layered_set(struct rbd_img_request *img_request)
1404 {
1405         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1406         smp_mb();
1407 }
1408
1409 static bool img_request_layered_test(struct rbd_img_request *img_request)
1410 {
1411         smp_mb();
1412         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1413 }
1414
1415 static void
1416 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1417 {
1418         u64 xferred = obj_request->xferred;
1419         u64 length = obj_request->length;
1420
1421         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1422                 obj_request, obj_request->img_request, obj_request->result,
1423                 xferred, length);
1424         /*
1425          * ENOENT means a hole in the image.  We zero-fill the
1426          * entire length of the request.  A short read also implies
1427          * zero-fill to the end of the request.  Either way we
1428          * update the xferred count to indicate the whole request
1429          * was satisfied.
1430          */
1431         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1432         if (obj_request->result == -ENOENT) {
1433                 if (obj_request->type == OBJ_REQUEST_BIO)
1434                         zero_bio_chain(obj_request->bio_list, 0);
1435                 else
1436                         zero_pages(obj_request->pages, 0, length);
1437                 obj_request->result = 0;
1438                 obj_request->xferred = length;
1439         } else if (xferred < length && !obj_request->result) {
1440                 if (obj_request->type == OBJ_REQUEST_BIO)
1441                         zero_bio_chain(obj_request->bio_list, xferred);
1442                 else
1443                         zero_pages(obj_request->pages, xferred, length);
1444                 obj_request->xferred = length;
1445         }
1446         obj_request_done_set(obj_request);
1447 }
1448
1449 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1450 {
1451         dout("%s: obj %p cb %p\n", __func__, obj_request,
1452                 obj_request->callback);
1453         if (obj_request->callback)
1454                 obj_request->callback(obj_request);
1455         else
1456                 complete_all(&obj_request->completion);
1457 }
1458
1459 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1460 {
1461         dout("%s: obj %p\n", __func__, obj_request);
1462         obj_request_done_set(obj_request);
1463 }
1464
1465 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1466 {
1467         struct rbd_img_request *img_request = NULL;
1468         struct rbd_device *rbd_dev = NULL;
1469         bool layered = false;
1470
1471         if (obj_request_img_data_test(obj_request)) {
1472                 img_request = obj_request->img_request;
1473                 layered = img_request && img_request_layered_test(img_request);
1474                 rbd_dev = img_request->rbd_dev;
1475         }
1476
1477         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1478                 obj_request, img_request, obj_request->result,
1479                 obj_request->xferred, obj_request->length);
1480         if (layered && obj_request->result == -ENOENT &&
1481                         obj_request->img_offset < rbd_dev->parent_overlap)
1482                 rbd_img_parent_read(obj_request);
1483         else if (img_request)
1484                 rbd_img_obj_request_read_callback(obj_request);
1485         else
1486                 obj_request_done_set(obj_request);
1487 }
1488
1489 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1490 {
1491         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1492                 obj_request->result, obj_request->length);
1493         /*
1494          * There is no such thing as a successful short write.  Set
1495          * it to our originally-requested length.
1496          */
1497         obj_request->xferred = obj_request->length;
1498         obj_request_done_set(obj_request);
1499 }
1500
1501 /*
1502  * For a simple stat call there's nothing to do.  We'll do more if
1503  * this is part of a write sequence for a layered image.
1504  */
1505 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1506 {
1507         dout("%s: obj %p\n", __func__, obj_request);
1508         obj_request_done_set(obj_request);
1509 }
1510
1511 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1512                                 struct ceph_msg *msg)
1513 {
1514         struct rbd_obj_request *obj_request = osd_req->r_priv;
1515         u16 opcode;
1516
1517         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1518         rbd_assert(osd_req == obj_request->osd_req);
1519         if (obj_request_img_data_test(obj_request)) {
1520                 rbd_assert(obj_request->img_request);
1521                 rbd_assert(obj_request->which != BAD_WHICH);
1522         } else {
1523                 rbd_assert(obj_request->which == BAD_WHICH);
1524         }
1525
1526         if (osd_req->r_result < 0)
1527                 obj_request->result = osd_req->r_result;
1528
1529         BUG_ON(osd_req->r_num_ops > 2);
1530
1531         /*
1532          * We support a 64-bit length, but ultimately it has to be
1533          * passed to blk_end_request(), which takes an unsigned int.
1534          */
1535         obj_request->xferred = osd_req->r_reply_op_len[0];
1536         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1537         opcode = osd_req->r_ops[0].op;
1538         switch (opcode) {
1539         case CEPH_OSD_OP_READ:
1540                 rbd_osd_read_callback(obj_request);
1541                 break;
1542         case CEPH_OSD_OP_WRITE:
1543                 rbd_osd_write_callback(obj_request);
1544                 break;
1545         case CEPH_OSD_OP_STAT:
1546                 rbd_osd_stat_callback(obj_request);
1547                 break;
1548         case CEPH_OSD_OP_CALL:
1549         case CEPH_OSD_OP_NOTIFY_ACK:
1550         case CEPH_OSD_OP_WATCH:
1551                 rbd_osd_trivial_callback(obj_request);
1552                 break;
1553         default:
1554                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1555                         obj_request->object_name, (unsigned short) opcode);
1556                 break;
1557         }
1558
1559         if (obj_request_done_test(obj_request))
1560                 rbd_obj_request_complete(obj_request);
1561 }
1562
1563 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1564 {
1565         struct rbd_img_request *img_request = obj_request->img_request;
1566         struct ceph_osd_request *osd_req = obj_request->osd_req;
1567         u64 snap_id;
1568
1569         rbd_assert(osd_req != NULL);
1570
1571         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1572         ceph_osdc_build_request(osd_req, obj_request->offset,
1573                         NULL, snap_id, NULL);
1574 }
1575
1576 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1577 {
1578         struct rbd_img_request *img_request = obj_request->img_request;
1579         struct ceph_osd_request *osd_req = obj_request->osd_req;
1580         struct ceph_snap_context *snapc;
1581         struct timespec mtime = CURRENT_TIME;
1582
1583         rbd_assert(osd_req != NULL);
1584
1585         snapc = img_request ? img_request->snapc : NULL;
1586         ceph_osdc_build_request(osd_req, obj_request->offset,
1587                         snapc, CEPH_NOSNAP, &mtime);
1588 }
1589
1590 static struct ceph_osd_request *rbd_osd_req_create(
1591                                         struct rbd_device *rbd_dev,
1592                                         bool write_request,
1593                                         struct rbd_obj_request *obj_request)
1594 {
1595         struct ceph_snap_context *snapc = NULL;
1596         struct ceph_osd_client *osdc;
1597         struct ceph_osd_request *osd_req;
1598
1599         if (obj_request_img_data_test(obj_request)) {
1600                 struct rbd_img_request *img_request = obj_request->img_request;
1601
1602                 rbd_assert(write_request ==
1603                                 img_request_write_test(img_request));
1604                 if (write_request)
1605                         snapc = img_request->snapc;
1606         }
1607
1608         /* Allocate and initialize the request, for the single op */
1609
1610         osdc = &rbd_dev->rbd_client->client->osdc;
1611         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1612         if (!osd_req)
1613                 return NULL;    /* ENOMEM */
1614
1615         if (write_request)
1616                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1617         else
1618                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1619
1620         osd_req->r_callback = rbd_osd_req_callback;
1621         osd_req->r_priv = obj_request;
1622
1623         osd_req->r_oid_len = strlen(obj_request->object_name);
1624         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1625         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1626
1627         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1628
1629         return osd_req;
1630 }
1631
1632 /*
1633  * Create a copyup osd request based on the information in the
1634  * object request supplied.  A copyup request has two osd ops,
1635  * a copyup method call, and a "normal" write request.
1636  */
1637 static struct ceph_osd_request *
1638 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1639 {
1640         struct rbd_img_request *img_request;
1641         struct ceph_snap_context *snapc;
1642         struct rbd_device *rbd_dev;
1643         struct ceph_osd_client *osdc;
1644         struct ceph_osd_request *osd_req;
1645
1646         rbd_assert(obj_request_img_data_test(obj_request));
1647         img_request = obj_request->img_request;
1648         rbd_assert(img_request);
1649         rbd_assert(img_request_write_test(img_request));
1650
1651         /* Allocate and initialize the request, for the two ops */
1652
1653         snapc = img_request->snapc;
1654         rbd_dev = img_request->rbd_dev;
1655         osdc = &rbd_dev->rbd_client->client->osdc;
1656         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1657         if (!osd_req)
1658                 return NULL;    /* ENOMEM */
1659
1660         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1661         osd_req->r_callback = rbd_osd_req_callback;
1662         osd_req->r_priv = obj_request;
1663
1664         osd_req->r_oid_len = strlen(obj_request->object_name);
1665         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1666         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1667
1668         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1669
1670         return osd_req;
1671 }
1672
1673
1674 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1675 {
1676         ceph_osdc_put_request(osd_req);
1677 }
1678
1679 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1680
1681 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1682                                                 u64 offset, u64 length,
1683                                                 enum obj_request_type type)
1684 {
1685         struct rbd_obj_request *obj_request;
1686         size_t size;
1687         char *name;
1688
1689         rbd_assert(obj_request_type_valid(type));
1690
1691         size = strlen(object_name) + 1;
1692         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1693         if (!obj_request)
1694                 return NULL;
1695
1696         name = (char *)(obj_request + 1);
1697         obj_request->object_name = memcpy(name, object_name, size);
1698         obj_request->offset = offset;
1699         obj_request->length = length;
1700         obj_request->flags = 0;
1701         obj_request->which = BAD_WHICH;
1702         obj_request->type = type;
1703         INIT_LIST_HEAD(&obj_request->links);
1704         init_completion(&obj_request->completion);
1705         kref_init(&obj_request->kref);
1706
1707         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1708                 offset, length, (int)type, obj_request);
1709
1710         return obj_request;
1711 }
1712
1713 static void rbd_obj_request_destroy(struct kref *kref)
1714 {
1715         struct rbd_obj_request *obj_request;
1716
1717         obj_request = container_of(kref, struct rbd_obj_request, kref);
1718
1719         dout("%s: obj %p\n", __func__, obj_request);
1720
1721         rbd_assert(obj_request->img_request == NULL);
1722         rbd_assert(obj_request->which == BAD_WHICH);
1723
1724         if (obj_request->osd_req)
1725                 rbd_osd_req_destroy(obj_request->osd_req);
1726
1727         rbd_assert(obj_request_type_valid(obj_request->type));
1728         switch (obj_request->type) {
1729         case OBJ_REQUEST_NODATA:
1730                 break;          /* Nothing to do */
1731         case OBJ_REQUEST_BIO:
1732                 if (obj_request->bio_list)
1733                         bio_chain_put(obj_request->bio_list);
1734                 break;
1735         case OBJ_REQUEST_PAGES:
1736                 if (obj_request->pages)
1737                         ceph_release_page_vector(obj_request->pages,
1738                                                 obj_request->page_count);
1739                 break;
1740         }
1741
1742         kfree(obj_request);
1743 }
1744
1745 /*
1746  * Caller is responsible for filling in the list of object requests
1747  * that comprises the image request, and the Linux request pointer
1748  * (if there is one).
1749  */
1750 static struct rbd_img_request *rbd_img_request_create(
1751                                         struct rbd_device *rbd_dev,
1752                                         u64 offset, u64 length,
1753                                         bool write_request,
1754                                         bool child_request)
1755 {
1756         struct rbd_img_request *img_request;
1757
1758         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1759         if (!img_request)
1760                 return NULL;
1761
1762         if (write_request) {
1763                 down_read(&rbd_dev->header_rwsem);
1764                 ceph_get_snap_context(rbd_dev->header.snapc);
1765                 up_read(&rbd_dev->header_rwsem);
1766         }
1767
1768         img_request->rq = NULL;
1769         img_request->rbd_dev = rbd_dev;
1770         img_request->offset = offset;
1771         img_request->length = length;
1772         img_request->flags = 0;
1773         if (write_request) {
1774                 img_request_write_set(img_request);
1775                 img_request->snapc = rbd_dev->header.snapc;
1776         } else {
1777                 img_request->snap_id = rbd_dev->spec->snap_id;
1778         }
1779         if (child_request)
1780                 img_request_child_set(img_request);
1781         if (rbd_dev->parent_spec)
1782                 img_request_layered_set(img_request);
1783         spin_lock_init(&img_request->completion_lock);
1784         img_request->next_completion = 0;
1785         img_request->callback = NULL;
1786         img_request->result = 0;
1787         img_request->obj_request_count = 0;
1788         INIT_LIST_HEAD(&img_request->obj_requests);
1789         kref_init(&img_request->kref);
1790
1791         rbd_img_request_get(img_request);       /* Avoid a warning */
1792         rbd_img_request_put(img_request);       /* TEMPORARY */
1793
1794         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1795                 write_request ? "write" : "read", offset, length,
1796                 img_request);
1797
1798         return img_request;
1799 }
1800
1801 static void rbd_img_request_destroy(struct kref *kref)
1802 {
1803         struct rbd_img_request *img_request;
1804         struct rbd_obj_request *obj_request;
1805         struct rbd_obj_request *next_obj_request;
1806
1807         img_request = container_of(kref, struct rbd_img_request, kref);
1808
1809         dout("%s: img %p\n", __func__, img_request);
1810
1811         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1812                 rbd_img_obj_request_del(img_request, obj_request);
1813         rbd_assert(img_request->obj_request_count == 0);
1814
1815         if (img_request_write_test(img_request))
1816                 ceph_put_snap_context(img_request->snapc);
1817
1818         if (img_request_child_test(img_request))
1819                 rbd_obj_request_put(img_request->obj_request);
1820
1821         kfree(img_request);
1822 }
1823
1824 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1825 {
1826         struct rbd_img_request *img_request;
1827         unsigned int xferred;
1828         int result;
1829         bool more;
1830
1831         rbd_assert(obj_request_img_data_test(obj_request));
1832         img_request = obj_request->img_request;
1833
1834         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1835         xferred = (unsigned int)obj_request->xferred;
1836         result = obj_request->result;
1837         if (result) {
1838                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1839
1840                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1841                         img_request_write_test(img_request) ? "write" : "read",
1842                         obj_request->length, obj_request->img_offset,
1843                         obj_request->offset);
1844                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1845                         result, xferred);
1846                 if (!img_request->result)
1847                         img_request->result = result;
1848         }
1849
1850         /* Image object requests don't own their page array */
1851
1852         if (obj_request->type == OBJ_REQUEST_PAGES) {
1853                 obj_request->pages = NULL;
1854                 obj_request->page_count = 0;
1855         }
1856
1857         if (img_request_child_test(img_request)) {
1858                 rbd_assert(img_request->obj_request != NULL);
1859                 more = obj_request->which < img_request->obj_request_count - 1;
1860         } else {
1861                 rbd_assert(img_request->rq != NULL);
1862                 more = blk_end_request(img_request->rq, result, xferred);
1863         }
1864
1865         return more;
1866 }
1867
1868 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1869 {
1870         struct rbd_img_request *img_request;
1871         u32 which = obj_request->which;
1872         bool more = true;
1873
1874         rbd_assert(obj_request_img_data_test(obj_request));
1875         img_request = obj_request->img_request;
1876
1877         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1878         rbd_assert(img_request != NULL);
1879         rbd_assert(img_request->obj_request_count > 0);
1880         rbd_assert(which != BAD_WHICH);
1881         rbd_assert(which < img_request->obj_request_count);
1882         rbd_assert(which >= img_request->next_completion);
1883
1884         spin_lock_irq(&img_request->completion_lock);
1885         if (which != img_request->next_completion)
1886                 goto out;
1887
1888         for_each_obj_request_from(img_request, obj_request) {
1889                 rbd_assert(more);
1890                 rbd_assert(which < img_request->obj_request_count);
1891
1892                 if (!obj_request_done_test(obj_request))
1893                         break;
1894                 more = rbd_img_obj_end_request(obj_request);
1895                 which++;
1896         }
1897
1898         rbd_assert(more ^ (which == img_request->obj_request_count));
1899         img_request->next_completion = which;
1900 out:
1901         spin_unlock_irq(&img_request->completion_lock);
1902
1903         if (!more)
1904                 rbd_img_request_complete(img_request);
1905 }
1906
1907 /*
1908  * Split up an image request into one or more object requests, each
1909  * to a different object.  The "type" parameter indicates whether
1910  * "data_desc" is the pointer to the head of a list of bio
1911  * structures, or the base of a page array.  In either case this
1912  * function assumes data_desc describes memory sufficient to hold
1913  * all data described by the image request.
1914  */
1915 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1916                                         enum obj_request_type type,
1917                                         void *data_desc)
1918 {
1919         struct rbd_device *rbd_dev = img_request->rbd_dev;
1920         struct rbd_obj_request *obj_request = NULL;
1921         struct rbd_obj_request *next_obj_request;
1922         bool write_request = img_request_write_test(img_request);
1923         struct bio *bio_list;
1924         unsigned int bio_offset = 0;
1925         struct page **pages;
1926         u64 img_offset;
1927         u64 resid;
1928         u16 opcode;
1929
1930         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1931                 (int)type, data_desc);
1932
1933         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1934         img_offset = img_request->offset;
1935         resid = img_request->length;
1936         rbd_assert(resid > 0);
1937
1938         if (type == OBJ_REQUEST_BIO) {
1939                 bio_list = data_desc;
1940                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1941         } else {
1942                 rbd_assert(type == OBJ_REQUEST_PAGES);
1943                 pages = data_desc;
1944         }
1945
1946         while (resid) {
1947                 struct ceph_osd_request *osd_req;
1948                 const char *object_name;
1949                 u64 offset;
1950                 u64 length;
1951
1952                 object_name = rbd_segment_name(rbd_dev, img_offset);
1953                 if (!object_name)
1954                         goto out_unwind;
1955                 offset = rbd_segment_offset(rbd_dev, img_offset);
1956                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1957                 obj_request = rbd_obj_request_create(object_name,
1958                                                 offset, length, type);
1959                 kfree(object_name);     /* object request has its own copy */
1960                 if (!obj_request)
1961                         goto out_unwind;
1962
1963                 if (type == OBJ_REQUEST_BIO) {
1964                         unsigned int clone_size;
1965
1966                         rbd_assert(length <= (u64)UINT_MAX);
1967                         clone_size = (unsigned int)length;
1968                         obj_request->bio_list =
1969                                         bio_chain_clone_range(&bio_list,
1970                                                                 &bio_offset,
1971                                                                 clone_size,
1972                                                                 GFP_ATOMIC);
1973                         if (!obj_request->bio_list)
1974                                 goto out_partial;
1975                 } else {
1976                         unsigned int page_count;
1977
1978                         obj_request->pages = pages;
1979                         page_count = (u32)calc_pages_for(offset, length);
1980                         obj_request->page_count = page_count;
1981                         if ((offset + length) & ~PAGE_MASK)
1982                                 page_count--;   /* more on last page */
1983                         pages += page_count;
1984                 }
1985
1986                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1987                                                 obj_request);
1988                 if (!osd_req)
1989                         goto out_partial;
1990                 obj_request->osd_req = osd_req;
1991                 obj_request->callback = rbd_img_obj_callback;
1992
1993                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1994                                                 0, 0);
1995                 if (type == OBJ_REQUEST_BIO)
1996                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1997                                         obj_request->bio_list, length);
1998                 else
1999                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2000                                         obj_request->pages, length,
2001                                         offset & ~PAGE_MASK, false, false);
2002
2003                 if (write_request)
2004                         rbd_osd_req_format_write(obj_request);
2005                 else
2006                         rbd_osd_req_format_read(obj_request);
2007
2008                 obj_request->img_offset = img_offset;
2009                 rbd_img_obj_request_add(img_request, obj_request);
2010
2011                 img_offset += length;
2012                 resid -= length;
2013         }
2014
2015         return 0;
2016
2017 out_partial:
2018         rbd_obj_request_put(obj_request);
2019 out_unwind:
2020         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2021                 rbd_obj_request_put(obj_request);
2022
2023         return -ENOMEM;
2024 }
2025
2026 static void
2027 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2028 {
2029         struct rbd_img_request *img_request;
2030         struct rbd_device *rbd_dev;
2031         u64 length;
2032         u32 page_count;
2033
2034         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2035         rbd_assert(obj_request_img_data_test(obj_request));
2036         img_request = obj_request->img_request;
2037         rbd_assert(img_request);
2038
2039         rbd_dev = img_request->rbd_dev;
2040         rbd_assert(rbd_dev);
2041         length = (u64)1 << rbd_dev->header.obj_order;
2042         page_count = (u32)calc_pages_for(0, length);
2043
2044         rbd_assert(obj_request->copyup_pages);
2045         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2046         obj_request->copyup_pages = NULL;
2047
2048         /*
2049          * We want the transfer count to reflect the size of the
2050          * original write request.  There is no such thing as a
2051          * successful short write, so if the request was successful
2052          * we can just set it to the originally-requested length.
2053          */
2054         if (!obj_request->result)
2055                 obj_request->xferred = obj_request->length;
2056
2057         /* Finish up with the normal image object callback */
2058
2059         rbd_img_obj_callback(obj_request);
2060 }
2061
2062 static void
2063 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2064 {
2065         struct rbd_obj_request *orig_request;
2066         struct ceph_osd_request *osd_req;
2067         struct ceph_osd_client *osdc;
2068         struct rbd_device *rbd_dev;
2069         struct page **pages;
2070         int result;
2071         u64 obj_size;
2072         u64 xferred;
2073
2074         rbd_assert(img_request_child_test(img_request));
2075
2076         /* First get what we need from the image request */
2077
2078         pages = img_request->copyup_pages;
2079         rbd_assert(pages != NULL);
2080         img_request->copyup_pages = NULL;
2081
2082         orig_request = img_request->obj_request;
2083         rbd_assert(orig_request != NULL);
2084         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2085         result = img_request->result;
2086         obj_size = img_request->length;
2087         xferred = img_request->xferred;
2088
2089         rbd_dev = img_request->rbd_dev;
2090         rbd_assert(rbd_dev);
2091         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2092
2093         rbd_img_request_put(img_request);
2094
2095         if (result)
2096                 goto out_err;
2097
2098         /* Allocate the new copyup osd request for the original request */
2099
2100         result = -ENOMEM;
2101         rbd_assert(!orig_request->osd_req);
2102         osd_req = rbd_osd_req_create_copyup(orig_request);
2103         if (!osd_req)
2104                 goto out_err;
2105         orig_request->osd_req = osd_req;
2106         orig_request->copyup_pages = pages;
2107
2108         /* Initialize the copyup op */
2109
2110         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2111         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2112                                                 false, false);
2113
2114         /* Then the original write request op */
2115
2116         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2117                                         orig_request->offset,
2118                                         orig_request->length, 0, 0);
2119         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2120                                         orig_request->length);
2121
2122         rbd_osd_req_format_write(orig_request);
2123
2124         /* All set, send it off. */
2125
2126         orig_request->callback = rbd_img_obj_copyup_callback;
2127         osdc = &rbd_dev->rbd_client->client->osdc;
2128         result = rbd_obj_request_submit(osdc, orig_request);
2129         if (!result)
2130                 return;
2131 out_err:
2132         /* Record the error code and complete the request */
2133
2134         orig_request->result = result;
2135         orig_request->xferred = 0;
2136         obj_request_done_set(orig_request);
2137         rbd_obj_request_complete(orig_request);
2138 }
2139
2140 /*
2141  * Read from the parent image the range of data that covers the
2142  * entire target of the given object request.  This is used for
2143  * satisfying a layered image write request when the target of an
2144  * object request from the image request does not exist.
2145  *
2146  * A page array big enough to hold the returned data is allocated
2147  * and supplied to rbd_img_request_fill() as the "data descriptor."
2148  * When the read completes, this page array will be transferred to
2149  * the original object request for the copyup operation.
2150  *
2151  * If an error occurs, record it as the result of the original
2152  * object request and mark it done so it gets completed.
2153  */
2154 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2155 {
2156         struct rbd_img_request *img_request = NULL;
2157         struct rbd_img_request *parent_request = NULL;
2158         struct rbd_device *rbd_dev;
2159         u64 img_offset;
2160         u64 length;
2161         struct page **pages = NULL;
2162         u32 page_count;
2163         int result;
2164
2165         rbd_assert(obj_request_img_data_test(obj_request));
2166         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2167
2168         img_request = obj_request->img_request;
2169         rbd_assert(img_request != NULL);
2170         rbd_dev = img_request->rbd_dev;
2171         rbd_assert(rbd_dev->parent != NULL);
2172
2173         /*
2174          * First things first.  The original osd request is of no
2175          * use to use any more, we'll need a new one that can hold
2176          * the two ops in a copyup request.  We'll get that later,
2177          * but for now we can release the old one.
2178          */
2179         rbd_osd_req_destroy(obj_request->osd_req);
2180         obj_request->osd_req = NULL;
2181
2182         /*
2183          * Determine the byte range covered by the object in the
2184          * child image to which the original request was to be sent.
2185          */
2186         img_offset = obj_request->img_offset - obj_request->offset;
2187         length = (u64)1 << rbd_dev->header.obj_order;
2188
2189         /*
2190          * There is no defined parent data beyond the parent
2191          * overlap, so limit what we read at that boundary if
2192          * necessary.
2193          */
2194         if (img_offset + length > rbd_dev->parent_overlap) {
2195                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2196                 length = rbd_dev->parent_overlap - img_offset;
2197         }
2198
2199         /*
2200          * Allocate a page array big enough to receive the data read
2201          * from the parent.
2202          */
2203         page_count = (u32)calc_pages_for(0, length);
2204         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2205         if (IS_ERR(pages)) {
2206                 result = PTR_ERR(pages);
2207                 pages = NULL;
2208                 goto out_err;
2209         }
2210
2211         result = -ENOMEM;
2212         parent_request = rbd_img_request_create(rbd_dev->parent,
2213                                                 img_offset, length,
2214                                                 false, true);
2215         if (!parent_request)
2216                 goto out_err;
2217         rbd_obj_request_get(obj_request);
2218         parent_request->obj_request = obj_request;
2219
2220         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2221         if (result)
2222                 goto out_err;
2223         parent_request->copyup_pages = pages;
2224
2225         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2226         result = rbd_img_request_submit(parent_request);
2227         if (!result)
2228                 return 0;
2229
2230         parent_request->copyup_pages = NULL;
2231         parent_request->obj_request = NULL;
2232         rbd_obj_request_put(obj_request);
2233 out_err:
2234         if (pages)
2235                 ceph_release_page_vector(pages, page_count);
2236         if (parent_request)
2237                 rbd_img_request_put(parent_request);
2238         obj_request->result = result;
2239         obj_request->xferred = 0;
2240         obj_request_done_set(obj_request);
2241
2242         return result;
2243 }
2244
2245 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2246 {
2247         struct rbd_obj_request *orig_request;
2248         int result;
2249
2250         rbd_assert(!obj_request_img_data_test(obj_request));
2251
2252         /*
2253          * All we need from the object request is the original
2254          * request and the result of the STAT op.  Grab those, then
2255          * we're done with the request.
2256          */
2257         orig_request = obj_request->obj_request;
2258         obj_request->obj_request = NULL;
2259         rbd_assert(orig_request);
2260         rbd_assert(orig_request->img_request);
2261
2262         result = obj_request->result;
2263         obj_request->result = 0;
2264
2265         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2266                 obj_request, orig_request, result,
2267                 obj_request->xferred, obj_request->length);
2268         rbd_obj_request_put(obj_request);
2269
2270         rbd_assert(orig_request);
2271         rbd_assert(orig_request->img_request);
2272
2273         /*
2274          * Our only purpose here is to determine whether the object
2275          * exists, and we don't want to treat the non-existence as
2276          * an error.  If something else comes back, transfer the
2277          * error to the original request and complete it now.
2278          */
2279         if (!result) {
2280                 obj_request_existence_set(orig_request, true);
2281         } else if (result == -ENOENT) {
2282                 obj_request_existence_set(orig_request, false);
2283         } else if (result) {
2284                 orig_request->result = result;
2285                 goto out;
2286         }
2287
2288         /*
2289          * Resubmit the original request now that we have recorded
2290          * whether the target object exists.
2291          */
2292         orig_request->result = rbd_img_obj_request_submit(orig_request);
2293 out:
2294         if (orig_request->result)
2295                 rbd_obj_request_complete(orig_request);
2296         rbd_obj_request_put(orig_request);
2297 }
2298
2299 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2300 {
2301         struct rbd_obj_request *stat_request;
2302         struct rbd_device *rbd_dev;
2303         struct ceph_osd_client *osdc;
2304         struct page **pages = NULL;
2305         u32 page_count;
2306         size_t size;
2307         int ret;
2308
2309         /*
2310          * The response data for a STAT call consists of:
2311          *     le64 length;
2312          *     struct {
2313          *         le32 tv_sec;
2314          *         le32 tv_nsec;
2315          *     } mtime;
2316          */
2317         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2318         page_count = (u32)calc_pages_for(0, size);
2319         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2320         if (IS_ERR(pages))
2321                 return PTR_ERR(pages);
2322
2323         ret = -ENOMEM;
2324         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2325                                                         OBJ_REQUEST_PAGES);
2326         if (!stat_request)
2327                 goto out;
2328
2329         rbd_obj_request_get(obj_request);
2330         stat_request->obj_request = obj_request;
2331         stat_request->pages = pages;
2332         stat_request->page_count = page_count;
2333
2334         rbd_assert(obj_request->img_request);
2335         rbd_dev = obj_request->img_request->rbd_dev;
2336         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2337                                                 stat_request);
2338         if (!stat_request->osd_req)
2339                 goto out;
2340         stat_request->callback = rbd_img_obj_exists_callback;
2341
2342         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2343         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2344                                         false, false);
2345         rbd_osd_req_format_read(stat_request);
2346
2347         osdc = &rbd_dev->rbd_client->client->osdc;
2348         ret = rbd_obj_request_submit(osdc, stat_request);
2349 out:
2350         if (ret)
2351                 rbd_obj_request_put(obj_request);
2352
2353         return ret;
2354 }
2355
2356 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2357 {
2358         struct rbd_img_request *img_request;
2359         struct rbd_device *rbd_dev;
2360         bool known;
2361
2362         rbd_assert(obj_request_img_data_test(obj_request));
2363
2364         img_request = obj_request->img_request;
2365         rbd_assert(img_request);
2366         rbd_dev = img_request->rbd_dev;
2367
2368         /*
2369          * Only writes to layered images need special handling.
2370          * Reads and non-layered writes are simple object requests.
2371          * Layered writes that start beyond the end of the overlap
2372          * with the parent have no parent data, so they too are
2373          * simple object requests.  Finally, if the target object is
2374          * known to already exist, its parent data has already been
2375          * copied, so a write to the object can also be handled as a
2376          * simple object request.
2377          */
2378         if (!img_request_write_test(img_request) ||
2379                 !img_request_layered_test(img_request) ||
2380                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2381                 ((known = obj_request_known_test(obj_request)) &&
2382                         obj_request_exists_test(obj_request))) {
2383
2384                 struct rbd_device *rbd_dev;
2385                 struct ceph_osd_client *osdc;
2386
2387                 rbd_dev = obj_request->img_request->rbd_dev;
2388                 osdc = &rbd_dev->rbd_client->client->osdc;
2389
2390                 return rbd_obj_request_submit(osdc, obj_request);
2391         }
2392
2393         /*
2394          * It's a layered write.  The target object might exist but
2395          * we may not know that yet.  If we know it doesn't exist,
2396          * start by reading the data for the full target object from
2397          * the parent so we can use it for a copyup to the target.
2398          */
2399         if (known)
2400                 return rbd_img_obj_parent_read_full(obj_request);
2401
2402         /* We don't know whether the target exists.  Go find out. */
2403
2404         return rbd_img_obj_exists_submit(obj_request);
2405 }
2406
2407 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2408 {
2409         struct rbd_obj_request *obj_request;
2410         struct rbd_obj_request *next_obj_request;
2411
2412         dout("%s: img %p\n", __func__, img_request);
2413         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2414                 int ret;
2415
2416                 ret = rbd_img_obj_request_submit(obj_request);
2417                 if (ret)
2418                         return ret;
2419         }
2420
2421         return 0;
2422 }
2423
2424 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2425 {
2426         struct rbd_obj_request *obj_request;
2427         struct rbd_device *rbd_dev;
2428         u64 obj_end;
2429
2430         rbd_assert(img_request_child_test(img_request));
2431
2432         obj_request = img_request->obj_request;
2433         rbd_assert(obj_request);
2434         rbd_assert(obj_request->img_request);
2435
2436         obj_request->result = img_request->result;
2437         if (obj_request->result)
2438                 goto out;
2439
2440         /*
2441          * We need to zero anything beyond the parent overlap
2442          * boundary.  Since rbd_img_obj_request_read_callback()
2443          * will zero anything beyond the end of a short read, an
2444          * easy way to do this is to pretend the data from the
2445          * parent came up short--ending at the overlap boundary.
2446          */
2447         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2448         obj_end = obj_request->img_offset + obj_request->length;
2449         rbd_dev = obj_request->img_request->rbd_dev;
2450         if (obj_end > rbd_dev->parent_overlap) {
2451                 u64 xferred = 0;
2452
2453                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2454                         xferred = rbd_dev->parent_overlap -
2455                                         obj_request->img_offset;
2456
2457                 obj_request->xferred = min(img_request->xferred, xferred);
2458         } else {
2459                 obj_request->xferred = img_request->xferred;
2460         }
2461 out:
2462         rbd_img_obj_request_read_callback(obj_request);
2463         rbd_obj_request_complete(obj_request);
2464 }
2465
2466 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2467 {
2468         struct rbd_device *rbd_dev;
2469         struct rbd_img_request *img_request;
2470         int result;
2471
2472         rbd_assert(obj_request_img_data_test(obj_request));
2473         rbd_assert(obj_request->img_request != NULL);
2474         rbd_assert(obj_request->result == (s32) -ENOENT);
2475         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2476
2477         rbd_dev = obj_request->img_request->rbd_dev;
2478         rbd_assert(rbd_dev->parent != NULL);
2479         /* rbd_read_finish(obj_request, obj_request->length); */
2480         img_request = rbd_img_request_create(rbd_dev->parent,
2481                                                 obj_request->img_offset,
2482                                                 obj_request->length,
2483                                                 false, true);
2484         result = -ENOMEM;
2485         if (!img_request)
2486                 goto out_err;
2487
2488         rbd_obj_request_get(obj_request);
2489         img_request->obj_request = obj_request;
2490
2491         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2492                                         obj_request->bio_list);
2493         if (result)
2494                 goto out_err;
2495
2496         img_request->callback = rbd_img_parent_read_callback;
2497         result = rbd_img_request_submit(img_request);
2498         if (result)
2499                 goto out_err;
2500
2501         return;
2502 out_err:
2503         if (img_request)
2504                 rbd_img_request_put(img_request);
2505         obj_request->result = result;
2506         obj_request->xferred = 0;
2507         obj_request_done_set(obj_request);
2508 }
2509
2510 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2511 {
2512         struct rbd_obj_request *obj_request;
2513         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2514         int ret;
2515
2516         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2517                                                         OBJ_REQUEST_NODATA);
2518         if (!obj_request)
2519                 return -ENOMEM;
2520
2521         ret = -ENOMEM;
2522         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2523         if (!obj_request->osd_req)
2524                 goto out;
2525         obj_request->callback = rbd_obj_request_put;
2526
2527         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2528                                         notify_id, 0, 0);
2529         rbd_osd_req_format_read(obj_request);
2530
2531         ret = rbd_obj_request_submit(osdc, obj_request);
2532 out:
2533         if (ret)
2534                 rbd_obj_request_put(obj_request);
2535
2536         return ret;
2537 }
2538
2539 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2540 {
2541         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2542
2543         if (!rbd_dev)
2544                 return;
2545
2546         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2547                 rbd_dev->header_name, (unsigned long long)notify_id,
2548                 (unsigned int)opcode);
2549         (void)rbd_dev_refresh(rbd_dev);
2550
2551         rbd_obj_notify_ack(rbd_dev, notify_id);
2552 }
2553
2554 /*
2555  * Request sync osd watch/unwatch.  The value of "start" determines
2556  * whether a watch request is being initiated or torn down.
2557  */
2558 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2559 {
2560         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2561         struct rbd_obj_request *obj_request;
2562         int ret;
2563
2564         rbd_assert(start ^ !!rbd_dev->watch_event);
2565         rbd_assert(start ^ !!rbd_dev->watch_request);
2566
2567         if (start) {
2568                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2569                                                 &rbd_dev->watch_event);
2570                 if (ret < 0)
2571                         return ret;
2572                 rbd_assert(rbd_dev->watch_event != NULL);
2573         }
2574
2575         ret = -ENOMEM;
2576         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2577                                                         OBJ_REQUEST_NODATA);
2578         if (!obj_request)
2579                 goto out_cancel;
2580
2581         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2582         if (!obj_request->osd_req)
2583                 goto out_cancel;
2584
2585         if (start)
2586                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2587         else
2588                 ceph_osdc_unregister_linger_request(osdc,
2589                                         rbd_dev->watch_request->osd_req);
2590
2591         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2592                                 rbd_dev->watch_event->cookie, 0, start);
2593         rbd_osd_req_format_write(obj_request);
2594
2595         ret = rbd_obj_request_submit(osdc, obj_request);
2596         if (ret)
2597                 goto out_cancel;
2598         ret = rbd_obj_request_wait(obj_request);
2599         if (ret)
2600                 goto out_cancel;
2601         ret = obj_request->result;
2602         if (ret)
2603                 goto out_cancel;
2604
2605         /*
2606          * A watch request is set to linger, so the underlying osd
2607          * request won't go away until we unregister it.  We retain
2608          * a pointer to the object request during that time (in
2609          * rbd_dev->watch_request), so we'll keep a reference to
2610          * it.  We'll drop that reference (below) after we've
2611          * unregistered it.
2612          */
2613         if (start) {
2614                 rbd_dev->watch_request = obj_request;
2615
2616                 return 0;
2617         }
2618
2619         /* We have successfully torn down the watch request */
2620
2621         rbd_obj_request_put(rbd_dev->watch_request);
2622         rbd_dev->watch_request = NULL;
2623 out_cancel:
2624         /* Cancel the event if we're tearing down, or on error */
2625         ceph_osdc_cancel_event(rbd_dev->watch_event);
2626         rbd_dev->watch_event = NULL;
2627         if (obj_request)
2628                 rbd_obj_request_put(obj_request);
2629
2630         return ret;
2631 }
2632
2633 /*
2634  * Synchronous osd object method call.  Returns the number of bytes
2635  * returned in the outbound buffer, or a negative error code.
2636  */
2637 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2638                              const char *object_name,
2639                              const char *class_name,
2640                              const char *method_name,
2641                              const void *outbound,
2642                              size_t outbound_size,
2643                              void *inbound,
2644                              size_t inbound_size)
2645 {
2646         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2647         struct rbd_obj_request *obj_request;
2648         struct page **pages;
2649         u32 page_count;
2650         int ret;
2651
2652         /*
2653          * Method calls are ultimately read operations.  The result
2654          * should placed into the inbound buffer provided.  They
2655          * also supply outbound data--parameters for the object
2656          * method.  Currently if this is present it will be a
2657          * snapshot id.
2658          */
2659         page_count = (u32)calc_pages_for(0, inbound_size);
2660         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2661         if (IS_ERR(pages))
2662                 return PTR_ERR(pages);
2663
2664         ret = -ENOMEM;
2665         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2666                                                         OBJ_REQUEST_PAGES);
2667         if (!obj_request)
2668                 goto out;
2669
2670         obj_request->pages = pages;
2671         obj_request->page_count = page_count;
2672
2673         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2674         if (!obj_request->osd_req)
2675                 goto out;
2676
2677         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2678                                         class_name, method_name);
2679         if (outbound_size) {
2680                 struct ceph_pagelist *pagelist;
2681
2682                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2683                 if (!pagelist)
2684                         goto out;
2685
2686                 ceph_pagelist_init(pagelist);
2687                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2688                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2689                                                 pagelist);
2690         }
2691         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2692                                         obj_request->pages, inbound_size,
2693                                         0, false, false);
2694         rbd_osd_req_format_read(obj_request);
2695
2696         ret = rbd_obj_request_submit(osdc, obj_request);
2697         if (ret)
2698                 goto out;
2699         ret = rbd_obj_request_wait(obj_request);
2700         if (ret)
2701                 goto out;
2702
2703         ret = obj_request->result;
2704         if (ret < 0)
2705                 goto out;
2706
2707         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2708         ret = (int)obj_request->xferred;
2709         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2710 out:
2711         if (obj_request)
2712                 rbd_obj_request_put(obj_request);
2713         else
2714                 ceph_release_page_vector(pages, page_count);
2715
2716         return ret;
2717 }
2718
2719 static void rbd_request_fn(struct request_queue *q)
2720                 __releases(q->queue_lock) __acquires(q->queue_lock)
2721 {
2722         struct rbd_device *rbd_dev = q->queuedata;
2723         bool read_only = rbd_dev->mapping.read_only;
2724         struct request *rq;
2725         int result;
2726
2727         while ((rq = blk_fetch_request(q))) {
2728                 bool write_request = rq_data_dir(rq) == WRITE;
2729                 struct rbd_img_request *img_request;
2730                 u64 offset;
2731                 u64 length;
2732
2733                 /* Ignore any non-FS requests that filter through. */
2734
2735                 if (rq->cmd_type != REQ_TYPE_FS) {
2736                         dout("%s: non-fs request type %d\n", __func__,
2737                                 (int) rq->cmd_type);
2738                         __blk_end_request_all(rq, 0);
2739                         continue;
2740                 }
2741
2742                 /* Ignore/skip any zero-length requests */
2743
2744                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2745                 length = (u64) blk_rq_bytes(rq);
2746
2747                 if (!length) {
2748                         dout("%s: zero-length request\n", __func__);
2749                         __blk_end_request_all(rq, 0);
2750                         continue;
2751                 }
2752
2753                 spin_unlock_irq(q->queue_lock);
2754
2755                 /* Disallow writes to a read-only device */
2756
2757                 if (write_request) {
2758                         result = -EROFS;
2759                         if (read_only)
2760                                 goto end_request;
2761                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2762                 }
2763
2764                 /*
2765                  * Quit early if the mapped snapshot no longer
2766                  * exists.  It's still possible the snapshot will
2767                  * have disappeared by the time our request arrives
2768                  * at the osd, but there's no sense in sending it if
2769                  * we already know.
2770                  */
2771                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2772                         dout("request for non-existent snapshot");
2773                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2774                         result = -ENXIO;
2775                         goto end_request;
2776                 }
2777
2778                 result = -EINVAL;
2779                 if (offset && length > U64_MAX - offset + 1) {
2780                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2781                                 offset, length);
2782                         goto end_request;       /* Shouldn't happen */
2783                 }
2784
2785                 result = -ENOMEM;
2786                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2787                                                         write_request, false);
2788                 if (!img_request)
2789                         goto end_request;
2790
2791                 img_request->rq = rq;
2792
2793                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2794                                                 rq->bio);
2795                 if (!result)
2796                         result = rbd_img_request_submit(img_request);
2797                 if (result)
2798                         rbd_img_request_put(img_request);
2799 end_request:
2800                 spin_lock_irq(q->queue_lock);
2801                 if (result < 0) {
2802                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2803                                 write_request ? "write" : "read",
2804                                 length, offset, result);
2805
2806                         __blk_end_request_all(rq, result);
2807                 }
2808         }
2809 }
2810
2811 /*
2812  * a queue callback. Makes sure that we don't create a bio that spans across
2813  * multiple osd objects. One exception would be with a single page bios,
2814  * which we handle later at bio_chain_clone_range()
2815  */
2816 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2817                           struct bio_vec *bvec)
2818 {
2819         struct rbd_device *rbd_dev = q->queuedata;
2820         sector_t sector_offset;
2821         sector_t sectors_per_obj;
2822         sector_t obj_sector_offset;
2823         int ret;
2824
2825         /*
2826          * Find how far into its rbd object the partition-relative
2827          * bio start sector is to offset relative to the enclosing
2828          * device.
2829          */
2830         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2831         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2832         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2833
2834         /*
2835          * Compute the number of bytes from that offset to the end
2836          * of the object.  Account for what's already used by the bio.
2837          */
2838         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2839         if (ret > bmd->bi_size)
2840                 ret -= bmd->bi_size;
2841         else
2842                 ret = 0;
2843
2844         /*
2845          * Don't send back more than was asked for.  And if the bio
2846          * was empty, let the whole thing through because:  "Note
2847          * that a block device *must* allow a single page to be
2848          * added to an empty bio."
2849          */
2850         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2851         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2852                 ret = (int) bvec->bv_len;
2853
2854         return ret;
2855 }
2856
2857 static void rbd_free_disk(struct rbd_device *rbd_dev)
2858 {
2859         struct gendisk *disk = rbd_dev->disk;
2860
2861         if (!disk)
2862                 return;
2863
2864         rbd_dev->disk = NULL;
2865         if (disk->flags & GENHD_FL_UP) {
2866                 del_gendisk(disk);
2867                 if (disk->queue)
2868                         blk_cleanup_queue(disk->queue);
2869         }
2870         put_disk(disk);
2871 }
2872
2873 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2874                                 const char *object_name,
2875                                 u64 offset, u64 length, void *buf)
2876
2877 {
2878         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2879         struct rbd_obj_request *obj_request;
2880         struct page **pages = NULL;
2881         u32 page_count;
2882         size_t size;
2883         int ret;
2884
2885         page_count = (u32) calc_pages_for(offset, length);
2886         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2887         if (IS_ERR(pages))
2888                 ret = PTR_ERR(pages);
2889
2890         ret = -ENOMEM;
2891         obj_request = rbd_obj_request_create(object_name, offset, length,
2892                                                         OBJ_REQUEST_PAGES);
2893         if (!obj_request)
2894                 goto out;
2895
2896         obj_request->pages = pages;
2897         obj_request->page_count = page_count;
2898
2899         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2900         if (!obj_request->osd_req)
2901                 goto out;
2902
2903         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2904                                         offset, length, 0, 0);
2905         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2906                                         obj_request->pages,
2907                                         obj_request->length,
2908                                         obj_request->offset & ~PAGE_MASK,
2909                                         false, false);
2910         rbd_osd_req_format_read(obj_request);
2911
2912         ret = rbd_obj_request_submit(osdc, obj_request);
2913         if (ret)
2914                 goto out;
2915         ret = rbd_obj_request_wait(obj_request);
2916         if (ret)
2917                 goto out;
2918
2919         ret = obj_request->result;
2920         if (ret < 0)
2921                 goto out;
2922
2923         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2924         size = (size_t) obj_request->xferred;
2925         ceph_copy_from_page_vector(pages, buf, 0, size);
2926         rbd_assert(size <= (size_t)INT_MAX);
2927         ret = (int)size;
2928 out:
2929         if (obj_request)
2930                 rbd_obj_request_put(obj_request);
2931         else
2932                 ceph_release_page_vector(pages, page_count);
2933
2934         return ret;
2935 }
2936
2937 /*
2938  * Read the complete header for the given rbd device.
2939  *
2940  * Returns a pointer to a dynamically-allocated buffer containing
2941  * the complete and validated header.  Caller can pass the address
2942  * of a variable that will be filled in with the version of the
2943  * header object at the time it was read.
2944  *
2945  * Returns a pointer-coded errno if a failure occurs.
2946  */
2947 static struct rbd_image_header_ondisk *
2948 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2949 {
2950         struct rbd_image_header_ondisk *ondisk = NULL;
2951         u32 snap_count = 0;
2952         u64 names_size = 0;
2953         u32 want_count;
2954         int ret;
2955
2956         /*
2957          * The complete header will include an array of its 64-bit
2958          * snapshot ids, followed by the names of those snapshots as
2959          * a contiguous block of NUL-terminated strings.  Note that
2960          * the number of snapshots could change by the time we read
2961          * it in, in which case we re-read it.
2962          */
2963         do {
2964                 size_t size;
2965
2966                 kfree(ondisk);
2967
2968                 size = sizeof (*ondisk);
2969                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2970                 size += names_size;
2971                 ondisk = kmalloc(size, GFP_KERNEL);
2972                 if (!ondisk)
2973                         return ERR_PTR(-ENOMEM);
2974
2975                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2976                                        0, size, ondisk);
2977                 if (ret < 0)
2978                         goto out_err;
2979                 if ((size_t)ret < size) {
2980                         ret = -ENXIO;
2981                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2982                                 size, ret);
2983                         goto out_err;
2984                 }
2985                 if (!rbd_dev_ondisk_valid(ondisk)) {
2986                         ret = -ENXIO;
2987                         rbd_warn(rbd_dev, "invalid header");
2988                         goto out_err;
2989                 }
2990
2991                 names_size = le64_to_cpu(ondisk->snap_names_len);
2992                 want_count = snap_count;
2993                 snap_count = le32_to_cpu(ondisk->snap_count);
2994         } while (snap_count != want_count);
2995
2996         return ondisk;
2997
2998 out_err:
2999         kfree(ondisk);
3000
3001         return ERR_PTR(ret);
3002 }
3003
3004 /*
3005  * reload the ondisk the header
3006  */
3007 static int rbd_read_header(struct rbd_device *rbd_dev,
3008                            struct rbd_image_header *header)
3009 {
3010         struct rbd_image_header_ondisk *ondisk;
3011         int ret;
3012
3013         ondisk = rbd_dev_v1_header_read(rbd_dev);
3014         if (IS_ERR(ondisk))
3015                 return PTR_ERR(ondisk);
3016         ret = rbd_header_from_disk(header, ondisk);
3017         kfree(ondisk);
3018
3019         return ret;
3020 }
3021
3022 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3023 {
3024         struct rbd_snap *snap;
3025         struct rbd_snap *next;
3026
3027         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3028                 list_del(&snap->node);
3029                 rbd_snap_destroy(snap);
3030         }
3031 }
3032
3033 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3034 {
3035         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3036                 return;
3037
3038         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3039                 sector_t size;
3040
3041                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3042                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3043                 dout("setting size to %llu sectors", (unsigned long long)size);
3044                 set_capacity(rbd_dev->disk, size);
3045         }
3046 }
3047
3048 /*
3049  * only read the first part of the ondisk header, without the snaps info
3050  */
3051 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3052 {
3053         int ret;
3054         struct rbd_image_header h;
3055
3056         ret = rbd_read_header(rbd_dev, &h);
3057         if (ret < 0)
3058                 return ret;
3059
3060         down_write(&rbd_dev->header_rwsem);
3061
3062         /* Update image size, and check for resize of mapped image */
3063         rbd_dev->header.image_size = h.image_size;
3064         rbd_update_mapping_size(rbd_dev);
3065
3066         /* rbd_dev->header.object_prefix shouldn't change */
3067         kfree(rbd_dev->header.snap_sizes);
3068         kfree(rbd_dev->header.snap_names);
3069         /* osd requests may still refer to snapc */
3070         ceph_put_snap_context(rbd_dev->header.snapc);
3071
3072         rbd_dev->header.image_size = h.image_size;
3073         rbd_dev->header.snapc = h.snapc;
3074         rbd_dev->header.snap_names = h.snap_names;
3075         rbd_dev->header.snap_sizes = h.snap_sizes;
3076         /* Free the extra copy of the object prefix */
3077         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3078                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3079         kfree(h.object_prefix);
3080
3081         ret = rbd_dev_snaps_update(rbd_dev);
3082
3083         up_write(&rbd_dev->header_rwsem);
3084
3085         return ret;
3086 }
3087
3088 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3089 {
3090         u64 image_size;
3091         int ret;
3092
3093         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3094         image_size = rbd_dev->header.image_size;
3095         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3096         if (rbd_dev->image_format == 1)
3097                 ret = rbd_dev_v1_refresh(rbd_dev);
3098         else
3099                 ret = rbd_dev_v2_refresh(rbd_dev);
3100         mutex_unlock(&ctl_mutex);
3101         if (ret)
3102                 rbd_warn(rbd_dev, "got notification but failed to "
3103                            " update snaps: %d\n", ret);
3104         if (image_size != rbd_dev->header.image_size)
3105                 revalidate_disk(rbd_dev->disk);
3106
3107         return ret;
3108 }
3109
3110 static int rbd_init_disk(struct rbd_device *rbd_dev)
3111 {
3112         struct gendisk *disk;
3113         struct request_queue *q;
3114         u64 segment_size;
3115
3116         /* create gendisk info */
3117         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3118         if (!disk)
3119                 return -ENOMEM;
3120
3121         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3122                  rbd_dev->dev_id);
3123         disk->major = rbd_dev->major;
3124         disk->first_minor = 0;
3125         disk->fops = &rbd_bd_ops;
3126         disk->private_data = rbd_dev;
3127
3128         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3129         if (!q)
3130                 goto out_disk;
3131
3132         /* We use the default size, but let's be explicit about it. */
3133         blk_queue_physical_block_size(q, SECTOR_SIZE);
3134
3135         /* set io sizes to object size */
3136         segment_size = rbd_obj_bytes(&rbd_dev->header);
3137         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3138         blk_queue_max_segment_size(q, segment_size);
3139         blk_queue_io_min(q, segment_size);
3140         blk_queue_io_opt(q, segment_size);
3141
3142         blk_queue_merge_bvec(q, rbd_merge_bvec);
3143         disk->queue = q;
3144
3145         q->queuedata = rbd_dev;
3146
3147         rbd_dev->disk = disk;
3148
3149         return 0;
3150 out_disk:
3151         put_disk(disk);
3152
3153         return -ENOMEM;
3154 }
3155
3156 /*
3157   sysfs
3158 */
3159
3160 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3161 {
3162         return container_of(dev, struct rbd_device, dev);
3163 }
3164
3165 static ssize_t rbd_size_show(struct device *dev,
3166                              struct device_attribute *attr, char *buf)
3167 {
3168         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3169
3170         return sprintf(buf, "%llu\n",
3171                 (unsigned long long)rbd_dev->mapping.size);
3172 }
3173
3174 /*
3175  * Note this shows the features for whatever's mapped, which is not
3176  * necessarily the base image.
3177  */
3178 static ssize_t rbd_features_show(struct device *dev,
3179                              struct device_attribute *attr, char *buf)
3180 {
3181         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3182
3183         return sprintf(buf, "0x%016llx\n",
3184                         (unsigned long long)rbd_dev->mapping.features);
3185 }
3186
3187 static ssize_t rbd_major_show(struct device *dev,
3188                               struct device_attribute *attr, char *buf)
3189 {
3190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3191
3192         if (rbd_dev->major)
3193                 return sprintf(buf, "%d\n", rbd_dev->major);
3194
3195         return sprintf(buf, "(none)\n");
3196
3197 }
3198
3199 static ssize_t rbd_client_id_show(struct device *dev,
3200                                   struct device_attribute *attr, char *buf)
3201 {
3202         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3203
3204         return sprintf(buf, "client%lld\n",
3205                         ceph_client_id(rbd_dev->rbd_client->client));
3206 }
3207
3208 static ssize_t rbd_pool_show(struct device *dev,
3209                              struct device_attribute *attr, char *buf)
3210 {
3211         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3212
3213         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3214 }
3215
3216 static ssize_t rbd_pool_id_show(struct device *dev,
3217                              struct device_attribute *attr, char *buf)
3218 {
3219         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3220
3221         return sprintf(buf, "%llu\n",
3222                         (unsigned long long) rbd_dev->spec->pool_id);
3223 }
3224
3225 static ssize_t rbd_name_show(struct device *dev,
3226                              struct device_attribute *attr, char *buf)
3227 {
3228         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229
3230         if (rbd_dev->spec->image_name)
3231                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3232
3233         return sprintf(buf, "(unknown)\n");
3234 }
3235
3236 static ssize_t rbd_image_id_show(struct device *dev,
3237                              struct device_attribute *attr, char *buf)
3238 {
3239         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3240
3241         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3242 }
3243
3244 /*
3245  * Shows the name of the currently-mapped snapshot (or
3246  * RBD_SNAP_HEAD_NAME for the base image).
3247  */
3248 static ssize_t rbd_snap_show(struct device *dev,
3249                              struct device_attribute *attr,
3250                              char *buf)
3251 {
3252         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3253
3254         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3255 }
3256
3257 /*
3258  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3259  * for the parent image.  If there is no parent, simply shows
3260  * "(no parent image)".
3261  */
3262 static ssize_t rbd_parent_show(struct device *dev,
3263                              struct device_attribute *attr,
3264                              char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267         struct rbd_spec *spec = rbd_dev->parent_spec;
3268         int count;
3269         char *bufp = buf;
3270
3271         if (!spec)
3272                 return sprintf(buf, "(no parent image)\n");
3273
3274         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3275                         (unsigned long long) spec->pool_id, spec->pool_name);
3276         if (count < 0)
3277                 return count;
3278         bufp += count;
3279
3280         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3281                         spec->image_name ? spec->image_name : "(unknown)");
3282         if (count < 0)
3283                 return count;
3284         bufp += count;
3285
3286         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3287                         (unsigned long long) spec->snap_id, spec->snap_name);
3288         if (count < 0)
3289                 return count;
3290         bufp += count;
3291
3292         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3293         if (count < 0)
3294                 return count;
3295         bufp += count;
3296
3297         return (ssize_t) (bufp - buf);
3298 }
3299
3300 static ssize_t rbd_image_refresh(struct device *dev,
3301                                  struct device_attribute *attr,
3302                                  const char *buf,
3303                                  size_t size)
3304 {
3305         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3306         int ret;
3307
3308         ret = rbd_dev_refresh(rbd_dev);
3309
3310         return ret < 0 ? ret : size;
3311 }
3312
3313 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3314 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3315 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3316 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3317 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3318 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3319 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3320 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3321 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3322 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3323 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3324
3325 static struct attribute *rbd_attrs[] = {
3326         &dev_attr_size.attr,
3327         &dev_attr_features.attr,
3328         &dev_attr_major.attr,
3329         &dev_attr_client_id.attr,
3330         &dev_attr_pool.attr,
3331         &dev_attr_pool_id.attr,
3332         &dev_attr_name.attr,
3333         &dev_attr_image_id.attr,
3334         &dev_attr_current_snap.attr,
3335         &dev_attr_parent.attr,
3336         &dev_attr_refresh.attr,
3337         NULL
3338 };
3339
3340 static struct attribute_group rbd_attr_group = {
3341         .attrs = rbd_attrs,
3342 };
3343
3344 static const struct attribute_group *rbd_attr_groups[] = {
3345         &rbd_attr_group,
3346         NULL
3347 };
3348
3349 static void rbd_sysfs_dev_release(struct device *dev)
3350 {
3351 }
3352
3353 static struct device_type rbd_device_type = {
3354         .name           = "rbd",
3355         .groups         = rbd_attr_groups,
3356         .release        = rbd_sysfs_dev_release,
3357 };
3358
3359 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3360 {
3361         kref_get(&spec->kref);
3362
3363         return spec;
3364 }
3365
3366 static void rbd_spec_free(struct kref *kref);
3367 static void rbd_spec_put(struct rbd_spec *spec)
3368 {
3369         if (spec)
3370                 kref_put(&spec->kref, rbd_spec_free);
3371 }
3372
3373 static struct rbd_spec *rbd_spec_alloc(void)
3374 {
3375         struct rbd_spec *spec;
3376
3377         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3378         if (!spec)
3379                 return NULL;
3380         kref_init(&spec->kref);
3381
3382         return spec;
3383 }
3384
3385 static void rbd_spec_free(struct kref *kref)
3386 {
3387         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3388
3389         kfree(spec->pool_name);
3390         kfree(spec->image_id);
3391         kfree(spec->image_name);
3392         kfree(spec->snap_name);
3393         kfree(spec);
3394 }
3395
3396 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3397                                 struct rbd_spec *spec)
3398 {
3399         struct rbd_device *rbd_dev;
3400
3401         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3402         if (!rbd_dev)
3403                 return NULL;
3404
3405         spin_lock_init(&rbd_dev->lock);
3406         rbd_dev->flags = 0;
3407         INIT_LIST_HEAD(&rbd_dev->node);
3408         INIT_LIST_HEAD(&rbd_dev->snaps);
3409         init_rwsem(&rbd_dev->header_rwsem);
3410
3411         rbd_dev->spec = spec;
3412         rbd_dev->rbd_client = rbdc;
3413
3414         /* Initialize the layout used for all rbd requests */
3415
3416         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3417         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3418         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3419         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3420
3421         return rbd_dev;
3422 }
3423
3424 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3425 {
3426         rbd_put_client(rbd_dev->rbd_client);
3427         rbd_spec_put(rbd_dev->spec);
3428         kfree(rbd_dev);
3429 }
3430
3431 static void rbd_snap_destroy(struct rbd_snap *snap)
3432 {
3433         kfree(snap->name);
3434         kfree(snap);
3435 }
3436
3437 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3438                                                 const char *snap_name,
3439                                                 u64 snap_id, u64 snap_size,
3440                                                 u64 snap_features)
3441 {
3442         struct rbd_snap *snap;
3443
3444         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3445         if (!snap)
3446                 return ERR_PTR(-ENOMEM);
3447
3448         snap->name = snap_name;
3449         snap->id = snap_id;
3450         snap->size = snap_size;
3451         snap->features = snap_features;
3452
3453         return snap;
3454 }
3455
3456 /*
3457  * Returns a dynamically-allocated snapshot name if successful, or a
3458  * pointer-coded error otherwise.
3459  */
3460 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev,
3461                         u64 snap_id, u64 *snap_size, u64 *snap_features)
3462 {
3463         const char *snap_name;
3464         u32 which;
3465
3466         which = rbd_dev_snap_index(rbd_dev, snap_id);
3467         if (which == BAD_SNAP_INDEX)
3468                 return ERR_PTR(-ENOENT);
3469         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
3470         if (!snap_name)
3471                 return ERR_PTR(-ENOMEM);
3472
3473         *snap_size = rbd_dev->header.snap_sizes[which];
3474         *snap_features = 0;     /* No features for v1 */
3475
3476         return snap_name;
3477 }
3478
3479 /*
3480  * Get the size and object order for an image snapshot, or if
3481  * snap_id is CEPH_NOSNAP, gets this information for the base
3482  * image.
3483  */
3484 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3485                                 u8 *order, u64 *snap_size)
3486 {
3487         __le64 snapid = cpu_to_le64(snap_id);
3488         int ret;
3489         struct {
3490                 u8 order;
3491                 __le64 size;
3492         } __attribute__ ((packed)) size_buf = { 0 };
3493
3494         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3495                                 "rbd", "get_size",
3496                                 &snapid, sizeof (snapid),
3497                                 &size_buf, sizeof (size_buf));
3498         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3499         if (ret < 0)
3500                 return ret;
3501         if (ret < sizeof (size_buf))
3502                 return -ERANGE;
3503
3504         if (order)
3505                 *order = size_buf.order;
3506         *snap_size = le64_to_cpu(size_buf.size);
3507
3508         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3509                 (unsigned long long)snap_id, (unsigned int)*order,
3510                 (unsigned long long)*snap_size);
3511
3512         return 0;
3513 }
3514
3515 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3516 {
3517         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3518                                         &rbd_dev->header.obj_order,
3519                                         &rbd_dev->header.image_size);
3520 }
3521
3522 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3523 {
3524         void *reply_buf;
3525         int ret;
3526         void *p;
3527
3528         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3529         if (!reply_buf)
3530                 return -ENOMEM;
3531
3532         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3533                                 "rbd", "get_object_prefix", NULL, 0,
3534                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3535         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3536         if (ret < 0)
3537                 goto out;
3538
3539         p = reply_buf;
3540         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3541                                                 p + ret, NULL, GFP_NOIO);
3542         ret = 0;
3543
3544         if (IS_ERR(rbd_dev->header.object_prefix)) {
3545                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3546                 rbd_dev->header.object_prefix = NULL;
3547         } else {
3548                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3549         }
3550 out:
3551         kfree(reply_buf);
3552
3553         return ret;
3554 }
3555
3556 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3557                 u64 *snap_features)
3558 {
3559         __le64 snapid = cpu_to_le64(snap_id);
3560         struct {
3561                 __le64 features;
3562                 __le64 incompat;
3563         } __attribute__ ((packed)) features_buf = { 0 };
3564         u64 incompat;
3565         int ret;
3566
3567         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3568                                 "rbd", "get_features",
3569                                 &snapid, sizeof (snapid),
3570                                 &features_buf, sizeof (features_buf));
3571         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3572         if (ret < 0)
3573                 return ret;
3574         if (ret < sizeof (features_buf))
3575                 return -ERANGE;
3576
3577         incompat = le64_to_cpu(features_buf.incompat);
3578         if (incompat & ~RBD_FEATURES_SUPPORTED)
3579                 return -ENXIO;
3580
3581         *snap_features = le64_to_cpu(features_buf.features);
3582
3583         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3584                 (unsigned long long)snap_id,
3585                 (unsigned long long)*snap_features,
3586                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3587
3588         return 0;
3589 }
3590
3591 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3592 {
3593         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3594                                                 &rbd_dev->header.features);
3595 }
3596
3597 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3598 {
3599         struct rbd_spec *parent_spec;
3600         size_t size;
3601         void *reply_buf = NULL;
3602         __le64 snapid;
3603         void *p;
3604         void *end;
3605         char *image_id;
3606         u64 overlap;
3607         int ret;
3608
3609         parent_spec = rbd_spec_alloc();
3610         if (!parent_spec)
3611                 return -ENOMEM;
3612
3613         size = sizeof (__le64) +                                /* pool_id */
3614                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3615                 sizeof (__le64) +                               /* snap_id */
3616                 sizeof (__le64);                                /* overlap */
3617         reply_buf = kmalloc(size, GFP_KERNEL);
3618         if (!reply_buf) {
3619                 ret = -ENOMEM;
3620                 goto out_err;
3621         }
3622
3623         snapid = cpu_to_le64(CEPH_NOSNAP);
3624         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3625                                 "rbd", "get_parent",
3626                                 &snapid, sizeof (snapid),
3627                                 reply_buf, size);
3628         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3629         if (ret < 0)
3630                 goto out_err;
3631
3632         p = reply_buf;
3633         end = reply_buf + ret;
3634         ret = -ERANGE;
3635         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3636         if (parent_spec->pool_id == CEPH_NOPOOL)
3637                 goto out;       /* No parent?  No problem. */
3638
3639         /* The ceph file layout needs to fit pool id in 32 bits */
3640
3641         ret = -EIO;
3642         if (parent_spec->pool_id > (u64)U32_MAX) {
3643                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3644                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3645                 goto out_err;
3646         }
3647
3648         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3649         if (IS_ERR(image_id)) {
3650                 ret = PTR_ERR(image_id);
3651                 goto out_err;
3652         }
3653         parent_spec->image_id = image_id;
3654         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3655         ceph_decode_64_safe(&p, end, overlap, out_err);
3656
3657         rbd_dev->parent_overlap = overlap;
3658         rbd_dev->parent_spec = parent_spec;
3659         parent_spec = NULL;     /* rbd_dev now owns this */
3660 out:
3661         ret = 0;
3662 out_err:
3663         kfree(reply_buf);
3664         rbd_spec_put(parent_spec);
3665
3666         return ret;
3667 }
3668
3669 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3670 {
3671         struct {
3672                 __le64 stripe_unit;
3673                 __le64 stripe_count;
3674         } __attribute__ ((packed)) striping_info_buf = { 0 };
3675         size_t size = sizeof (striping_info_buf);
3676         void *p;
3677         u64 obj_size;
3678         u64 stripe_unit;
3679         u64 stripe_count;
3680         int ret;
3681
3682         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3683                                 "rbd", "get_stripe_unit_count", NULL, 0,
3684                                 (char *)&striping_info_buf, size);
3685         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3686         if (ret < 0)
3687                 return ret;
3688         if (ret < size)
3689                 return -ERANGE;
3690
3691         /*
3692          * We don't actually support the "fancy striping" feature
3693          * (STRIPINGV2) yet, but if the striping sizes are the
3694          * defaults the behavior is the same as before.  So find
3695          * out, and only fail if the image has non-default values.
3696          */
3697         ret = -EINVAL;
3698         obj_size = (u64)1 << rbd_dev->header.obj_order;
3699         p = &striping_info_buf;
3700         stripe_unit = ceph_decode_64(&p);
3701         if (stripe_unit != obj_size) {
3702                 rbd_warn(rbd_dev, "unsupported stripe unit "
3703                                 "(got %llu want %llu)",
3704                                 stripe_unit, obj_size);
3705                 return -EINVAL;
3706         }
3707         stripe_count = ceph_decode_64(&p);
3708         if (stripe_count != 1) {
3709                 rbd_warn(rbd_dev, "unsupported stripe count "
3710                                 "(got %llu want 1)", stripe_count);
3711                 return -EINVAL;
3712         }
3713         rbd_dev->header.stripe_unit = stripe_unit;
3714         rbd_dev->header.stripe_count = stripe_count;
3715
3716         return 0;
3717 }
3718
3719 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3720 {
3721         size_t image_id_size;
3722         char *image_id;
3723         void *p;
3724         void *end;
3725         size_t size;
3726         void *reply_buf = NULL;
3727         size_t len = 0;
3728         char *image_name = NULL;
3729         int ret;
3730
3731         rbd_assert(!rbd_dev->spec->image_name);
3732
3733         len = strlen(rbd_dev->spec->image_id);
3734         image_id_size = sizeof (__le32) + len;
3735         image_id = kmalloc(image_id_size, GFP_KERNEL);
3736         if (!image_id)
3737                 return NULL;
3738
3739         p = image_id;
3740         end = image_id + image_id_size;
3741         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3742
3743         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3744         reply_buf = kmalloc(size, GFP_KERNEL);
3745         if (!reply_buf)
3746                 goto out;
3747
3748         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3749                                 "rbd", "dir_get_name",
3750                                 image_id, image_id_size,
3751                                 reply_buf, size);
3752         if (ret < 0)
3753                 goto out;
3754         p = reply_buf;
3755         end = reply_buf + ret;
3756
3757         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3758         if (IS_ERR(image_name))
3759                 image_name = NULL;
3760         else
3761                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3762 out:
3763         kfree(reply_buf);
3764         kfree(image_id);
3765
3766         return image_name;
3767 }
3768
3769 /*
3770  * When an rbd image has a parent image, it is identified by the
3771  * pool, image, and snapshot ids (not names).  This function fills
3772  * in the names for those ids.  (It's OK if we can't figure out the
3773  * name for an image id, but the pool and snapshot ids should always
3774  * exist and have names.)  All names in an rbd spec are dynamically
3775  * allocated.
3776  *
3777  * When an image being mapped (not a parent) is probed, we have the
3778  * pool name and pool id, image name and image id, and the snapshot
3779  * name.  The only thing we're missing is the snapshot id.
3780  *
3781  * The set of snapshots for an image is not known until they have
3782  * been read by rbd_dev_snaps_update(), so we can't completely fill
3783  * in this information until after that has been called.
3784  */
3785 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3786 {
3787         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3788         struct rbd_spec *spec = rbd_dev->spec;
3789         const char *pool_name;
3790         const char *image_name;
3791         const char *snap_name;
3792         int ret;
3793
3794         /*
3795          * An image being mapped will have the pool name (etc.), but
3796          * we need to look up the snapshot id.
3797          */
3798         if (spec->pool_name) {
3799                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3800                         struct rbd_snap *snap;
3801
3802                         snap = snap_by_name(rbd_dev, spec->snap_name);
3803                         if (!snap)
3804                                 return -ENOENT;
3805                         spec->snap_id = snap->id;
3806                 } else {
3807                         spec->snap_id = CEPH_NOSNAP;
3808                 }
3809
3810                 return 0;
3811         }
3812
3813         /* Get the pool name; we have to make our own copy of this */
3814
3815         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3816         if (!pool_name) {
3817                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3818                 return -EIO;
3819         }
3820         pool_name = kstrdup(pool_name, GFP_KERNEL);
3821         if (!pool_name)
3822                 return -ENOMEM;
3823
3824         /* Fetch the image name; tolerate failure here */
3825
3826         image_name = rbd_dev_image_name(rbd_dev);
3827         if (!image_name)
3828                 rbd_warn(rbd_dev, "unable to get image name");
3829
3830         /* Look up the snapshot name, and make a copy */
3831
3832         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3833         if (!snap_name) {
3834                 ret = -ENOMEM;
3835                 goto out_err;
3836         }
3837
3838         spec->pool_name = pool_name;
3839         spec->image_name = image_name;
3840         spec->snap_name = snap_name;
3841
3842         return 0;
3843 out_err:
3844         kfree(image_name);
3845         kfree(pool_name);
3846
3847         return ret;
3848 }
3849
3850 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3851 {
3852         size_t size;
3853         int ret;
3854         void *reply_buf;
3855         void *p;
3856         void *end;
3857         u64 seq;
3858         u32 snap_count;
3859         struct ceph_snap_context *snapc;
3860         u32 i;
3861
3862         /*
3863          * We'll need room for the seq value (maximum snapshot id),
3864          * snapshot count, and array of that many snapshot ids.
3865          * For now we have a fixed upper limit on the number we're
3866          * prepared to receive.
3867          */
3868         size = sizeof (__le64) + sizeof (__le32) +
3869                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3870         reply_buf = kzalloc(size, GFP_KERNEL);
3871         if (!reply_buf)
3872                 return -ENOMEM;
3873
3874         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3875                                 "rbd", "get_snapcontext", NULL, 0,
3876                                 reply_buf, size);
3877         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3878         if (ret < 0)
3879                 goto out;
3880
3881         p = reply_buf;
3882         end = reply_buf + ret;
3883         ret = -ERANGE;
3884         ceph_decode_64_safe(&p, end, seq, out);
3885         ceph_decode_32_safe(&p, end, snap_count, out);
3886
3887         /*
3888          * Make sure the reported number of snapshot ids wouldn't go
3889          * beyond the end of our buffer.  But before checking that,
3890          * make sure the computed size of the snapshot context we
3891          * allocate is representable in a size_t.
3892          */
3893         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3894                                  / sizeof (u64)) {
3895                 ret = -EINVAL;
3896                 goto out;
3897         }
3898         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3899                 goto out;
3900         ret = 0;
3901
3902         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3903         if (!snapc) {
3904                 ret = -ENOMEM;
3905                 goto out;
3906         }
3907         snapc->seq = seq;
3908         for (i = 0; i < snap_count; i++)
3909                 snapc->snaps[i] = ceph_decode_64(&p);
3910
3911         rbd_dev->header.snapc = snapc;
3912
3913         dout("  snap context seq = %llu, snap_count = %u\n",
3914                 (unsigned long long)seq, (unsigned int)snap_count);
3915 out:
3916         kfree(reply_buf);
3917
3918         return ret;
3919 }
3920
3921 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3922                                         u64 snap_id)
3923 {
3924         size_t size;
3925         void *reply_buf;
3926         __le64 snapid;
3927         int ret;
3928         void *p;
3929         void *end;
3930         char *snap_name;
3931
3932         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3933         reply_buf = kmalloc(size, GFP_KERNEL);
3934         if (!reply_buf)
3935                 return ERR_PTR(-ENOMEM);
3936
3937         snapid = cpu_to_le64(snap_id);
3938         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3939                                 "rbd", "get_snapshot_name",
3940                                 &snapid, sizeof (snapid),
3941                                 reply_buf, size);
3942         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3943         if (ret < 0) {
3944                 snap_name = ERR_PTR(ret);
3945                 goto out;
3946         }
3947
3948         p = reply_buf;
3949         end = reply_buf + ret;
3950         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3951         if (IS_ERR(snap_name))
3952                 goto out;
3953
3954         dout("  snap_id 0x%016llx snap_name = %s\n",
3955                 (unsigned long long)snap_id, snap_name);
3956 out:
3957         kfree(reply_buf);
3958
3959         return snap_name;
3960 }
3961
3962 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev,
3963                         u64 snap_id, u64 *snap_size, u64 *snap_features)
3964 {
3965         u64 size;
3966         u64 features;
3967         const char *snap_name;
3968         int ret;
3969
3970         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3971         if (ret)
3972                 goto out_err;
3973
3974         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3975         if (ret)
3976                 goto out_err;
3977
3978         snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3979         if (!IS_ERR(snap_name)) {
3980                 *snap_size = size;
3981                 *snap_features = features;
3982         }
3983
3984         return snap_name;
3985 out_err:
3986         return ERR_PTR(ret);
3987 }
3988
3989 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev,
3990                 u64 snap_id, u64 *snap_size, u64 *snap_features)
3991 {
3992         if (rbd_dev->image_format == 1)
3993                 return rbd_dev_v1_snap_info(rbd_dev, snap_id,
3994                                         snap_size, snap_features);
3995         if (rbd_dev->image_format == 2)
3996                 return rbd_dev_v2_snap_info(rbd_dev, snap_id,
3997                                         snap_size, snap_features);
3998         return ERR_PTR(-EINVAL);
3999 }
4000
4001 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4002 {
4003         int ret;
4004
4005         down_write(&rbd_dev->header_rwsem);
4006
4007         ret = rbd_dev_v2_image_size(rbd_dev);
4008         if (ret)
4009                 goto out;
4010         rbd_update_mapping_size(rbd_dev);
4011
4012         ret = rbd_dev_v2_snap_context(rbd_dev);
4013         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4014         if (ret)
4015                 goto out;
4016         ret = rbd_dev_snaps_update(rbd_dev);
4017         dout("rbd_dev_snaps_update returned %d\n", ret);
4018         if (ret)
4019                 goto out;
4020 out:
4021         up_write(&rbd_dev->header_rwsem);
4022
4023         return ret;
4024 }
4025
4026 /*
4027  * Scan the rbd device's current snapshot list and compare it to the
4028  * newly-received snapshot context.  Remove any existing snapshots
4029  * not present in the new snapshot context.  Add a new snapshot for
4030  * any snaphots in the snapshot context not in the current list.
4031  * And verify there are no changes to snapshots we already know
4032  * about.
4033  *
4034  * Assumes the snapshots in the snapshot context are sorted by
4035  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4036  * are also maintained in that order.)
4037  *
4038  * Note that any error occurs while updating the snapshot list
4039  * aborts the update, and the entire list is cleared.  The snapshot
4040  * list becomes inconsistent at that point anyway, so it might as
4041  * well be empty.
4042  */
4043 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4044 {
4045         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4046         const u32 snap_count = snapc->num_snaps;
4047         struct list_head *head = &rbd_dev->snaps;
4048         struct list_head *links = head->next;
4049         u32 index = 0;
4050         int ret = 0;
4051
4052         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4053         while (index < snap_count || links != head) {
4054                 u64 snap_id;
4055                 struct rbd_snap *snap;
4056                 const char *snap_name;
4057                 u64 snap_size = 0;
4058                 u64 snap_features = 0;
4059
4060                 snap_id = index < snap_count ? snapc->snaps[index]
4061                                              : CEPH_NOSNAP;
4062                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4063                                      : NULL;
4064                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4065
4066                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4067                         struct list_head *next = links->next;
4068
4069                         /*
4070                          * A previously-existing snapshot is not in
4071                          * the new snap context.
4072                          *
4073                          * If the now-missing snapshot is the one
4074                          * the image represents, clear its existence
4075                          * flag so we can avoid sending any more
4076                          * requests to it.
4077                          */
4078                         if (rbd_dev->spec->snap_id == snap->id)
4079                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4080                         dout("removing %ssnap id %llu\n",
4081                                 rbd_dev->spec->snap_id == snap->id ?
4082                                                         "mapped " : "",
4083                                 (unsigned long long)snap->id);
4084
4085                         list_del(&snap->node);
4086                         rbd_snap_destroy(snap);
4087
4088                         /* Done with this list entry; advance */
4089
4090                         links = next;
4091                         continue;
4092                 }
4093
4094                 snap_name = rbd_dev_snap_info(rbd_dev, snap_id,
4095                                         &snap_size, &snap_features);
4096                 if (IS_ERR(snap_name)) {
4097                         ret = PTR_ERR(snap_name);
4098                         dout("failed to get snap info, error %d\n", ret);
4099                         goto out_err;
4100                 }
4101
4102                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4103                         (unsigned long long)snap_id);
4104                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4105                         struct rbd_snap *new_snap;
4106
4107                         /* We haven't seen this snapshot before */
4108
4109                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4110                                         snap_id, snap_size, snap_features);
4111                         if (IS_ERR(new_snap)) {
4112                                 ret = PTR_ERR(new_snap);
4113                                 dout("  failed to add dev, error %d\n", ret);
4114                                 goto out_err;
4115                         }
4116
4117                         /* New goes before existing, or at end of list */
4118
4119                         dout("  added dev%s\n", snap ? "" : " at end\n");
4120                         if (snap)
4121                                 list_add_tail(&new_snap->node, &snap->node);
4122                         else
4123                                 list_add_tail(&new_snap->node, head);
4124                 } else {
4125                         /* Already have this one */
4126
4127                         dout("  already present\n");
4128
4129                         rbd_assert(snap->size == snap_size);
4130                         rbd_assert(!strcmp(snap->name, snap_name));
4131                         rbd_assert(snap->features == snap_features);
4132
4133                         /* Done with this list entry; advance */
4134
4135                         links = links->next;
4136                 }
4137
4138                 /* Advance to the next entry in the snapshot context */
4139
4140                 index++;
4141         }
4142         dout("%s: done\n", __func__);
4143
4144         return 0;
4145 out_err:
4146         rbd_remove_all_snaps(rbd_dev);
4147
4148         return ret;
4149 }
4150
4151 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4152 {
4153         struct device *dev;
4154         int ret;
4155
4156         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4157
4158         dev = &rbd_dev->dev;
4159         dev->bus = &rbd_bus_type;
4160         dev->type = &rbd_device_type;
4161         dev->parent = &rbd_root_dev;
4162         dev->release = rbd_dev_device_release;
4163         dev_set_name(dev, "%d", rbd_dev->dev_id);
4164         ret = device_register(dev);
4165
4166         mutex_unlock(&ctl_mutex);
4167
4168         return ret;
4169 }
4170
4171 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4172 {
4173         device_unregister(&rbd_dev->dev);
4174 }
4175
4176 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4177
4178 /*
4179  * Get a unique rbd identifier for the given new rbd_dev, and add
4180  * the rbd_dev to the global list.  The minimum rbd id is 1.
4181  */
4182 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4183 {
4184         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4185
4186         spin_lock(&rbd_dev_list_lock);
4187         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4188         spin_unlock(&rbd_dev_list_lock);
4189         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4190                 (unsigned long long) rbd_dev->dev_id);
4191 }
4192
4193 /*
4194  * Remove an rbd_dev from the global list, and record that its
4195  * identifier is no longer in use.
4196  */
4197 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4198 {
4199         struct list_head *tmp;
4200         int rbd_id = rbd_dev->dev_id;
4201         int max_id;
4202
4203         rbd_assert(rbd_id > 0);
4204
4205         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4206                 (unsigned long long) rbd_dev->dev_id);
4207         spin_lock(&rbd_dev_list_lock);
4208         list_del_init(&rbd_dev->node);
4209
4210         /*
4211          * If the id being "put" is not the current maximum, there
4212          * is nothing special we need to do.
4213          */
4214         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4215                 spin_unlock(&rbd_dev_list_lock);
4216                 return;
4217         }
4218
4219         /*
4220          * We need to update the current maximum id.  Search the
4221          * list to find out what it is.  We're more likely to find
4222          * the maximum at the end, so search the list backward.
4223          */
4224         max_id = 0;
4225         list_for_each_prev(tmp, &rbd_dev_list) {
4226                 struct rbd_device *rbd_dev;
4227
4228                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4229                 if (rbd_dev->dev_id > max_id)
4230                         max_id = rbd_dev->dev_id;
4231         }
4232         spin_unlock(&rbd_dev_list_lock);
4233
4234         /*
4235          * The max id could have been updated by rbd_dev_id_get(), in
4236          * which case it now accurately reflects the new maximum.
4237          * Be careful not to overwrite the maximum value in that
4238          * case.
4239          */
4240         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4241         dout("  max dev id has been reset\n");
4242 }
4243
4244 /*
4245  * Skips over white space at *buf, and updates *buf to point to the
4246  * first found non-space character (if any). Returns the length of
4247  * the token (string of non-white space characters) found.  Note
4248  * that *buf must be terminated with '\0'.
4249  */
4250 static inline size_t next_token(const char **buf)
4251 {
4252         /*
4253         * These are the characters that produce nonzero for
4254         * isspace() in the "C" and "POSIX" locales.
4255         */
4256         const char *spaces = " \f\n\r\t\v";
4257
4258         *buf += strspn(*buf, spaces);   /* Find start of token */
4259
4260         return strcspn(*buf, spaces);   /* Return token length */
4261 }
4262
4263 /*
4264  * Finds the next token in *buf, and if the provided token buffer is
4265  * big enough, copies the found token into it.  The result, if
4266  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4267  * must be terminated with '\0' on entry.
4268  *
4269  * Returns the length of the token found (not including the '\0').
4270  * Return value will be 0 if no token is found, and it will be >=
4271  * token_size if the token would not fit.
4272  *
4273  * The *buf pointer will be updated to point beyond the end of the
4274  * found token.  Note that this occurs even if the token buffer is
4275  * too small to hold it.
4276  */
4277 static inline size_t copy_token(const char **buf,
4278                                 char *token,
4279                                 size_t token_size)
4280 {
4281         size_t len;
4282
4283         len = next_token(buf);
4284         if (len < token_size) {
4285                 memcpy(token, *buf, len);
4286                 *(token + len) = '\0';
4287         }
4288         *buf += len;
4289
4290         return len;
4291 }
4292
4293 /*
4294  * Finds the next token in *buf, dynamically allocates a buffer big
4295  * enough to hold a copy of it, and copies the token into the new
4296  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4297  * that a duplicate buffer is created even for a zero-length token.
4298  *
4299  * Returns a pointer to the newly-allocated duplicate, or a null
4300  * pointer if memory for the duplicate was not available.  If
4301  * the lenp argument is a non-null pointer, the length of the token
4302  * (not including the '\0') is returned in *lenp.
4303  *
4304  * If successful, the *buf pointer will be updated to point beyond
4305  * the end of the found token.
4306  *
4307  * Note: uses GFP_KERNEL for allocation.
4308  */
4309 static inline char *dup_token(const char **buf, size_t *lenp)
4310 {
4311         char *dup;
4312         size_t len;
4313
4314         len = next_token(buf);
4315         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4316         if (!dup)
4317                 return NULL;
4318         *(dup + len) = '\0';
4319         *buf += len;
4320
4321         if (lenp)
4322                 *lenp = len;
4323
4324         return dup;
4325 }
4326
4327 /*
4328  * Parse the options provided for an "rbd add" (i.e., rbd image
4329  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4330  * and the data written is passed here via a NUL-terminated buffer.
4331  * Returns 0 if successful or an error code otherwise.
4332  *
4333  * The information extracted from these options is recorded in
4334  * the other parameters which return dynamically-allocated
4335  * structures:
4336  *  ceph_opts
4337  *      The address of a pointer that will refer to a ceph options
4338  *      structure.  Caller must release the returned pointer using
4339  *      ceph_destroy_options() when it is no longer needed.
4340  *  rbd_opts
4341  *      Address of an rbd options pointer.  Fully initialized by
4342  *      this function; caller must release with kfree().
4343  *  spec
4344  *      Address of an rbd image specification pointer.  Fully
4345  *      initialized by this function based on parsed options.
4346  *      Caller must release with rbd_spec_put().
4347  *
4348  * The options passed take this form:
4349  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4350  * where:
4351  *  <mon_addrs>
4352  *      A comma-separated list of one or more monitor addresses.
4353  *      A monitor address is an ip address, optionally followed
4354  *      by a port number (separated by a colon).
4355  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4356  *  <options>
4357  *      A comma-separated list of ceph and/or rbd options.
4358  *  <pool_name>
4359  *      The name of the rados pool containing the rbd image.
4360  *  <image_name>
4361  *      The name of the image in that pool to map.
4362  *  <snap_id>
4363  *      An optional snapshot id.  If provided, the mapping will
4364  *      present data from the image at the time that snapshot was
4365  *      created.  The image head is used if no snapshot id is
4366  *      provided.  Snapshot mappings are always read-only.
4367  */
4368 static int rbd_add_parse_args(const char *buf,
4369                                 struct ceph_options **ceph_opts,
4370                                 struct rbd_options **opts,
4371                                 struct rbd_spec **rbd_spec)
4372 {
4373         size_t len;
4374         char *options;
4375         const char *mon_addrs;
4376         char *snap_name;
4377         size_t mon_addrs_size;
4378         struct rbd_spec *spec = NULL;
4379         struct rbd_options *rbd_opts = NULL;
4380         struct ceph_options *copts;
4381         int ret;
4382
4383         /* The first four tokens are required */
4384
4385         len = next_token(&buf);
4386         if (!len) {
4387                 rbd_warn(NULL, "no monitor address(es) provided");
4388                 return -EINVAL;
4389         }
4390         mon_addrs = buf;
4391         mon_addrs_size = len + 1;
4392         buf += len;
4393
4394         ret = -EINVAL;
4395         options = dup_token(&buf, NULL);
4396         if (!options)
4397                 return -ENOMEM;
4398         if (!*options) {
4399                 rbd_warn(NULL, "no options provided");
4400                 goto out_err;
4401         }
4402
4403         spec = rbd_spec_alloc();
4404         if (!spec)
4405                 goto out_mem;
4406
4407         spec->pool_name = dup_token(&buf, NULL);
4408         if (!spec->pool_name)
4409                 goto out_mem;
4410         if (!*spec->pool_name) {
4411                 rbd_warn(NULL, "no pool name provided");
4412                 goto out_err;
4413         }
4414
4415         spec->image_name = dup_token(&buf, NULL);
4416         if (!spec->image_name)
4417                 goto out_mem;
4418         if (!*spec->image_name) {
4419                 rbd_warn(NULL, "no image name provided");
4420                 goto out_err;
4421         }
4422
4423         /*
4424          * Snapshot name is optional; default is to use "-"
4425          * (indicating the head/no snapshot).
4426          */
4427         len = next_token(&buf);
4428         if (!len) {
4429                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4430                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4431         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4432                 ret = -ENAMETOOLONG;
4433                 goto out_err;
4434         }
4435         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4436         if (!snap_name)
4437                 goto out_mem;
4438         *(snap_name + len) = '\0';
4439         spec->snap_name = snap_name;
4440
4441         /* Initialize all rbd options to the defaults */
4442
4443         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4444         if (!rbd_opts)
4445                 goto out_mem;
4446
4447         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4448
4449         copts = ceph_parse_options(options, mon_addrs,
4450                                         mon_addrs + mon_addrs_size - 1,
4451                                         parse_rbd_opts_token, rbd_opts);
4452         if (IS_ERR(copts)) {
4453                 ret = PTR_ERR(copts);
4454                 goto out_err;
4455         }
4456         kfree(options);
4457
4458         *ceph_opts = copts;
4459         *opts = rbd_opts;
4460         *rbd_spec = spec;
4461
4462         return 0;
4463 out_mem:
4464         ret = -ENOMEM;
4465 out_err:
4466         kfree(rbd_opts);
4467         rbd_spec_put(spec);
4468         kfree(options);
4469
4470         return ret;
4471 }
4472
4473 /*
4474  * An rbd format 2 image has a unique identifier, distinct from the
4475  * name given to it by the user.  Internally, that identifier is
4476  * what's used to specify the names of objects related to the image.
4477  *
4478  * A special "rbd id" object is used to map an rbd image name to its
4479  * id.  If that object doesn't exist, then there is no v2 rbd image
4480  * with the supplied name.
4481  *
4482  * This function will record the given rbd_dev's image_id field if
4483  * it can be determined, and in that case will return 0.  If any
4484  * errors occur a negative errno will be returned and the rbd_dev's
4485  * image_id field will be unchanged (and should be NULL).
4486  */
4487 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4488 {
4489         int ret;
4490         size_t size;
4491         char *object_name;
4492         void *response;
4493         char *image_id;
4494
4495         /*
4496          * When probing a parent image, the image id is already
4497          * known (and the image name likely is not).  There's no
4498          * need to fetch the image id again in this case.  We
4499          * do still need to set the image format though.
4500          */
4501         if (rbd_dev->spec->image_id) {
4502                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4503
4504                 return 0;
4505         }
4506
4507         /*
4508          * First, see if the format 2 image id file exists, and if
4509          * so, get the image's persistent id from it.
4510          */
4511         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4512         object_name = kmalloc(size, GFP_NOIO);
4513         if (!object_name)
4514                 return -ENOMEM;
4515         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4516         dout("rbd id object name is %s\n", object_name);
4517
4518         /* Response will be an encoded string, which includes a length */
4519
4520         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4521         response = kzalloc(size, GFP_NOIO);
4522         if (!response) {
4523                 ret = -ENOMEM;
4524                 goto out;
4525         }
4526
4527         /* If it doesn't exist we'll assume it's a format 1 image */
4528
4529         ret = rbd_obj_method_sync(rbd_dev, object_name,
4530                                 "rbd", "get_id", NULL, 0,
4531                                 response, RBD_IMAGE_ID_LEN_MAX);
4532         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4533         if (ret == -ENOENT) {
4534                 image_id = kstrdup("", GFP_KERNEL);
4535                 ret = image_id ? 0 : -ENOMEM;
4536                 if (!ret)
4537                         rbd_dev->image_format = 1;
4538         } else if (ret > sizeof (__le32)) {
4539                 void *p = response;
4540
4541                 image_id = ceph_extract_encoded_string(&p, p + ret,
4542                                                 NULL, GFP_NOIO);
4543                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4544                 if (!ret)
4545                         rbd_dev->image_format = 2;
4546         } else {
4547                 ret = -EINVAL;
4548         }
4549
4550         if (!ret) {
4551                 rbd_dev->spec->image_id = image_id;
4552                 dout("image_id is %s\n", image_id);
4553         }
4554 out:
4555         kfree(response);
4556         kfree(object_name);
4557
4558         return ret;
4559 }
4560
4561 /* Undo whatever state changes are made by v1 or v2 image probe */
4562
4563 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4564 {
4565         struct rbd_image_header *header;
4566
4567         rbd_dev_remove_parent(rbd_dev);
4568         rbd_spec_put(rbd_dev->parent_spec);
4569         rbd_dev->parent_spec = NULL;
4570         rbd_dev->parent_overlap = 0;
4571
4572         /* Free dynamic fields from the header, then zero it out */
4573
4574         header = &rbd_dev->header;
4575         ceph_put_snap_context(header->snapc);
4576         kfree(header->snap_sizes);
4577         kfree(header->snap_names);
4578         kfree(header->object_prefix);
4579         memset(header, 0, sizeof (*header));
4580 }
4581
4582 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4583 {
4584         int ret;
4585
4586         /* Populate rbd image metadata */
4587
4588         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4589         if (ret < 0)
4590                 goto out_err;
4591
4592         /* Version 1 images have no parent (no layering) */
4593
4594         rbd_dev->parent_spec = NULL;
4595         rbd_dev->parent_overlap = 0;
4596
4597         dout("discovered version 1 image, header name is %s\n",
4598                 rbd_dev->header_name);
4599
4600         return 0;
4601
4602 out_err:
4603         kfree(rbd_dev->header_name);
4604         rbd_dev->header_name = NULL;
4605         kfree(rbd_dev->spec->image_id);
4606         rbd_dev->spec->image_id = NULL;
4607
4608         return ret;
4609 }
4610
4611 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4612 {
4613         int ret;
4614
4615         ret = rbd_dev_v2_image_size(rbd_dev);
4616         if (ret)
4617                 goto out_err;
4618
4619         /* Get the object prefix (a.k.a. block_name) for the image */
4620
4621         ret = rbd_dev_v2_object_prefix(rbd_dev);
4622         if (ret)
4623                 goto out_err;
4624
4625         /* Get the and check features for the image */
4626
4627         ret = rbd_dev_v2_features(rbd_dev);
4628         if (ret)
4629                 goto out_err;
4630
4631         /* If the image supports layering, get the parent info */
4632
4633         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4634                 ret = rbd_dev_v2_parent_info(rbd_dev);
4635                 if (ret)
4636                         goto out_err;
4637
4638                 /*
4639                  * Don't print a warning for parent images.  We can
4640                  * tell this point because we won't know its pool
4641                  * name yet (just its pool id).
4642                  */
4643                 if (rbd_dev->spec->pool_name)
4644                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4645                                         "is EXPERIMENTAL!");
4646         }
4647
4648         /* If the image supports fancy striping, get its parameters */
4649
4650         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4651                 ret = rbd_dev_v2_striping_info(rbd_dev);
4652                 if (ret < 0)
4653                         goto out_err;
4654         }
4655
4656         /* crypto and compression type aren't (yet) supported for v2 images */
4657
4658         rbd_dev->header.crypt_type = 0;
4659         rbd_dev->header.comp_type = 0;
4660
4661         /* Get the snapshot context, plus the header version */
4662
4663         ret = rbd_dev_v2_snap_context(rbd_dev);
4664         if (ret)
4665                 goto out_err;
4666
4667         dout("discovered version 2 image, header name is %s\n",
4668                 rbd_dev->header_name);
4669
4670         return 0;
4671 out_err:
4672         rbd_dev->parent_overlap = 0;
4673         rbd_spec_put(rbd_dev->parent_spec);
4674         rbd_dev->parent_spec = NULL;
4675         kfree(rbd_dev->header_name);
4676         rbd_dev->header_name = NULL;
4677         kfree(rbd_dev->header.object_prefix);
4678         rbd_dev->header.object_prefix = NULL;
4679
4680         return ret;
4681 }
4682
4683 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4684 {
4685         struct rbd_device *parent = NULL;
4686         struct rbd_spec *parent_spec;
4687         struct rbd_client *rbdc;
4688         int ret;
4689
4690         if (!rbd_dev->parent_spec)
4691                 return 0;
4692         /*
4693          * We need to pass a reference to the client and the parent
4694          * spec when creating the parent rbd_dev.  Images related by
4695          * parent/child relationships always share both.
4696          */
4697         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4698         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4699
4700         ret = -ENOMEM;
4701         parent = rbd_dev_create(rbdc, parent_spec);
4702         if (!parent)
4703                 goto out_err;
4704
4705         ret = rbd_dev_image_probe(parent);
4706         if (ret < 0)
4707                 goto out_err;
4708         rbd_dev->parent = parent;
4709
4710         return 0;
4711 out_err:
4712         if (parent) {
4713                 rbd_spec_put(rbd_dev->parent_spec);
4714                 kfree(rbd_dev->header_name);
4715                 rbd_dev_destroy(parent);
4716         } else {
4717                 rbd_put_client(rbdc);
4718                 rbd_spec_put(parent_spec);
4719         }
4720
4721         return ret;
4722 }
4723
4724 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4725 {
4726         int ret;
4727
4728         ret = rbd_dev_mapping_set(rbd_dev);
4729         if (ret)
4730                 return ret;
4731
4732         /* generate unique id: find highest unique id, add one */
4733         rbd_dev_id_get(rbd_dev);
4734
4735         /* Fill in the device name, now that we have its id. */
4736         BUILD_BUG_ON(DEV_NAME_LEN
4737                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4738         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4739
4740         /* Get our block major device number. */
4741
4742         ret = register_blkdev(0, rbd_dev->name);
4743         if (ret < 0)
4744                 goto err_out_id;
4745         rbd_dev->major = ret;
4746
4747         /* Set up the blkdev mapping. */
4748
4749         ret = rbd_init_disk(rbd_dev);
4750         if (ret)
4751                 goto err_out_blkdev;
4752
4753         ret = rbd_bus_add_dev(rbd_dev);
4754         if (ret)
4755                 goto err_out_disk;
4756
4757         /* Everything's ready.  Announce the disk to the world. */
4758
4759         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4760         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4761         add_disk(rbd_dev->disk);
4762
4763         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4764                 (unsigned long long) rbd_dev->mapping.size);
4765
4766         return ret;
4767
4768 err_out_disk:
4769         rbd_free_disk(rbd_dev);
4770 err_out_blkdev:
4771         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4772 err_out_id:
4773         rbd_dev_id_put(rbd_dev);
4774         rbd_dev_mapping_clear(rbd_dev);
4775
4776         return ret;
4777 }
4778
4779 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4780 {
4781         struct rbd_spec *spec = rbd_dev->spec;
4782         size_t size;
4783
4784         /* Record the header object name for this rbd image. */
4785
4786         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4787
4788         if (rbd_dev->image_format == 1)
4789                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4790         else
4791                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4792
4793         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4794         if (!rbd_dev->header_name)
4795                 return -ENOMEM;
4796
4797         if (rbd_dev->image_format == 1)
4798                 sprintf(rbd_dev->header_name, "%s%s",
4799                         spec->image_name, RBD_SUFFIX);
4800         else
4801                 sprintf(rbd_dev->header_name, "%s%s",
4802                         RBD_HEADER_PREFIX, spec->image_id);
4803         return 0;
4804 }
4805
4806 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4807 {
4808         int ret;
4809
4810         rbd_remove_all_snaps(rbd_dev);
4811         rbd_dev_unprobe(rbd_dev);
4812         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4813         if (ret)
4814                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4815         kfree(rbd_dev->header_name);
4816         rbd_dev->header_name = NULL;
4817         rbd_dev->image_format = 0;
4818         kfree(rbd_dev->spec->image_id);
4819         rbd_dev->spec->image_id = NULL;
4820
4821         rbd_dev_destroy(rbd_dev);
4822 }
4823
4824 /*
4825  * Probe for the existence of the header object for the given rbd
4826  * device.  For format 2 images this includes determining the image
4827  * id.
4828  */
4829 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4830 {
4831         int ret;
4832         int tmp;
4833
4834         /*
4835          * Get the id from the image id object.  If it's not a
4836          * format 2 image, we'll get ENOENT back, and we'll assume
4837          * it's a format 1 image.
4838          */
4839         ret = rbd_dev_image_id(rbd_dev);
4840         if (ret)
4841                 return ret;
4842         rbd_assert(rbd_dev->spec->image_id);
4843         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4844
4845         ret = rbd_dev_header_name(rbd_dev);
4846         if (ret)
4847                 goto err_out_format;
4848
4849         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4850         if (ret)
4851                 goto out_header_name;
4852
4853         if (rbd_dev->image_format == 1)
4854                 ret = rbd_dev_v1_probe(rbd_dev);
4855         else
4856                 ret = rbd_dev_v2_probe(rbd_dev);
4857         if (ret)
4858                 goto err_out_watch;
4859
4860         ret = rbd_dev_snaps_update(rbd_dev);
4861         if (ret)
4862                 goto err_out_probe;
4863
4864         ret = rbd_dev_spec_update(rbd_dev);
4865         if (ret)
4866                 goto err_out_snaps;
4867
4868         ret = rbd_dev_probe_parent(rbd_dev);
4869         if (!ret)
4870                 return 0;
4871
4872 err_out_snaps:
4873         rbd_remove_all_snaps(rbd_dev);
4874 err_out_probe:
4875         rbd_dev_unprobe(rbd_dev);
4876 err_out_watch:
4877         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4878         if (tmp)
4879                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4880 out_header_name:
4881         kfree(rbd_dev->header_name);
4882         rbd_dev->header_name = NULL;
4883 err_out_format:
4884         rbd_dev->image_format = 0;
4885         kfree(rbd_dev->spec->image_id);
4886         rbd_dev->spec->image_id = NULL;
4887
4888         dout("probe failed, returning %d\n", ret);
4889
4890         return ret;
4891 }
4892
4893 static ssize_t rbd_add(struct bus_type *bus,
4894                        const char *buf,
4895                        size_t count)
4896 {
4897         struct rbd_device *rbd_dev = NULL;
4898         struct ceph_options *ceph_opts = NULL;
4899         struct rbd_options *rbd_opts = NULL;
4900         struct rbd_spec *spec = NULL;
4901         struct rbd_client *rbdc;
4902         struct ceph_osd_client *osdc;
4903         int rc = -ENOMEM;
4904
4905         if (!try_module_get(THIS_MODULE))
4906                 return -ENODEV;
4907
4908         /* parse add command */
4909         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4910         if (rc < 0)
4911                 goto err_out_module;
4912
4913         rbdc = rbd_get_client(ceph_opts);
4914         if (IS_ERR(rbdc)) {
4915                 rc = PTR_ERR(rbdc);
4916                 goto err_out_args;
4917         }
4918         ceph_opts = NULL;       /* rbd_dev client now owns this */
4919
4920         /* pick the pool */
4921         osdc = &rbdc->client->osdc;
4922         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4923         if (rc < 0)
4924                 goto err_out_client;
4925         spec->pool_id = (u64)rc;
4926
4927         /* The ceph file layout needs to fit pool id in 32 bits */
4928
4929         if (spec->pool_id > (u64)U32_MAX) {
4930                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4931                                 (unsigned long long)spec->pool_id, U32_MAX);
4932                 rc = -EIO;
4933                 goto err_out_client;
4934         }
4935
4936         rbd_dev = rbd_dev_create(rbdc, spec);
4937         if (!rbd_dev)
4938                 goto err_out_client;
4939         rbdc = NULL;            /* rbd_dev now owns this */
4940         spec = NULL;            /* rbd_dev now owns this */
4941
4942         rbd_dev->mapping.read_only = rbd_opts->read_only;
4943         kfree(rbd_opts);
4944         rbd_opts = NULL;        /* done with this */
4945
4946         rc = rbd_dev_image_probe(rbd_dev);
4947         if (rc < 0)
4948                 goto err_out_rbd_dev;
4949
4950         rc = rbd_dev_device_setup(rbd_dev);
4951         if (!rc)
4952                 return count;
4953
4954         rbd_dev_image_release(rbd_dev);
4955 err_out_rbd_dev:
4956         rbd_dev_destroy(rbd_dev);
4957 err_out_client:
4958         rbd_put_client(rbdc);
4959 err_out_args:
4960         if (ceph_opts)
4961                 ceph_destroy_options(ceph_opts);
4962         kfree(rbd_opts);
4963         rbd_spec_put(spec);
4964 err_out_module:
4965         module_put(THIS_MODULE);
4966
4967         dout("Error adding device %s\n", buf);
4968
4969         return (ssize_t)rc;
4970 }
4971
4972 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4973 {
4974         struct list_head *tmp;
4975         struct rbd_device *rbd_dev;
4976
4977         spin_lock(&rbd_dev_list_lock);
4978         list_for_each(tmp, &rbd_dev_list) {
4979                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4980                 if (rbd_dev->dev_id == dev_id) {
4981                         spin_unlock(&rbd_dev_list_lock);
4982                         return rbd_dev;
4983                 }
4984         }
4985         spin_unlock(&rbd_dev_list_lock);
4986         return NULL;
4987 }
4988
4989 static void rbd_dev_device_release(struct device *dev)
4990 {
4991         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4992
4993         rbd_free_disk(rbd_dev);
4994         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4995         rbd_dev_clear_mapping(rbd_dev);
4996         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4997         rbd_dev->major = 0;
4998         rbd_dev_id_put(rbd_dev);
4999         rbd_dev_mapping_clear(rbd_dev);
5000 }
5001
5002 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5003 {
5004         while (rbd_dev->parent) {
5005                 struct rbd_device *first = rbd_dev;
5006                 struct rbd_device *second = first->parent;
5007                 struct rbd_device *third;
5008
5009                 /*
5010                  * Follow to the parent with no grandparent and
5011                  * remove it.
5012                  */
5013                 while (second && (third = second->parent)) {
5014                         first = second;
5015                         second = third;
5016                 }
5017                 rbd_assert(second);
5018                 rbd_dev_image_release(second);
5019                 first->parent = NULL;
5020                 first->parent_overlap = 0;
5021
5022                 rbd_assert(first->parent_spec);
5023                 rbd_spec_put(first->parent_spec);
5024                 first->parent_spec = NULL;
5025         }
5026 }
5027
5028 static ssize_t rbd_remove(struct bus_type *bus,
5029                           const char *buf,
5030                           size_t count)
5031 {
5032         struct rbd_device *rbd_dev = NULL;
5033         int target_id;
5034         unsigned long ul;
5035         int ret;
5036
5037         ret = strict_strtoul(buf, 10, &ul);
5038         if (ret)
5039                 return ret;
5040
5041         /* convert to int; abort if we lost anything in the conversion */
5042         target_id = (int) ul;
5043         if (target_id != ul)
5044                 return -EINVAL;
5045
5046         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5047
5048         rbd_dev = __rbd_get_dev(target_id);
5049         if (!rbd_dev) {
5050                 ret = -ENOENT;
5051                 goto done;
5052         }
5053
5054         spin_lock_irq(&rbd_dev->lock);
5055         if (rbd_dev->open_count)
5056                 ret = -EBUSY;
5057         else
5058                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5059         spin_unlock_irq(&rbd_dev->lock);
5060         if (ret < 0)
5061                 goto done;
5062         ret = count;
5063         rbd_bus_del_dev(rbd_dev);
5064         rbd_dev_image_release(rbd_dev);
5065         module_put(THIS_MODULE);
5066 done:
5067         mutex_unlock(&ctl_mutex);
5068
5069         return ret;
5070 }
5071
5072 /*
5073  * create control files in sysfs
5074  * /sys/bus/rbd/...
5075  */
5076 static int rbd_sysfs_init(void)
5077 {
5078         int ret;
5079
5080         ret = device_register(&rbd_root_dev);
5081         if (ret < 0)
5082                 return ret;
5083
5084         ret = bus_register(&rbd_bus_type);
5085         if (ret < 0)
5086                 device_unregister(&rbd_root_dev);
5087
5088         return ret;
5089 }
5090
5091 static void rbd_sysfs_cleanup(void)
5092 {
5093         bus_unregister(&rbd_bus_type);
5094         device_unregister(&rbd_root_dev);
5095 }
5096
5097 static int __init rbd_init(void)
5098 {
5099         int rc;
5100
5101         if (!libceph_compatible(NULL)) {
5102                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5103
5104                 return -EINVAL;
5105         }
5106         rc = rbd_sysfs_init();
5107         if (rc)
5108                 return rc;
5109         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5110         return 0;
5111 }
5112
5113 static void __exit rbd_exit(void)
5114 {
5115         rbd_sysfs_cleanup();
5116 }
5117
5118 module_init(rbd_init);
5119 module_exit(rbd_exit);
5120
5121 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5122 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5123 MODULE_DESCRIPTION("rados block device");
5124
5125 /* following authorship retained from original osdblk.c */
5126 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5127
5128 MODULE_LICENSE("GPL");