]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: define rbd_snap_size() and rbd_snap_features()
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
70
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX    64
74
75 #define RBD_OBJ_PREFIX_LEN_MAX  64
76
77 /* Feature bits */
78
79 #define RBD_FEATURE_LAYERING    (1<<0)
80 #define RBD_FEATURE_STRIPINGV2  (1<<1)
81 #define RBD_FEATURES_ALL \
82             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 /*
98  * block device image metadata (in-memory version)
99  */
100 struct rbd_image_header {
101         /* These four fields never change for a given rbd image */
102         char *object_prefix;
103         u64 features;
104         __u8 obj_order;
105         __u8 crypt_type;
106         __u8 comp_type;
107
108         /* The remaining fields need to be updated occasionally */
109         u64 image_size;
110         struct ceph_snap_context *snapc;
111         char *snap_names;
112         u64 *snap_sizes;
113
114         u64 stripe_unit;
115         u64 stripe_count;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         const char      *pool_name;
146
147         const char      *image_id;
148         const char      *image_name;
149
150         u64             snap_id;
151         const char      *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 enum obj_req_flags {
178         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
179         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
180         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
181         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
182 };
183
184 struct rbd_obj_request {
185         const char              *object_name;
186         u64                     offset;         /* object start byte */
187         u64                     length;         /* bytes from offset */
188         unsigned long           flags;
189
190         /*
191          * An object request associated with an image will have its
192          * img_data flag set; a standalone object request will not.
193          *
194          * A standalone object request will have which == BAD_WHICH
195          * and a null obj_request pointer.
196          *
197          * An object request initiated in support of a layered image
198          * object (to check for its existence before a write) will
199          * have which == BAD_WHICH and a non-null obj_request pointer.
200          *
201          * Finally, an object request for rbd image data will have
202          * which != BAD_WHICH, and will have a non-null img_request
203          * pointer.  The value of which will be in the range
204          * 0..(img_request->obj_request_count-1).
205          */
206         union {
207                 struct rbd_obj_request  *obj_request;   /* STAT op */
208                 struct {
209                         struct rbd_img_request  *img_request;
210                         u64                     img_offset;
211                         /* links for img_request->obj_requests list */
212                         struct list_head        links;
213                 };
214         };
215         u32                     which;          /* posn image request list */
216
217         enum obj_request_type   type;
218         union {
219                 struct bio      *bio_list;
220                 struct {
221                         struct page     **pages;
222                         u32             page_count;
223                 };
224         };
225         struct page             **copyup_pages;
226
227         struct ceph_osd_request *osd_req;
228
229         u64                     xferred;        /* bytes transferred */
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
436 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
437                                         u64 snap_id);
438 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
439                                 u8 *order, u64 *snap_size);
440 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
441                 u64 *snap_features);
442 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
443
444 static int rbd_open(struct block_device *bdev, fmode_t mode)
445 {
446         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
447         bool removing = false;
448
449         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
450                 return -EROFS;
451
452         spin_lock_irq(&rbd_dev->lock);
453         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
454                 removing = true;
455         else
456                 rbd_dev->open_count++;
457         spin_unlock_irq(&rbd_dev->lock);
458         if (removing)
459                 return -ENOENT;
460
461         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
462         (void) get_device(&rbd_dev->dev);
463         set_device_ro(bdev, rbd_dev->mapping.read_only);
464         mutex_unlock(&ctl_mutex);
465
466         return 0;
467 }
468
469 static int rbd_release(struct gendisk *disk, fmode_t mode)
470 {
471         struct rbd_device *rbd_dev = disk->private_data;
472         unsigned long open_count_before;
473
474         spin_lock_irq(&rbd_dev->lock);
475         open_count_before = rbd_dev->open_count--;
476         spin_unlock_irq(&rbd_dev->lock);
477         rbd_assert(open_count_before > 0);
478
479         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
480         put_device(&rbd_dev->dev);
481         mutex_unlock(&ctl_mutex);
482
483         return 0;
484 }
485
486 static const struct block_device_operations rbd_bd_ops = {
487         .owner                  = THIS_MODULE,
488         .open                   = rbd_open,
489         .release                = rbd_release,
490 };
491
492 /*
493  * Initialize an rbd client instance.
494  * We own *ceph_opts.
495  */
496 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
497 {
498         struct rbd_client *rbdc;
499         int ret = -ENOMEM;
500
501         dout("%s:\n", __func__);
502         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
503         if (!rbdc)
504                 goto out_opt;
505
506         kref_init(&rbdc->kref);
507         INIT_LIST_HEAD(&rbdc->node);
508
509         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
510
511         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
512         if (IS_ERR(rbdc->client))
513                 goto out_mutex;
514         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
515
516         ret = ceph_open_session(rbdc->client);
517         if (ret < 0)
518                 goto out_err;
519
520         spin_lock(&rbd_client_list_lock);
521         list_add_tail(&rbdc->node, &rbd_client_list);
522         spin_unlock(&rbd_client_list_lock);
523
524         mutex_unlock(&ctl_mutex);
525         dout("%s: rbdc %p\n", __func__, rbdc);
526
527         return rbdc;
528
529 out_err:
530         ceph_destroy_client(rbdc->client);
531 out_mutex:
532         mutex_unlock(&ctl_mutex);
533         kfree(rbdc);
534 out_opt:
535         if (ceph_opts)
536                 ceph_destroy_options(ceph_opts);
537         dout("%s: error %d\n", __func__, ret);
538
539         return ERR_PTR(ret);
540 }
541
542 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
543 {
544         kref_get(&rbdc->kref);
545
546         return rbdc;
547 }
548
549 /*
550  * Find a ceph client with specific addr and configuration.  If
551  * found, bump its reference count.
552  */
553 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
554 {
555         struct rbd_client *client_node;
556         bool found = false;
557
558         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
559                 return NULL;
560
561         spin_lock(&rbd_client_list_lock);
562         list_for_each_entry(client_node, &rbd_client_list, node) {
563                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
564                         __rbd_get_client(client_node);
565
566                         found = true;
567                         break;
568                 }
569         }
570         spin_unlock(&rbd_client_list_lock);
571
572         return found ? client_node : NULL;
573 }
574
575 /*
576  * mount options
577  */
578 enum {
579         Opt_last_int,
580         /* int args above */
581         Opt_last_string,
582         /* string args above */
583         Opt_read_only,
584         Opt_read_write,
585         /* Boolean args above */
586         Opt_last_bool,
587 };
588
589 static match_table_t rbd_opts_tokens = {
590         /* int args above */
591         /* string args above */
592         {Opt_read_only, "read_only"},
593         {Opt_read_only, "ro"},          /* Alternate spelling */
594         {Opt_read_write, "read_write"},
595         {Opt_read_write, "rw"},         /* Alternate spelling */
596         /* Boolean args above */
597         {-1, NULL}
598 };
599
600 struct rbd_options {
601         bool    read_only;
602 };
603
604 #define RBD_READ_ONLY_DEFAULT   false
605
606 static int parse_rbd_opts_token(char *c, void *private)
607 {
608         struct rbd_options *rbd_opts = private;
609         substring_t argstr[MAX_OPT_ARGS];
610         int token, intval, ret;
611
612         token = match_token(c, rbd_opts_tokens, argstr);
613         if (token < 0)
614                 return -EINVAL;
615
616         if (token < Opt_last_int) {
617                 ret = match_int(&argstr[0], &intval);
618                 if (ret < 0) {
619                         pr_err("bad mount option arg (not int) "
620                                "at '%s'\n", c);
621                         return ret;
622                 }
623                 dout("got int token %d val %d\n", token, intval);
624         } else if (token > Opt_last_int && token < Opt_last_string) {
625                 dout("got string token %d val %s\n", token,
626                      argstr[0].from);
627         } else if (token > Opt_last_string && token < Opt_last_bool) {
628                 dout("got Boolean token %d\n", token);
629         } else {
630                 dout("got token %d\n", token);
631         }
632
633         switch (token) {
634         case Opt_read_only:
635                 rbd_opts->read_only = true;
636                 break;
637         case Opt_read_write:
638                 rbd_opts->read_only = false;
639                 break;
640         default:
641                 rbd_assert(false);
642                 break;
643         }
644         return 0;
645 }
646
647 /*
648  * Get a ceph client with specific addr and configuration, if one does
649  * not exist create it.
650  */
651 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
652 {
653         struct rbd_client *rbdc;
654
655         rbdc = rbd_client_find(ceph_opts);
656         if (rbdc)       /* using an existing client */
657                 ceph_destroy_options(ceph_opts);
658         else
659                 rbdc = rbd_client_create(ceph_opts);
660
661         return rbdc;
662 }
663
664 /*
665  * Destroy ceph client
666  *
667  * Caller must hold rbd_client_list_lock.
668  */
669 static void rbd_client_release(struct kref *kref)
670 {
671         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
672
673         dout("%s: rbdc %p\n", __func__, rbdc);
674         spin_lock(&rbd_client_list_lock);
675         list_del(&rbdc->node);
676         spin_unlock(&rbd_client_list_lock);
677
678         ceph_destroy_client(rbdc->client);
679         kfree(rbdc);
680 }
681
682 /*
683  * Drop reference to ceph client node. If it's not referenced anymore, release
684  * it.
685  */
686 static void rbd_put_client(struct rbd_client *rbdc)
687 {
688         if (rbdc)
689                 kref_put(&rbdc->kref, rbd_client_release);
690 }
691
692 static bool rbd_image_format_valid(u32 image_format)
693 {
694         return image_format == 1 || image_format == 2;
695 }
696
697 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
698 {
699         size_t size;
700         u32 snap_count;
701
702         /* The header has to start with the magic rbd header text */
703         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
704                 return false;
705
706         /* The bio layer requires at least sector-sized I/O */
707
708         if (ondisk->options.order < SECTOR_SHIFT)
709                 return false;
710
711         /* If we use u64 in a few spots we may be able to loosen this */
712
713         if (ondisk->options.order > 8 * sizeof (int) - 1)
714                 return false;
715
716         /*
717          * The size of a snapshot header has to fit in a size_t, and
718          * that limits the number of snapshots.
719          */
720         snap_count = le32_to_cpu(ondisk->snap_count);
721         size = SIZE_MAX - sizeof (struct ceph_snap_context);
722         if (snap_count > size / sizeof (__le64))
723                 return false;
724
725         /*
726          * Not only that, but the size of the entire the snapshot
727          * header must also be representable in a size_t.
728          */
729         size -= snap_count * sizeof (__le64);
730         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731                 return false;
732
733         return true;
734 }
735
736 /*
737  * Create a new header structure, translate header format from the on-disk
738  * header.
739  */
740 static int rbd_header_from_disk(struct rbd_image_header *header,
741                                  struct rbd_image_header_ondisk *ondisk)
742 {
743         u32 snap_count;
744         size_t len;
745         size_t size;
746         u32 i;
747
748         memset(header, 0, sizeof (*header));
749
750         snap_count = le32_to_cpu(ondisk->snap_count);
751
752         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
753         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
754         if (!header->object_prefix)
755                 return -ENOMEM;
756         memcpy(header->object_prefix, ondisk->object_prefix, len);
757         header->object_prefix[len] = '\0';
758
759         if (snap_count) {
760                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
761
762                 /* Save a copy of the snapshot names */
763
764                 if (snap_names_len > (u64) SIZE_MAX)
765                         return -EIO;
766                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
767                 if (!header->snap_names)
768                         goto out_err;
769                 /*
770                  * Note that rbd_dev_v1_header_read() guarantees
771                  * the ondisk buffer we're working with has
772                  * snap_names_len bytes beyond the end of the
773                  * snapshot id array, this memcpy() is safe.
774                  */
775                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
776                         snap_names_len);
777
778                 /* Record each snapshot's size */
779
780                 size = snap_count * sizeof (*header->snap_sizes);
781                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
782                 if (!header->snap_sizes)
783                         goto out_err;
784                 for (i = 0; i < snap_count; i++)
785                         header->snap_sizes[i] =
786                                 le64_to_cpu(ondisk->snaps[i].image_size);
787         } else {
788                 header->snap_names = NULL;
789                 header->snap_sizes = NULL;
790         }
791
792         header->features = 0;   /* No features support in v1 images */
793         header->obj_order = ondisk->options.order;
794         header->crypt_type = ondisk->options.crypt_type;
795         header->comp_type = ondisk->options.comp_type;
796
797         /* Allocate and fill in the snapshot context */
798
799         header->image_size = le64_to_cpu(ondisk->image_size);
800
801         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
802         if (!header->snapc)
803                 goto out_err;
804         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
805         for (i = 0; i < snap_count; i++)
806                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
807
808         return 0;
809
810 out_err:
811         kfree(header->snap_sizes);
812         header->snap_sizes = NULL;
813         kfree(header->snap_names);
814         header->snap_names = NULL;
815         kfree(header->object_prefix);
816         header->object_prefix = NULL;
817
818         return -ENOMEM;
819 }
820
821 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
822 {
823         const char *snap_name;
824
825         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
826
827         /* Skip over names until we find the one we are looking for */
828
829         snap_name = rbd_dev->header.snap_names;
830         while (which--)
831                 snap_name += strlen(snap_name) + 1;
832
833         return kstrdup(snap_name, GFP_KERNEL);
834 }
835
836 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
837 {
838         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
839         u32 which;
840
841         for (which = 0; which < snapc->num_snaps; which++)
842                 if (snapc->snaps[which] == snap_id)
843                         return which;
844
845         return BAD_SNAP_INDEX;
846 }
847
848 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
849                                         u64 snap_id)
850 {
851         u32 which;
852
853         which = rbd_dev_snap_index(rbd_dev, snap_id);
854         if (which == BAD_SNAP_INDEX)
855                 return NULL;
856
857         return _rbd_dev_v1_snap_name(rbd_dev, which);
858 }
859
860 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
861 {
862         if (snap_id == CEPH_NOSNAP)
863                 return RBD_SNAP_HEAD_NAME;
864
865         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
866         if (rbd_dev->image_format == 1)
867                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
868
869         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
870 }
871
872 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
873                                 u64 *snap_size)
874 {
875         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
876         if (snap_id == CEPH_NOSNAP) {
877                 *snap_size = rbd_dev->header.image_size;
878         } else if (rbd_dev->image_format == 1) {
879                 u32 which;
880
881                 which = rbd_dev_snap_index(rbd_dev, snap_id);
882                 if (which == BAD_SNAP_INDEX)
883                         return -ENOENT;
884
885                 *snap_size = rbd_dev->header.snap_sizes[which];
886         } else {
887                 u64 size = 0;
888                 int ret;
889
890                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
891                 if (ret)
892                         return ret;
893
894                 *snap_size = size;
895         }
896         return 0;
897 }
898
899 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
900                         u64 *snap_features)
901 {
902         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
903         if (snap_id == CEPH_NOSNAP) {
904                 *snap_features = rbd_dev->header.features;
905         } else if (rbd_dev->image_format == 1) {
906                 *snap_features = 0;     /* No features for format 1 */
907         } else {
908                 u64 features = 0;
909                 int ret;
910
911                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
912                 if (ret)
913                         return ret;
914
915                 *snap_features = features;
916         }
917         return 0;
918 }
919
920 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
921 {
922         const char *snap_name = rbd_dev->spec->snap_name;
923         u64 snap_id;
924         u64 size = 0;
925         u64 features = 0;
926         int ret;
927
928         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
929                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
930                 if (snap_id == CEPH_NOSNAP)
931                         return -ENOENT;
932         } else {
933                 snap_id = CEPH_NOSNAP;
934         }
935
936         ret = rbd_snap_size(rbd_dev, snap_id, &size);
937         if (ret)
938                 return ret;
939         ret = rbd_snap_features(rbd_dev, snap_id, &features);
940         if (ret)
941                 return ret;
942
943         rbd_dev->mapping.size = size;
944         rbd_dev->mapping.features = features;
945
946         /* If we are mapping a snapshot it must be marked read-only */
947
948         if (snap_id != CEPH_NOSNAP)
949                 rbd_dev->mapping.read_only = true;
950
951         return 0;
952 }
953
954 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
955 {
956         rbd_dev->mapping.size = 0;
957         rbd_dev->mapping.features = 0;
958         rbd_dev->mapping.read_only = true;
959 }
960
961 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
962 {
963         rbd_dev->mapping.size = 0;
964         rbd_dev->mapping.features = 0;
965         rbd_dev->mapping.read_only = true;
966 }
967
968 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
969 {
970         char *name;
971         u64 segment;
972         int ret;
973
974         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
975         if (!name)
976                 return NULL;
977         segment = offset >> rbd_dev->header.obj_order;
978         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
979                         rbd_dev->header.object_prefix, segment);
980         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
981                 pr_err("error formatting segment name for #%llu (%d)\n",
982                         segment, ret);
983                 kfree(name);
984                 name = NULL;
985         }
986
987         return name;
988 }
989
990 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
991 {
992         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
993
994         return offset & (segment_size - 1);
995 }
996
997 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
998                                 u64 offset, u64 length)
999 {
1000         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1001
1002         offset &= segment_size - 1;
1003
1004         rbd_assert(length <= U64_MAX - offset);
1005         if (offset + length > segment_size)
1006                 length = segment_size - offset;
1007
1008         return length;
1009 }
1010
1011 /*
1012  * returns the size of an object in the image
1013  */
1014 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1015 {
1016         return 1 << header->obj_order;
1017 }
1018
1019 /*
1020  * bio helpers
1021  */
1022
1023 static void bio_chain_put(struct bio *chain)
1024 {
1025         struct bio *tmp;
1026
1027         while (chain) {
1028                 tmp = chain;
1029                 chain = chain->bi_next;
1030                 bio_put(tmp);
1031         }
1032 }
1033
1034 /*
1035  * zeros a bio chain, starting at specific offset
1036  */
1037 static void zero_bio_chain(struct bio *chain, int start_ofs)
1038 {
1039         struct bio_vec *bv;
1040         unsigned long flags;
1041         void *buf;
1042         int i;
1043         int pos = 0;
1044
1045         while (chain) {
1046                 bio_for_each_segment(bv, chain, i) {
1047                         if (pos + bv->bv_len > start_ofs) {
1048                                 int remainder = max(start_ofs - pos, 0);
1049                                 buf = bvec_kmap_irq(bv, &flags);
1050                                 memset(buf + remainder, 0,
1051                                        bv->bv_len - remainder);
1052                                 bvec_kunmap_irq(buf, &flags);
1053                         }
1054                         pos += bv->bv_len;
1055                 }
1056
1057                 chain = chain->bi_next;
1058         }
1059 }
1060
1061 /*
1062  * similar to zero_bio_chain(), zeros data defined by a page array,
1063  * starting at the given byte offset from the start of the array and
1064  * continuing up to the given end offset.  The pages array is
1065  * assumed to be big enough to hold all bytes up to the end.
1066  */
1067 static void zero_pages(struct page **pages, u64 offset, u64 end)
1068 {
1069         struct page **page = &pages[offset >> PAGE_SHIFT];
1070
1071         rbd_assert(end > offset);
1072         rbd_assert(end - offset <= (u64)SIZE_MAX);
1073         while (offset < end) {
1074                 size_t page_offset;
1075                 size_t length;
1076                 unsigned long flags;
1077                 void *kaddr;
1078
1079                 page_offset = (size_t)(offset & ~PAGE_MASK);
1080                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1081                 local_irq_save(flags);
1082                 kaddr = kmap_atomic(*page);
1083                 memset(kaddr + page_offset, 0, length);
1084                 kunmap_atomic(kaddr);
1085                 local_irq_restore(flags);
1086
1087                 offset += length;
1088                 page++;
1089         }
1090 }
1091
1092 /*
1093  * Clone a portion of a bio, starting at the given byte offset
1094  * and continuing for the number of bytes indicated.
1095  */
1096 static struct bio *bio_clone_range(struct bio *bio_src,
1097                                         unsigned int offset,
1098                                         unsigned int len,
1099                                         gfp_t gfpmask)
1100 {
1101         struct bio_vec *bv;
1102         unsigned int resid;
1103         unsigned short idx;
1104         unsigned int voff;
1105         unsigned short end_idx;
1106         unsigned short vcnt;
1107         struct bio *bio;
1108
1109         /* Handle the easy case for the caller */
1110
1111         if (!offset && len == bio_src->bi_size)
1112                 return bio_clone(bio_src, gfpmask);
1113
1114         if (WARN_ON_ONCE(!len))
1115                 return NULL;
1116         if (WARN_ON_ONCE(len > bio_src->bi_size))
1117                 return NULL;
1118         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1119                 return NULL;
1120
1121         /* Find first affected segment... */
1122
1123         resid = offset;
1124         __bio_for_each_segment(bv, bio_src, idx, 0) {
1125                 if (resid < bv->bv_len)
1126                         break;
1127                 resid -= bv->bv_len;
1128         }
1129         voff = resid;
1130
1131         /* ...and the last affected segment */
1132
1133         resid += len;
1134         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1135                 if (resid <= bv->bv_len)
1136                         break;
1137                 resid -= bv->bv_len;
1138         }
1139         vcnt = end_idx - idx + 1;
1140
1141         /* Build the clone */
1142
1143         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1144         if (!bio)
1145                 return NULL;    /* ENOMEM */
1146
1147         bio->bi_bdev = bio_src->bi_bdev;
1148         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1149         bio->bi_rw = bio_src->bi_rw;
1150         bio->bi_flags |= 1 << BIO_CLONED;
1151
1152         /*
1153          * Copy over our part of the bio_vec, then update the first
1154          * and last (or only) entries.
1155          */
1156         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1157                         vcnt * sizeof (struct bio_vec));
1158         bio->bi_io_vec[0].bv_offset += voff;
1159         if (vcnt > 1) {
1160                 bio->bi_io_vec[0].bv_len -= voff;
1161                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1162         } else {
1163                 bio->bi_io_vec[0].bv_len = len;
1164         }
1165
1166         bio->bi_vcnt = vcnt;
1167         bio->bi_size = len;
1168         bio->bi_idx = 0;
1169
1170         return bio;
1171 }
1172
1173 /*
1174  * Clone a portion of a bio chain, starting at the given byte offset
1175  * into the first bio in the source chain and continuing for the
1176  * number of bytes indicated.  The result is another bio chain of
1177  * exactly the given length, or a null pointer on error.
1178  *
1179  * The bio_src and offset parameters are both in-out.  On entry they
1180  * refer to the first source bio and the offset into that bio where
1181  * the start of data to be cloned is located.
1182  *
1183  * On return, bio_src is updated to refer to the bio in the source
1184  * chain that contains first un-cloned byte, and *offset will
1185  * contain the offset of that byte within that bio.
1186  */
1187 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1188                                         unsigned int *offset,
1189                                         unsigned int len,
1190                                         gfp_t gfpmask)
1191 {
1192         struct bio *bi = *bio_src;
1193         unsigned int off = *offset;
1194         struct bio *chain = NULL;
1195         struct bio **end;
1196
1197         /* Build up a chain of clone bios up to the limit */
1198
1199         if (!bi || off >= bi->bi_size || !len)
1200                 return NULL;            /* Nothing to clone */
1201
1202         end = &chain;
1203         while (len) {
1204                 unsigned int bi_size;
1205                 struct bio *bio;
1206
1207                 if (!bi) {
1208                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1209                         goto out_err;   /* EINVAL; ran out of bio's */
1210                 }
1211                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1212                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1213                 if (!bio)
1214                         goto out_err;   /* ENOMEM */
1215
1216                 *end = bio;
1217                 end = &bio->bi_next;
1218
1219                 off += bi_size;
1220                 if (off == bi->bi_size) {
1221                         bi = bi->bi_next;
1222                         off = 0;
1223                 }
1224                 len -= bi_size;
1225         }
1226         *bio_src = bi;
1227         *offset = off;
1228
1229         return chain;
1230 out_err:
1231         bio_chain_put(chain);
1232
1233         return NULL;
1234 }
1235
1236 /*
1237  * The default/initial value for all object request flags is 0.  For
1238  * each flag, once its value is set to 1 it is never reset to 0
1239  * again.
1240  */
1241 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1242 {
1243         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1244                 struct rbd_device *rbd_dev;
1245
1246                 rbd_dev = obj_request->img_request->rbd_dev;
1247                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1248                         obj_request);
1249         }
1250 }
1251
1252 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1253 {
1254         smp_mb();
1255         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1256 }
1257
1258 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1259 {
1260         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1261                 struct rbd_device *rbd_dev = NULL;
1262
1263                 if (obj_request_img_data_test(obj_request))
1264                         rbd_dev = obj_request->img_request->rbd_dev;
1265                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1266                         obj_request);
1267         }
1268 }
1269
1270 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1271 {
1272         smp_mb();
1273         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1274 }
1275
1276 /*
1277  * This sets the KNOWN flag after (possibly) setting the EXISTS
1278  * flag.  The latter is set based on the "exists" value provided.
1279  *
1280  * Note that for our purposes once an object exists it never goes
1281  * away again.  It's possible that the response from two existence
1282  * checks are separated by the creation of the target object, and
1283  * the first ("doesn't exist") response arrives *after* the second
1284  * ("does exist").  In that case we ignore the second one.
1285  */
1286 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1287                                 bool exists)
1288 {
1289         if (exists)
1290                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1291         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1292         smp_mb();
1293 }
1294
1295 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1296 {
1297         smp_mb();
1298         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1299 }
1300
1301 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1302 {
1303         smp_mb();
1304         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1305 }
1306
1307 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1308 {
1309         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1310                 atomic_read(&obj_request->kref.refcount));
1311         kref_get(&obj_request->kref);
1312 }
1313
1314 static void rbd_obj_request_destroy(struct kref *kref);
1315 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1316 {
1317         rbd_assert(obj_request != NULL);
1318         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1319                 atomic_read(&obj_request->kref.refcount));
1320         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1321 }
1322
1323 static void rbd_img_request_get(struct rbd_img_request *img_request)
1324 {
1325         dout("%s: img %p (was %d)\n", __func__, img_request,
1326                 atomic_read(&img_request->kref.refcount));
1327         kref_get(&img_request->kref);
1328 }
1329
1330 static void rbd_img_request_destroy(struct kref *kref);
1331 static void rbd_img_request_put(struct rbd_img_request *img_request)
1332 {
1333         rbd_assert(img_request != NULL);
1334         dout("%s: img %p (was %d)\n", __func__, img_request,
1335                 atomic_read(&img_request->kref.refcount));
1336         kref_put(&img_request->kref, rbd_img_request_destroy);
1337 }
1338
1339 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1340                                         struct rbd_obj_request *obj_request)
1341 {
1342         rbd_assert(obj_request->img_request == NULL);
1343
1344         /* Image request now owns object's original reference */
1345         obj_request->img_request = img_request;
1346         obj_request->which = img_request->obj_request_count;
1347         rbd_assert(!obj_request_img_data_test(obj_request));
1348         obj_request_img_data_set(obj_request);
1349         rbd_assert(obj_request->which != BAD_WHICH);
1350         img_request->obj_request_count++;
1351         list_add_tail(&obj_request->links, &img_request->obj_requests);
1352         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1353                 obj_request->which);
1354 }
1355
1356 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1357                                         struct rbd_obj_request *obj_request)
1358 {
1359         rbd_assert(obj_request->which != BAD_WHICH);
1360
1361         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1362                 obj_request->which);
1363         list_del(&obj_request->links);
1364         rbd_assert(img_request->obj_request_count > 0);
1365         img_request->obj_request_count--;
1366         rbd_assert(obj_request->which == img_request->obj_request_count);
1367         obj_request->which = BAD_WHICH;
1368         rbd_assert(obj_request_img_data_test(obj_request));
1369         rbd_assert(obj_request->img_request == img_request);
1370         obj_request->img_request = NULL;
1371         obj_request->callback = NULL;
1372         rbd_obj_request_put(obj_request);
1373 }
1374
1375 static bool obj_request_type_valid(enum obj_request_type type)
1376 {
1377         switch (type) {
1378         case OBJ_REQUEST_NODATA:
1379         case OBJ_REQUEST_BIO:
1380         case OBJ_REQUEST_PAGES:
1381                 return true;
1382         default:
1383                 return false;
1384         }
1385 }
1386
1387 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1388                                 struct rbd_obj_request *obj_request)
1389 {
1390         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1391
1392         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1393 }
1394
1395 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1396 {
1397
1398         dout("%s: img %p\n", __func__, img_request);
1399
1400         /*
1401          * If no error occurred, compute the aggregate transfer
1402          * count for the image request.  We could instead use
1403          * atomic64_cmpxchg() to update it as each object request
1404          * completes; not clear which way is better off hand.
1405          */
1406         if (!img_request->result) {
1407                 struct rbd_obj_request *obj_request;
1408                 u64 xferred = 0;
1409
1410                 for_each_obj_request(img_request, obj_request)
1411                         xferred += obj_request->xferred;
1412                 img_request->xferred = xferred;
1413         }
1414
1415         if (img_request->callback)
1416                 img_request->callback(img_request);
1417         else
1418                 rbd_img_request_put(img_request);
1419 }
1420
1421 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1422
1423 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1424 {
1425         dout("%s: obj %p\n", __func__, obj_request);
1426
1427         return wait_for_completion_interruptible(&obj_request->completion);
1428 }
1429
1430 /*
1431  * The default/initial value for all image request flags is 0.  Each
1432  * is conditionally set to 1 at image request initialization time
1433  * and currently never change thereafter.
1434  */
1435 static void img_request_write_set(struct rbd_img_request *img_request)
1436 {
1437         set_bit(IMG_REQ_WRITE, &img_request->flags);
1438         smp_mb();
1439 }
1440
1441 static bool img_request_write_test(struct rbd_img_request *img_request)
1442 {
1443         smp_mb();
1444         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1445 }
1446
1447 static void img_request_child_set(struct rbd_img_request *img_request)
1448 {
1449         set_bit(IMG_REQ_CHILD, &img_request->flags);
1450         smp_mb();
1451 }
1452
1453 static bool img_request_child_test(struct rbd_img_request *img_request)
1454 {
1455         smp_mb();
1456         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1457 }
1458
1459 static void img_request_layered_set(struct rbd_img_request *img_request)
1460 {
1461         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1462         smp_mb();
1463 }
1464
1465 static bool img_request_layered_test(struct rbd_img_request *img_request)
1466 {
1467         smp_mb();
1468         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1469 }
1470
1471 static void
1472 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1473 {
1474         u64 xferred = obj_request->xferred;
1475         u64 length = obj_request->length;
1476
1477         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1478                 obj_request, obj_request->img_request, obj_request->result,
1479                 xferred, length);
1480         /*
1481          * ENOENT means a hole in the image.  We zero-fill the
1482          * entire length of the request.  A short read also implies
1483          * zero-fill to the end of the request.  Either way we
1484          * update the xferred count to indicate the whole request
1485          * was satisfied.
1486          */
1487         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1488         if (obj_request->result == -ENOENT) {
1489                 if (obj_request->type == OBJ_REQUEST_BIO)
1490                         zero_bio_chain(obj_request->bio_list, 0);
1491                 else
1492                         zero_pages(obj_request->pages, 0, length);
1493                 obj_request->result = 0;
1494                 obj_request->xferred = length;
1495         } else if (xferred < length && !obj_request->result) {
1496                 if (obj_request->type == OBJ_REQUEST_BIO)
1497                         zero_bio_chain(obj_request->bio_list, xferred);
1498                 else
1499                         zero_pages(obj_request->pages, xferred, length);
1500                 obj_request->xferred = length;
1501         }
1502         obj_request_done_set(obj_request);
1503 }
1504
1505 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1506 {
1507         dout("%s: obj %p cb %p\n", __func__, obj_request,
1508                 obj_request->callback);
1509         if (obj_request->callback)
1510                 obj_request->callback(obj_request);
1511         else
1512                 complete_all(&obj_request->completion);
1513 }
1514
1515 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1516 {
1517         dout("%s: obj %p\n", __func__, obj_request);
1518         obj_request_done_set(obj_request);
1519 }
1520
1521 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1522 {
1523         struct rbd_img_request *img_request = NULL;
1524         struct rbd_device *rbd_dev = NULL;
1525         bool layered = false;
1526
1527         if (obj_request_img_data_test(obj_request)) {
1528                 img_request = obj_request->img_request;
1529                 layered = img_request && img_request_layered_test(img_request);
1530                 rbd_dev = img_request->rbd_dev;
1531         }
1532
1533         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1534                 obj_request, img_request, obj_request->result,
1535                 obj_request->xferred, obj_request->length);
1536         if (layered && obj_request->result == -ENOENT &&
1537                         obj_request->img_offset < rbd_dev->parent_overlap)
1538                 rbd_img_parent_read(obj_request);
1539         else if (img_request)
1540                 rbd_img_obj_request_read_callback(obj_request);
1541         else
1542                 obj_request_done_set(obj_request);
1543 }
1544
1545 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1546 {
1547         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1548                 obj_request->result, obj_request->length);
1549         /*
1550          * There is no such thing as a successful short write.  Set
1551          * it to our originally-requested length.
1552          */
1553         obj_request->xferred = obj_request->length;
1554         obj_request_done_set(obj_request);
1555 }
1556
1557 /*
1558  * For a simple stat call there's nothing to do.  We'll do more if
1559  * this is part of a write sequence for a layered image.
1560  */
1561 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1562 {
1563         dout("%s: obj %p\n", __func__, obj_request);
1564         obj_request_done_set(obj_request);
1565 }
1566
1567 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1568                                 struct ceph_msg *msg)
1569 {
1570         struct rbd_obj_request *obj_request = osd_req->r_priv;
1571         u16 opcode;
1572
1573         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1574         rbd_assert(osd_req == obj_request->osd_req);
1575         if (obj_request_img_data_test(obj_request)) {
1576                 rbd_assert(obj_request->img_request);
1577                 rbd_assert(obj_request->which != BAD_WHICH);
1578         } else {
1579                 rbd_assert(obj_request->which == BAD_WHICH);
1580         }
1581
1582         if (osd_req->r_result < 0)
1583                 obj_request->result = osd_req->r_result;
1584
1585         BUG_ON(osd_req->r_num_ops > 2);
1586
1587         /*
1588          * We support a 64-bit length, but ultimately it has to be
1589          * passed to blk_end_request(), which takes an unsigned int.
1590          */
1591         obj_request->xferred = osd_req->r_reply_op_len[0];
1592         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1593         opcode = osd_req->r_ops[0].op;
1594         switch (opcode) {
1595         case CEPH_OSD_OP_READ:
1596                 rbd_osd_read_callback(obj_request);
1597                 break;
1598         case CEPH_OSD_OP_WRITE:
1599                 rbd_osd_write_callback(obj_request);
1600                 break;
1601         case CEPH_OSD_OP_STAT:
1602                 rbd_osd_stat_callback(obj_request);
1603                 break;
1604         case CEPH_OSD_OP_CALL:
1605         case CEPH_OSD_OP_NOTIFY_ACK:
1606         case CEPH_OSD_OP_WATCH:
1607                 rbd_osd_trivial_callback(obj_request);
1608                 break;
1609         default:
1610                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1611                         obj_request->object_name, (unsigned short) opcode);
1612                 break;
1613         }
1614
1615         if (obj_request_done_test(obj_request))
1616                 rbd_obj_request_complete(obj_request);
1617 }
1618
1619 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1620 {
1621         struct rbd_img_request *img_request = obj_request->img_request;
1622         struct ceph_osd_request *osd_req = obj_request->osd_req;
1623         u64 snap_id;
1624
1625         rbd_assert(osd_req != NULL);
1626
1627         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1628         ceph_osdc_build_request(osd_req, obj_request->offset,
1629                         NULL, snap_id, NULL);
1630 }
1631
1632 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1633 {
1634         struct rbd_img_request *img_request = obj_request->img_request;
1635         struct ceph_osd_request *osd_req = obj_request->osd_req;
1636         struct ceph_snap_context *snapc;
1637         struct timespec mtime = CURRENT_TIME;
1638
1639         rbd_assert(osd_req != NULL);
1640
1641         snapc = img_request ? img_request->snapc : NULL;
1642         ceph_osdc_build_request(osd_req, obj_request->offset,
1643                         snapc, CEPH_NOSNAP, &mtime);
1644 }
1645
1646 static struct ceph_osd_request *rbd_osd_req_create(
1647                                         struct rbd_device *rbd_dev,
1648                                         bool write_request,
1649                                         struct rbd_obj_request *obj_request)
1650 {
1651         struct ceph_snap_context *snapc = NULL;
1652         struct ceph_osd_client *osdc;
1653         struct ceph_osd_request *osd_req;
1654
1655         if (obj_request_img_data_test(obj_request)) {
1656                 struct rbd_img_request *img_request = obj_request->img_request;
1657
1658                 rbd_assert(write_request ==
1659                                 img_request_write_test(img_request));
1660                 if (write_request)
1661                         snapc = img_request->snapc;
1662         }
1663
1664         /* Allocate and initialize the request, for the single op */
1665
1666         osdc = &rbd_dev->rbd_client->client->osdc;
1667         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1668         if (!osd_req)
1669                 return NULL;    /* ENOMEM */
1670
1671         if (write_request)
1672                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1673         else
1674                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1675
1676         osd_req->r_callback = rbd_osd_req_callback;
1677         osd_req->r_priv = obj_request;
1678
1679         osd_req->r_oid_len = strlen(obj_request->object_name);
1680         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1681         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1682
1683         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1684
1685         return osd_req;
1686 }
1687
1688 /*
1689  * Create a copyup osd request based on the information in the
1690  * object request supplied.  A copyup request has two osd ops,
1691  * a copyup method call, and a "normal" write request.
1692  */
1693 static struct ceph_osd_request *
1694 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1695 {
1696         struct rbd_img_request *img_request;
1697         struct ceph_snap_context *snapc;
1698         struct rbd_device *rbd_dev;
1699         struct ceph_osd_client *osdc;
1700         struct ceph_osd_request *osd_req;
1701
1702         rbd_assert(obj_request_img_data_test(obj_request));
1703         img_request = obj_request->img_request;
1704         rbd_assert(img_request);
1705         rbd_assert(img_request_write_test(img_request));
1706
1707         /* Allocate and initialize the request, for the two ops */
1708
1709         snapc = img_request->snapc;
1710         rbd_dev = img_request->rbd_dev;
1711         osdc = &rbd_dev->rbd_client->client->osdc;
1712         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1713         if (!osd_req)
1714                 return NULL;    /* ENOMEM */
1715
1716         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1717         osd_req->r_callback = rbd_osd_req_callback;
1718         osd_req->r_priv = obj_request;
1719
1720         osd_req->r_oid_len = strlen(obj_request->object_name);
1721         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1722         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1723
1724         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1725
1726         return osd_req;
1727 }
1728
1729
1730 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1731 {
1732         ceph_osdc_put_request(osd_req);
1733 }
1734
1735 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1736
1737 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1738                                                 u64 offset, u64 length,
1739                                                 enum obj_request_type type)
1740 {
1741         struct rbd_obj_request *obj_request;
1742         size_t size;
1743         char *name;
1744
1745         rbd_assert(obj_request_type_valid(type));
1746
1747         size = strlen(object_name) + 1;
1748         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1749         if (!obj_request)
1750                 return NULL;
1751
1752         name = (char *)(obj_request + 1);
1753         obj_request->object_name = memcpy(name, object_name, size);
1754         obj_request->offset = offset;
1755         obj_request->length = length;
1756         obj_request->flags = 0;
1757         obj_request->which = BAD_WHICH;
1758         obj_request->type = type;
1759         INIT_LIST_HEAD(&obj_request->links);
1760         init_completion(&obj_request->completion);
1761         kref_init(&obj_request->kref);
1762
1763         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1764                 offset, length, (int)type, obj_request);
1765
1766         return obj_request;
1767 }
1768
1769 static void rbd_obj_request_destroy(struct kref *kref)
1770 {
1771         struct rbd_obj_request *obj_request;
1772
1773         obj_request = container_of(kref, struct rbd_obj_request, kref);
1774
1775         dout("%s: obj %p\n", __func__, obj_request);
1776
1777         rbd_assert(obj_request->img_request == NULL);
1778         rbd_assert(obj_request->which == BAD_WHICH);
1779
1780         if (obj_request->osd_req)
1781                 rbd_osd_req_destroy(obj_request->osd_req);
1782
1783         rbd_assert(obj_request_type_valid(obj_request->type));
1784         switch (obj_request->type) {
1785         case OBJ_REQUEST_NODATA:
1786                 break;          /* Nothing to do */
1787         case OBJ_REQUEST_BIO:
1788                 if (obj_request->bio_list)
1789                         bio_chain_put(obj_request->bio_list);
1790                 break;
1791         case OBJ_REQUEST_PAGES:
1792                 if (obj_request->pages)
1793                         ceph_release_page_vector(obj_request->pages,
1794                                                 obj_request->page_count);
1795                 break;
1796         }
1797
1798         kfree(obj_request);
1799 }
1800
1801 /*
1802  * Caller is responsible for filling in the list of object requests
1803  * that comprises the image request, and the Linux request pointer
1804  * (if there is one).
1805  */
1806 static struct rbd_img_request *rbd_img_request_create(
1807                                         struct rbd_device *rbd_dev,
1808                                         u64 offset, u64 length,
1809                                         bool write_request,
1810                                         bool child_request)
1811 {
1812         struct rbd_img_request *img_request;
1813
1814         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1815         if (!img_request)
1816                 return NULL;
1817
1818         if (write_request) {
1819                 down_read(&rbd_dev->header_rwsem);
1820                 ceph_get_snap_context(rbd_dev->header.snapc);
1821                 up_read(&rbd_dev->header_rwsem);
1822         }
1823
1824         img_request->rq = NULL;
1825         img_request->rbd_dev = rbd_dev;
1826         img_request->offset = offset;
1827         img_request->length = length;
1828         img_request->flags = 0;
1829         if (write_request) {
1830                 img_request_write_set(img_request);
1831                 img_request->snapc = rbd_dev->header.snapc;
1832         } else {
1833                 img_request->snap_id = rbd_dev->spec->snap_id;
1834         }
1835         if (child_request)
1836                 img_request_child_set(img_request);
1837         if (rbd_dev->parent_spec)
1838                 img_request_layered_set(img_request);
1839         spin_lock_init(&img_request->completion_lock);
1840         img_request->next_completion = 0;
1841         img_request->callback = NULL;
1842         img_request->result = 0;
1843         img_request->obj_request_count = 0;
1844         INIT_LIST_HEAD(&img_request->obj_requests);
1845         kref_init(&img_request->kref);
1846
1847         rbd_img_request_get(img_request);       /* Avoid a warning */
1848         rbd_img_request_put(img_request);       /* TEMPORARY */
1849
1850         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1851                 write_request ? "write" : "read", offset, length,
1852                 img_request);
1853
1854         return img_request;
1855 }
1856
1857 static void rbd_img_request_destroy(struct kref *kref)
1858 {
1859         struct rbd_img_request *img_request;
1860         struct rbd_obj_request *obj_request;
1861         struct rbd_obj_request *next_obj_request;
1862
1863         img_request = container_of(kref, struct rbd_img_request, kref);
1864
1865         dout("%s: img %p\n", __func__, img_request);
1866
1867         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1868                 rbd_img_obj_request_del(img_request, obj_request);
1869         rbd_assert(img_request->obj_request_count == 0);
1870
1871         if (img_request_write_test(img_request))
1872                 ceph_put_snap_context(img_request->snapc);
1873
1874         if (img_request_child_test(img_request))
1875                 rbd_obj_request_put(img_request->obj_request);
1876
1877         kfree(img_request);
1878 }
1879
1880 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1881 {
1882         struct rbd_img_request *img_request;
1883         unsigned int xferred;
1884         int result;
1885         bool more;
1886
1887         rbd_assert(obj_request_img_data_test(obj_request));
1888         img_request = obj_request->img_request;
1889
1890         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1891         xferred = (unsigned int)obj_request->xferred;
1892         result = obj_request->result;
1893         if (result) {
1894                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1895
1896                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1897                         img_request_write_test(img_request) ? "write" : "read",
1898                         obj_request->length, obj_request->img_offset,
1899                         obj_request->offset);
1900                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1901                         result, xferred);
1902                 if (!img_request->result)
1903                         img_request->result = result;
1904         }
1905
1906         /* Image object requests don't own their page array */
1907
1908         if (obj_request->type == OBJ_REQUEST_PAGES) {
1909                 obj_request->pages = NULL;
1910                 obj_request->page_count = 0;
1911         }
1912
1913         if (img_request_child_test(img_request)) {
1914                 rbd_assert(img_request->obj_request != NULL);
1915                 more = obj_request->which < img_request->obj_request_count - 1;
1916         } else {
1917                 rbd_assert(img_request->rq != NULL);
1918                 more = blk_end_request(img_request->rq, result, xferred);
1919         }
1920
1921         return more;
1922 }
1923
1924 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1925 {
1926         struct rbd_img_request *img_request;
1927         u32 which = obj_request->which;
1928         bool more = true;
1929
1930         rbd_assert(obj_request_img_data_test(obj_request));
1931         img_request = obj_request->img_request;
1932
1933         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1934         rbd_assert(img_request != NULL);
1935         rbd_assert(img_request->obj_request_count > 0);
1936         rbd_assert(which != BAD_WHICH);
1937         rbd_assert(which < img_request->obj_request_count);
1938         rbd_assert(which >= img_request->next_completion);
1939
1940         spin_lock_irq(&img_request->completion_lock);
1941         if (which != img_request->next_completion)
1942                 goto out;
1943
1944         for_each_obj_request_from(img_request, obj_request) {
1945                 rbd_assert(more);
1946                 rbd_assert(which < img_request->obj_request_count);
1947
1948                 if (!obj_request_done_test(obj_request))
1949                         break;
1950                 more = rbd_img_obj_end_request(obj_request);
1951                 which++;
1952         }
1953
1954         rbd_assert(more ^ (which == img_request->obj_request_count));
1955         img_request->next_completion = which;
1956 out:
1957         spin_unlock_irq(&img_request->completion_lock);
1958
1959         if (!more)
1960                 rbd_img_request_complete(img_request);
1961 }
1962
1963 /*
1964  * Split up an image request into one or more object requests, each
1965  * to a different object.  The "type" parameter indicates whether
1966  * "data_desc" is the pointer to the head of a list of bio
1967  * structures, or the base of a page array.  In either case this
1968  * function assumes data_desc describes memory sufficient to hold
1969  * all data described by the image request.
1970  */
1971 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1972                                         enum obj_request_type type,
1973                                         void *data_desc)
1974 {
1975         struct rbd_device *rbd_dev = img_request->rbd_dev;
1976         struct rbd_obj_request *obj_request = NULL;
1977         struct rbd_obj_request *next_obj_request;
1978         bool write_request = img_request_write_test(img_request);
1979         struct bio *bio_list;
1980         unsigned int bio_offset = 0;
1981         struct page **pages;
1982         u64 img_offset;
1983         u64 resid;
1984         u16 opcode;
1985
1986         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1987                 (int)type, data_desc);
1988
1989         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1990         img_offset = img_request->offset;
1991         resid = img_request->length;
1992         rbd_assert(resid > 0);
1993
1994         if (type == OBJ_REQUEST_BIO) {
1995                 bio_list = data_desc;
1996                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1997         } else {
1998                 rbd_assert(type == OBJ_REQUEST_PAGES);
1999                 pages = data_desc;
2000         }
2001
2002         while (resid) {
2003                 struct ceph_osd_request *osd_req;
2004                 const char *object_name;
2005                 u64 offset;
2006                 u64 length;
2007
2008                 object_name = rbd_segment_name(rbd_dev, img_offset);
2009                 if (!object_name)
2010                         goto out_unwind;
2011                 offset = rbd_segment_offset(rbd_dev, img_offset);
2012                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2013                 obj_request = rbd_obj_request_create(object_name,
2014                                                 offset, length, type);
2015                 kfree(object_name);     /* object request has its own copy */
2016                 if (!obj_request)
2017                         goto out_unwind;
2018
2019                 if (type == OBJ_REQUEST_BIO) {
2020                         unsigned int clone_size;
2021
2022                         rbd_assert(length <= (u64)UINT_MAX);
2023                         clone_size = (unsigned int)length;
2024                         obj_request->bio_list =
2025                                         bio_chain_clone_range(&bio_list,
2026                                                                 &bio_offset,
2027                                                                 clone_size,
2028                                                                 GFP_ATOMIC);
2029                         if (!obj_request->bio_list)
2030                                 goto out_partial;
2031                 } else {
2032                         unsigned int page_count;
2033
2034                         obj_request->pages = pages;
2035                         page_count = (u32)calc_pages_for(offset, length);
2036                         obj_request->page_count = page_count;
2037                         if ((offset + length) & ~PAGE_MASK)
2038                                 page_count--;   /* more on last page */
2039                         pages += page_count;
2040                 }
2041
2042                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2043                                                 obj_request);
2044                 if (!osd_req)
2045                         goto out_partial;
2046                 obj_request->osd_req = osd_req;
2047                 obj_request->callback = rbd_img_obj_callback;
2048
2049                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2050                                                 0, 0);
2051                 if (type == OBJ_REQUEST_BIO)
2052                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2053                                         obj_request->bio_list, length);
2054                 else
2055                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2056                                         obj_request->pages, length,
2057                                         offset & ~PAGE_MASK, false, false);
2058
2059                 if (write_request)
2060                         rbd_osd_req_format_write(obj_request);
2061                 else
2062                         rbd_osd_req_format_read(obj_request);
2063
2064                 obj_request->img_offset = img_offset;
2065                 rbd_img_obj_request_add(img_request, obj_request);
2066
2067                 img_offset += length;
2068                 resid -= length;
2069         }
2070
2071         return 0;
2072
2073 out_partial:
2074         rbd_obj_request_put(obj_request);
2075 out_unwind:
2076         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2077                 rbd_obj_request_put(obj_request);
2078
2079         return -ENOMEM;
2080 }
2081
2082 static void
2083 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2084 {
2085         struct rbd_img_request *img_request;
2086         struct rbd_device *rbd_dev;
2087         u64 length;
2088         u32 page_count;
2089
2090         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2091         rbd_assert(obj_request_img_data_test(obj_request));
2092         img_request = obj_request->img_request;
2093         rbd_assert(img_request);
2094
2095         rbd_dev = img_request->rbd_dev;
2096         rbd_assert(rbd_dev);
2097         length = (u64)1 << rbd_dev->header.obj_order;
2098         page_count = (u32)calc_pages_for(0, length);
2099
2100         rbd_assert(obj_request->copyup_pages);
2101         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2102         obj_request->copyup_pages = NULL;
2103
2104         /*
2105          * We want the transfer count to reflect the size of the
2106          * original write request.  There is no such thing as a
2107          * successful short write, so if the request was successful
2108          * we can just set it to the originally-requested length.
2109          */
2110         if (!obj_request->result)
2111                 obj_request->xferred = obj_request->length;
2112
2113         /* Finish up with the normal image object callback */
2114
2115         rbd_img_obj_callback(obj_request);
2116 }
2117
2118 static void
2119 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2120 {
2121         struct rbd_obj_request *orig_request;
2122         struct ceph_osd_request *osd_req;
2123         struct ceph_osd_client *osdc;
2124         struct rbd_device *rbd_dev;
2125         struct page **pages;
2126         int result;
2127         u64 obj_size;
2128         u64 xferred;
2129
2130         rbd_assert(img_request_child_test(img_request));
2131
2132         /* First get what we need from the image request */
2133
2134         pages = img_request->copyup_pages;
2135         rbd_assert(pages != NULL);
2136         img_request->copyup_pages = NULL;
2137
2138         orig_request = img_request->obj_request;
2139         rbd_assert(orig_request != NULL);
2140         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2141         result = img_request->result;
2142         obj_size = img_request->length;
2143         xferred = img_request->xferred;
2144
2145         rbd_dev = img_request->rbd_dev;
2146         rbd_assert(rbd_dev);
2147         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2148
2149         rbd_img_request_put(img_request);
2150
2151         if (result)
2152                 goto out_err;
2153
2154         /* Allocate the new copyup osd request for the original request */
2155
2156         result = -ENOMEM;
2157         rbd_assert(!orig_request->osd_req);
2158         osd_req = rbd_osd_req_create_copyup(orig_request);
2159         if (!osd_req)
2160                 goto out_err;
2161         orig_request->osd_req = osd_req;
2162         orig_request->copyup_pages = pages;
2163
2164         /* Initialize the copyup op */
2165
2166         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2167         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2168                                                 false, false);
2169
2170         /* Then the original write request op */
2171
2172         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2173                                         orig_request->offset,
2174                                         orig_request->length, 0, 0);
2175         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2176                                         orig_request->length);
2177
2178         rbd_osd_req_format_write(orig_request);
2179
2180         /* All set, send it off. */
2181
2182         orig_request->callback = rbd_img_obj_copyup_callback;
2183         osdc = &rbd_dev->rbd_client->client->osdc;
2184         result = rbd_obj_request_submit(osdc, orig_request);
2185         if (!result)
2186                 return;
2187 out_err:
2188         /* Record the error code and complete the request */
2189
2190         orig_request->result = result;
2191         orig_request->xferred = 0;
2192         obj_request_done_set(orig_request);
2193         rbd_obj_request_complete(orig_request);
2194 }
2195
2196 /*
2197  * Read from the parent image the range of data that covers the
2198  * entire target of the given object request.  This is used for
2199  * satisfying a layered image write request when the target of an
2200  * object request from the image request does not exist.
2201  *
2202  * A page array big enough to hold the returned data is allocated
2203  * and supplied to rbd_img_request_fill() as the "data descriptor."
2204  * When the read completes, this page array will be transferred to
2205  * the original object request for the copyup operation.
2206  *
2207  * If an error occurs, record it as the result of the original
2208  * object request and mark it done so it gets completed.
2209  */
2210 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2211 {
2212         struct rbd_img_request *img_request = NULL;
2213         struct rbd_img_request *parent_request = NULL;
2214         struct rbd_device *rbd_dev;
2215         u64 img_offset;
2216         u64 length;
2217         struct page **pages = NULL;
2218         u32 page_count;
2219         int result;
2220
2221         rbd_assert(obj_request_img_data_test(obj_request));
2222         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2223
2224         img_request = obj_request->img_request;
2225         rbd_assert(img_request != NULL);
2226         rbd_dev = img_request->rbd_dev;
2227         rbd_assert(rbd_dev->parent != NULL);
2228
2229         /*
2230          * First things first.  The original osd request is of no
2231          * use to use any more, we'll need a new one that can hold
2232          * the two ops in a copyup request.  We'll get that later,
2233          * but for now we can release the old one.
2234          */
2235         rbd_osd_req_destroy(obj_request->osd_req);
2236         obj_request->osd_req = NULL;
2237
2238         /*
2239          * Determine the byte range covered by the object in the
2240          * child image to which the original request was to be sent.
2241          */
2242         img_offset = obj_request->img_offset - obj_request->offset;
2243         length = (u64)1 << rbd_dev->header.obj_order;
2244
2245         /*
2246          * There is no defined parent data beyond the parent
2247          * overlap, so limit what we read at that boundary if
2248          * necessary.
2249          */
2250         if (img_offset + length > rbd_dev->parent_overlap) {
2251                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2252                 length = rbd_dev->parent_overlap - img_offset;
2253         }
2254
2255         /*
2256          * Allocate a page array big enough to receive the data read
2257          * from the parent.
2258          */
2259         page_count = (u32)calc_pages_for(0, length);
2260         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2261         if (IS_ERR(pages)) {
2262                 result = PTR_ERR(pages);
2263                 pages = NULL;
2264                 goto out_err;
2265         }
2266
2267         result = -ENOMEM;
2268         parent_request = rbd_img_request_create(rbd_dev->parent,
2269                                                 img_offset, length,
2270                                                 false, true);
2271         if (!parent_request)
2272                 goto out_err;
2273         rbd_obj_request_get(obj_request);
2274         parent_request->obj_request = obj_request;
2275
2276         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2277         if (result)
2278                 goto out_err;
2279         parent_request->copyup_pages = pages;
2280
2281         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2282         result = rbd_img_request_submit(parent_request);
2283         if (!result)
2284                 return 0;
2285
2286         parent_request->copyup_pages = NULL;
2287         parent_request->obj_request = NULL;
2288         rbd_obj_request_put(obj_request);
2289 out_err:
2290         if (pages)
2291                 ceph_release_page_vector(pages, page_count);
2292         if (parent_request)
2293                 rbd_img_request_put(parent_request);
2294         obj_request->result = result;
2295         obj_request->xferred = 0;
2296         obj_request_done_set(obj_request);
2297
2298         return result;
2299 }
2300
2301 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2302 {
2303         struct rbd_obj_request *orig_request;
2304         int result;
2305
2306         rbd_assert(!obj_request_img_data_test(obj_request));
2307
2308         /*
2309          * All we need from the object request is the original
2310          * request and the result of the STAT op.  Grab those, then
2311          * we're done with the request.
2312          */
2313         orig_request = obj_request->obj_request;
2314         obj_request->obj_request = NULL;
2315         rbd_assert(orig_request);
2316         rbd_assert(orig_request->img_request);
2317
2318         result = obj_request->result;
2319         obj_request->result = 0;
2320
2321         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2322                 obj_request, orig_request, result,
2323                 obj_request->xferred, obj_request->length);
2324         rbd_obj_request_put(obj_request);
2325
2326         rbd_assert(orig_request);
2327         rbd_assert(orig_request->img_request);
2328
2329         /*
2330          * Our only purpose here is to determine whether the object
2331          * exists, and we don't want to treat the non-existence as
2332          * an error.  If something else comes back, transfer the
2333          * error to the original request and complete it now.
2334          */
2335         if (!result) {
2336                 obj_request_existence_set(orig_request, true);
2337         } else if (result == -ENOENT) {
2338                 obj_request_existence_set(orig_request, false);
2339         } else if (result) {
2340                 orig_request->result = result;
2341                 goto out;
2342         }
2343
2344         /*
2345          * Resubmit the original request now that we have recorded
2346          * whether the target object exists.
2347          */
2348         orig_request->result = rbd_img_obj_request_submit(orig_request);
2349 out:
2350         if (orig_request->result)
2351                 rbd_obj_request_complete(orig_request);
2352         rbd_obj_request_put(orig_request);
2353 }
2354
2355 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2356 {
2357         struct rbd_obj_request *stat_request;
2358         struct rbd_device *rbd_dev;
2359         struct ceph_osd_client *osdc;
2360         struct page **pages = NULL;
2361         u32 page_count;
2362         size_t size;
2363         int ret;
2364
2365         /*
2366          * The response data for a STAT call consists of:
2367          *     le64 length;
2368          *     struct {
2369          *         le32 tv_sec;
2370          *         le32 tv_nsec;
2371          *     } mtime;
2372          */
2373         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2374         page_count = (u32)calc_pages_for(0, size);
2375         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2376         if (IS_ERR(pages))
2377                 return PTR_ERR(pages);
2378
2379         ret = -ENOMEM;
2380         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2381                                                         OBJ_REQUEST_PAGES);
2382         if (!stat_request)
2383                 goto out;
2384
2385         rbd_obj_request_get(obj_request);
2386         stat_request->obj_request = obj_request;
2387         stat_request->pages = pages;
2388         stat_request->page_count = page_count;
2389
2390         rbd_assert(obj_request->img_request);
2391         rbd_dev = obj_request->img_request->rbd_dev;
2392         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2393                                                 stat_request);
2394         if (!stat_request->osd_req)
2395                 goto out;
2396         stat_request->callback = rbd_img_obj_exists_callback;
2397
2398         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2399         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2400                                         false, false);
2401         rbd_osd_req_format_read(stat_request);
2402
2403         osdc = &rbd_dev->rbd_client->client->osdc;
2404         ret = rbd_obj_request_submit(osdc, stat_request);
2405 out:
2406         if (ret)
2407                 rbd_obj_request_put(obj_request);
2408
2409         return ret;
2410 }
2411
2412 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2413 {
2414         struct rbd_img_request *img_request;
2415         struct rbd_device *rbd_dev;
2416         bool known;
2417
2418         rbd_assert(obj_request_img_data_test(obj_request));
2419
2420         img_request = obj_request->img_request;
2421         rbd_assert(img_request);
2422         rbd_dev = img_request->rbd_dev;
2423
2424         /*
2425          * Only writes to layered images need special handling.
2426          * Reads and non-layered writes are simple object requests.
2427          * Layered writes that start beyond the end of the overlap
2428          * with the parent have no parent data, so they too are
2429          * simple object requests.  Finally, if the target object is
2430          * known to already exist, its parent data has already been
2431          * copied, so a write to the object can also be handled as a
2432          * simple object request.
2433          */
2434         if (!img_request_write_test(img_request) ||
2435                 !img_request_layered_test(img_request) ||
2436                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2437                 ((known = obj_request_known_test(obj_request)) &&
2438                         obj_request_exists_test(obj_request))) {
2439
2440                 struct rbd_device *rbd_dev;
2441                 struct ceph_osd_client *osdc;
2442
2443                 rbd_dev = obj_request->img_request->rbd_dev;
2444                 osdc = &rbd_dev->rbd_client->client->osdc;
2445
2446                 return rbd_obj_request_submit(osdc, obj_request);
2447         }
2448
2449         /*
2450          * It's a layered write.  The target object might exist but
2451          * we may not know that yet.  If we know it doesn't exist,
2452          * start by reading the data for the full target object from
2453          * the parent so we can use it for a copyup to the target.
2454          */
2455         if (known)
2456                 return rbd_img_obj_parent_read_full(obj_request);
2457
2458         /* We don't know whether the target exists.  Go find out. */
2459
2460         return rbd_img_obj_exists_submit(obj_request);
2461 }
2462
2463 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2464 {
2465         struct rbd_obj_request *obj_request;
2466         struct rbd_obj_request *next_obj_request;
2467
2468         dout("%s: img %p\n", __func__, img_request);
2469         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2470                 int ret;
2471
2472                 ret = rbd_img_obj_request_submit(obj_request);
2473                 if (ret)
2474                         return ret;
2475         }
2476
2477         return 0;
2478 }
2479
2480 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2481 {
2482         struct rbd_obj_request *obj_request;
2483         struct rbd_device *rbd_dev;
2484         u64 obj_end;
2485
2486         rbd_assert(img_request_child_test(img_request));
2487
2488         obj_request = img_request->obj_request;
2489         rbd_assert(obj_request);
2490         rbd_assert(obj_request->img_request);
2491
2492         obj_request->result = img_request->result;
2493         if (obj_request->result)
2494                 goto out;
2495
2496         /*
2497          * We need to zero anything beyond the parent overlap
2498          * boundary.  Since rbd_img_obj_request_read_callback()
2499          * will zero anything beyond the end of a short read, an
2500          * easy way to do this is to pretend the data from the
2501          * parent came up short--ending at the overlap boundary.
2502          */
2503         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2504         obj_end = obj_request->img_offset + obj_request->length;
2505         rbd_dev = obj_request->img_request->rbd_dev;
2506         if (obj_end > rbd_dev->parent_overlap) {
2507                 u64 xferred = 0;
2508
2509                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2510                         xferred = rbd_dev->parent_overlap -
2511                                         obj_request->img_offset;
2512
2513                 obj_request->xferred = min(img_request->xferred, xferred);
2514         } else {
2515                 obj_request->xferred = img_request->xferred;
2516         }
2517 out:
2518         rbd_img_obj_request_read_callback(obj_request);
2519         rbd_obj_request_complete(obj_request);
2520 }
2521
2522 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2523 {
2524         struct rbd_device *rbd_dev;
2525         struct rbd_img_request *img_request;
2526         int result;
2527
2528         rbd_assert(obj_request_img_data_test(obj_request));
2529         rbd_assert(obj_request->img_request != NULL);
2530         rbd_assert(obj_request->result == (s32) -ENOENT);
2531         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2532
2533         rbd_dev = obj_request->img_request->rbd_dev;
2534         rbd_assert(rbd_dev->parent != NULL);
2535         /* rbd_read_finish(obj_request, obj_request->length); */
2536         img_request = rbd_img_request_create(rbd_dev->parent,
2537                                                 obj_request->img_offset,
2538                                                 obj_request->length,
2539                                                 false, true);
2540         result = -ENOMEM;
2541         if (!img_request)
2542                 goto out_err;
2543
2544         rbd_obj_request_get(obj_request);
2545         img_request->obj_request = obj_request;
2546
2547         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2548                                         obj_request->bio_list);
2549         if (result)
2550                 goto out_err;
2551
2552         img_request->callback = rbd_img_parent_read_callback;
2553         result = rbd_img_request_submit(img_request);
2554         if (result)
2555                 goto out_err;
2556
2557         return;
2558 out_err:
2559         if (img_request)
2560                 rbd_img_request_put(img_request);
2561         obj_request->result = result;
2562         obj_request->xferred = 0;
2563         obj_request_done_set(obj_request);
2564 }
2565
2566 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2567 {
2568         struct rbd_obj_request *obj_request;
2569         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2570         int ret;
2571
2572         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2573                                                         OBJ_REQUEST_NODATA);
2574         if (!obj_request)
2575                 return -ENOMEM;
2576
2577         ret = -ENOMEM;
2578         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2579         if (!obj_request->osd_req)
2580                 goto out;
2581         obj_request->callback = rbd_obj_request_put;
2582
2583         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2584                                         notify_id, 0, 0);
2585         rbd_osd_req_format_read(obj_request);
2586
2587         ret = rbd_obj_request_submit(osdc, obj_request);
2588 out:
2589         if (ret)
2590                 rbd_obj_request_put(obj_request);
2591
2592         return ret;
2593 }
2594
2595 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2596 {
2597         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2598
2599         if (!rbd_dev)
2600                 return;
2601
2602         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2603                 rbd_dev->header_name, (unsigned long long)notify_id,
2604                 (unsigned int)opcode);
2605         (void)rbd_dev_refresh(rbd_dev);
2606
2607         rbd_obj_notify_ack(rbd_dev, notify_id);
2608 }
2609
2610 /*
2611  * Request sync osd watch/unwatch.  The value of "start" determines
2612  * whether a watch request is being initiated or torn down.
2613  */
2614 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2615 {
2616         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2617         struct rbd_obj_request *obj_request;
2618         int ret;
2619
2620         rbd_assert(start ^ !!rbd_dev->watch_event);
2621         rbd_assert(start ^ !!rbd_dev->watch_request);
2622
2623         if (start) {
2624                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2625                                                 &rbd_dev->watch_event);
2626                 if (ret < 0)
2627                         return ret;
2628                 rbd_assert(rbd_dev->watch_event != NULL);
2629         }
2630
2631         ret = -ENOMEM;
2632         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2633                                                         OBJ_REQUEST_NODATA);
2634         if (!obj_request)
2635                 goto out_cancel;
2636
2637         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2638         if (!obj_request->osd_req)
2639                 goto out_cancel;
2640
2641         if (start)
2642                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2643         else
2644                 ceph_osdc_unregister_linger_request(osdc,
2645                                         rbd_dev->watch_request->osd_req);
2646
2647         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2648                                 rbd_dev->watch_event->cookie, 0, start);
2649         rbd_osd_req_format_write(obj_request);
2650
2651         ret = rbd_obj_request_submit(osdc, obj_request);
2652         if (ret)
2653                 goto out_cancel;
2654         ret = rbd_obj_request_wait(obj_request);
2655         if (ret)
2656                 goto out_cancel;
2657         ret = obj_request->result;
2658         if (ret)
2659                 goto out_cancel;
2660
2661         /*
2662          * A watch request is set to linger, so the underlying osd
2663          * request won't go away until we unregister it.  We retain
2664          * a pointer to the object request during that time (in
2665          * rbd_dev->watch_request), so we'll keep a reference to
2666          * it.  We'll drop that reference (below) after we've
2667          * unregistered it.
2668          */
2669         if (start) {
2670                 rbd_dev->watch_request = obj_request;
2671
2672                 return 0;
2673         }
2674
2675         /* We have successfully torn down the watch request */
2676
2677         rbd_obj_request_put(rbd_dev->watch_request);
2678         rbd_dev->watch_request = NULL;
2679 out_cancel:
2680         /* Cancel the event if we're tearing down, or on error */
2681         ceph_osdc_cancel_event(rbd_dev->watch_event);
2682         rbd_dev->watch_event = NULL;
2683         if (obj_request)
2684                 rbd_obj_request_put(obj_request);
2685
2686         return ret;
2687 }
2688
2689 /*
2690  * Synchronous osd object method call.  Returns the number of bytes
2691  * returned in the outbound buffer, or a negative error code.
2692  */
2693 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2694                              const char *object_name,
2695                              const char *class_name,
2696                              const char *method_name,
2697                              const void *outbound,
2698                              size_t outbound_size,
2699                              void *inbound,
2700                              size_t inbound_size)
2701 {
2702         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2703         struct rbd_obj_request *obj_request;
2704         struct page **pages;
2705         u32 page_count;
2706         int ret;
2707
2708         /*
2709          * Method calls are ultimately read operations.  The result
2710          * should placed into the inbound buffer provided.  They
2711          * also supply outbound data--parameters for the object
2712          * method.  Currently if this is present it will be a
2713          * snapshot id.
2714          */
2715         page_count = (u32)calc_pages_for(0, inbound_size);
2716         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2717         if (IS_ERR(pages))
2718                 return PTR_ERR(pages);
2719
2720         ret = -ENOMEM;
2721         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2722                                                         OBJ_REQUEST_PAGES);
2723         if (!obj_request)
2724                 goto out;
2725
2726         obj_request->pages = pages;
2727         obj_request->page_count = page_count;
2728
2729         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2730         if (!obj_request->osd_req)
2731                 goto out;
2732
2733         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2734                                         class_name, method_name);
2735         if (outbound_size) {
2736                 struct ceph_pagelist *pagelist;
2737
2738                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2739                 if (!pagelist)
2740                         goto out;
2741
2742                 ceph_pagelist_init(pagelist);
2743                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2744                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2745                                                 pagelist);
2746         }
2747         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2748                                         obj_request->pages, inbound_size,
2749                                         0, false, false);
2750         rbd_osd_req_format_read(obj_request);
2751
2752         ret = rbd_obj_request_submit(osdc, obj_request);
2753         if (ret)
2754                 goto out;
2755         ret = rbd_obj_request_wait(obj_request);
2756         if (ret)
2757                 goto out;
2758
2759         ret = obj_request->result;
2760         if (ret < 0)
2761                 goto out;
2762
2763         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2764         ret = (int)obj_request->xferred;
2765         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2766 out:
2767         if (obj_request)
2768                 rbd_obj_request_put(obj_request);
2769         else
2770                 ceph_release_page_vector(pages, page_count);
2771
2772         return ret;
2773 }
2774
2775 static void rbd_request_fn(struct request_queue *q)
2776                 __releases(q->queue_lock) __acquires(q->queue_lock)
2777 {
2778         struct rbd_device *rbd_dev = q->queuedata;
2779         bool read_only = rbd_dev->mapping.read_only;
2780         struct request *rq;
2781         int result;
2782
2783         while ((rq = blk_fetch_request(q))) {
2784                 bool write_request = rq_data_dir(rq) == WRITE;
2785                 struct rbd_img_request *img_request;
2786                 u64 offset;
2787                 u64 length;
2788
2789                 /* Ignore any non-FS requests that filter through. */
2790
2791                 if (rq->cmd_type != REQ_TYPE_FS) {
2792                         dout("%s: non-fs request type %d\n", __func__,
2793                                 (int) rq->cmd_type);
2794                         __blk_end_request_all(rq, 0);
2795                         continue;
2796                 }
2797
2798                 /* Ignore/skip any zero-length requests */
2799
2800                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2801                 length = (u64) blk_rq_bytes(rq);
2802
2803                 if (!length) {
2804                         dout("%s: zero-length request\n", __func__);
2805                         __blk_end_request_all(rq, 0);
2806                         continue;
2807                 }
2808
2809                 spin_unlock_irq(q->queue_lock);
2810
2811                 /* Disallow writes to a read-only device */
2812
2813                 if (write_request) {
2814                         result = -EROFS;
2815                         if (read_only)
2816                                 goto end_request;
2817                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2818                 }
2819
2820                 /*
2821                  * Quit early if the mapped snapshot no longer
2822                  * exists.  It's still possible the snapshot will
2823                  * have disappeared by the time our request arrives
2824                  * at the osd, but there's no sense in sending it if
2825                  * we already know.
2826                  */
2827                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2828                         dout("request for non-existent snapshot");
2829                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2830                         result = -ENXIO;
2831                         goto end_request;
2832                 }
2833
2834                 result = -EINVAL;
2835                 if (offset && length > U64_MAX - offset + 1) {
2836                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2837                                 offset, length);
2838                         goto end_request;       /* Shouldn't happen */
2839                 }
2840
2841                 result = -ENOMEM;
2842                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2843                                                         write_request, false);
2844                 if (!img_request)
2845                         goto end_request;
2846
2847                 img_request->rq = rq;
2848
2849                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2850                                                 rq->bio);
2851                 if (!result)
2852                         result = rbd_img_request_submit(img_request);
2853                 if (result)
2854                         rbd_img_request_put(img_request);
2855 end_request:
2856                 spin_lock_irq(q->queue_lock);
2857                 if (result < 0) {
2858                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2859                                 write_request ? "write" : "read",
2860                                 length, offset, result);
2861
2862                         __blk_end_request_all(rq, result);
2863                 }
2864         }
2865 }
2866
2867 /*
2868  * a queue callback. Makes sure that we don't create a bio that spans across
2869  * multiple osd objects. One exception would be with a single page bios,
2870  * which we handle later at bio_chain_clone_range()
2871  */
2872 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2873                           struct bio_vec *bvec)
2874 {
2875         struct rbd_device *rbd_dev = q->queuedata;
2876         sector_t sector_offset;
2877         sector_t sectors_per_obj;
2878         sector_t obj_sector_offset;
2879         int ret;
2880
2881         /*
2882          * Find how far into its rbd object the partition-relative
2883          * bio start sector is to offset relative to the enclosing
2884          * device.
2885          */
2886         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2887         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2888         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2889
2890         /*
2891          * Compute the number of bytes from that offset to the end
2892          * of the object.  Account for what's already used by the bio.
2893          */
2894         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2895         if (ret > bmd->bi_size)
2896                 ret -= bmd->bi_size;
2897         else
2898                 ret = 0;
2899
2900         /*
2901          * Don't send back more than was asked for.  And if the bio
2902          * was empty, let the whole thing through because:  "Note
2903          * that a block device *must* allow a single page to be
2904          * added to an empty bio."
2905          */
2906         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2907         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2908                 ret = (int) bvec->bv_len;
2909
2910         return ret;
2911 }
2912
2913 static void rbd_free_disk(struct rbd_device *rbd_dev)
2914 {
2915         struct gendisk *disk = rbd_dev->disk;
2916
2917         if (!disk)
2918                 return;
2919
2920         rbd_dev->disk = NULL;
2921         if (disk->flags & GENHD_FL_UP) {
2922                 del_gendisk(disk);
2923                 if (disk->queue)
2924                         blk_cleanup_queue(disk->queue);
2925         }
2926         put_disk(disk);
2927 }
2928
2929 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2930                                 const char *object_name,
2931                                 u64 offset, u64 length, void *buf)
2932
2933 {
2934         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2935         struct rbd_obj_request *obj_request;
2936         struct page **pages = NULL;
2937         u32 page_count;
2938         size_t size;
2939         int ret;
2940
2941         page_count = (u32) calc_pages_for(offset, length);
2942         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2943         if (IS_ERR(pages))
2944                 ret = PTR_ERR(pages);
2945
2946         ret = -ENOMEM;
2947         obj_request = rbd_obj_request_create(object_name, offset, length,
2948                                                         OBJ_REQUEST_PAGES);
2949         if (!obj_request)
2950                 goto out;
2951
2952         obj_request->pages = pages;
2953         obj_request->page_count = page_count;
2954
2955         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2956         if (!obj_request->osd_req)
2957                 goto out;
2958
2959         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2960                                         offset, length, 0, 0);
2961         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2962                                         obj_request->pages,
2963                                         obj_request->length,
2964                                         obj_request->offset & ~PAGE_MASK,
2965                                         false, false);
2966         rbd_osd_req_format_read(obj_request);
2967
2968         ret = rbd_obj_request_submit(osdc, obj_request);
2969         if (ret)
2970                 goto out;
2971         ret = rbd_obj_request_wait(obj_request);
2972         if (ret)
2973                 goto out;
2974
2975         ret = obj_request->result;
2976         if (ret < 0)
2977                 goto out;
2978
2979         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2980         size = (size_t) obj_request->xferred;
2981         ceph_copy_from_page_vector(pages, buf, 0, size);
2982         rbd_assert(size <= (size_t)INT_MAX);
2983         ret = (int)size;
2984 out:
2985         if (obj_request)
2986                 rbd_obj_request_put(obj_request);
2987         else
2988                 ceph_release_page_vector(pages, page_count);
2989
2990         return ret;
2991 }
2992
2993 /*
2994  * Read the complete header for the given rbd device.
2995  *
2996  * Returns a pointer to a dynamically-allocated buffer containing
2997  * the complete and validated header.  Caller can pass the address
2998  * of a variable that will be filled in with the version of the
2999  * header object at the time it was read.
3000  *
3001  * Returns a pointer-coded errno if a failure occurs.
3002  */
3003 static struct rbd_image_header_ondisk *
3004 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3005 {
3006         struct rbd_image_header_ondisk *ondisk = NULL;
3007         u32 snap_count = 0;
3008         u64 names_size = 0;
3009         u32 want_count;
3010         int ret;
3011
3012         /*
3013          * The complete header will include an array of its 64-bit
3014          * snapshot ids, followed by the names of those snapshots as
3015          * a contiguous block of NUL-terminated strings.  Note that
3016          * the number of snapshots could change by the time we read
3017          * it in, in which case we re-read it.
3018          */
3019         do {
3020                 size_t size;
3021
3022                 kfree(ondisk);
3023
3024                 size = sizeof (*ondisk);
3025                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3026                 size += names_size;
3027                 ondisk = kmalloc(size, GFP_KERNEL);
3028                 if (!ondisk)
3029                         return ERR_PTR(-ENOMEM);
3030
3031                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3032                                        0, size, ondisk);
3033                 if (ret < 0)
3034                         goto out_err;
3035                 if ((size_t)ret < size) {
3036                         ret = -ENXIO;
3037                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3038                                 size, ret);
3039                         goto out_err;
3040                 }
3041                 if (!rbd_dev_ondisk_valid(ondisk)) {
3042                         ret = -ENXIO;
3043                         rbd_warn(rbd_dev, "invalid header");
3044                         goto out_err;
3045                 }
3046
3047                 names_size = le64_to_cpu(ondisk->snap_names_len);
3048                 want_count = snap_count;
3049                 snap_count = le32_to_cpu(ondisk->snap_count);
3050         } while (snap_count != want_count);
3051
3052         return ondisk;
3053
3054 out_err:
3055         kfree(ondisk);
3056
3057         return ERR_PTR(ret);
3058 }
3059
3060 /*
3061  * reload the ondisk the header
3062  */
3063 static int rbd_read_header(struct rbd_device *rbd_dev,
3064                            struct rbd_image_header *header)
3065 {
3066         struct rbd_image_header_ondisk *ondisk;
3067         int ret;
3068
3069         ondisk = rbd_dev_v1_header_read(rbd_dev);
3070         if (IS_ERR(ondisk))
3071                 return PTR_ERR(ondisk);
3072         ret = rbd_header_from_disk(header, ondisk);
3073         kfree(ondisk);
3074
3075         return ret;
3076 }
3077
3078 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3079 {
3080         struct rbd_snap *snap;
3081         struct rbd_snap *next;
3082
3083         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3084                 list_del(&snap->node);
3085                 rbd_snap_destroy(snap);
3086         }
3087 }
3088
3089 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3090 {
3091         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3092                 return;
3093
3094         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3095                 sector_t size;
3096
3097                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3098                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3099                 dout("setting size to %llu sectors", (unsigned long long)size);
3100                 set_capacity(rbd_dev->disk, size);
3101         }
3102 }
3103
3104 /*
3105  * only read the first part of the ondisk header, without the snaps info
3106  */
3107 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3108 {
3109         int ret;
3110         struct rbd_image_header h;
3111
3112         ret = rbd_read_header(rbd_dev, &h);
3113         if (ret < 0)
3114                 return ret;
3115
3116         down_write(&rbd_dev->header_rwsem);
3117
3118         /* Update image size, and check for resize of mapped image */
3119         rbd_dev->header.image_size = h.image_size;
3120         rbd_update_mapping_size(rbd_dev);
3121
3122         /* rbd_dev->header.object_prefix shouldn't change */
3123         kfree(rbd_dev->header.snap_sizes);
3124         kfree(rbd_dev->header.snap_names);
3125         /* osd requests may still refer to snapc */
3126         ceph_put_snap_context(rbd_dev->header.snapc);
3127
3128         rbd_dev->header.image_size = h.image_size;
3129         rbd_dev->header.snapc = h.snapc;
3130         rbd_dev->header.snap_names = h.snap_names;
3131         rbd_dev->header.snap_sizes = h.snap_sizes;
3132         /* Free the extra copy of the object prefix */
3133         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3134                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3135         kfree(h.object_prefix);
3136
3137         ret = rbd_dev_snaps_update(rbd_dev);
3138
3139         up_write(&rbd_dev->header_rwsem);
3140
3141         return ret;
3142 }
3143
3144 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3145 {
3146         u64 image_size;
3147         int ret;
3148
3149         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3150         image_size = rbd_dev->header.image_size;
3151         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3152         if (rbd_dev->image_format == 1)
3153                 ret = rbd_dev_v1_refresh(rbd_dev);
3154         else
3155                 ret = rbd_dev_v2_refresh(rbd_dev);
3156         mutex_unlock(&ctl_mutex);
3157         if (ret)
3158                 rbd_warn(rbd_dev, "got notification but failed to "
3159                            " update snaps: %d\n", ret);
3160         if (image_size != rbd_dev->header.image_size)
3161                 revalidate_disk(rbd_dev->disk);
3162
3163         return ret;
3164 }
3165
3166 static int rbd_init_disk(struct rbd_device *rbd_dev)
3167 {
3168         struct gendisk *disk;
3169         struct request_queue *q;
3170         u64 segment_size;
3171
3172         /* create gendisk info */
3173         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3174         if (!disk)
3175                 return -ENOMEM;
3176
3177         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3178                  rbd_dev->dev_id);
3179         disk->major = rbd_dev->major;
3180         disk->first_minor = 0;
3181         disk->fops = &rbd_bd_ops;
3182         disk->private_data = rbd_dev;
3183
3184         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3185         if (!q)
3186                 goto out_disk;
3187
3188         /* We use the default size, but let's be explicit about it. */
3189         blk_queue_physical_block_size(q, SECTOR_SIZE);
3190
3191         /* set io sizes to object size */
3192         segment_size = rbd_obj_bytes(&rbd_dev->header);
3193         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3194         blk_queue_max_segment_size(q, segment_size);
3195         blk_queue_io_min(q, segment_size);
3196         blk_queue_io_opt(q, segment_size);
3197
3198         blk_queue_merge_bvec(q, rbd_merge_bvec);
3199         disk->queue = q;
3200
3201         q->queuedata = rbd_dev;
3202
3203         rbd_dev->disk = disk;
3204
3205         return 0;
3206 out_disk:
3207         put_disk(disk);
3208
3209         return -ENOMEM;
3210 }
3211
3212 /*
3213   sysfs
3214 */
3215
3216 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3217 {
3218         return container_of(dev, struct rbd_device, dev);
3219 }
3220
3221 static ssize_t rbd_size_show(struct device *dev,
3222                              struct device_attribute *attr, char *buf)
3223 {
3224         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3225
3226         return sprintf(buf, "%llu\n",
3227                 (unsigned long long)rbd_dev->mapping.size);
3228 }
3229
3230 /*
3231  * Note this shows the features for whatever's mapped, which is not
3232  * necessarily the base image.
3233  */
3234 static ssize_t rbd_features_show(struct device *dev,
3235                              struct device_attribute *attr, char *buf)
3236 {
3237         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3238
3239         return sprintf(buf, "0x%016llx\n",
3240                         (unsigned long long)rbd_dev->mapping.features);
3241 }
3242
3243 static ssize_t rbd_major_show(struct device *dev,
3244                               struct device_attribute *attr, char *buf)
3245 {
3246         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3247
3248         if (rbd_dev->major)
3249                 return sprintf(buf, "%d\n", rbd_dev->major);
3250
3251         return sprintf(buf, "(none)\n");
3252
3253 }
3254
3255 static ssize_t rbd_client_id_show(struct device *dev,
3256                                   struct device_attribute *attr, char *buf)
3257 {
3258         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3259
3260         return sprintf(buf, "client%lld\n",
3261                         ceph_client_id(rbd_dev->rbd_client->client));
3262 }
3263
3264 static ssize_t rbd_pool_show(struct device *dev,
3265                              struct device_attribute *attr, char *buf)
3266 {
3267         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268
3269         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3270 }
3271
3272 static ssize_t rbd_pool_id_show(struct device *dev,
3273                              struct device_attribute *attr, char *buf)
3274 {
3275         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276
3277         return sprintf(buf, "%llu\n",
3278                         (unsigned long long) rbd_dev->spec->pool_id);
3279 }
3280
3281 static ssize_t rbd_name_show(struct device *dev,
3282                              struct device_attribute *attr, char *buf)
3283 {
3284         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3285
3286         if (rbd_dev->spec->image_name)
3287                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3288
3289         return sprintf(buf, "(unknown)\n");
3290 }
3291
3292 static ssize_t rbd_image_id_show(struct device *dev,
3293                              struct device_attribute *attr, char *buf)
3294 {
3295         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3296
3297         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3298 }
3299
3300 /*
3301  * Shows the name of the currently-mapped snapshot (or
3302  * RBD_SNAP_HEAD_NAME for the base image).
3303  */
3304 static ssize_t rbd_snap_show(struct device *dev,
3305                              struct device_attribute *attr,
3306                              char *buf)
3307 {
3308         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3309
3310         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3311 }
3312
3313 /*
3314  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3315  * for the parent image.  If there is no parent, simply shows
3316  * "(no parent image)".
3317  */
3318 static ssize_t rbd_parent_show(struct device *dev,
3319                              struct device_attribute *attr,
3320                              char *buf)
3321 {
3322         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3323         struct rbd_spec *spec = rbd_dev->parent_spec;
3324         int count;
3325         char *bufp = buf;
3326
3327         if (!spec)
3328                 return sprintf(buf, "(no parent image)\n");
3329
3330         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3331                         (unsigned long long) spec->pool_id, spec->pool_name);
3332         if (count < 0)
3333                 return count;
3334         bufp += count;
3335
3336         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3337                         spec->image_name ? spec->image_name : "(unknown)");
3338         if (count < 0)
3339                 return count;
3340         bufp += count;
3341
3342         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3343                         (unsigned long long) spec->snap_id, spec->snap_name);
3344         if (count < 0)
3345                 return count;
3346         bufp += count;
3347
3348         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3349         if (count < 0)
3350                 return count;
3351         bufp += count;
3352
3353         return (ssize_t) (bufp - buf);
3354 }
3355
3356 static ssize_t rbd_image_refresh(struct device *dev,
3357                                  struct device_attribute *attr,
3358                                  const char *buf,
3359                                  size_t size)
3360 {
3361         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3362         int ret;
3363
3364         ret = rbd_dev_refresh(rbd_dev);
3365
3366         return ret < 0 ? ret : size;
3367 }
3368
3369 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3370 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3371 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3372 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3373 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3374 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3375 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3376 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3377 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3378 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3379 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3380
3381 static struct attribute *rbd_attrs[] = {
3382         &dev_attr_size.attr,
3383         &dev_attr_features.attr,
3384         &dev_attr_major.attr,
3385         &dev_attr_client_id.attr,
3386         &dev_attr_pool.attr,
3387         &dev_attr_pool_id.attr,
3388         &dev_attr_name.attr,
3389         &dev_attr_image_id.attr,
3390         &dev_attr_current_snap.attr,
3391         &dev_attr_parent.attr,
3392         &dev_attr_refresh.attr,
3393         NULL
3394 };
3395
3396 static struct attribute_group rbd_attr_group = {
3397         .attrs = rbd_attrs,
3398 };
3399
3400 static const struct attribute_group *rbd_attr_groups[] = {
3401         &rbd_attr_group,
3402         NULL
3403 };
3404
3405 static void rbd_sysfs_dev_release(struct device *dev)
3406 {
3407 }
3408
3409 static struct device_type rbd_device_type = {
3410         .name           = "rbd",
3411         .groups         = rbd_attr_groups,
3412         .release        = rbd_sysfs_dev_release,
3413 };
3414
3415 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3416 {
3417         kref_get(&spec->kref);
3418
3419         return spec;
3420 }
3421
3422 static void rbd_spec_free(struct kref *kref);
3423 static void rbd_spec_put(struct rbd_spec *spec)
3424 {
3425         if (spec)
3426                 kref_put(&spec->kref, rbd_spec_free);
3427 }
3428
3429 static struct rbd_spec *rbd_spec_alloc(void)
3430 {
3431         struct rbd_spec *spec;
3432
3433         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3434         if (!spec)
3435                 return NULL;
3436         kref_init(&spec->kref);
3437
3438         return spec;
3439 }
3440
3441 static void rbd_spec_free(struct kref *kref)
3442 {
3443         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3444
3445         kfree(spec->pool_name);
3446         kfree(spec->image_id);
3447         kfree(spec->image_name);
3448         kfree(spec->snap_name);
3449         kfree(spec);
3450 }
3451
3452 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3453                                 struct rbd_spec *spec)
3454 {
3455         struct rbd_device *rbd_dev;
3456
3457         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3458         if (!rbd_dev)
3459                 return NULL;
3460
3461         spin_lock_init(&rbd_dev->lock);
3462         rbd_dev->flags = 0;
3463         INIT_LIST_HEAD(&rbd_dev->node);
3464         INIT_LIST_HEAD(&rbd_dev->snaps);
3465         init_rwsem(&rbd_dev->header_rwsem);
3466
3467         rbd_dev->spec = spec;
3468         rbd_dev->rbd_client = rbdc;
3469
3470         /* Initialize the layout used for all rbd requests */
3471
3472         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3473         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3474         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3475         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3476
3477         return rbd_dev;
3478 }
3479
3480 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3481 {
3482         rbd_put_client(rbd_dev->rbd_client);
3483         rbd_spec_put(rbd_dev->spec);
3484         kfree(rbd_dev);
3485 }
3486
3487 static void rbd_snap_destroy(struct rbd_snap *snap)
3488 {
3489         kfree(snap->name);
3490         kfree(snap);
3491 }
3492
3493 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3494                                                 const char *snap_name,
3495                                                 u64 snap_id, u64 snap_size,
3496                                                 u64 snap_features)
3497 {
3498         struct rbd_snap *snap;
3499
3500         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3501         if (!snap)
3502                 return ERR_PTR(-ENOMEM);
3503
3504         snap->name = snap_name;
3505         snap->id = snap_id;
3506         snap->size = snap_size;
3507         snap->features = snap_features;
3508
3509         return snap;
3510 }
3511
3512 /*
3513  * Returns a dynamically-allocated snapshot name if successful, or a
3514  * pointer-coded error otherwise.
3515  */
3516 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev,
3517                         u64 snap_id, u64 *snap_size, u64 *snap_features)
3518 {
3519         const char *snap_name;
3520         u32 which;
3521
3522         which = rbd_dev_snap_index(rbd_dev, snap_id);
3523         if (which == BAD_SNAP_INDEX)
3524                 return ERR_PTR(-ENOENT);
3525         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
3526         if (!snap_name)
3527                 return ERR_PTR(-ENOMEM);
3528
3529         *snap_size = rbd_dev->header.snap_sizes[which];
3530         *snap_features = 0;     /* No features for v1 */
3531
3532         return snap_name;
3533 }
3534
3535 /*
3536  * Get the size and object order for an image snapshot, or if
3537  * snap_id is CEPH_NOSNAP, gets this information for the base
3538  * image.
3539  */
3540 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3541                                 u8 *order, u64 *snap_size)
3542 {
3543         __le64 snapid = cpu_to_le64(snap_id);
3544         int ret;
3545         struct {
3546                 u8 order;
3547                 __le64 size;
3548         } __attribute__ ((packed)) size_buf = { 0 };
3549
3550         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3551                                 "rbd", "get_size",
3552                                 &snapid, sizeof (snapid),
3553                                 &size_buf, sizeof (size_buf));
3554         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3555         if (ret < 0)
3556                 return ret;
3557         if (ret < sizeof (size_buf))
3558                 return -ERANGE;
3559
3560         if (order)
3561                 *order = size_buf.order;
3562         *snap_size = le64_to_cpu(size_buf.size);
3563
3564         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3565                 (unsigned long long)snap_id, (unsigned int)*order,
3566                 (unsigned long long)*snap_size);
3567
3568         return 0;
3569 }
3570
3571 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3572 {
3573         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3574                                         &rbd_dev->header.obj_order,
3575                                         &rbd_dev->header.image_size);
3576 }
3577
3578 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3579 {
3580         void *reply_buf;
3581         int ret;
3582         void *p;
3583
3584         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3585         if (!reply_buf)
3586                 return -ENOMEM;
3587
3588         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3589                                 "rbd", "get_object_prefix", NULL, 0,
3590                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3591         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3592         if (ret < 0)
3593                 goto out;
3594
3595         p = reply_buf;
3596         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3597                                                 p + ret, NULL, GFP_NOIO);
3598         ret = 0;
3599
3600         if (IS_ERR(rbd_dev->header.object_prefix)) {
3601                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3602                 rbd_dev->header.object_prefix = NULL;
3603         } else {
3604                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3605         }
3606 out:
3607         kfree(reply_buf);
3608
3609         return ret;
3610 }
3611
3612 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3613                 u64 *snap_features)
3614 {
3615         __le64 snapid = cpu_to_le64(snap_id);
3616         struct {
3617                 __le64 features;
3618                 __le64 incompat;
3619         } __attribute__ ((packed)) features_buf = { 0 };
3620         u64 incompat;
3621         int ret;
3622
3623         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3624                                 "rbd", "get_features",
3625                                 &snapid, sizeof (snapid),
3626                                 &features_buf, sizeof (features_buf));
3627         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3628         if (ret < 0)
3629                 return ret;
3630         if (ret < sizeof (features_buf))
3631                 return -ERANGE;
3632
3633         incompat = le64_to_cpu(features_buf.incompat);
3634         if (incompat & ~RBD_FEATURES_SUPPORTED)
3635                 return -ENXIO;
3636
3637         *snap_features = le64_to_cpu(features_buf.features);
3638
3639         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3640                 (unsigned long long)snap_id,
3641                 (unsigned long long)*snap_features,
3642                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3643
3644         return 0;
3645 }
3646
3647 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3648 {
3649         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3650                                                 &rbd_dev->header.features);
3651 }
3652
3653 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3654 {
3655         struct rbd_spec *parent_spec;
3656         size_t size;
3657         void *reply_buf = NULL;
3658         __le64 snapid;
3659         void *p;
3660         void *end;
3661         char *image_id;
3662         u64 overlap;
3663         int ret;
3664
3665         parent_spec = rbd_spec_alloc();
3666         if (!parent_spec)
3667                 return -ENOMEM;
3668
3669         size = sizeof (__le64) +                                /* pool_id */
3670                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3671                 sizeof (__le64) +                               /* snap_id */
3672                 sizeof (__le64);                                /* overlap */
3673         reply_buf = kmalloc(size, GFP_KERNEL);
3674         if (!reply_buf) {
3675                 ret = -ENOMEM;
3676                 goto out_err;
3677         }
3678
3679         snapid = cpu_to_le64(CEPH_NOSNAP);
3680         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3681                                 "rbd", "get_parent",
3682                                 &snapid, sizeof (snapid),
3683                                 reply_buf, size);
3684         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3685         if (ret < 0)
3686                 goto out_err;
3687
3688         p = reply_buf;
3689         end = reply_buf + ret;
3690         ret = -ERANGE;
3691         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3692         if (parent_spec->pool_id == CEPH_NOPOOL)
3693                 goto out;       /* No parent?  No problem. */
3694
3695         /* The ceph file layout needs to fit pool id in 32 bits */
3696
3697         ret = -EIO;
3698         if (parent_spec->pool_id > (u64)U32_MAX) {
3699                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3700                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3701                 goto out_err;
3702         }
3703
3704         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3705         if (IS_ERR(image_id)) {
3706                 ret = PTR_ERR(image_id);
3707                 goto out_err;
3708         }
3709         parent_spec->image_id = image_id;
3710         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3711         ceph_decode_64_safe(&p, end, overlap, out_err);
3712
3713         rbd_dev->parent_overlap = overlap;
3714         rbd_dev->parent_spec = parent_spec;
3715         parent_spec = NULL;     /* rbd_dev now owns this */
3716 out:
3717         ret = 0;
3718 out_err:
3719         kfree(reply_buf);
3720         rbd_spec_put(parent_spec);
3721
3722         return ret;
3723 }
3724
3725 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3726 {
3727         struct {
3728                 __le64 stripe_unit;
3729                 __le64 stripe_count;
3730         } __attribute__ ((packed)) striping_info_buf = { 0 };
3731         size_t size = sizeof (striping_info_buf);
3732         void *p;
3733         u64 obj_size;
3734         u64 stripe_unit;
3735         u64 stripe_count;
3736         int ret;
3737
3738         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3739                                 "rbd", "get_stripe_unit_count", NULL, 0,
3740                                 (char *)&striping_info_buf, size);
3741         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3742         if (ret < 0)
3743                 return ret;
3744         if (ret < size)
3745                 return -ERANGE;
3746
3747         /*
3748          * We don't actually support the "fancy striping" feature
3749          * (STRIPINGV2) yet, but if the striping sizes are the
3750          * defaults the behavior is the same as before.  So find
3751          * out, and only fail if the image has non-default values.
3752          */
3753         ret = -EINVAL;
3754         obj_size = (u64)1 << rbd_dev->header.obj_order;
3755         p = &striping_info_buf;
3756         stripe_unit = ceph_decode_64(&p);
3757         if (stripe_unit != obj_size) {
3758                 rbd_warn(rbd_dev, "unsupported stripe unit "
3759                                 "(got %llu want %llu)",
3760                                 stripe_unit, obj_size);
3761                 return -EINVAL;
3762         }
3763         stripe_count = ceph_decode_64(&p);
3764         if (stripe_count != 1) {
3765                 rbd_warn(rbd_dev, "unsupported stripe count "
3766                                 "(got %llu want 1)", stripe_count);
3767                 return -EINVAL;
3768         }
3769         rbd_dev->header.stripe_unit = stripe_unit;
3770         rbd_dev->header.stripe_count = stripe_count;
3771
3772         return 0;
3773 }
3774
3775 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3776 {
3777         size_t image_id_size;
3778         char *image_id;
3779         void *p;
3780         void *end;
3781         size_t size;
3782         void *reply_buf = NULL;
3783         size_t len = 0;
3784         char *image_name = NULL;
3785         int ret;
3786
3787         rbd_assert(!rbd_dev->spec->image_name);
3788
3789         len = strlen(rbd_dev->spec->image_id);
3790         image_id_size = sizeof (__le32) + len;
3791         image_id = kmalloc(image_id_size, GFP_KERNEL);
3792         if (!image_id)
3793                 return NULL;
3794
3795         p = image_id;
3796         end = image_id + image_id_size;
3797         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3798
3799         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3800         reply_buf = kmalloc(size, GFP_KERNEL);
3801         if (!reply_buf)
3802                 goto out;
3803
3804         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3805                                 "rbd", "dir_get_name",
3806                                 image_id, image_id_size,
3807                                 reply_buf, size);
3808         if (ret < 0)
3809                 goto out;
3810         p = reply_buf;
3811         end = reply_buf + ret;
3812
3813         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3814         if (IS_ERR(image_name))
3815                 image_name = NULL;
3816         else
3817                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3818 out:
3819         kfree(reply_buf);
3820         kfree(image_id);
3821
3822         return image_name;
3823 }
3824
3825 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3826 {
3827         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3828         const char *snap_name;
3829         u32 which = 0;
3830
3831         /* Skip over names until we find the one we are looking for */
3832
3833         snap_name = rbd_dev->header.snap_names;
3834         while (which < snapc->num_snaps) {
3835                 if (!strcmp(name, snap_name))
3836                         return snapc->snaps[which];
3837                 snap_name += strlen(snap_name) + 1;
3838                 which++;
3839         }
3840         return CEPH_NOSNAP;
3841 }
3842
3843 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3844 {
3845         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3846         u32 which;
3847         bool found = false;
3848         u64 snap_id;
3849
3850         for (which = 0; !found && which < snapc->num_snaps; which++) {
3851                 const char *snap_name;
3852
3853                 snap_id = snapc->snaps[which];
3854                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3855                 if (IS_ERR(snap_name))
3856                         break;
3857                 found = !strcmp(name, snap_name);
3858                 kfree(snap_name);
3859         }
3860         return found ? snap_id : CEPH_NOSNAP;
3861 }
3862
3863 /*
3864  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3865  * no snapshot by that name is found, or if an error occurs.
3866  */
3867 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3868 {
3869         if (rbd_dev->image_format == 1)
3870                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3871
3872         return rbd_v2_snap_id_by_name(rbd_dev, name);
3873 }
3874
3875 /*
3876  * When an rbd image has a parent image, it is identified by the
3877  * pool, image, and snapshot ids (not names).  This function fills
3878  * in the names for those ids.  (It's OK if we can't figure out the
3879  * name for an image id, but the pool and snapshot ids should always
3880  * exist and have names.)  All names in an rbd spec are dynamically
3881  * allocated.
3882  *
3883  * When an image being mapped (not a parent) is probed, we have the
3884  * pool name and pool id, image name and image id, and the snapshot
3885  * name.  The only thing we're missing is the snapshot id.
3886  *
3887  * The set of snapshots for an image is not known until they have
3888  * been read by rbd_dev_snaps_update(), so we can't completely fill
3889  * in this information until after that has been called.
3890  */
3891 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3892 {
3893         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3894         struct rbd_spec *spec = rbd_dev->spec;
3895         const char *pool_name;
3896         const char *image_name;
3897         const char *snap_name;
3898         int ret;
3899
3900         /*
3901          * An image being mapped will have the pool name (etc.), but
3902          * we need to look up the snapshot id.
3903          */
3904         if (spec->pool_name) {
3905                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3906                         u64 snap_id;
3907
3908                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3909                         if (snap_id == CEPH_NOSNAP)
3910                                 return -ENOENT;
3911                         spec->snap_id = snap_id;
3912                 } else {
3913                         spec->snap_id = CEPH_NOSNAP;
3914                 }
3915
3916                 return 0;
3917         }
3918
3919         /* Get the pool name; we have to make our own copy of this */
3920
3921         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3922         if (!pool_name) {
3923                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3924                 return -EIO;
3925         }
3926         pool_name = kstrdup(pool_name, GFP_KERNEL);
3927         if (!pool_name)
3928                 return -ENOMEM;
3929
3930         /* Fetch the image name; tolerate failure here */
3931
3932         image_name = rbd_dev_image_name(rbd_dev);
3933         if (!image_name)
3934                 rbd_warn(rbd_dev, "unable to get image name");
3935
3936         /* Look up the snapshot name, and make a copy */
3937
3938         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3939         if (!snap_name) {
3940                 ret = -ENOMEM;
3941                 goto out_err;
3942         }
3943
3944         spec->pool_name = pool_name;
3945         spec->image_name = image_name;
3946         spec->snap_name = snap_name;
3947
3948         return 0;
3949 out_err:
3950         kfree(image_name);
3951         kfree(pool_name);
3952
3953         return ret;
3954 }
3955
3956 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3957 {
3958         size_t size;
3959         int ret;
3960         void *reply_buf;
3961         void *p;
3962         void *end;
3963         u64 seq;
3964         u32 snap_count;
3965         struct ceph_snap_context *snapc;
3966         u32 i;
3967
3968         /*
3969          * We'll need room for the seq value (maximum snapshot id),
3970          * snapshot count, and array of that many snapshot ids.
3971          * For now we have a fixed upper limit on the number we're
3972          * prepared to receive.
3973          */
3974         size = sizeof (__le64) + sizeof (__le32) +
3975                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3976         reply_buf = kzalloc(size, GFP_KERNEL);
3977         if (!reply_buf)
3978                 return -ENOMEM;
3979
3980         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3981                                 "rbd", "get_snapcontext", NULL, 0,
3982                                 reply_buf, size);
3983         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3984         if (ret < 0)
3985                 goto out;
3986
3987         p = reply_buf;
3988         end = reply_buf + ret;
3989         ret = -ERANGE;
3990         ceph_decode_64_safe(&p, end, seq, out);
3991         ceph_decode_32_safe(&p, end, snap_count, out);
3992
3993         /*
3994          * Make sure the reported number of snapshot ids wouldn't go
3995          * beyond the end of our buffer.  But before checking that,
3996          * make sure the computed size of the snapshot context we
3997          * allocate is representable in a size_t.
3998          */
3999         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4000                                  / sizeof (u64)) {
4001                 ret = -EINVAL;
4002                 goto out;
4003         }
4004         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4005                 goto out;
4006         ret = 0;
4007
4008         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4009         if (!snapc) {
4010                 ret = -ENOMEM;
4011                 goto out;
4012         }
4013         snapc->seq = seq;
4014         for (i = 0; i < snap_count; i++)
4015                 snapc->snaps[i] = ceph_decode_64(&p);
4016
4017         rbd_dev->header.snapc = snapc;
4018
4019         dout("  snap context seq = %llu, snap_count = %u\n",
4020                 (unsigned long long)seq, (unsigned int)snap_count);
4021 out:
4022         kfree(reply_buf);
4023
4024         return ret;
4025 }
4026
4027 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4028                                         u64 snap_id)
4029 {
4030         size_t size;
4031         void *reply_buf;
4032         __le64 snapid;
4033         int ret;
4034         void *p;
4035         void *end;
4036         char *snap_name;
4037
4038         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4039         reply_buf = kmalloc(size, GFP_KERNEL);
4040         if (!reply_buf)
4041                 return ERR_PTR(-ENOMEM);
4042
4043         snapid = cpu_to_le64(snap_id);
4044         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4045                                 "rbd", "get_snapshot_name",
4046                                 &snapid, sizeof (snapid),
4047                                 reply_buf, size);
4048         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4049         if (ret < 0) {
4050                 snap_name = ERR_PTR(ret);
4051                 goto out;
4052         }
4053
4054         p = reply_buf;
4055         end = reply_buf + ret;
4056         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4057         if (IS_ERR(snap_name))
4058                 goto out;
4059
4060         dout("  snap_id 0x%016llx snap_name = %s\n",
4061                 (unsigned long long)snap_id, snap_name);
4062 out:
4063         kfree(reply_buf);
4064
4065         return snap_name;
4066 }
4067
4068 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev,
4069                         u64 snap_id, u64 *snap_size, u64 *snap_features)
4070 {
4071         u64 size;
4072         u64 features;
4073         const char *snap_name;
4074         int ret;
4075
4076         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
4077         if (ret)
4078                 goto out_err;
4079
4080         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
4081         if (ret)
4082                 goto out_err;
4083
4084         snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4085         if (!IS_ERR(snap_name)) {
4086                 *snap_size = size;
4087                 *snap_features = features;
4088         }
4089
4090         return snap_name;
4091 out_err:
4092         return ERR_PTR(ret);
4093 }
4094
4095 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev,
4096                 u64 snap_id, u64 *snap_size, u64 *snap_features)
4097 {
4098         if (rbd_dev->image_format == 1)
4099                 return rbd_dev_v1_snap_info(rbd_dev, snap_id,
4100                                         snap_size, snap_features);
4101         if (rbd_dev->image_format == 2)
4102                 return rbd_dev_v2_snap_info(rbd_dev, snap_id,
4103                                         snap_size, snap_features);
4104         return ERR_PTR(-EINVAL);
4105 }
4106
4107 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4108 {
4109         int ret;
4110
4111         down_write(&rbd_dev->header_rwsem);
4112
4113         ret = rbd_dev_v2_image_size(rbd_dev);
4114         if (ret)
4115                 goto out;
4116         rbd_update_mapping_size(rbd_dev);
4117
4118         ret = rbd_dev_v2_snap_context(rbd_dev);
4119         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4120         if (ret)
4121                 goto out;
4122         ret = rbd_dev_snaps_update(rbd_dev);
4123         dout("rbd_dev_snaps_update returned %d\n", ret);
4124         if (ret)
4125                 goto out;
4126 out:
4127         up_write(&rbd_dev->header_rwsem);
4128
4129         return ret;
4130 }
4131
4132 /*
4133  * Scan the rbd device's current snapshot list and compare it to the
4134  * newly-received snapshot context.  Remove any existing snapshots
4135  * not present in the new snapshot context.  Add a new snapshot for
4136  * any snaphots in the snapshot context not in the current list.
4137  * And verify there are no changes to snapshots we already know
4138  * about.
4139  *
4140  * Assumes the snapshots in the snapshot context are sorted by
4141  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4142  * are also maintained in that order.)
4143  *
4144  * Note that any error occurs while updating the snapshot list
4145  * aborts the update, and the entire list is cleared.  The snapshot
4146  * list becomes inconsistent at that point anyway, so it might as
4147  * well be empty.
4148  */
4149 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4150 {
4151         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4152         const u32 snap_count = snapc->num_snaps;
4153         struct list_head *head = &rbd_dev->snaps;
4154         struct list_head *links = head->next;
4155         u32 index = 0;
4156         int ret = 0;
4157
4158         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4159         while (index < snap_count || links != head) {
4160                 u64 snap_id;
4161                 struct rbd_snap *snap;
4162                 const char *snap_name;
4163                 u64 snap_size = 0;
4164                 u64 snap_features = 0;
4165
4166                 snap_id = index < snap_count ? snapc->snaps[index]
4167                                              : CEPH_NOSNAP;
4168                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4169                                      : NULL;
4170                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4171
4172                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4173                         struct list_head *next = links->next;
4174
4175                         /*
4176                          * A previously-existing snapshot is not in
4177                          * the new snap context.
4178                          *
4179                          * If the now-missing snapshot is the one
4180                          * the image represents, clear its existence
4181                          * flag so we can avoid sending any more
4182                          * requests to it.
4183                          */
4184                         if (rbd_dev->spec->snap_id == snap->id)
4185                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4186                         dout("removing %ssnap id %llu\n",
4187                                 rbd_dev->spec->snap_id == snap->id ?
4188                                                         "mapped " : "",
4189                                 (unsigned long long)snap->id);
4190
4191                         list_del(&snap->node);
4192                         rbd_snap_destroy(snap);
4193
4194                         /* Done with this list entry; advance */
4195
4196                         links = next;
4197                         continue;
4198                 }
4199
4200                 snap_name = rbd_dev_snap_info(rbd_dev, snap_id,
4201                                         &snap_size, &snap_features);
4202                 if (IS_ERR(snap_name)) {
4203                         ret = PTR_ERR(snap_name);
4204                         dout("failed to get snap info, error %d\n", ret);
4205                         goto out_err;
4206                 }
4207
4208                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4209                         (unsigned long long)snap_id);
4210                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4211                         struct rbd_snap *new_snap;
4212
4213                         /* We haven't seen this snapshot before */
4214
4215                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4216                                         snap_id, snap_size, snap_features);
4217                         if (IS_ERR(new_snap)) {
4218                                 ret = PTR_ERR(new_snap);
4219                                 dout("  failed to add dev, error %d\n", ret);
4220                                 goto out_err;
4221                         }
4222
4223                         /* New goes before existing, or at end of list */
4224
4225                         dout("  added dev%s\n", snap ? "" : " at end\n");
4226                         if (snap)
4227                                 list_add_tail(&new_snap->node, &snap->node);
4228                         else
4229                                 list_add_tail(&new_snap->node, head);
4230                 } else {
4231                         /* Already have this one */
4232
4233                         dout("  already present\n");
4234
4235                         rbd_assert(snap->size == snap_size);
4236                         rbd_assert(!strcmp(snap->name, snap_name));
4237                         rbd_assert(snap->features == snap_features);
4238
4239                         /* Done with this list entry; advance */
4240
4241                         links = links->next;
4242                 }
4243
4244                 /* Advance to the next entry in the snapshot context */
4245
4246                 index++;
4247         }
4248         dout("%s: done\n", __func__);
4249
4250         return 0;
4251 out_err:
4252         rbd_remove_all_snaps(rbd_dev);
4253
4254         return ret;
4255 }
4256
4257 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4258 {
4259         struct device *dev;
4260         int ret;
4261
4262         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4263
4264         dev = &rbd_dev->dev;
4265         dev->bus = &rbd_bus_type;
4266         dev->type = &rbd_device_type;
4267         dev->parent = &rbd_root_dev;
4268         dev->release = rbd_dev_device_release;
4269         dev_set_name(dev, "%d", rbd_dev->dev_id);
4270         ret = device_register(dev);
4271
4272         mutex_unlock(&ctl_mutex);
4273
4274         return ret;
4275 }
4276
4277 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4278 {
4279         device_unregister(&rbd_dev->dev);
4280 }
4281
4282 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4283
4284 /*
4285  * Get a unique rbd identifier for the given new rbd_dev, and add
4286  * the rbd_dev to the global list.  The minimum rbd id is 1.
4287  */
4288 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4289 {
4290         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4291
4292         spin_lock(&rbd_dev_list_lock);
4293         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4294         spin_unlock(&rbd_dev_list_lock);
4295         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4296                 (unsigned long long) rbd_dev->dev_id);
4297 }
4298
4299 /*
4300  * Remove an rbd_dev from the global list, and record that its
4301  * identifier is no longer in use.
4302  */
4303 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4304 {
4305         struct list_head *tmp;
4306         int rbd_id = rbd_dev->dev_id;
4307         int max_id;
4308
4309         rbd_assert(rbd_id > 0);
4310
4311         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4312                 (unsigned long long) rbd_dev->dev_id);
4313         spin_lock(&rbd_dev_list_lock);
4314         list_del_init(&rbd_dev->node);
4315
4316         /*
4317          * If the id being "put" is not the current maximum, there
4318          * is nothing special we need to do.
4319          */
4320         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4321                 spin_unlock(&rbd_dev_list_lock);
4322                 return;
4323         }
4324
4325         /*
4326          * We need to update the current maximum id.  Search the
4327          * list to find out what it is.  We're more likely to find
4328          * the maximum at the end, so search the list backward.
4329          */
4330         max_id = 0;
4331         list_for_each_prev(tmp, &rbd_dev_list) {
4332                 struct rbd_device *rbd_dev;
4333
4334                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4335                 if (rbd_dev->dev_id > max_id)
4336                         max_id = rbd_dev->dev_id;
4337         }
4338         spin_unlock(&rbd_dev_list_lock);
4339
4340         /*
4341          * The max id could have been updated by rbd_dev_id_get(), in
4342          * which case it now accurately reflects the new maximum.
4343          * Be careful not to overwrite the maximum value in that
4344          * case.
4345          */
4346         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4347         dout("  max dev id has been reset\n");
4348 }
4349
4350 /*
4351  * Skips over white space at *buf, and updates *buf to point to the
4352  * first found non-space character (if any). Returns the length of
4353  * the token (string of non-white space characters) found.  Note
4354  * that *buf must be terminated with '\0'.
4355  */
4356 static inline size_t next_token(const char **buf)
4357 {
4358         /*
4359         * These are the characters that produce nonzero for
4360         * isspace() in the "C" and "POSIX" locales.
4361         */
4362         const char *spaces = " \f\n\r\t\v";
4363
4364         *buf += strspn(*buf, spaces);   /* Find start of token */
4365
4366         return strcspn(*buf, spaces);   /* Return token length */
4367 }
4368
4369 /*
4370  * Finds the next token in *buf, and if the provided token buffer is
4371  * big enough, copies the found token into it.  The result, if
4372  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4373  * must be terminated with '\0' on entry.
4374  *
4375  * Returns the length of the token found (not including the '\0').
4376  * Return value will be 0 if no token is found, and it will be >=
4377  * token_size if the token would not fit.
4378  *
4379  * The *buf pointer will be updated to point beyond the end of the
4380  * found token.  Note that this occurs even if the token buffer is
4381  * too small to hold it.
4382  */
4383 static inline size_t copy_token(const char **buf,
4384                                 char *token,
4385                                 size_t token_size)
4386 {
4387         size_t len;
4388
4389         len = next_token(buf);
4390         if (len < token_size) {
4391                 memcpy(token, *buf, len);
4392                 *(token + len) = '\0';
4393         }
4394         *buf += len;
4395
4396         return len;
4397 }
4398
4399 /*
4400  * Finds the next token in *buf, dynamically allocates a buffer big
4401  * enough to hold a copy of it, and copies the token into the new
4402  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4403  * that a duplicate buffer is created even for a zero-length token.
4404  *
4405  * Returns a pointer to the newly-allocated duplicate, or a null
4406  * pointer if memory for the duplicate was not available.  If
4407  * the lenp argument is a non-null pointer, the length of the token
4408  * (not including the '\0') is returned in *lenp.
4409  *
4410  * If successful, the *buf pointer will be updated to point beyond
4411  * the end of the found token.
4412  *
4413  * Note: uses GFP_KERNEL for allocation.
4414  */
4415 static inline char *dup_token(const char **buf, size_t *lenp)
4416 {
4417         char *dup;
4418         size_t len;
4419
4420         len = next_token(buf);
4421         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4422         if (!dup)
4423                 return NULL;
4424         *(dup + len) = '\0';
4425         *buf += len;
4426
4427         if (lenp)
4428                 *lenp = len;
4429
4430         return dup;
4431 }
4432
4433 /*
4434  * Parse the options provided for an "rbd add" (i.e., rbd image
4435  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4436  * and the data written is passed here via a NUL-terminated buffer.
4437  * Returns 0 if successful or an error code otherwise.
4438  *
4439  * The information extracted from these options is recorded in
4440  * the other parameters which return dynamically-allocated
4441  * structures:
4442  *  ceph_opts
4443  *      The address of a pointer that will refer to a ceph options
4444  *      structure.  Caller must release the returned pointer using
4445  *      ceph_destroy_options() when it is no longer needed.
4446  *  rbd_opts
4447  *      Address of an rbd options pointer.  Fully initialized by
4448  *      this function; caller must release with kfree().
4449  *  spec
4450  *      Address of an rbd image specification pointer.  Fully
4451  *      initialized by this function based on parsed options.
4452  *      Caller must release with rbd_spec_put().
4453  *
4454  * The options passed take this form:
4455  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4456  * where:
4457  *  <mon_addrs>
4458  *      A comma-separated list of one or more monitor addresses.
4459  *      A monitor address is an ip address, optionally followed
4460  *      by a port number (separated by a colon).
4461  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4462  *  <options>
4463  *      A comma-separated list of ceph and/or rbd options.
4464  *  <pool_name>
4465  *      The name of the rados pool containing the rbd image.
4466  *  <image_name>
4467  *      The name of the image in that pool to map.
4468  *  <snap_id>
4469  *      An optional snapshot id.  If provided, the mapping will
4470  *      present data from the image at the time that snapshot was
4471  *      created.  The image head is used if no snapshot id is
4472  *      provided.  Snapshot mappings are always read-only.
4473  */
4474 static int rbd_add_parse_args(const char *buf,
4475                                 struct ceph_options **ceph_opts,
4476                                 struct rbd_options **opts,
4477                                 struct rbd_spec **rbd_spec)
4478 {
4479         size_t len;
4480         char *options;
4481         const char *mon_addrs;
4482         char *snap_name;
4483         size_t mon_addrs_size;
4484         struct rbd_spec *spec = NULL;
4485         struct rbd_options *rbd_opts = NULL;
4486         struct ceph_options *copts;
4487         int ret;
4488
4489         /* The first four tokens are required */
4490
4491         len = next_token(&buf);
4492         if (!len) {
4493                 rbd_warn(NULL, "no monitor address(es) provided");
4494                 return -EINVAL;
4495         }
4496         mon_addrs = buf;
4497         mon_addrs_size = len + 1;
4498         buf += len;
4499
4500         ret = -EINVAL;
4501         options = dup_token(&buf, NULL);
4502         if (!options)
4503                 return -ENOMEM;
4504         if (!*options) {
4505                 rbd_warn(NULL, "no options provided");
4506                 goto out_err;
4507         }
4508
4509         spec = rbd_spec_alloc();
4510         if (!spec)
4511                 goto out_mem;
4512
4513         spec->pool_name = dup_token(&buf, NULL);
4514         if (!spec->pool_name)
4515                 goto out_mem;
4516         if (!*spec->pool_name) {
4517                 rbd_warn(NULL, "no pool name provided");
4518                 goto out_err;
4519         }
4520
4521         spec->image_name = dup_token(&buf, NULL);
4522         if (!spec->image_name)
4523                 goto out_mem;
4524         if (!*spec->image_name) {
4525                 rbd_warn(NULL, "no image name provided");
4526                 goto out_err;
4527         }
4528
4529         /*
4530          * Snapshot name is optional; default is to use "-"
4531          * (indicating the head/no snapshot).
4532          */
4533         len = next_token(&buf);
4534         if (!len) {
4535                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4536                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4537         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4538                 ret = -ENAMETOOLONG;
4539                 goto out_err;
4540         }
4541         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4542         if (!snap_name)
4543                 goto out_mem;
4544         *(snap_name + len) = '\0';
4545         spec->snap_name = snap_name;
4546
4547         /* Initialize all rbd options to the defaults */
4548
4549         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4550         if (!rbd_opts)
4551                 goto out_mem;
4552
4553         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4554
4555         copts = ceph_parse_options(options, mon_addrs,
4556                                         mon_addrs + mon_addrs_size - 1,
4557                                         parse_rbd_opts_token, rbd_opts);
4558         if (IS_ERR(copts)) {
4559                 ret = PTR_ERR(copts);
4560                 goto out_err;
4561         }
4562         kfree(options);
4563
4564         *ceph_opts = copts;
4565         *opts = rbd_opts;
4566         *rbd_spec = spec;
4567
4568         return 0;
4569 out_mem:
4570         ret = -ENOMEM;
4571 out_err:
4572         kfree(rbd_opts);
4573         rbd_spec_put(spec);
4574         kfree(options);
4575
4576         return ret;
4577 }
4578
4579 /*
4580  * An rbd format 2 image has a unique identifier, distinct from the
4581  * name given to it by the user.  Internally, that identifier is
4582  * what's used to specify the names of objects related to the image.
4583  *
4584  * A special "rbd id" object is used to map an rbd image name to its
4585  * id.  If that object doesn't exist, then there is no v2 rbd image
4586  * with the supplied name.
4587  *
4588  * This function will record the given rbd_dev's image_id field if
4589  * it can be determined, and in that case will return 0.  If any
4590  * errors occur a negative errno will be returned and the rbd_dev's
4591  * image_id field will be unchanged (and should be NULL).
4592  */
4593 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4594 {
4595         int ret;
4596         size_t size;
4597         char *object_name;
4598         void *response;
4599         char *image_id;
4600
4601         /*
4602          * When probing a parent image, the image id is already
4603          * known (and the image name likely is not).  There's no
4604          * need to fetch the image id again in this case.  We
4605          * do still need to set the image format though.
4606          */
4607         if (rbd_dev->spec->image_id) {
4608                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4609
4610                 return 0;
4611         }
4612
4613         /*
4614          * First, see if the format 2 image id file exists, and if
4615          * so, get the image's persistent id from it.
4616          */
4617         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4618         object_name = kmalloc(size, GFP_NOIO);
4619         if (!object_name)
4620                 return -ENOMEM;
4621         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4622         dout("rbd id object name is %s\n", object_name);
4623
4624         /* Response will be an encoded string, which includes a length */
4625
4626         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4627         response = kzalloc(size, GFP_NOIO);
4628         if (!response) {
4629                 ret = -ENOMEM;
4630                 goto out;
4631         }
4632
4633         /* If it doesn't exist we'll assume it's a format 1 image */
4634
4635         ret = rbd_obj_method_sync(rbd_dev, object_name,
4636                                 "rbd", "get_id", NULL, 0,
4637                                 response, RBD_IMAGE_ID_LEN_MAX);
4638         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4639         if (ret == -ENOENT) {
4640                 image_id = kstrdup("", GFP_KERNEL);
4641                 ret = image_id ? 0 : -ENOMEM;
4642                 if (!ret)
4643                         rbd_dev->image_format = 1;
4644         } else if (ret > sizeof (__le32)) {
4645                 void *p = response;
4646
4647                 image_id = ceph_extract_encoded_string(&p, p + ret,
4648                                                 NULL, GFP_NOIO);
4649                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4650                 if (!ret)
4651                         rbd_dev->image_format = 2;
4652         } else {
4653                 ret = -EINVAL;
4654         }
4655
4656         if (!ret) {
4657                 rbd_dev->spec->image_id = image_id;
4658                 dout("image_id is %s\n", image_id);
4659         }
4660 out:
4661         kfree(response);
4662         kfree(object_name);
4663
4664         return ret;
4665 }
4666
4667 /* Undo whatever state changes are made by v1 or v2 image probe */
4668
4669 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4670 {
4671         struct rbd_image_header *header;
4672
4673         rbd_dev_remove_parent(rbd_dev);
4674         rbd_spec_put(rbd_dev->parent_spec);
4675         rbd_dev->parent_spec = NULL;
4676         rbd_dev->parent_overlap = 0;
4677
4678         /* Free dynamic fields from the header, then zero it out */
4679
4680         header = &rbd_dev->header;
4681         ceph_put_snap_context(header->snapc);
4682         kfree(header->snap_sizes);
4683         kfree(header->snap_names);
4684         kfree(header->object_prefix);
4685         memset(header, 0, sizeof (*header));
4686 }
4687
4688 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4689 {
4690         int ret;
4691
4692         /* Populate rbd image metadata */
4693
4694         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4695         if (ret < 0)
4696                 goto out_err;
4697
4698         /* Version 1 images have no parent (no layering) */
4699
4700         rbd_dev->parent_spec = NULL;
4701         rbd_dev->parent_overlap = 0;
4702
4703         dout("discovered version 1 image, header name is %s\n",
4704                 rbd_dev->header_name);
4705
4706         return 0;
4707
4708 out_err:
4709         kfree(rbd_dev->header_name);
4710         rbd_dev->header_name = NULL;
4711         kfree(rbd_dev->spec->image_id);
4712         rbd_dev->spec->image_id = NULL;
4713
4714         return ret;
4715 }
4716
4717 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4718 {
4719         int ret;
4720
4721         ret = rbd_dev_v2_image_size(rbd_dev);
4722         if (ret)
4723                 goto out_err;
4724
4725         /* Get the object prefix (a.k.a. block_name) for the image */
4726
4727         ret = rbd_dev_v2_object_prefix(rbd_dev);
4728         if (ret)
4729                 goto out_err;
4730
4731         /* Get the and check features for the image */
4732
4733         ret = rbd_dev_v2_features(rbd_dev);
4734         if (ret)
4735                 goto out_err;
4736
4737         /* If the image supports layering, get the parent info */
4738
4739         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4740                 ret = rbd_dev_v2_parent_info(rbd_dev);
4741                 if (ret)
4742                         goto out_err;
4743
4744                 /*
4745                  * Don't print a warning for parent images.  We can
4746                  * tell this point because we won't know its pool
4747                  * name yet (just its pool id).
4748                  */
4749                 if (rbd_dev->spec->pool_name)
4750                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4751                                         "is EXPERIMENTAL!");
4752         }
4753
4754         /* If the image supports fancy striping, get its parameters */
4755
4756         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4757                 ret = rbd_dev_v2_striping_info(rbd_dev);
4758                 if (ret < 0)
4759                         goto out_err;
4760         }
4761
4762         /* crypto and compression type aren't (yet) supported for v2 images */
4763
4764         rbd_dev->header.crypt_type = 0;
4765         rbd_dev->header.comp_type = 0;
4766
4767         /* Get the snapshot context, plus the header version */
4768
4769         ret = rbd_dev_v2_snap_context(rbd_dev);
4770         if (ret)
4771                 goto out_err;
4772
4773         dout("discovered version 2 image, header name is %s\n",
4774                 rbd_dev->header_name);
4775
4776         return 0;
4777 out_err:
4778         rbd_dev->parent_overlap = 0;
4779         rbd_spec_put(rbd_dev->parent_spec);
4780         rbd_dev->parent_spec = NULL;
4781         kfree(rbd_dev->header_name);
4782         rbd_dev->header_name = NULL;
4783         kfree(rbd_dev->header.object_prefix);
4784         rbd_dev->header.object_prefix = NULL;
4785
4786         return ret;
4787 }
4788
4789 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4790 {
4791         struct rbd_device *parent = NULL;
4792         struct rbd_spec *parent_spec;
4793         struct rbd_client *rbdc;
4794         int ret;
4795
4796         if (!rbd_dev->parent_spec)
4797                 return 0;
4798         /*
4799          * We need to pass a reference to the client and the parent
4800          * spec when creating the parent rbd_dev.  Images related by
4801          * parent/child relationships always share both.
4802          */
4803         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4804         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4805
4806         ret = -ENOMEM;
4807         parent = rbd_dev_create(rbdc, parent_spec);
4808         if (!parent)
4809                 goto out_err;
4810
4811         ret = rbd_dev_image_probe(parent);
4812         if (ret < 0)
4813                 goto out_err;
4814         rbd_dev->parent = parent;
4815
4816         return 0;
4817 out_err:
4818         if (parent) {
4819                 rbd_spec_put(rbd_dev->parent_spec);
4820                 kfree(rbd_dev->header_name);
4821                 rbd_dev_destroy(parent);
4822         } else {
4823                 rbd_put_client(rbdc);
4824                 rbd_spec_put(parent_spec);
4825         }
4826
4827         return ret;
4828 }
4829
4830 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4831 {
4832         int ret;
4833
4834         ret = rbd_dev_mapping_set(rbd_dev);
4835         if (ret)
4836                 return ret;
4837
4838         /* generate unique id: find highest unique id, add one */
4839         rbd_dev_id_get(rbd_dev);
4840
4841         /* Fill in the device name, now that we have its id. */
4842         BUILD_BUG_ON(DEV_NAME_LEN
4843                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4844         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4845
4846         /* Get our block major device number. */
4847
4848         ret = register_blkdev(0, rbd_dev->name);
4849         if (ret < 0)
4850                 goto err_out_id;
4851         rbd_dev->major = ret;
4852
4853         /* Set up the blkdev mapping. */
4854
4855         ret = rbd_init_disk(rbd_dev);
4856         if (ret)
4857                 goto err_out_blkdev;
4858
4859         ret = rbd_bus_add_dev(rbd_dev);
4860         if (ret)
4861                 goto err_out_disk;
4862
4863         /* Everything's ready.  Announce the disk to the world. */
4864
4865         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4866         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4867         add_disk(rbd_dev->disk);
4868
4869         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4870                 (unsigned long long) rbd_dev->mapping.size);
4871
4872         return ret;
4873
4874 err_out_disk:
4875         rbd_free_disk(rbd_dev);
4876 err_out_blkdev:
4877         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4878 err_out_id:
4879         rbd_dev_id_put(rbd_dev);
4880         rbd_dev_mapping_clear(rbd_dev);
4881
4882         return ret;
4883 }
4884
4885 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4886 {
4887         struct rbd_spec *spec = rbd_dev->spec;
4888         size_t size;
4889
4890         /* Record the header object name for this rbd image. */
4891
4892         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4893
4894         if (rbd_dev->image_format == 1)
4895                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4896         else
4897                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4898
4899         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4900         if (!rbd_dev->header_name)
4901                 return -ENOMEM;
4902
4903         if (rbd_dev->image_format == 1)
4904                 sprintf(rbd_dev->header_name, "%s%s",
4905                         spec->image_name, RBD_SUFFIX);
4906         else
4907                 sprintf(rbd_dev->header_name, "%s%s",
4908                         RBD_HEADER_PREFIX, spec->image_id);
4909         return 0;
4910 }
4911
4912 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4913 {
4914         int ret;
4915
4916         rbd_remove_all_snaps(rbd_dev);
4917         rbd_dev_unprobe(rbd_dev);
4918         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4919         if (ret)
4920                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4921         kfree(rbd_dev->header_name);
4922         rbd_dev->header_name = NULL;
4923         rbd_dev->image_format = 0;
4924         kfree(rbd_dev->spec->image_id);
4925         rbd_dev->spec->image_id = NULL;
4926
4927         rbd_dev_destroy(rbd_dev);
4928 }
4929
4930 /*
4931  * Probe for the existence of the header object for the given rbd
4932  * device.  For format 2 images this includes determining the image
4933  * id.
4934  */
4935 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4936 {
4937         int ret;
4938         int tmp;
4939
4940         /*
4941          * Get the id from the image id object.  If it's not a
4942          * format 2 image, we'll get ENOENT back, and we'll assume
4943          * it's a format 1 image.
4944          */
4945         ret = rbd_dev_image_id(rbd_dev);
4946         if (ret)
4947                 return ret;
4948         rbd_assert(rbd_dev->spec->image_id);
4949         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4950
4951         ret = rbd_dev_header_name(rbd_dev);
4952         if (ret)
4953                 goto err_out_format;
4954
4955         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4956         if (ret)
4957                 goto out_header_name;
4958
4959         if (rbd_dev->image_format == 1)
4960                 ret = rbd_dev_v1_probe(rbd_dev);
4961         else
4962                 ret = rbd_dev_v2_probe(rbd_dev);
4963         if (ret)
4964                 goto err_out_watch;
4965
4966         ret = rbd_dev_snaps_update(rbd_dev);
4967         if (ret)
4968                 goto err_out_probe;
4969
4970         ret = rbd_dev_spec_update(rbd_dev);
4971         if (ret)
4972                 goto err_out_snaps;
4973
4974         ret = rbd_dev_probe_parent(rbd_dev);
4975         if (!ret)
4976                 return 0;
4977
4978 err_out_snaps:
4979         rbd_remove_all_snaps(rbd_dev);
4980 err_out_probe:
4981         rbd_dev_unprobe(rbd_dev);
4982 err_out_watch:
4983         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4984         if (tmp)
4985                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4986 out_header_name:
4987         kfree(rbd_dev->header_name);
4988         rbd_dev->header_name = NULL;
4989 err_out_format:
4990         rbd_dev->image_format = 0;
4991         kfree(rbd_dev->spec->image_id);
4992         rbd_dev->spec->image_id = NULL;
4993
4994         dout("probe failed, returning %d\n", ret);
4995
4996         return ret;
4997 }
4998
4999 static ssize_t rbd_add(struct bus_type *bus,
5000                        const char *buf,
5001                        size_t count)
5002 {
5003         struct rbd_device *rbd_dev = NULL;
5004         struct ceph_options *ceph_opts = NULL;
5005         struct rbd_options *rbd_opts = NULL;
5006         struct rbd_spec *spec = NULL;
5007         struct rbd_client *rbdc;
5008         struct ceph_osd_client *osdc;
5009         int rc = -ENOMEM;
5010
5011         if (!try_module_get(THIS_MODULE))
5012                 return -ENODEV;
5013
5014         /* parse add command */
5015         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5016         if (rc < 0)
5017                 goto err_out_module;
5018
5019         rbdc = rbd_get_client(ceph_opts);
5020         if (IS_ERR(rbdc)) {
5021                 rc = PTR_ERR(rbdc);
5022                 goto err_out_args;
5023         }
5024         ceph_opts = NULL;       /* rbd_dev client now owns this */
5025
5026         /* pick the pool */
5027         osdc = &rbdc->client->osdc;
5028         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5029         if (rc < 0)
5030                 goto err_out_client;
5031         spec->pool_id = (u64)rc;
5032
5033         /* The ceph file layout needs to fit pool id in 32 bits */
5034
5035         if (spec->pool_id > (u64)U32_MAX) {
5036                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5037                                 (unsigned long long)spec->pool_id, U32_MAX);
5038                 rc = -EIO;
5039                 goto err_out_client;
5040         }
5041
5042         rbd_dev = rbd_dev_create(rbdc, spec);
5043         if (!rbd_dev)
5044                 goto err_out_client;
5045         rbdc = NULL;            /* rbd_dev now owns this */
5046         spec = NULL;            /* rbd_dev now owns this */
5047
5048         rbd_dev->mapping.read_only = rbd_opts->read_only;
5049         kfree(rbd_opts);
5050         rbd_opts = NULL;        /* done with this */
5051
5052         rc = rbd_dev_image_probe(rbd_dev);
5053         if (rc < 0)
5054                 goto err_out_rbd_dev;
5055
5056         rc = rbd_dev_device_setup(rbd_dev);
5057         if (!rc)
5058                 return count;
5059
5060         rbd_dev_image_release(rbd_dev);
5061 err_out_rbd_dev:
5062         rbd_dev_destroy(rbd_dev);
5063 err_out_client:
5064         rbd_put_client(rbdc);
5065 err_out_args:
5066         if (ceph_opts)
5067                 ceph_destroy_options(ceph_opts);
5068         kfree(rbd_opts);
5069         rbd_spec_put(spec);
5070 err_out_module:
5071         module_put(THIS_MODULE);
5072
5073         dout("Error adding device %s\n", buf);
5074
5075         return (ssize_t)rc;
5076 }
5077
5078 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5079 {
5080         struct list_head *tmp;
5081         struct rbd_device *rbd_dev;
5082
5083         spin_lock(&rbd_dev_list_lock);
5084         list_for_each(tmp, &rbd_dev_list) {
5085                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5086                 if (rbd_dev->dev_id == dev_id) {
5087                         spin_unlock(&rbd_dev_list_lock);
5088                         return rbd_dev;
5089                 }
5090         }
5091         spin_unlock(&rbd_dev_list_lock);
5092         return NULL;
5093 }
5094
5095 static void rbd_dev_device_release(struct device *dev)
5096 {
5097         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5098
5099         rbd_free_disk(rbd_dev);
5100         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5101         rbd_dev_clear_mapping(rbd_dev);
5102         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5103         rbd_dev->major = 0;
5104         rbd_dev_id_put(rbd_dev);
5105         rbd_dev_mapping_clear(rbd_dev);
5106 }
5107
5108 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5109 {
5110         while (rbd_dev->parent) {
5111                 struct rbd_device *first = rbd_dev;
5112                 struct rbd_device *second = first->parent;
5113                 struct rbd_device *third;
5114
5115                 /*
5116                  * Follow to the parent with no grandparent and
5117                  * remove it.
5118                  */
5119                 while (second && (third = second->parent)) {
5120                         first = second;
5121                         second = third;
5122                 }
5123                 rbd_assert(second);
5124                 rbd_dev_image_release(second);
5125                 first->parent = NULL;
5126                 first->parent_overlap = 0;
5127
5128                 rbd_assert(first->parent_spec);
5129                 rbd_spec_put(first->parent_spec);
5130                 first->parent_spec = NULL;
5131         }
5132 }
5133
5134 static ssize_t rbd_remove(struct bus_type *bus,
5135                           const char *buf,
5136                           size_t count)
5137 {
5138         struct rbd_device *rbd_dev = NULL;
5139         int target_id;
5140         unsigned long ul;
5141         int ret;
5142
5143         ret = strict_strtoul(buf, 10, &ul);
5144         if (ret)
5145                 return ret;
5146
5147         /* convert to int; abort if we lost anything in the conversion */
5148         target_id = (int) ul;
5149         if (target_id != ul)
5150                 return -EINVAL;
5151
5152         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5153
5154         rbd_dev = __rbd_get_dev(target_id);
5155         if (!rbd_dev) {
5156                 ret = -ENOENT;
5157                 goto done;
5158         }
5159
5160         spin_lock_irq(&rbd_dev->lock);
5161         if (rbd_dev->open_count)
5162                 ret = -EBUSY;
5163         else
5164                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5165         spin_unlock_irq(&rbd_dev->lock);
5166         if (ret < 0)
5167                 goto done;
5168         ret = count;
5169         rbd_bus_del_dev(rbd_dev);
5170         rbd_dev_image_release(rbd_dev);
5171         module_put(THIS_MODULE);
5172 done:
5173         mutex_unlock(&ctl_mutex);
5174
5175         return ret;
5176 }
5177
5178 /*
5179  * create control files in sysfs
5180  * /sys/bus/rbd/...
5181  */
5182 static int rbd_sysfs_init(void)
5183 {
5184         int ret;
5185
5186         ret = device_register(&rbd_root_dev);
5187         if (ret < 0)
5188                 return ret;
5189
5190         ret = bus_register(&rbd_bus_type);
5191         if (ret < 0)
5192                 device_unregister(&rbd_root_dev);
5193
5194         return ret;
5195 }
5196
5197 static void rbd_sysfs_cleanup(void)
5198 {
5199         bus_unregister(&rbd_bus_type);
5200         device_unregister(&rbd_root_dev);
5201 }
5202
5203 static int __init rbd_init(void)
5204 {
5205         int rc;
5206
5207         if (!libceph_compatible(NULL)) {
5208                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5209
5210                 return -EINVAL;
5211         }
5212         rc = rbd_sysfs_init();
5213         if (rc)
5214                 return rc;
5215         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5216         return 0;
5217 }
5218
5219 static void __exit rbd_exit(void)
5220 {
5221         rbd_sysfs_cleanup();
5222 }
5223
5224 module_init(rbd_init);
5225 module_exit(rbd_exit);
5226
5227 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5228 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5229 MODULE_DESCRIPTION("rados block device");
5230
5231 /* following authorship retained from original osdblk.c */
5232 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5233
5234 MODULE_LICENSE("GPL");