]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: kill off the snapshot list
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
70
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX    64
74
75 #define RBD_OBJ_PREFIX_LEN_MAX  64
76
77 /* Feature bits */
78
79 #define RBD_FEATURE_LAYERING    (1<<0)
80 #define RBD_FEATURE_STRIPINGV2  (1<<1)
81 #define RBD_FEATURES_ALL \
82             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 /*
98  * block device image metadata (in-memory version)
99  */
100 struct rbd_image_header {
101         /* These four fields never change for a given rbd image */
102         char *object_prefix;
103         u64 features;
104         __u8 obj_order;
105         __u8 crypt_type;
106         __u8 comp_type;
107
108         /* The remaining fields need to be updated occasionally */
109         u64 image_size;
110         struct ceph_snap_context *snapc;
111         char *snap_names;
112         u64 *snap_sizes;
113
114         u64 stripe_unit;
115         u64 stripe_count;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         const char      *pool_name;
146
147         const char      *image_id;
148         const char      *image_name;
149
150         u64             snap_id;
151         const char      *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 enum obj_req_flags {
178         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
179         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
180         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
181         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
182 };
183
184 struct rbd_obj_request {
185         const char              *object_name;
186         u64                     offset;         /* object start byte */
187         u64                     length;         /* bytes from offset */
188         unsigned long           flags;
189
190         /*
191          * An object request associated with an image will have its
192          * img_data flag set; a standalone object request will not.
193          *
194          * A standalone object request will have which == BAD_WHICH
195          * and a null obj_request pointer.
196          *
197          * An object request initiated in support of a layered image
198          * object (to check for its existence before a write) will
199          * have which == BAD_WHICH and a non-null obj_request pointer.
200          *
201          * Finally, an object request for rbd image data will have
202          * which != BAD_WHICH, and will have a non-null img_request
203          * pointer.  The value of which will be in the range
204          * 0..(img_request->obj_request_count-1).
205          */
206         union {
207                 struct rbd_obj_request  *obj_request;   /* STAT op */
208                 struct {
209                         struct rbd_img_request  *img_request;
210                         u64                     img_offset;
211                         /* links for img_request->obj_requests list */
212                         struct list_head        links;
213                 };
214         };
215         u32                     which;          /* posn image request list */
216
217         enum obj_request_type   type;
218         union {
219                 struct bio      *bio_list;
220                 struct {
221                         struct page     **pages;
222                         u32             page_count;
223                 };
224         };
225         struct page             **copyup_pages;
226
227         struct ceph_osd_request *osd_req;
228
229         u64                     xferred;        /* bytes transferred */
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_mapping {
278         u64                     size;
279         u64                     features;
280         bool                    read_only;
281 };
282
283 /*
284  * a single device
285  */
286 struct rbd_device {
287         int                     dev_id;         /* blkdev unique id */
288
289         int                     major;          /* blkdev assigned major */
290         struct gendisk          *disk;          /* blkdev's gendisk and rq */
291
292         u32                     image_format;   /* Either 1 or 2 */
293         struct rbd_client       *rbd_client;
294
295         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
296
297         spinlock_t              lock;           /* queue, flags, open_count */
298
299         struct rbd_image_header header;
300         unsigned long           flags;          /* possibly lock protected */
301         struct rbd_spec         *spec;
302
303         char                    *header_name;
304
305         struct ceph_file_layout layout;
306
307         struct ceph_osd_event   *watch_event;
308         struct rbd_obj_request  *watch_request;
309
310         struct rbd_spec         *parent_spec;
311         u64                     parent_overlap;
312         struct rbd_device       *parent;
313
314         /* protects updating the header */
315         struct rw_semaphore     header_rwsem;
316
317         struct rbd_mapping      mapping;
318
319         struct list_head        node;
320
321         /* sysfs related */
322         struct device           dev;
323         unsigned long           open_count;     /* protected by lock */
324 };
325
326 /*
327  * Flag bits for rbd_dev->flags.  If atomicity is required,
328  * rbd_dev->lock is used to protect access.
329  *
330  * Currently, only the "removing" flag (which is coupled with the
331  * "open_count" field) requires atomic access.
332  */
333 enum rbd_dev_flags {
334         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
335         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
336 };
337
338 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
339
340 static LIST_HEAD(rbd_dev_list);    /* devices */
341 static DEFINE_SPINLOCK(rbd_dev_list_lock);
342
343 static LIST_HEAD(rbd_client_list);              /* clients */
344 static DEFINE_SPINLOCK(rbd_client_list_lock);
345
346 static int rbd_img_request_submit(struct rbd_img_request *img_request);
347
348 static void rbd_dev_device_release(struct device *dev);
349
350 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
351                        size_t count);
352 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
353                           size_t count);
354 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
355
356 static struct bus_attribute rbd_bus_attrs[] = {
357         __ATTR(add, S_IWUSR, NULL, rbd_add),
358         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
359         __ATTR_NULL
360 };
361
362 static struct bus_type rbd_bus_type = {
363         .name           = "rbd",
364         .bus_attrs      = rbd_bus_attrs,
365 };
366
367 static void rbd_root_dev_release(struct device *dev)
368 {
369 }
370
371 static struct device rbd_root_dev = {
372         .init_name =    "rbd",
373         .release =      rbd_root_dev_release,
374 };
375
376 static __printf(2, 3)
377 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
378 {
379         struct va_format vaf;
380         va_list args;
381
382         va_start(args, fmt);
383         vaf.fmt = fmt;
384         vaf.va = &args;
385
386         if (!rbd_dev)
387                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
388         else if (rbd_dev->disk)
389                 printk(KERN_WARNING "%s: %s: %pV\n",
390                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
391         else if (rbd_dev->spec && rbd_dev->spec->image_name)
392                 printk(KERN_WARNING "%s: image %s: %pV\n",
393                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
394         else if (rbd_dev->spec && rbd_dev->spec->image_id)
395                 printk(KERN_WARNING "%s: id %s: %pV\n",
396                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
397         else    /* punt */
398                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
399                         RBD_DRV_NAME, rbd_dev, &vaf);
400         va_end(args);
401 }
402
403 #ifdef RBD_DEBUG
404 #define rbd_assert(expr)                                                \
405                 if (unlikely(!(expr))) {                                \
406                         printk(KERN_ERR "\nAssertion failure in %s() "  \
407                                                 "at line %d:\n\n"       \
408                                         "\trbd_assert(%s);\n\n",        \
409                                         __func__, __LINE__, #expr);     \
410                         BUG();                                          \
411                 }
412 #else /* !RBD_DEBUG */
413 #  define rbd_assert(expr)      ((void) 0)
414 #endif /* !RBD_DEBUG */
415
416 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
417 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
418 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
419
420 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
421 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
422 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
423                                         u64 snap_id);
424 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
425                                 u8 *order, u64 *snap_size);
426 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
427                 u64 *snap_features);
428 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
429
430 static int rbd_open(struct block_device *bdev, fmode_t mode)
431 {
432         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
433         bool removing = false;
434
435         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
436                 return -EROFS;
437
438         spin_lock_irq(&rbd_dev->lock);
439         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
440                 removing = true;
441         else
442                 rbd_dev->open_count++;
443         spin_unlock_irq(&rbd_dev->lock);
444         if (removing)
445                 return -ENOENT;
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448         (void) get_device(&rbd_dev->dev);
449         set_device_ro(bdev, rbd_dev->mapping.read_only);
450         mutex_unlock(&ctl_mutex);
451
452         return 0;
453 }
454
455 static int rbd_release(struct gendisk *disk, fmode_t mode)
456 {
457         struct rbd_device *rbd_dev = disk->private_data;
458         unsigned long open_count_before;
459
460         spin_lock_irq(&rbd_dev->lock);
461         open_count_before = rbd_dev->open_count--;
462         spin_unlock_irq(&rbd_dev->lock);
463         rbd_assert(open_count_before > 0);
464
465         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
466         put_device(&rbd_dev->dev);
467         mutex_unlock(&ctl_mutex);
468
469         return 0;
470 }
471
472 static const struct block_device_operations rbd_bd_ops = {
473         .owner                  = THIS_MODULE,
474         .open                   = rbd_open,
475         .release                = rbd_release,
476 };
477
478 /*
479  * Initialize an rbd client instance.
480  * We own *ceph_opts.
481  */
482 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
483 {
484         struct rbd_client *rbdc;
485         int ret = -ENOMEM;
486
487         dout("%s:\n", __func__);
488         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
489         if (!rbdc)
490                 goto out_opt;
491
492         kref_init(&rbdc->kref);
493         INIT_LIST_HEAD(&rbdc->node);
494
495         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
496
497         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
498         if (IS_ERR(rbdc->client))
499                 goto out_mutex;
500         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
501
502         ret = ceph_open_session(rbdc->client);
503         if (ret < 0)
504                 goto out_err;
505
506         spin_lock(&rbd_client_list_lock);
507         list_add_tail(&rbdc->node, &rbd_client_list);
508         spin_unlock(&rbd_client_list_lock);
509
510         mutex_unlock(&ctl_mutex);
511         dout("%s: rbdc %p\n", __func__, rbdc);
512
513         return rbdc;
514
515 out_err:
516         ceph_destroy_client(rbdc->client);
517 out_mutex:
518         mutex_unlock(&ctl_mutex);
519         kfree(rbdc);
520 out_opt:
521         if (ceph_opts)
522                 ceph_destroy_options(ceph_opts);
523         dout("%s: error %d\n", __func__, ret);
524
525         return ERR_PTR(ret);
526 }
527
528 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
529 {
530         kref_get(&rbdc->kref);
531
532         return rbdc;
533 }
534
535 /*
536  * Find a ceph client with specific addr and configuration.  If
537  * found, bump its reference count.
538  */
539 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
540 {
541         struct rbd_client *client_node;
542         bool found = false;
543
544         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
545                 return NULL;
546
547         spin_lock(&rbd_client_list_lock);
548         list_for_each_entry(client_node, &rbd_client_list, node) {
549                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
550                         __rbd_get_client(client_node);
551
552                         found = true;
553                         break;
554                 }
555         }
556         spin_unlock(&rbd_client_list_lock);
557
558         return found ? client_node : NULL;
559 }
560
561 /*
562  * mount options
563  */
564 enum {
565         Opt_last_int,
566         /* int args above */
567         Opt_last_string,
568         /* string args above */
569         Opt_read_only,
570         Opt_read_write,
571         /* Boolean args above */
572         Opt_last_bool,
573 };
574
575 static match_table_t rbd_opts_tokens = {
576         /* int args above */
577         /* string args above */
578         {Opt_read_only, "read_only"},
579         {Opt_read_only, "ro"},          /* Alternate spelling */
580         {Opt_read_write, "read_write"},
581         {Opt_read_write, "rw"},         /* Alternate spelling */
582         /* Boolean args above */
583         {-1, NULL}
584 };
585
586 struct rbd_options {
587         bool    read_only;
588 };
589
590 #define RBD_READ_ONLY_DEFAULT   false
591
592 static int parse_rbd_opts_token(char *c, void *private)
593 {
594         struct rbd_options *rbd_opts = private;
595         substring_t argstr[MAX_OPT_ARGS];
596         int token, intval, ret;
597
598         token = match_token(c, rbd_opts_tokens, argstr);
599         if (token < 0)
600                 return -EINVAL;
601
602         if (token < Opt_last_int) {
603                 ret = match_int(&argstr[0], &intval);
604                 if (ret < 0) {
605                         pr_err("bad mount option arg (not int) "
606                                "at '%s'\n", c);
607                         return ret;
608                 }
609                 dout("got int token %d val %d\n", token, intval);
610         } else if (token > Opt_last_int && token < Opt_last_string) {
611                 dout("got string token %d val %s\n", token,
612                      argstr[0].from);
613         } else if (token > Opt_last_string && token < Opt_last_bool) {
614                 dout("got Boolean token %d\n", token);
615         } else {
616                 dout("got token %d\n", token);
617         }
618
619         switch (token) {
620         case Opt_read_only:
621                 rbd_opts->read_only = true;
622                 break;
623         case Opt_read_write:
624                 rbd_opts->read_only = false;
625                 break;
626         default:
627                 rbd_assert(false);
628                 break;
629         }
630         return 0;
631 }
632
633 /*
634  * Get a ceph client with specific addr and configuration, if one does
635  * not exist create it.
636  */
637 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
638 {
639         struct rbd_client *rbdc;
640
641         rbdc = rbd_client_find(ceph_opts);
642         if (rbdc)       /* using an existing client */
643                 ceph_destroy_options(ceph_opts);
644         else
645                 rbdc = rbd_client_create(ceph_opts);
646
647         return rbdc;
648 }
649
650 /*
651  * Destroy ceph client
652  *
653  * Caller must hold rbd_client_list_lock.
654  */
655 static void rbd_client_release(struct kref *kref)
656 {
657         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
658
659         dout("%s: rbdc %p\n", __func__, rbdc);
660         spin_lock(&rbd_client_list_lock);
661         list_del(&rbdc->node);
662         spin_unlock(&rbd_client_list_lock);
663
664         ceph_destroy_client(rbdc->client);
665         kfree(rbdc);
666 }
667
668 /*
669  * Drop reference to ceph client node. If it's not referenced anymore, release
670  * it.
671  */
672 static void rbd_put_client(struct rbd_client *rbdc)
673 {
674         if (rbdc)
675                 kref_put(&rbdc->kref, rbd_client_release);
676 }
677
678 static bool rbd_image_format_valid(u32 image_format)
679 {
680         return image_format == 1 || image_format == 2;
681 }
682
683 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
684 {
685         size_t size;
686         u32 snap_count;
687
688         /* The header has to start with the magic rbd header text */
689         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
690                 return false;
691
692         /* The bio layer requires at least sector-sized I/O */
693
694         if (ondisk->options.order < SECTOR_SHIFT)
695                 return false;
696
697         /* If we use u64 in a few spots we may be able to loosen this */
698
699         if (ondisk->options.order > 8 * sizeof (int) - 1)
700                 return false;
701
702         /*
703          * The size of a snapshot header has to fit in a size_t, and
704          * that limits the number of snapshots.
705          */
706         snap_count = le32_to_cpu(ondisk->snap_count);
707         size = SIZE_MAX - sizeof (struct ceph_snap_context);
708         if (snap_count > size / sizeof (__le64))
709                 return false;
710
711         /*
712          * Not only that, but the size of the entire the snapshot
713          * header must also be representable in a size_t.
714          */
715         size -= snap_count * sizeof (__le64);
716         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
717                 return false;
718
719         return true;
720 }
721
722 /*
723  * Create a new header structure, translate header format from the on-disk
724  * header.
725  */
726 static int rbd_header_from_disk(struct rbd_image_header *header,
727                                  struct rbd_image_header_ondisk *ondisk)
728 {
729         u32 snap_count;
730         size_t len;
731         size_t size;
732         u32 i;
733
734         memset(header, 0, sizeof (*header));
735
736         snap_count = le32_to_cpu(ondisk->snap_count);
737
738         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
739         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
740         if (!header->object_prefix)
741                 return -ENOMEM;
742         memcpy(header->object_prefix, ondisk->object_prefix, len);
743         header->object_prefix[len] = '\0';
744
745         if (snap_count) {
746                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
747
748                 /* Save a copy of the snapshot names */
749
750                 if (snap_names_len > (u64) SIZE_MAX)
751                         return -EIO;
752                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
753                 if (!header->snap_names)
754                         goto out_err;
755                 /*
756                  * Note that rbd_dev_v1_header_read() guarantees
757                  * the ondisk buffer we're working with has
758                  * snap_names_len bytes beyond the end of the
759                  * snapshot id array, this memcpy() is safe.
760                  */
761                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
762                         snap_names_len);
763
764                 /* Record each snapshot's size */
765
766                 size = snap_count * sizeof (*header->snap_sizes);
767                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
768                 if (!header->snap_sizes)
769                         goto out_err;
770                 for (i = 0; i < snap_count; i++)
771                         header->snap_sizes[i] =
772                                 le64_to_cpu(ondisk->snaps[i].image_size);
773         } else {
774                 header->snap_names = NULL;
775                 header->snap_sizes = NULL;
776         }
777
778         header->features = 0;   /* No features support in v1 images */
779         header->obj_order = ondisk->options.order;
780         header->crypt_type = ondisk->options.crypt_type;
781         header->comp_type = ondisk->options.comp_type;
782
783         /* Allocate and fill in the snapshot context */
784
785         header->image_size = le64_to_cpu(ondisk->image_size);
786
787         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
788         if (!header->snapc)
789                 goto out_err;
790         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
791         for (i = 0; i < snap_count; i++)
792                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
793
794         return 0;
795
796 out_err:
797         kfree(header->snap_sizes);
798         header->snap_sizes = NULL;
799         kfree(header->snap_names);
800         header->snap_names = NULL;
801         kfree(header->object_prefix);
802         header->object_prefix = NULL;
803
804         return -ENOMEM;
805 }
806
807 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
808 {
809         const char *snap_name;
810
811         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
812
813         /* Skip over names until we find the one we are looking for */
814
815         snap_name = rbd_dev->header.snap_names;
816         while (which--)
817                 snap_name += strlen(snap_name) + 1;
818
819         return kstrdup(snap_name, GFP_KERNEL);
820 }
821
822 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
823 {
824         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
825         u32 which;
826
827         for (which = 0; which < snapc->num_snaps; which++)
828                 if (snapc->snaps[which] == snap_id)
829                         return which;
830
831         return BAD_SNAP_INDEX;
832 }
833
834 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
835                                         u64 snap_id)
836 {
837         u32 which;
838
839         which = rbd_dev_snap_index(rbd_dev, snap_id);
840         if (which == BAD_SNAP_INDEX)
841                 return NULL;
842
843         return _rbd_dev_v1_snap_name(rbd_dev, which);
844 }
845
846 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
847 {
848         if (snap_id == CEPH_NOSNAP)
849                 return RBD_SNAP_HEAD_NAME;
850
851         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
852         if (rbd_dev->image_format == 1)
853                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
854
855         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
856 }
857
858 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
859                                 u64 *snap_size)
860 {
861         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
862         if (snap_id == CEPH_NOSNAP) {
863                 *snap_size = rbd_dev->header.image_size;
864         } else if (rbd_dev->image_format == 1) {
865                 u32 which;
866
867                 which = rbd_dev_snap_index(rbd_dev, snap_id);
868                 if (which == BAD_SNAP_INDEX)
869                         return -ENOENT;
870
871                 *snap_size = rbd_dev->header.snap_sizes[which];
872         } else {
873                 u64 size = 0;
874                 int ret;
875
876                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
877                 if (ret)
878                         return ret;
879
880                 *snap_size = size;
881         }
882         return 0;
883 }
884
885 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
886                         u64 *snap_features)
887 {
888         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
889         if (snap_id == CEPH_NOSNAP) {
890                 *snap_features = rbd_dev->header.features;
891         } else if (rbd_dev->image_format == 1) {
892                 *snap_features = 0;     /* No features for format 1 */
893         } else {
894                 u64 features = 0;
895                 int ret;
896
897                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
898                 if (ret)
899                         return ret;
900
901                 *snap_features = features;
902         }
903         return 0;
904 }
905
906 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
907 {
908         const char *snap_name = rbd_dev->spec->snap_name;
909         u64 snap_id;
910         u64 size = 0;
911         u64 features = 0;
912         int ret;
913
914         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
915                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
916                 if (snap_id == CEPH_NOSNAP)
917                         return -ENOENT;
918         } else {
919                 snap_id = CEPH_NOSNAP;
920         }
921
922         ret = rbd_snap_size(rbd_dev, snap_id, &size);
923         if (ret)
924                 return ret;
925         ret = rbd_snap_features(rbd_dev, snap_id, &features);
926         if (ret)
927                 return ret;
928
929         rbd_dev->mapping.size = size;
930         rbd_dev->mapping.features = features;
931
932         /* If we are mapping a snapshot it must be marked read-only */
933
934         if (snap_id != CEPH_NOSNAP)
935                 rbd_dev->mapping.read_only = true;
936
937         return 0;
938 }
939
940 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
941 {
942         rbd_dev->mapping.size = 0;
943         rbd_dev->mapping.features = 0;
944         rbd_dev->mapping.read_only = true;
945 }
946
947 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
948 {
949         rbd_dev->mapping.size = 0;
950         rbd_dev->mapping.features = 0;
951         rbd_dev->mapping.read_only = true;
952 }
953
954 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
955 {
956         char *name;
957         u64 segment;
958         int ret;
959
960         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
961         if (!name)
962                 return NULL;
963         segment = offset >> rbd_dev->header.obj_order;
964         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
965                         rbd_dev->header.object_prefix, segment);
966         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
967                 pr_err("error formatting segment name for #%llu (%d)\n",
968                         segment, ret);
969                 kfree(name);
970                 name = NULL;
971         }
972
973         return name;
974 }
975
976 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
977 {
978         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
979
980         return offset & (segment_size - 1);
981 }
982
983 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
984                                 u64 offset, u64 length)
985 {
986         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
987
988         offset &= segment_size - 1;
989
990         rbd_assert(length <= U64_MAX - offset);
991         if (offset + length > segment_size)
992                 length = segment_size - offset;
993
994         return length;
995 }
996
997 /*
998  * returns the size of an object in the image
999  */
1000 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1001 {
1002         return 1 << header->obj_order;
1003 }
1004
1005 /*
1006  * bio helpers
1007  */
1008
1009 static void bio_chain_put(struct bio *chain)
1010 {
1011         struct bio *tmp;
1012
1013         while (chain) {
1014                 tmp = chain;
1015                 chain = chain->bi_next;
1016                 bio_put(tmp);
1017         }
1018 }
1019
1020 /*
1021  * zeros a bio chain, starting at specific offset
1022  */
1023 static void zero_bio_chain(struct bio *chain, int start_ofs)
1024 {
1025         struct bio_vec *bv;
1026         unsigned long flags;
1027         void *buf;
1028         int i;
1029         int pos = 0;
1030
1031         while (chain) {
1032                 bio_for_each_segment(bv, chain, i) {
1033                         if (pos + bv->bv_len > start_ofs) {
1034                                 int remainder = max(start_ofs - pos, 0);
1035                                 buf = bvec_kmap_irq(bv, &flags);
1036                                 memset(buf + remainder, 0,
1037                                        bv->bv_len - remainder);
1038                                 bvec_kunmap_irq(buf, &flags);
1039                         }
1040                         pos += bv->bv_len;
1041                 }
1042
1043                 chain = chain->bi_next;
1044         }
1045 }
1046
1047 /*
1048  * similar to zero_bio_chain(), zeros data defined by a page array,
1049  * starting at the given byte offset from the start of the array and
1050  * continuing up to the given end offset.  The pages array is
1051  * assumed to be big enough to hold all bytes up to the end.
1052  */
1053 static void zero_pages(struct page **pages, u64 offset, u64 end)
1054 {
1055         struct page **page = &pages[offset >> PAGE_SHIFT];
1056
1057         rbd_assert(end > offset);
1058         rbd_assert(end - offset <= (u64)SIZE_MAX);
1059         while (offset < end) {
1060                 size_t page_offset;
1061                 size_t length;
1062                 unsigned long flags;
1063                 void *kaddr;
1064
1065                 page_offset = (size_t)(offset & ~PAGE_MASK);
1066                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1067                 local_irq_save(flags);
1068                 kaddr = kmap_atomic(*page);
1069                 memset(kaddr + page_offset, 0, length);
1070                 kunmap_atomic(kaddr);
1071                 local_irq_restore(flags);
1072
1073                 offset += length;
1074                 page++;
1075         }
1076 }
1077
1078 /*
1079  * Clone a portion of a bio, starting at the given byte offset
1080  * and continuing for the number of bytes indicated.
1081  */
1082 static struct bio *bio_clone_range(struct bio *bio_src,
1083                                         unsigned int offset,
1084                                         unsigned int len,
1085                                         gfp_t gfpmask)
1086 {
1087         struct bio_vec *bv;
1088         unsigned int resid;
1089         unsigned short idx;
1090         unsigned int voff;
1091         unsigned short end_idx;
1092         unsigned short vcnt;
1093         struct bio *bio;
1094
1095         /* Handle the easy case for the caller */
1096
1097         if (!offset && len == bio_src->bi_size)
1098                 return bio_clone(bio_src, gfpmask);
1099
1100         if (WARN_ON_ONCE(!len))
1101                 return NULL;
1102         if (WARN_ON_ONCE(len > bio_src->bi_size))
1103                 return NULL;
1104         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1105                 return NULL;
1106
1107         /* Find first affected segment... */
1108
1109         resid = offset;
1110         __bio_for_each_segment(bv, bio_src, idx, 0) {
1111                 if (resid < bv->bv_len)
1112                         break;
1113                 resid -= bv->bv_len;
1114         }
1115         voff = resid;
1116
1117         /* ...and the last affected segment */
1118
1119         resid += len;
1120         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1121                 if (resid <= bv->bv_len)
1122                         break;
1123                 resid -= bv->bv_len;
1124         }
1125         vcnt = end_idx - idx + 1;
1126
1127         /* Build the clone */
1128
1129         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1130         if (!bio)
1131                 return NULL;    /* ENOMEM */
1132
1133         bio->bi_bdev = bio_src->bi_bdev;
1134         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1135         bio->bi_rw = bio_src->bi_rw;
1136         bio->bi_flags |= 1 << BIO_CLONED;
1137
1138         /*
1139          * Copy over our part of the bio_vec, then update the first
1140          * and last (or only) entries.
1141          */
1142         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1143                         vcnt * sizeof (struct bio_vec));
1144         bio->bi_io_vec[0].bv_offset += voff;
1145         if (vcnt > 1) {
1146                 bio->bi_io_vec[0].bv_len -= voff;
1147                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1148         } else {
1149                 bio->bi_io_vec[0].bv_len = len;
1150         }
1151
1152         bio->bi_vcnt = vcnt;
1153         bio->bi_size = len;
1154         bio->bi_idx = 0;
1155
1156         return bio;
1157 }
1158
1159 /*
1160  * Clone a portion of a bio chain, starting at the given byte offset
1161  * into the first bio in the source chain and continuing for the
1162  * number of bytes indicated.  The result is another bio chain of
1163  * exactly the given length, or a null pointer on error.
1164  *
1165  * The bio_src and offset parameters are both in-out.  On entry they
1166  * refer to the first source bio and the offset into that bio where
1167  * the start of data to be cloned is located.
1168  *
1169  * On return, bio_src is updated to refer to the bio in the source
1170  * chain that contains first un-cloned byte, and *offset will
1171  * contain the offset of that byte within that bio.
1172  */
1173 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1174                                         unsigned int *offset,
1175                                         unsigned int len,
1176                                         gfp_t gfpmask)
1177 {
1178         struct bio *bi = *bio_src;
1179         unsigned int off = *offset;
1180         struct bio *chain = NULL;
1181         struct bio **end;
1182
1183         /* Build up a chain of clone bios up to the limit */
1184
1185         if (!bi || off >= bi->bi_size || !len)
1186                 return NULL;            /* Nothing to clone */
1187
1188         end = &chain;
1189         while (len) {
1190                 unsigned int bi_size;
1191                 struct bio *bio;
1192
1193                 if (!bi) {
1194                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1195                         goto out_err;   /* EINVAL; ran out of bio's */
1196                 }
1197                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1198                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1199                 if (!bio)
1200                         goto out_err;   /* ENOMEM */
1201
1202                 *end = bio;
1203                 end = &bio->bi_next;
1204
1205                 off += bi_size;
1206                 if (off == bi->bi_size) {
1207                         bi = bi->bi_next;
1208                         off = 0;
1209                 }
1210                 len -= bi_size;
1211         }
1212         *bio_src = bi;
1213         *offset = off;
1214
1215         return chain;
1216 out_err:
1217         bio_chain_put(chain);
1218
1219         return NULL;
1220 }
1221
1222 /*
1223  * The default/initial value for all object request flags is 0.  For
1224  * each flag, once its value is set to 1 it is never reset to 0
1225  * again.
1226  */
1227 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1228 {
1229         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1230                 struct rbd_device *rbd_dev;
1231
1232                 rbd_dev = obj_request->img_request->rbd_dev;
1233                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1234                         obj_request);
1235         }
1236 }
1237
1238 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1239 {
1240         smp_mb();
1241         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1242 }
1243
1244 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1245 {
1246         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1247                 struct rbd_device *rbd_dev = NULL;
1248
1249                 if (obj_request_img_data_test(obj_request))
1250                         rbd_dev = obj_request->img_request->rbd_dev;
1251                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1252                         obj_request);
1253         }
1254 }
1255
1256 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1257 {
1258         smp_mb();
1259         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1260 }
1261
1262 /*
1263  * This sets the KNOWN flag after (possibly) setting the EXISTS
1264  * flag.  The latter is set based on the "exists" value provided.
1265  *
1266  * Note that for our purposes once an object exists it never goes
1267  * away again.  It's possible that the response from two existence
1268  * checks are separated by the creation of the target object, and
1269  * the first ("doesn't exist") response arrives *after* the second
1270  * ("does exist").  In that case we ignore the second one.
1271  */
1272 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1273                                 bool exists)
1274 {
1275         if (exists)
1276                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1277         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1278         smp_mb();
1279 }
1280
1281 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1282 {
1283         smp_mb();
1284         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1285 }
1286
1287 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1288 {
1289         smp_mb();
1290         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1291 }
1292
1293 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1294 {
1295         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1296                 atomic_read(&obj_request->kref.refcount));
1297         kref_get(&obj_request->kref);
1298 }
1299
1300 static void rbd_obj_request_destroy(struct kref *kref);
1301 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1302 {
1303         rbd_assert(obj_request != NULL);
1304         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1305                 atomic_read(&obj_request->kref.refcount));
1306         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1307 }
1308
1309 static void rbd_img_request_get(struct rbd_img_request *img_request)
1310 {
1311         dout("%s: img %p (was %d)\n", __func__, img_request,
1312                 atomic_read(&img_request->kref.refcount));
1313         kref_get(&img_request->kref);
1314 }
1315
1316 static void rbd_img_request_destroy(struct kref *kref);
1317 static void rbd_img_request_put(struct rbd_img_request *img_request)
1318 {
1319         rbd_assert(img_request != NULL);
1320         dout("%s: img %p (was %d)\n", __func__, img_request,
1321                 atomic_read(&img_request->kref.refcount));
1322         kref_put(&img_request->kref, rbd_img_request_destroy);
1323 }
1324
1325 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1326                                         struct rbd_obj_request *obj_request)
1327 {
1328         rbd_assert(obj_request->img_request == NULL);
1329
1330         /* Image request now owns object's original reference */
1331         obj_request->img_request = img_request;
1332         obj_request->which = img_request->obj_request_count;
1333         rbd_assert(!obj_request_img_data_test(obj_request));
1334         obj_request_img_data_set(obj_request);
1335         rbd_assert(obj_request->which != BAD_WHICH);
1336         img_request->obj_request_count++;
1337         list_add_tail(&obj_request->links, &img_request->obj_requests);
1338         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1339                 obj_request->which);
1340 }
1341
1342 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1343                                         struct rbd_obj_request *obj_request)
1344 {
1345         rbd_assert(obj_request->which != BAD_WHICH);
1346
1347         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1348                 obj_request->which);
1349         list_del(&obj_request->links);
1350         rbd_assert(img_request->obj_request_count > 0);
1351         img_request->obj_request_count--;
1352         rbd_assert(obj_request->which == img_request->obj_request_count);
1353         obj_request->which = BAD_WHICH;
1354         rbd_assert(obj_request_img_data_test(obj_request));
1355         rbd_assert(obj_request->img_request == img_request);
1356         obj_request->img_request = NULL;
1357         obj_request->callback = NULL;
1358         rbd_obj_request_put(obj_request);
1359 }
1360
1361 static bool obj_request_type_valid(enum obj_request_type type)
1362 {
1363         switch (type) {
1364         case OBJ_REQUEST_NODATA:
1365         case OBJ_REQUEST_BIO:
1366         case OBJ_REQUEST_PAGES:
1367                 return true;
1368         default:
1369                 return false;
1370         }
1371 }
1372
1373 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1374                                 struct rbd_obj_request *obj_request)
1375 {
1376         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1377
1378         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1379 }
1380
1381 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1382 {
1383
1384         dout("%s: img %p\n", __func__, img_request);
1385
1386         /*
1387          * If no error occurred, compute the aggregate transfer
1388          * count for the image request.  We could instead use
1389          * atomic64_cmpxchg() to update it as each object request
1390          * completes; not clear which way is better off hand.
1391          */
1392         if (!img_request->result) {
1393                 struct rbd_obj_request *obj_request;
1394                 u64 xferred = 0;
1395
1396                 for_each_obj_request(img_request, obj_request)
1397                         xferred += obj_request->xferred;
1398                 img_request->xferred = xferred;
1399         }
1400
1401         if (img_request->callback)
1402                 img_request->callback(img_request);
1403         else
1404                 rbd_img_request_put(img_request);
1405 }
1406
1407 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1408
1409 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1410 {
1411         dout("%s: obj %p\n", __func__, obj_request);
1412
1413         return wait_for_completion_interruptible(&obj_request->completion);
1414 }
1415
1416 /*
1417  * The default/initial value for all image request flags is 0.  Each
1418  * is conditionally set to 1 at image request initialization time
1419  * and currently never change thereafter.
1420  */
1421 static void img_request_write_set(struct rbd_img_request *img_request)
1422 {
1423         set_bit(IMG_REQ_WRITE, &img_request->flags);
1424         smp_mb();
1425 }
1426
1427 static bool img_request_write_test(struct rbd_img_request *img_request)
1428 {
1429         smp_mb();
1430         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1431 }
1432
1433 static void img_request_child_set(struct rbd_img_request *img_request)
1434 {
1435         set_bit(IMG_REQ_CHILD, &img_request->flags);
1436         smp_mb();
1437 }
1438
1439 static bool img_request_child_test(struct rbd_img_request *img_request)
1440 {
1441         smp_mb();
1442         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1443 }
1444
1445 static void img_request_layered_set(struct rbd_img_request *img_request)
1446 {
1447         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1448         smp_mb();
1449 }
1450
1451 static bool img_request_layered_test(struct rbd_img_request *img_request)
1452 {
1453         smp_mb();
1454         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1455 }
1456
1457 static void
1458 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1459 {
1460         u64 xferred = obj_request->xferred;
1461         u64 length = obj_request->length;
1462
1463         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1464                 obj_request, obj_request->img_request, obj_request->result,
1465                 xferred, length);
1466         /*
1467          * ENOENT means a hole in the image.  We zero-fill the
1468          * entire length of the request.  A short read also implies
1469          * zero-fill to the end of the request.  Either way we
1470          * update the xferred count to indicate the whole request
1471          * was satisfied.
1472          */
1473         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1474         if (obj_request->result == -ENOENT) {
1475                 if (obj_request->type == OBJ_REQUEST_BIO)
1476                         zero_bio_chain(obj_request->bio_list, 0);
1477                 else
1478                         zero_pages(obj_request->pages, 0, length);
1479                 obj_request->result = 0;
1480                 obj_request->xferred = length;
1481         } else if (xferred < length && !obj_request->result) {
1482                 if (obj_request->type == OBJ_REQUEST_BIO)
1483                         zero_bio_chain(obj_request->bio_list, xferred);
1484                 else
1485                         zero_pages(obj_request->pages, xferred, length);
1486                 obj_request->xferred = length;
1487         }
1488         obj_request_done_set(obj_request);
1489 }
1490
1491 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1492 {
1493         dout("%s: obj %p cb %p\n", __func__, obj_request,
1494                 obj_request->callback);
1495         if (obj_request->callback)
1496                 obj_request->callback(obj_request);
1497         else
1498                 complete_all(&obj_request->completion);
1499 }
1500
1501 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1502 {
1503         dout("%s: obj %p\n", __func__, obj_request);
1504         obj_request_done_set(obj_request);
1505 }
1506
1507 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1508 {
1509         struct rbd_img_request *img_request = NULL;
1510         struct rbd_device *rbd_dev = NULL;
1511         bool layered = false;
1512
1513         if (obj_request_img_data_test(obj_request)) {
1514                 img_request = obj_request->img_request;
1515                 layered = img_request && img_request_layered_test(img_request);
1516                 rbd_dev = img_request->rbd_dev;
1517         }
1518
1519         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1520                 obj_request, img_request, obj_request->result,
1521                 obj_request->xferred, obj_request->length);
1522         if (layered && obj_request->result == -ENOENT &&
1523                         obj_request->img_offset < rbd_dev->parent_overlap)
1524                 rbd_img_parent_read(obj_request);
1525         else if (img_request)
1526                 rbd_img_obj_request_read_callback(obj_request);
1527         else
1528                 obj_request_done_set(obj_request);
1529 }
1530
1531 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1532 {
1533         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1534                 obj_request->result, obj_request->length);
1535         /*
1536          * There is no such thing as a successful short write.  Set
1537          * it to our originally-requested length.
1538          */
1539         obj_request->xferred = obj_request->length;
1540         obj_request_done_set(obj_request);
1541 }
1542
1543 /*
1544  * For a simple stat call there's nothing to do.  We'll do more if
1545  * this is part of a write sequence for a layered image.
1546  */
1547 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1554                                 struct ceph_msg *msg)
1555 {
1556         struct rbd_obj_request *obj_request = osd_req->r_priv;
1557         u16 opcode;
1558
1559         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1560         rbd_assert(osd_req == obj_request->osd_req);
1561         if (obj_request_img_data_test(obj_request)) {
1562                 rbd_assert(obj_request->img_request);
1563                 rbd_assert(obj_request->which != BAD_WHICH);
1564         } else {
1565                 rbd_assert(obj_request->which == BAD_WHICH);
1566         }
1567
1568         if (osd_req->r_result < 0)
1569                 obj_request->result = osd_req->r_result;
1570
1571         BUG_ON(osd_req->r_num_ops > 2);
1572
1573         /*
1574          * We support a 64-bit length, but ultimately it has to be
1575          * passed to blk_end_request(), which takes an unsigned int.
1576          */
1577         obj_request->xferred = osd_req->r_reply_op_len[0];
1578         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1579         opcode = osd_req->r_ops[0].op;
1580         switch (opcode) {
1581         case CEPH_OSD_OP_READ:
1582                 rbd_osd_read_callback(obj_request);
1583                 break;
1584         case CEPH_OSD_OP_WRITE:
1585                 rbd_osd_write_callback(obj_request);
1586                 break;
1587         case CEPH_OSD_OP_STAT:
1588                 rbd_osd_stat_callback(obj_request);
1589                 break;
1590         case CEPH_OSD_OP_CALL:
1591         case CEPH_OSD_OP_NOTIFY_ACK:
1592         case CEPH_OSD_OP_WATCH:
1593                 rbd_osd_trivial_callback(obj_request);
1594                 break;
1595         default:
1596                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1597                         obj_request->object_name, (unsigned short) opcode);
1598                 break;
1599         }
1600
1601         if (obj_request_done_test(obj_request))
1602                 rbd_obj_request_complete(obj_request);
1603 }
1604
1605 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1606 {
1607         struct rbd_img_request *img_request = obj_request->img_request;
1608         struct ceph_osd_request *osd_req = obj_request->osd_req;
1609         u64 snap_id;
1610
1611         rbd_assert(osd_req != NULL);
1612
1613         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1614         ceph_osdc_build_request(osd_req, obj_request->offset,
1615                         NULL, snap_id, NULL);
1616 }
1617
1618 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1619 {
1620         struct rbd_img_request *img_request = obj_request->img_request;
1621         struct ceph_osd_request *osd_req = obj_request->osd_req;
1622         struct ceph_snap_context *snapc;
1623         struct timespec mtime = CURRENT_TIME;
1624
1625         rbd_assert(osd_req != NULL);
1626
1627         snapc = img_request ? img_request->snapc : NULL;
1628         ceph_osdc_build_request(osd_req, obj_request->offset,
1629                         snapc, CEPH_NOSNAP, &mtime);
1630 }
1631
1632 static struct ceph_osd_request *rbd_osd_req_create(
1633                                         struct rbd_device *rbd_dev,
1634                                         bool write_request,
1635                                         struct rbd_obj_request *obj_request)
1636 {
1637         struct ceph_snap_context *snapc = NULL;
1638         struct ceph_osd_client *osdc;
1639         struct ceph_osd_request *osd_req;
1640
1641         if (obj_request_img_data_test(obj_request)) {
1642                 struct rbd_img_request *img_request = obj_request->img_request;
1643
1644                 rbd_assert(write_request ==
1645                                 img_request_write_test(img_request));
1646                 if (write_request)
1647                         snapc = img_request->snapc;
1648         }
1649
1650         /* Allocate and initialize the request, for the single op */
1651
1652         osdc = &rbd_dev->rbd_client->client->osdc;
1653         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1654         if (!osd_req)
1655                 return NULL;    /* ENOMEM */
1656
1657         if (write_request)
1658                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1659         else
1660                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1661
1662         osd_req->r_callback = rbd_osd_req_callback;
1663         osd_req->r_priv = obj_request;
1664
1665         osd_req->r_oid_len = strlen(obj_request->object_name);
1666         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1667         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1668
1669         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1670
1671         return osd_req;
1672 }
1673
1674 /*
1675  * Create a copyup osd request based on the information in the
1676  * object request supplied.  A copyup request has two osd ops,
1677  * a copyup method call, and a "normal" write request.
1678  */
1679 static struct ceph_osd_request *
1680 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1681 {
1682         struct rbd_img_request *img_request;
1683         struct ceph_snap_context *snapc;
1684         struct rbd_device *rbd_dev;
1685         struct ceph_osd_client *osdc;
1686         struct ceph_osd_request *osd_req;
1687
1688         rbd_assert(obj_request_img_data_test(obj_request));
1689         img_request = obj_request->img_request;
1690         rbd_assert(img_request);
1691         rbd_assert(img_request_write_test(img_request));
1692
1693         /* Allocate and initialize the request, for the two ops */
1694
1695         snapc = img_request->snapc;
1696         rbd_dev = img_request->rbd_dev;
1697         osdc = &rbd_dev->rbd_client->client->osdc;
1698         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1699         if (!osd_req)
1700                 return NULL;    /* ENOMEM */
1701
1702         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1703         osd_req->r_callback = rbd_osd_req_callback;
1704         osd_req->r_priv = obj_request;
1705
1706         osd_req->r_oid_len = strlen(obj_request->object_name);
1707         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1708         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1709
1710         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1711
1712         return osd_req;
1713 }
1714
1715
1716 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1717 {
1718         ceph_osdc_put_request(osd_req);
1719 }
1720
1721 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1722
1723 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1724                                                 u64 offset, u64 length,
1725                                                 enum obj_request_type type)
1726 {
1727         struct rbd_obj_request *obj_request;
1728         size_t size;
1729         char *name;
1730
1731         rbd_assert(obj_request_type_valid(type));
1732
1733         size = strlen(object_name) + 1;
1734         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1735         if (!obj_request)
1736                 return NULL;
1737
1738         name = (char *)(obj_request + 1);
1739         obj_request->object_name = memcpy(name, object_name, size);
1740         obj_request->offset = offset;
1741         obj_request->length = length;
1742         obj_request->flags = 0;
1743         obj_request->which = BAD_WHICH;
1744         obj_request->type = type;
1745         INIT_LIST_HEAD(&obj_request->links);
1746         init_completion(&obj_request->completion);
1747         kref_init(&obj_request->kref);
1748
1749         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1750                 offset, length, (int)type, obj_request);
1751
1752         return obj_request;
1753 }
1754
1755 static void rbd_obj_request_destroy(struct kref *kref)
1756 {
1757         struct rbd_obj_request *obj_request;
1758
1759         obj_request = container_of(kref, struct rbd_obj_request, kref);
1760
1761         dout("%s: obj %p\n", __func__, obj_request);
1762
1763         rbd_assert(obj_request->img_request == NULL);
1764         rbd_assert(obj_request->which == BAD_WHICH);
1765
1766         if (obj_request->osd_req)
1767                 rbd_osd_req_destroy(obj_request->osd_req);
1768
1769         rbd_assert(obj_request_type_valid(obj_request->type));
1770         switch (obj_request->type) {
1771         case OBJ_REQUEST_NODATA:
1772                 break;          /* Nothing to do */
1773         case OBJ_REQUEST_BIO:
1774                 if (obj_request->bio_list)
1775                         bio_chain_put(obj_request->bio_list);
1776                 break;
1777         case OBJ_REQUEST_PAGES:
1778                 if (obj_request->pages)
1779                         ceph_release_page_vector(obj_request->pages,
1780                                                 obj_request->page_count);
1781                 break;
1782         }
1783
1784         kfree(obj_request);
1785 }
1786
1787 /*
1788  * Caller is responsible for filling in the list of object requests
1789  * that comprises the image request, and the Linux request pointer
1790  * (if there is one).
1791  */
1792 static struct rbd_img_request *rbd_img_request_create(
1793                                         struct rbd_device *rbd_dev,
1794                                         u64 offset, u64 length,
1795                                         bool write_request,
1796                                         bool child_request)
1797 {
1798         struct rbd_img_request *img_request;
1799
1800         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1801         if (!img_request)
1802                 return NULL;
1803
1804         if (write_request) {
1805                 down_read(&rbd_dev->header_rwsem);
1806                 ceph_get_snap_context(rbd_dev->header.snapc);
1807                 up_read(&rbd_dev->header_rwsem);
1808         }
1809
1810         img_request->rq = NULL;
1811         img_request->rbd_dev = rbd_dev;
1812         img_request->offset = offset;
1813         img_request->length = length;
1814         img_request->flags = 0;
1815         if (write_request) {
1816                 img_request_write_set(img_request);
1817                 img_request->snapc = rbd_dev->header.snapc;
1818         } else {
1819                 img_request->snap_id = rbd_dev->spec->snap_id;
1820         }
1821         if (child_request)
1822                 img_request_child_set(img_request);
1823         if (rbd_dev->parent_spec)
1824                 img_request_layered_set(img_request);
1825         spin_lock_init(&img_request->completion_lock);
1826         img_request->next_completion = 0;
1827         img_request->callback = NULL;
1828         img_request->result = 0;
1829         img_request->obj_request_count = 0;
1830         INIT_LIST_HEAD(&img_request->obj_requests);
1831         kref_init(&img_request->kref);
1832
1833         rbd_img_request_get(img_request);       /* Avoid a warning */
1834         rbd_img_request_put(img_request);       /* TEMPORARY */
1835
1836         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1837                 write_request ? "write" : "read", offset, length,
1838                 img_request);
1839
1840         return img_request;
1841 }
1842
1843 static void rbd_img_request_destroy(struct kref *kref)
1844 {
1845         struct rbd_img_request *img_request;
1846         struct rbd_obj_request *obj_request;
1847         struct rbd_obj_request *next_obj_request;
1848
1849         img_request = container_of(kref, struct rbd_img_request, kref);
1850
1851         dout("%s: img %p\n", __func__, img_request);
1852
1853         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1854                 rbd_img_obj_request_del(img_request, obj_request);
1855         rbd_assert(img_request->obj_request_count == 0);
1856
1857         if (img_request_write_test(img_request))
1858                 ceph_put_snap_context(img_request->snapc);
1859
1860         if (img_request_child_test(img_request))
1861                 rbd_obj_request_put(img_request->obj_request);
1862
1863         kfree(img_request);
1864 }
1865
1866 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1867 {
1868         struct rbd_img_request *img_request;
1869         unsigned int xferred;
1870         int result;
1871         bool more;
1872
1873         rbd_assert(obj_request_img_data_test(obj_request));
1874         img_request = obj_request->img_request;
1875
1876         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1877         xferred = (unsigned int)obj_request->xferred;
1878         result = obj_request->result;
1879         if (result) {
1880                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881
1882                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1883                         img_request_write_test(img_request) ? "write" : "read",
1884                         obj_request->length, obj_request->img_offset,
1885                         obj_request->offset);
1886                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1887                         result, xferred);
1888                 if (!img_request->result)
1889                         img_request->result = result;
1890         }
1891
1892         /* Image object requests don't own their page array */
1893
1894         if (obj_request->type == OBJ_REQUEST_PAGES) {
1895                 obj_request->pages = NULL;
1896                 obj_request->page_count = 0;
1897         }
1898
1899         if (img_request_child_test(img_request)) {
1900                 rbd_assert(img_request->obj_request != NULL);
1901                 more = obj_request->which < img_request->obj_request_count - 1;
1902         } else {
1903                 rbd_assert(img_request->rq != NULL);
1904                 more = blk_end_request(img_request->rq, result, xferred);
1905         }
1906
1907         return more;
1908 }
1909
1910 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1911 {
1912         struct rbd_img_request *img_request;
1913         u32 which = obj_request->which;
1914         bool more = true;
1915
1916         rbd_assert(obj_request_img_data_test(obj_request));
1917         img_request = obj_request->img_request;
1918
1919         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1920         rbd_assert(img_request != NULL);
1921         rbd_assert(img_request->obj_request_count > 0);
1922         rbd_assert(which != BAD_WHICH);
1923         rbd_assert(which < img_request->obj_request_count);
1924         rbd_assert(which >= img_request->next_completion);
1925
1926         spin_lock_irq(&img_request->completion_lock);
1927         if (which != img_request->next_completion)
1928                 goto out;
1929
1930         for_each_obj_request_from(img_request, obj_request) {
1931                 rbd_assert(more);
1932                 rbd_assert(which < img_request->obj_request_count);
1933
1934                 if (!obj_request_done_test(obj_request))
1935                         break;
1936                 more = rbd_img_obj_end_request(obj_request);
1937                 which++;
1938         }
1939
1940         rbd_assert(more ^ (which == img_request->obj_request_count));
1941         img_request->next_completion = which;
1942 out:
1943         spin_unlock_irq(&img_request->completion_lock);
1944
1945         if (!more)
1946                 rbd_img_request_complete(img_request);
1947 }
1948
1949 /*
1950  * Split up an image request into one or more object requests, each
1951  * to a different object.  The "type" parameter indicates whether
1952  * "data_desc" is the pointer to the head of a list of bio
1953  * structures, or the base of a page array.  In either case this
1954  * function assumes data_desc describes memory sufficient to hold
1955  * all data described by the image request.
1956  */
1957 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1958                                         enum obj_request_type type,
1959                                         void *data_desc)
1960 {
1961         struct rbd_device *rbd_dev = img_request->rbd_dev;
1962         struct rbd_obj_request *obj_request = NULL;
1963         struct rbd_obj_request *next_obj_request;
1964         bool write_request = img_request_write_test(img_request);
1965         struct bio *bio_list;
1966         unsigned int bio_offset = 0;
1967         struct page **pages;
1968         u64 img_offset;
1969         u64 resid;
1970         u16 opcode;
1971
1972         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1973                 (int)type, data_desc);
1974
1975         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1976         img_offset = img_request->offset;
1977         resid = img_request->length;
1978         rbd_assert(resid > 0);
1979
1980         if (type == OBJ_REQUEST_BIO) {
1981                 bio_list = data_desc;
1982                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1983         } else {
1984                 rbd_assert(type == OBJ_REQUEST_PAGES);
1985                 pages = data_desc;
1986         }
1987
1988         while (resid) {
1989                 struct ceph_osd_request *osd_req;
1990                 const char *object_name;
1991                 u64 offset;
1992                 u64 length;
1993
1994                 object_name = rbd_segment_name(rbd_dev, img_offset);
1995                 if (!object_name)
1996                         goto out_unwind;
1997                 offset = rbd_segment_offset(rbd_dev, img_offset);
1998                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1999                 obj_request = rbd_obj_request_create(object_name,
2000                                                 offset, length, type);
2001                 kfree(object_name);     /* object request has its own copy */
2002                 if (!obj_request)
2003                         goto out_unwind;
2004
2005                 if (type == OBJ_REQUEST_BIO) {
2006                         unsigned int clone_size;
2007
2008                         rbd_assert(length <= (u64)UINT_MAX);
2009                         clone_size = (unsigned int)length;
2010                         obj_request->bio_list =
2011                                         bio_chain_clone_range(&bio_list,
2012                                                                 &bio_offset,
2013                                                                 clone_size,
2014                                                                 GFP_ATOMIC);
2015                         if (!obj_request->bio_list)
2016                                 goto out_partial;
2017                 } else {
2018                         unsigned int page_count;
2019
2020                         obj_request->pages = pages;
2021                         page_count = (u32)calc_pages_for(offset, length);
2022                         obj_request->page_count = page_count;
2023                         if ((offset + length) & ~PAGE_MASK)
2024                                 page_count--;   /* more on last page */
2025                         pages += page_count;
2026                 }
2027
2028                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2029                                                 obj_request);
2030                 if (!osd_req)
2031                         goto out_partial;
2032                 obj_request->osd_req = osd_req;
2033                 obj_request->callback = rbd_img_obj_callback;
2034
2035                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2036                                                 0, 0);
2037                 if (type == OBJ_REQUEST_BIO)
2038                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2039                                         obj_request->bio_list, length);
2040                 else
2041                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2042                                         obj_request->pages, length,
2043                                         offset & ~PAGE_MASK, false, false);
2044
2045                 if (write_request)
2046                         rbd_osd_req_format_write(obj_request);
2047                 else
2048                         rbd_osd_req_format_read(obj_request);
2049
2050                 obj_request->img_offset = img_offset;
2051                 rbd_img_obj_request_add(img_request, obj_request);
2052
2053                 img_offset += length;
2054                 resid -= length;
2055         }
2056
2057         return 0;
2058
2059 out_partial:
2060         rbd_obj_request_put(obj_request);
2061 out_unwind:
2062         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2063                 rbd_obj_request_put(obj_request);
2064
2065         return -ENOMEM;
2066 }
2067
2068 static void
2069 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2070 {
2071         struct rbd_img_request *img_request;
2072         struct rbd_device *rbd_dev;
2073         u64 length;
2074         u32 page_count;
2075
2076         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2077         rbd_assert(obj_request_img_data_test(obj_request));
2078         img_request = obj_request->img_request;
2079         rbd_assert(img_request);
2080
2081         rbd_dev = img_request->rbd_dev;
2082         rbd_assert(rbd_dev);
2083         length = (u64)1 << rbd_dev->header.obj_order;
2084         page_count = (u32)calc_pages_for(0, length);
2085
2086         rbd_assert(obj_request->copyup_pages);
2087         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2088         obj_request->copyup_pages = NULL;
2089
2090         /*
2091          * We want the transfer count to reflect the size of the
2092          * original write request.  There is no such thing as a
2093          * successful short write, so if the request was successful
2094          * we can just set it to the originally-requested length.
2095          */
2096         if (!obj_request->result)
2097                 obj_request->xferred = obj_request->length;
2098
2099         /* Finish up with the normal image object callback */
2100
2101         rbd_img_obj_callback(obj_request);
2102 }
2103
2104 static void
2105 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2106 {
2107         struct rbd_obj_request *orig_request;
2108         struct ceph_osd_request *osd_req;
2109         struct ceph_osd_client *osdc;
2110         struct rbd_device *rbd_dev;
2111         struct page **pages;
2112         int result;
2113         u64 obj_size;
2114         u64 xferred;
2115
2116         rbd_assert(img_request_child_test(img_request));
2117
2118         /* First get what we need from the image request */
2119
2120         pages = img_request->copyup_pages;
2121         rbd_assert(pages != NULL);
2122         img_request->copyup_pages = NULL;
2123
2124         orig_request = img_request->obj_request;
2125         rbd_assert(orig_request != NULL);
2126         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2127         result = img_request->result;
2128         obj_size = img_request->length;
2129         xferred = img_request->xferred;
2130
2131         rbd_dev = img_request->rbd_dev;
2132         rbd_assert(rbd_dev);
2133         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2134
2135         rbd_img_request_put(img_request);
2136
2137         if (result)
2138                 goto out_err;
2139
2140         /* Allocate the new copyup osd request for the original request */
2141
2142         result = -ENOMEM;
2143         rbd_assert(!orig_request->osd_req);
2144         osd_req = rbd_osd_req_create_copyup(orig_request);
2145         if (!osd_req)
2146                 goto out_err;
2147         orig_request->osd_req = osd_req;
2148         orig_request->copyup_pages = pages;
2149
2150         /* Initialize the copyup op */
2151
2152         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2153         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2154                                                 false, false);
2155
2156         /* Then the original write request op */
2157
2158         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2159                                         orig_request->offset,
2160                                         orig_request->length, 0, 0);
2161         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2162                                         orig_request->length);
2163
2164         rbd_osd_req_format_write(orig_request);
2165
2166         /* All set, send it off. */
2167
2168         orig_request->callback = rbd_img_obj_copyup_callback;
2169         osdc = &rbd_dev->rbd_client->client->osdc;
2170         result = rbd_obj_request_submit(osdc, orig_request);
2171         if (!result)
2172                 return;
2173 out_err:
2174         /* Record the error code and complete the request */
2175
2176         orig_request->result = result;
2177         orig_request->xferred = 0;
2178         obj_request_done_set(orig_request);
2179         rbd_obj_request_complete(orig_request);
2180 }
2181
2182 /*
2183  * Read from the parent image the range of data that covers the
2184  * entire target of the given object request.  This is used for
2185  * satisfying a layered image write request when the target of an
2186  * object request from the image request does not exist.
2187  *
2188  * A page array big enough to hold the returned data is allocated
2189  * and supplied to rbd_img_request_fill() as the "data descriptor."
2190  * When the read completes, this page array will be transferred to
2191  * the original object request for the copyup operation.
2192  *
2193  * If an error occurs, record it as the result of the original
2194  * object request and mark it done so it gets completed.
2195  */
2196 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2197 {
2198         struct rbd_img_request *img_request = NULL;
2199         struct rbd_img_request *parent_request = NULL;
2200         struct rbd_device *rbd_dev;
2201         u64 img_offset;
2202         u64 length;
2203         struct page **pages = NULL;
2204         u32 page_count;
2205         int result;
2206
2207         rbd_assert(obj_request_img_data_test(obj_request));
2208         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2209
2210         img_request = obj_request->img_request;
2211         rbd_assert(img_request != NULL);
2212         rbd_dev = img_request->rbd_dev;
2213         rbd_assert(rbd_dev->parent != NULL);
2214
2215         /*
2216          * First things first.  The original osd request is of no
2217          * use to use any more, we'll need a new one that can hold
2218          * the two ops in a copyup request.  We'll get that later,
2219          * but for now we can release the old one.
2220          */
2221         rbd_osd_req_destroy(obj_request->osd_req);
2222         obj_request->osd_req = NULL;
2223
2224         /*
2225          * Determine the byte range covered by the object in the
2226          * child image to which the original request was to be sent.
2227          */
2228         img_offset = obj_request->img_offset - obj_request->offset;
2229         length = (u64)1 << rbd_dev->header.obj_order;
2230
2231         /*
2232          * There is no defined parent data beyond the parent
2233          * overlap, so limit what we read at that boundary if
2234          * necessary.
2235          */
2236         if (img_offset + length > rbd_dev->parent_overlap) {
2237                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2238                 length = rbd_dev->parent_overlap - img_offset;
2239         }
2240
2241         /*
2242          * Allocate a page array big enough to receive the data read
2243          * from the parent.
2244          */
2245         page_count = (u32)calc_pages_for(0, length);
2246         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2247         if (IS_ERR(pages)) {
2248                 result = PTR_ERR(pages);
2249                 pages = NULL;
2250                 goto out_err;
2251         }
2252
2253         result = -ENOMEM;
2254         parent_request = rbd_img_request_create(rbd_dev->parent,
2255                                                 img_offset, length,
2256                                                 false, true);
2257         if (!parent_request)
2258                 goto out_err;
2259         rbd_obj_request_get(obj_request);
2260         parent_request->obj_request = obj_request;
2261
2262         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2263         if (result)
2264                 goto out_err;
2265         parent_request->copyup_pages = pages;
2266
2267         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2268         result = rbd_img_request_submit(parent_request);
2269         if (!result)
2270                 return 0;
2271
2272         parent_request->copyup_pages = NULL;
2273         parent_request->obj_request = NULL;
2274         rbd_obj_request_put(obj_request);
2275 out_err:
2276         if (pages)
2277                 ceph_release_page_vector(pages, page_count);
2278         if (parent_request)
2279                 rbd_img_request_put(parent_request);
2280         obj_request->result = result;
2281         obj_request->xferred = 0;
2282         obj_request_done_set(obj_request);
2283
2284         return result;
2285 }
2286
2287 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2288 {
2289         struct rbd_obj_request *orig_request;
2290         int result;
2291
2292         rbd_assert(!obj_request_img_data_test(obj_request));
2293
2294         /*
2295          * All we need from the object request is the original
2296          * request and the result of the STAT op.  Grab those, then
2297          * we're done with the request.
2298          */
2299         orig_request = obj_request->obj_request;
2300         obj_request->obj_request = NULL;
2301         rbd_assert(orig_request);
2302         rbd_assert(orig_request->img_request);
2303
2304         result = obj_request->result;
2305         obj_request->result = 0;
2306
2307         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2308                 obj_request, orig_request, result,
2309                 obj_request->xferred, obj_request->length);
2310         rbd_obj_request_put(obj_request);
2311
2312         rbd_assert(orig_request);
2313         rbd_assert(orig_request->img_request);
2314
2315         /*
2316          * Our only purpose here is to determine whether the object
2317          * exists, and we don't want to treat the non-existence as
2318          * an error.  If something else comes back, transfer the
2319          * error to the original request and complete it now.
2320          */
2321         if (!result) {
2322                 obj_request_existence_set(orig_request, true);
2323         } else if (result == -ENOENT) {
2324                 obj_request_existence_set(orig_request, false);
2325         } else if (result) {
2326                 orig_request->result = result;
2327                 goto out;
2328         }
2329
2330         /*
2331          * Resubmit the original request now that we have recorded
2332          * whether the target object exists.
2333          */
2334         orig_request->result = rbd_img_obj_request_submit(orig_request);
2335 out:
2336         if (orig_request->result)
2337                 rbd_obj_request_complete(orig_request);
2338         rbd_obj_request_put(orig_request);
2339 }
2340
2341 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2342 {
2343         struct rbd_obj_request *stat_request;
2344         struct rbd_device *rbd_dev;
2345         struct ceph_osd_client *osdc;
2346         struct page **pages = NULL;
2347         u32 page_count;
2348         size_t size;
2349         int ret;
2350
2351         /*
2352          * The response data for a STAT call consists of:
2353          *     le64 length;
2354          *     struct {
2355          *         le32 tv_sec;
2356          *         le32 tv_nsec;
2357          *     } mtime;
2358          */
2359         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2360         page_count = (u32)calc_pages_for(0, size);
2361         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2362         if (IS_ERR(pages))
2363                 return PTR_ERR(pages);
2364
2365         ret = -ENOMEM;
2366         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2367                                                         OBJ_REQUEST_PAGES);
2368         if (!stat_request)
2369                 goto out;
2370
2371         rbd_obj_request_get(obj_request);
2372         stat_request->obj_request = obj_request;
2373         stat_request->pages = pages;
2374         stat_request->page_count = page_count;
2375
2376         rbd_assert(obj_request->img_request);
2377         rbd_dev = obj_request->img_request->rbd_dev;
2378         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2379                                                 stat_request);
2380         if (!stat_request->osd_req)
2381                 goto out;
2382         stat_request->callback = rbd_img_obj_exists_callback;
2383
2384         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2385         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2386                                         false, false);
2387         rbd_osd_req_format_read(stat_request);
2388
2389         osdc = &rbd_dev->rbd_client->client->osdc;
2390         ret = rbd_obj_request_submit(osdc, stat_request);
2391 out:
2392         if (ret)
2393                 rbd_obj_request_put(obj_request);
2394
2395         return ret;
2396 }
2397
2398 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2399 {
2400         struct rbd_img_request *img_request;
2401         struct rbd_device *rbd_dev;
2402         bool known;
2403
2404         rbd_assert(obj_request_img_data_test(obj_request));
2405
2406         img_request = obj_request->img_request;
2407         rbd_assert(img_request);
2408         rbd_dev = img_request->rbd_dev;
2409
2410         /*
2411          * Only writes to layered images need special handling.
2412          * Reads and non-layered writes are simple object requests.
2413          * Layered writes that start beyond the end of the overlap
2414          * with the parent have no parent data, so they too are
2415          * simple object requests.  Finally, if the target object is
2416          * known to already exist, its parent data has already been
2417          * copied, so a write to the object can also be handled as a
2418          * simple object request.
2419          */
2420         if (!img_request_write_test(img_request) ||
2421                 !img_request_layered_test(img_request) ||
2422                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2423                 ((known = obj_request_known_test(obj_request)) &&
2424                         obj_request_exists_test(obj_request))) {
2425
2426                 struct rbd_device *rbd_dev;
2427                 struct ceph_osd_client *osdc;
2428
2429                 rbd_dev = obj_request->img_request->rbd_dev;
2430                 osdc = &rbd_dev->rbd_client->client->osdc;
2431
2432                 return rbd_obj_request_submit(osdc, obj_request);
2433         }
2434
2435         /*
2436          * It's a layered write.  The target object might exist but
2437          * we may not know that yet.  If we know it doesn't exist,
2438          * start by reading the data for the full target object from
2439          * the parent so we can use it for a copyup to the target.
2440          */
2441         if (known)
2442                 return rbd_img_obj_parent_read_full(obj_request);
2443
2444         /* We don't know whether the target exists.  Go find out. */
2445
2446         return rbd_img_obj_exists_submit(obj_request);
2447 }
2448
2449 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2450 {
2451         struct rbd_obj_request *obj_request;
2452         struct rbd_obj_request *next_obj_request;
2453
2454         dout("%s: img %p\n", __func__, img_request);
2455         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2456                 int ret;
2457
2458                 ret = rbd_img_obj_request_submit(obj_request);
2459                 if (ret)
2460                         return ret;
2461         }
2462
2463         return 0;
2464 }
2465
2466 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2467 {
2468         struct rbd_obj_request *obj_request;
2469         struct rbd_device *rbd_dev;
2470         u64 obj_end;
2471
2472         rbd_assert(img_request_child_test(img_request));
2473
2474         obj_request = img_request->obj_request;
2475         rbd_assert(obj_request);
2476         rbd_assert(obj_request->img_request);
2477
2478         obj_request->result = img_request->result;
2479         if (obj_request->result)
2480                 goto out;
2481
2482         /*
2483          * We need to zero anything beyond the parent overlap
2484          * boundary.  Since rbd_img_obj_request_read_callback()
2485          * will zero anything beyond the end of a short read, an
2486          * easy way to do this is to pretend the data from the
2487          * parent came up short--ending at the overlap boundary.
2488          */
2489         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2490         obj_end = obj_request->img_offset + obj_request->length;
2491         rbd_dev = obj_request->img_request->rbd_dev;
2492         if (obj_end > rbd_dev->parent_overlap) {
2493                 u64 xferred = 0;
2494
2495                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2496                         xferred = rbd_dev->parent_overlap -
2497                                         obj_request->img_offset;
2498
2499                 obj_request->xferred = min(img_request->xferred, xferred);
2500         } else {
2501                 obj_request->xferred = img_request->xferred;
2502         }
2503 out:
2504         rbd_img_obj_request_read_callback(obj_request);
2505         rbd_obj_request_complete(obj_request);
2506 }
2507
2508 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2509 {
2510         struct rbd_device *rbd_dev;
2511         struct rbd_img_request *img_request;
2512         int result;
2513
2514         rbd_assert(obj_request_img_data_test(obj_request));
2515         rbd_assert(obj_request->img_request != NULL);
2516         rbd_assert(obj_request->result == (s32) -ENOENT);
2517         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2518
2519         rbd_dev = obj_request->img_request->rbd_dev;
2520         rbd_assert(rbd_dev->parent != NULL);
2521         /* rbd_read_finish(obj_request, obj_request->length); */
2522         img_request = rbd_img_request_create(rbd_dev->parent,
2523                                                 obj_request->img_offset,
2524                                                 obj_request->length,
2525                                                 false, true);
2526         result = -ENOMEM;
2527         if (!img_request)
2528                 goto out_err;
2529
2530         rbd_obj_request_get(obj_request);
2531         img_request->obj_request = obj_request;
2532
2533         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2534                                         obj_request->bio_list);
2535         if (result)
2536                 goto out_err;
2537
2538         img_request->callback = rbd_img_parent_read_callback;
2539         result = rbd_img_request_submit(img_request);
2540         if (result)
2541                 goto out_err;
2542
2543         return;
2544 out_err:
2545         if (img_request)
2546                 rbd_img_request_put(img_request);
2547         obj_request->result = result;
2548         obj_request->xferred = 0;
2549         obj_request_done_set(obj_request);
2550 }
2551
2552 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2553 {
2554         struct rbd_obj_request *obj_request;
2555         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2556         int ret;
2557
2558         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2559                                                         OBJ_REQUEST_NODATA);
2560         if (!obj_request)
2561                 return -ENOMEM;
2562
2563         ret = -ENOMEM;
2564         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2565         if (!obj_request->osd_req)
2566                 goto out;
2567         obj_request->callback = rbd_obj_request_put;
2568
2569         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2570                                         notify_id, 0, 0);
2571         rbd_osd_req_format_read(obj_request);
2572
2573         ret = rbd_obj_request_submit(osdc, obj_request);
2574 out:
2575         if (ret)
2576                 rbd_obj_request_put(obj_request);
2577
2578         return ret;
2579 }
2580
2581 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2582 {
2583         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2584
2585         if (!rbd_dev)
2586                 return;
2587
2588         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2589                 rbd_dev->header_name, (unsigned long long)notify_id,
2590                 (unsigned int)opcode);
2591         (void)rbd_dev_refresh(rbd_dev);
2592
2593         rbd_obj_notify_ack(rbd_dev, notify_id);
2594 }
2595
2596 /*
2597  * Request sync osd watch/unwatch.  The value of "start" determines
2598  * whether a watch request is being initiated or torn down.
2599  */
2600 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2601 {
2602         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2603         struct rbd_obj_request *obj_request;
2604         int ret;
2605
2606         rbd_assert(start ^ !!rbd_dev->watch_event);
2607         rbd_assert(start ^ !!rbd_dev->watch_request);
2608
2609         if (start) {
2610                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2611                                                 &rbd_dev->watch_event);
2612                 if (ret < 0)
2613                         return ret;
2614                 rbd_assert(rbd_dev->watch_event != NULL);
2615         }
2616
2617         ret = -ENOMEM;
2618         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2619                                                         OBJ_REQUEST_NODATA);
2620         if (!obj_request)
2621                 goto out_cancel;
2622
2623         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2624         if (!obj_request->osd_req)
2625                 goto out_cancel;
2626
2627         if (start)
2628                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2629         else
2630                 ceph_osdc_unregister_linger_request(osdc,
2631                                         rbd_dev->watch_request->osd_req);
2632
2633         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2634                                 rbd_dev->watch_event->cookie, 0, start);
2635         rbd_osd_req_format_write(obj_request);
2636
2637         ret = rbd_obj_request_submit(osdc, obj_request);
2638         if (ret)
2639                 goto out_cancel;
2640         ret = rbd_obj_request_wait(obj_request);
2641         if (ret)
2642                 goto out_cancel;
2643         ret = obj_request->result;
2644         if (ret)
2645                 goto out_cancel;
2646
2647         /*
2648          * A watch request is set to linger, so the underlying osd
2649          * request won't go away until we unregister it.  We retain
2650          * a pointer to the object request during that time (in
2651          * rbd_dev->watch_request), so we'll keep a reference to
2652          * it.  We'll drop that reference (below) after we've
2653          * unregistered it.
2654          */
2655         if (start) {
2656                 rbd_dev->watch_request = obj_request;
2657
2658                 return 0;
2659         }
2660
2661         /* We have successfully torn down the watch request */
2662
2663         rbd_obj_request_put(rbd_dev->watch_request);
2664         rbd_dev->watch_request = NULL;
2665 out_cancel:
2666         /* Cancel the event if we're tearing down, or on error */
2667         ceph_osdc_cancel_event(rbd_dev->watch_event);
2668         rbd_dev->watch_event = NULL;
2669         if (obj_request)
2670                 rbd_obj_request_put(obj_request);
2671
2672         return ret;
2673 }
2674
2675 /*
2676  * Synchronous osd object method call.  Returns the number of bytes
2677  * returned in the outbound buffer, or a negative error code.
2678  */
2679 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2680                              const char *object_name,
2681                              const char *class_name,
2682                              const char *method_name,
2683                              const void *outbound,
2684                              size_t outbound_size,
2685                              void *inbound,
2686                              size_t inbound_size)
2687 {
2688         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2689         struct rbd_obj_request *obj_request;
2690         struct page **pages;
2691         u32 page_count;
2692         int ret;
2693
2694         /*
2695          * Method calls are ultimately read operations.  The result
2696          * should placed into the inbound buffer provided.  They
2697          * also supply outbound data--parameters for the object
2698          * method.  Currently if this is present it will be a
2699          * snapshot id.
2700          */
2701         page_count = (u32)calc_pages_for(0, inbound_size);
2702         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2703         if (IS_ERR(pages))
2704                 return PTR_ERR(pages);
2705
2706         ret = -ENOMEM;
2707         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2708                                                         OBJ_REQUEST_PAGES);
2709         if (!obj_request)
2710                 goto out;
2711
2712         obj_request->pages = pages;
2713         obj_request->page_count = page_count;
2714
2715         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2716         if (!obj_request->osd_req)
2717                 goto out;
2718
2719         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2720                                         class_name, method_name);
2721         if (outbound_size) {
2722                 struct ceph_pagelist *pagelist;
2723
2724                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2725                 if (!pagelist)
2726                         goto out;
2727
2728                 ceph_pagelist_init(pagelist);
2729                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2730                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2731                                                 pagelist);
2732         }
2733         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2734                                         obj_request->pages, inbound_size,
2735                                         0, false, false);
2736         rbd_osd_req_format_read(obj_request);
2737
2738         ret = rbd_obj_request_submit(osdc, obj_request);
2739         if (ret)
2740                 goto out;
2741         ret = rbd_obj_request_wait(obj_request);
2742         if (ret)
2743                 goto out;
2744
2745         ret = obj_request->result;
2746         if (ret < 0)
2747                 goto out;
2748
2749         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2750         ret = (int)obj_request->xferred;
2751         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2752 out:
2753         if (obj_request)
2754                 rbd_obj_request_put(obj_request);
2755         else
2756                 ceph_release_page_vector(pages, page_count);
2757
2758         return ret;
2759 }
2760
2761 static void rbd_request_fn(struct request_queue *q)
2762                 __releases(q->queue_lock) __acquires(q->queue_lock)
2763 {
2764         struct rbd_device *rbd_dev = q->queuedata;
2765         bool read_only = rbd_dev->mapping.read_only;
2766         struct request *rq;
2767         int result;
2768
2769         while ((rq = blk_fetch_request(q))) {
2770                 bool write_request = rq_data_dir(rq) == WRITE;
2771                 struct rbd_img_request *img_request;
2772                 u64 offset;
2773                 u64 length;
2774
2775                 /* Ignore any non-FS requests that filter through. */
2776
2777                 if (rq->cmd_type != REQ_TYPE_FS) {
2778                         dout("%s: non-fs request type %d\n", __func__,
2779                                 (int) rq->cmd_type);
2780                         __blk_end_request_all(rq, 0);
2781                         continue;
2782                 }
2783
2784                 /* Ignore/skip any zero-length requests */
2785
2786                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2787                 length = (u64) blk_rq_bytes(rq);
2788
2789                 if (!length) {
2790                         dout("%s: zero-length request\n", __func__);
2791                         __blk_end_request_all(rq, 0);
2792                         continue;
2793                 }
2794
2795                 spin_unlock_irq(q->queue_lock);
2796
2797                 /* Disallow writes to a read-only device */
2798
2799                 if (write_request) {
2800                         result = -EROFS;
2801                         if (read_only)
2802                                 goto end_request;
2803                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2804                 }
2805
2806                 /*
2807                  * Quit early if the mapped snapshot no longer
2808                  * exists.  It's still possible the snapshot will
2809                  * have disappeared by the time our request arrives
2810                  * at the osd, but there's no sense in sending it if
2811                  * we already know.
2812                  */
2813                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2814                         dout("request for non-existent snapshot");
2815                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2816                         result = -ENXIO;
2817                         goto end_request;
2818                 }
2819
2820                 result = -EINVAL;
2821                 if (offset && length > U64_MAX - offset + 1) {
2822                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2823                                 offset, length);
2824                         goto end_request;       /* Shouldn't happen */
2825                 }
2826
2827                 result = -ENOMEM;
2828                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2829                                                         write_request, false);
2830                 if (!img_request)
2831                         goto end_request;
2832
2833                 img_request->rq = rq;
2834
2835                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2836                                                 rq->bio);
2837                 if (!result)
2838                         result = rbd_img_request_submit(img_request);
2839                 if (result)
2840                         rbd_img_request_put(img_request);
2841 end_request:
2842                 spin_lock_irq(q->queue_lock);
2843                 if (result < 0) {
2844                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2845                                 write_request ? "write" : "read",
2846                                 length, offset, result);
2847
2848                         __blk_end_request_all(rq, result);
2849                 }
2850         }
2851 }
2852
2853 /*
2854  * a queue callback. Makes sure that we don't create a bio that spans across
2855  * multiple osd objects. One exception would be with a single page bios,
2856  * which we handle later at bio_chain_clone_range()
2857  */
2858 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2859                           struct bio_vec *bvec)
2860 {
2861         struct rbd_device *rbd_dev = q->queuedata;
2862         sector_t sector_offset;
2863         sector_t sectors_per_obj;
2864         sector_t obj_sector_offset;
2865         int ret;
2866
2867         /*
2868          * Find how far into its rbd object the partition-relative
2869          * bio start sector is to offset relative to the enclosing
2870          * device.
2871          */
2872         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2873         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2874         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2875
2876         /*
2877          * Compute the number of bytes from that offset to the end
2878          * of the object.  Account for what's already used by the bio.
2879          */
2880         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2881         if (ret > bmd->bi_size)
2882                 ret -= bmd->bi_size;
2883         else
2884                 ret = 0;
2885
2886         /*
2887          * Don't send back more than was asked for.  And if the bio
2888          * was empty, let the whole thing through because:  "Note
2889          * that a block device *must* allow a single page to be
2890          * added to an empty bio."
2891          */
2892         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2893         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2894                 ret = (int) bvec->bv_len;
2895
2896         return ret;
2897 }
2898
2899 static void rbd_free_disk(struct rbd_device *rbd_dev)
2900 {
2901         struct gendisk *disk = rbd_dev->disk;
2902
2903         if (!disk)
2904                 return;
2905
2906         rbd_dev->disk = NULL;
2907         if (disk->flags & GENHD_FL_UP) {
2908                 del_gendisk(disk);
2909                 if (disk->queue)
2910                         blk_cleanup_queue(disk->queue);
2911         }
2912         put_disk(disk);
2913 }
2914
2915 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2916                                 const char *object_name,
2917                                 u64 offset, u64 length, void *buf)
2918
2919 {
2920         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2921         struct rbd_obj_request *obj_request;
2922         struct page **pages = NULL;
2923         u32 page_count;
2924         size_t size;
2925         int ret;
2926
2927         page_count = (u32) calc_pages_for(offset, length);
2928         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2929         if (IS_ERR(pages))
2930                 ret = PTR_ERR(pages);
2931
2932         ret = -ENOMEM;
2933         obj_request = rbd_obj_request_create(object_name, offset, length,
2934                                                         OBJ_REQUEST_PAGES);
2935         if (!obj_request)
2936                 goto out;
2937
2938         obj_request->pages = pages;
2939         obj_request->page_count = page_count;
2940
2941         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2942         if (!obj_request->osd_req)
2943                 goto out;
2944
2945         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2946                                         offset, length, 0, 0);
2947         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2948                                         obj_request->pages,
2949                                         obj_request->length,
2950                                         obj_request->offset & ~PAGE_MASK,
2951                                         false, false);
2952         rbd_osd_req_format_read(obj_request);
2953
2954         ret = rbd_obj_request_submit(osdc, obj_request);
2955         if (ret)
2956                 goto out;
2957         ret = rbd_obj_request_wait(obj_request);
2958         if (ret)
2959                 goto out;
2960
2961         ret = obj_request->result;
2962         if (ret < 0)
2963                 goto out;
2964
2965         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2966         size = (size_t) obj_request->xferred;
2967         ceph_copy_from_page_vector(pages, buf, 0, size);
2968         rbd_assert(size <= (size_t)INT_MAX);
2969         ret = (int)size;
2970 out:
2971         if (obj_request)
2972                 rbd_obj_request_put(obj_request);
2973         else
2974                 ceph_release_page_vector(pages, page_count);
2975
2976         return ret;
2977 }
2978
2979 /*
2980  * Read the complete header for the given rbd device.
2981  *
2982  * Returns a pointer to a dynamically-allocated buffer containing
2983  * the complete and validated header.  Caller can pass the address
2984  * of a variable that will be filled in with the version of the
2985  * header object at the time it was read.
2986  *
2987  * Returns a pointer-coded errno if a failure occurs.
2988  */
2989 static struct rbd_image_header_ondisk *
2990 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2991 {
2992         struct rbd_image_header_ondisk *ondisk = NULL;
2993         u32 snap_count = 0;
2994         u64 names_size = 0;
2995         u32 want_count;
2996         int ret;
2997
2998         /*
2999          * The complete header will include an array of its 64-bit
3000          * snapshot ids, followed by the names of those snapshots as
3001          * a contiguous block of NUL-terminated strings.  Note that
3002          * the number of snapshots could change by the time we read
3003          * it in, in which case we re-read it.
3004          */
3005         do {
3006                 size_t size;
3007
3008                 kfree(ondisk);
3009
3010                 size = sizeof (*ondisk);
3011                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3012                 size += names_size;
3013                 ondisk = kmalloc(size, GFP_KERNEL);
3014                 if (!ondisk)
3015                         return ERR_PTR(-ENOMEM);
3016
3017                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3018                                        0, size, ondisk);
3019                 if (ret < 0)
3020                         goto out_err;
3021                 if ((size_t)ret < size) {
3022                         ret = -ENXIO;
3023                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3024                                 size, ret);
3025                         goto out_err;
3026                 }
3027                 if (!rbd_dev_ondisk_valid(ondisk)) {
3028                         ret = -ENXIO;
3029                         rbd_warn(rbd_dev, "invalid header");
3030                         goto out_err;
3031                 }
3032
3033                 names_size = le64_to_cpu(ondisk->snap_names_len);
3034                 want_count = snap_count;
3035                 snap_count = le32_to_cpu(ondisk->snap_count);
3036         } while (snap_count != want_count);
3037
3038         return ondisk;
3039
3040 out_err:
3041         kfree(ondisk);
3042
3043         return ERR_PTR(ret);
3044 }
3045
3046 /*
3047  * reload the ondisk the header
3048  */
3049 static int rbd_read_header(struct rbd_device *rbd_dev,
3050                            struct rbd_image_header *header)
3051 {
3052         struct rbd_image_header_ondisk *ondisk;
3053         int ret;
3054
3055         ondisk = rbd_dev_v1_header_read(rbd_dev);
3056         if (IS_ERR(ondisk))
3057                 return PTR_ERR(ondisk);
3058         ret = rbd_header_from_disk(header, ondisk);
3059         kfree(ondisk);
3060
3061         return ret;
3062 }
3063
3064 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3065 {
3066         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3067                 return;
3068
3069         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3070                 sector_t size;
3071
3072                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3073                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3074                 dout("setting size to %llu sectors", (unsigned long long)size);
3075                 set_capacity(rbd_dev->disk, size);
3076         }
3077 }
3078
3079 /*
3080  * only read the first part of the ondisk header, without the snaps info
3081  */
3082 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3083 {
3084         int ret;
3085         struct rbd_image_header h;
3086
3087         ret = rbd_read_header(rbd_dev, &h);
3088         if (ret < 0)
3089                 return ret;
3090
3091         down_write(&rbd_dev->header_rwsem);
3092
3093         /* Update image size, and check for resize of mapped image */
3094         rbd_dev->header.image_size = h.image_size;
3095         rbd_update_mapping_size(rbd_dev);
3096
3097         /* rbd_dev->header.object_prefix shouldn't change */
3098         kfree(rbd_dev->header.snap_sizes);
3099         kfree(rbd_dev->header.snap_names);
3100         /* osd requests may still refer to snapc */
3101         ceph_put_snap_context(rbd_dev->header.snapc);
3102
3103         rbd_dev->header.image_size = h.image_size;
3104         rbd_dev->header.snapc = h.snapc;
3105         rbd_dev->header.snap_names = h.snap_names;
3106         rbd_dev->header.snap_sizes = h.snap_sizes;
3107         /* Free the extra copy of the object prefix */
3108         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3109                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3110         kfree(h.object_prefix);
3111
3112         up_write(&rbd_dev->header_rwsem);
3113
3114         return ret;
3115 }
3116
3117 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3118 {
3119         u64 image_size;
3120         int ret;
3121
3122         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3123         image_size = rbd_dev->header.image_size;
3124         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3125         if (rbd_dev->image_format == 1)
3126                 ret = rbd_dev_v1_refresh(rbd_dev);
3127         else
3128                 ret = rbd_dev_v2_refresh(rbd_dev);
3129         mutex_unlock(&ctl_mutex);
3130         if (ret)
3131                 rbd_warn(rbd_dev, "got notification but failed to "
3132                            " update snaps: %d\n", ret);
3133         if (image_size != rbd_dev->header.image_size)
3134                 revalidate_disk(rbd_dev->disk);
3135
3136         return ret;
3137 }
3138
3139 static int rbd_init_disk(struct rbd_device *rbd_dev)
3140 {
3141         struct gendisk *disk;
3142         struct request_queue *q;
3143         u64 segment_size;
3144
3145         /* create gendisk info */
3146         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3147         if (!disk)
3148                 return -ENOMEM;
3149
3150         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3151                  rbd_dev->dev_id);
3152         disk->major = rbd_dev->major;
3153         disk->first_minor = 0;
3154         disk->fops = &rbd_bd_ops;
3155         disk->private_data = rbd_dev;
3156
3157         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3158         if (!q)
3159                 goto out_disk;
3160
3161         /* We use the default size, but let's be explicit about it. */
3162         blk_queue_physical_block_size(q, SECTOR_SIZE);
3163
3164         /* set io sizes to object size */
3165         segment_size = rbd_obj_bytes(&rbd_dev->header);
3166         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3167         blk_queue_max_segment_size(q, segment_size);
3168         blk_queue_io_min(q, segment_size);
3169         blk_queue_io_opt(q, segment_size);
3170
3171         blk_queue_merge_bvec(q, rbd_merge_bvec);
3172         disk->queue = q;
3173
3174         q->queuedata = rbd_dev;
3175
3176         rbd_dev->disk = disk;
3177
3178         return 0;
3179 out_disk:
3180         put_disk(disk);
3181
3182         return -ENOMEM;
3183 }
3184
3185 /*
3186   sysfs
3187 */
3188
3189 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3190 {
3191         return container_of(dev, struct rbd_device, dev);
3192 }
3193
3194 static ssize_t rbd_size_show(struct device *dev,
3195                              struct device_attribute *attr, char *buf)
3196 {
3197         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3198
3199         return sprintf(buf, "%llu\n",
3200                 (unsigned long long)rbd_dev->mapping.size);
3201 }
3202
3203 /*
3204  * Note this shows the features for whatever's mapped, which is not
3205  * necessarily the base image.
3206  */
3207 static ssize_t rbd_features_show(struct device *dev,
3208                              struct device_attribute *attr, char *buf)
3209 {
3210         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3211
3212         return sprintf(buf, "0x%016llx\n",
3213                         (unsigned long long)rbd_dev->mapping.features);
3214 }
3215
3216 static ssize_t rbd_major_show(struct device *dev,
3217                               struct device_attribute *attr, char *buf)
3218 {
3219         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3220
3221         if (rbd_dev->major)
3222                 return sprintf(buf, "%d\n", rbd_dev->major);
3223
3224         return sprintf(buf, "(none)\n");
3225
3226 }
3227
3228 static ssize_t rbd_client_id_show(struct device *dev,
3229                                   struct device_attribute *attr, char *buf)
3230 {
3231         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3232
3233         return sprintf(buf, "client%lld\n",
3234                         ceph_client_id(rbd_dev->rbd_client->client));
3235 }
3236
3237 static ssize_t rbd_pool_show(struct device *dev,
3238                              struct device_attribute *attr, char *buf)
3239 {
3240         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3241
3242         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3243 }
3244
3245 static ssize_t rbd_pool_id_show(struct device *dev,
3246                              struct device_attribute *attr, char *buf)
3247 {
3248         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3249
3250         return sprintf(buf, "%llu\n",
3251                         (unsigned long long) rbd_dev->spec->pool_id);
3252 }
3253
3254 static ssize_t rbd_name_show(struct device *dev,
3255                              struct device_attribute *attr, char *buf)
3256 {
3257         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3258
3259         if (rbd_dev->spec->image_name)
3260                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3261
3262         return sprintf(buf, "(unknown)\n");
3263 }
3264
3265 static ssize_t rbd_image_id_show(struct device *dev,
3266                              struct device_attribute *attr, char *buf)
3267 {
3268         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3269
3270         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3271 }
3272
3273 /*
3274  * Shows the name of the currently-mapped snapshot (or
3275  * RBD_SNAP_HEAD_NAME for the base image).
3276  */
3277 static ssize_t rbd_snap_show(struct device *dev,
3278                              struct device_attribute *attr,
3279                              char *buf)
3280 {
3281         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3282
3283         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3284 }
3285
3286 /*
3287  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3288  * for the parent image.  If there is no parent, simply shows
3289  * "(no parent image)".
3290  */
3291 static ssize_t rbd_parent_show(struct device *dev,
3292                              struct device_attribute *attr,
3293                              char *buf)
3294 {
3295         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3296         struct rbd_spec *spec = rbd_dev->parent_spec;
3297         int count;
3298         char *bufp = buf;
3299
3300         if (!spec)
3301                 return sprintf(buf, "(no parent image)\n");
3302
3303         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3304                         (unsigned long long) spec->pool_id, spec->pool_name);
3305         if (count < 0)
3306                 return count;
3307         bufp += count;
3308
3309         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3310                         spec->image_name ? spec->image_name : "(unknown)");
3311         if (count < 0)
3312                 return count;
3313         bufp += count;
3314
3315         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3316                         (unsigned long long) spec->snap_id, spec->snap_name);
3317         if (count < 0)
3318                 return count;
3319         bufp += count;
3320
3321         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3322         if (count < 0)
3323                 return count;
3324         bufp += count;
3325
3326         return (ssize_t) (bufp - buf);
3327 }
3328
3329 static ssize_t rbd_image_refresh(struct device *dev,
3330                                  struct device_attribute *attr,
3331                                  const char *buf,
3332                                  size_t size)
3333 {
3334         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3335         int ret;
3336
3337         ret = rbd_dev_refresh(rbd_dev);
3338
3339         return ret < 0 ? ret : size;
3340 }
3341
3342 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3343 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3344 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3345 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3346 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3347 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3348 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3349 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3350 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3351 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3352 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3353
3354 static struct attribute *rbd_attrs[] = {
3355         &dev_attr_size.attr,
3356         &dev_attr_features.attr,
3357         &dev_attr_major.attr,
3358         &dev_attr_client_id.attr,
3359         &dev_attr_pool.attr,
3360         &dev_attr_pool_id.attr,
3361         &dev_attr_name.attr,
3362         &dev_attr_image_id.attr,
3363         &dev_attr_current_snap.attr,
3364         &dev_attr_parent.attr,
3365         &dev_attr_refresh.attr,
3366         NULL
3367 };
3368
3369 static struct attribute_group rbd_attr_group = {
3370         .attrs = rbd_attrs,
3371 };
3372
3373 static const struct attribute_group *rbd_attr_groups[] = {
3374         &rbd_attr_group,
3375         NULL
3376 };
3377
3378 static void rbd_sysfs_dev_release(struct device *dev)
3379 {
3380 }
3381
3382 static struct device_type rbd_device_type = {
3383         .name           = "rbd",
3384         .groups         = rbd_attr_groups,
3385         .release        = rbd_sysfs_dev_release,
3386 };
3387
3388 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3389 {
3390         kref_get(&spec->kref);
3391
3392         return spec;
3393 }
3394
3395 static void rbd_spec_free(struct kref *kref);
3396 static void rbd_spec_put(struct rbd_spec *spec)
3397 {
3398         if (spec)
3399                 kref_put(&spec->kref, rbd_spec_free);
3400 }
3401
3402 static struct rbd_spec *rbd_spec_alloc(void)
3403 {
3404         struct rbd_spec *spec;
3405
3406         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3407         if (!spec)
3408                 return NULL;
3409         kref_init(&spec->kref);
3410
3411         return spec;
3412 }
3413
3414 static void rbd_spec_free(struct kref *kref)
3415 {
3416         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3417
3418         kfree(spec->pool_name);
3419         kfree(spec->image_id);
3420         kfree(spec->image_name);
3421         kfree(spec->snap_name);
3422         kfree(spec);
3423 }
3424
3425 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3426                                 struct rbd_spec *spec)
3427 {
3428         struct rbd_device *rbd_dev;
3429
3430         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3431         if (!rbd_dev)
3432                 return NULL;
3433
3434         spin_lock_init(&rbd_dev->lock);
3435         rbd_dev->flags = 0;
3436         INIT_LIST_HEAD(&rbd_dev->node);
3437         init_rwsem(&rbd_dev->header_rwsem);
3438
3439         rbd_dev->spec = spec;
3440         rbd_dev->rbd_client = rbdc;
3441
3442         /* Initialize the layout used for all rbd requests */
3443
3444         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3445         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3446         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3447         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3448
3449         return rbd_dev;
3450 }
3451
3452 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3453 {
3454         rbd_put_client(rbd_dev->rbd_client);
3455         rbd_spec_put(rbd_dev->spec);
3456         kfree(rbd_dev);
3457 }
3458
3459 /*
3460  * Get the size and object order for an image snapshot, or if
3461  * snap_id is CEPH_NOSNAP, gets this information for the base
3462  * image.
3463  */
3464 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3465                                 u8 *order, u64 *snap_size)
3466 {
3467         __le64 snapid = cpu_to_le64(snap_id);
3468         int ret;
3469         struct {
3470                 u8 order;
3471                 __le64 size;
3472         } __attribute__ ((packed)) size_buf = { 0 };
3473
3474         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3475                                 "rbd", "get_size",
3476                                 &snapid, sizeof (snapid),
3477                                 &size_buf, sizeof (size_buf));
3478         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3479         if (ret < 0)
3480                 return ret;
3481         if (ret < sizeof (size_buf))
3482                 return -ERANGE;
3483
3484         if (order)
3485                 *order = size_buf.order;
3486         *snap_size = le64_to_cpu(size_buf.size);
3487
3488         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3489                 (unsigned long long)snap_id, (unsigned int)*order,
3490                 (unsigned long long)*snap_size);
3491
3492         return 0;
3493 }
3494
3495 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3496 {
3497         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3498                                         &rbd_dev->header.obj_order,
3499                                         &rbd_dev->header.image_size);
3500 }
3501
3502 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3503 {
3504         void *reply_buf;
3505         int ret;
3506         void *p;
3507
3508         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3509         if (!reply_buf)
3510                 return -ENOMEM;
3511
3512         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3513                                 "rbd", "get_object_prefix", NULL, 0,
3514                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3515         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3516         if (ret < 0)
3517                 goto out;
3518
3519         p = reply_buf;
3520         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3521                                                 p + ret, NULL, GFP_NOIO);
3522         ret = 0;
3523
3524         if (IS_ERR(rbd_dev->header.object_prefix)) {
3525                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3526                 rbd_dev->header.object_prefix = NULL;
3527         } else {
3528                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3529         }
3530 out:
3531         kfree(reply_buf);
3532
3533         return ret;
3534 }
3535
3536 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3537                 u64 *snap_features)
3538 {
3539         __le64 snapid = cpu_to_le64(snap_id);
3540         struct {
3541                 __le64 features;
3542                 __le64 incompat;
3543         } __attribute__ ((packed)) features_buf = { 0 };
3544         u64 incompat;
3545         int ret;
3546
3547         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3548                                 "rbd", "get_features",
3549                                 &snapid, sizeof (snapid),
3550                                 &features_buf, sizeof (features_buf));
3551         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3552         if (ret < 0)
3553                 return ret;
3554         if (ret < sizeof (features_buf))
3555                 return -ERANGE;
3556
3557         incompat = le64_to_cpu(features_buf.incompat);
3558         if (incompat & ~RBD_FEATURES_SUPPORTED)
3559                 return -ENXIO;
3560
3561         *snap_features = le64_to_cpu(features_buf.features);
3562
3563         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3564                 (unsigned long long)snap_id,
3565                 (unsigned long long)*snap_features,
3566                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3567
3568         return 0;
3569 }
3570
3571 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3572 {
3573         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3574                                                 &rbd_dev->header.features);
3575 }
3576
3577 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3578 {
3579         struct rbd_spec *parent_spec;
3580         size_t size;
3581         void *reply_buf = NULL;
3582         __le64 snapid;
3583         void *p;
3584         void *end;
3585         char *image_id;
3586         u64 overlap;
3587         int ret;
3588
3589         parent_spec = rbd_spec_alloc();
3590         if (!parent_spec)
3591                 return -ENOMEM;
3592
3593         size = sizeof (__le64) +                                /* pool_id */
3594                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3595                 sizeof (__le64) +                               /* snap_id */
3596                 sizeof (__le64);                                /* overlap */
3597         reply_buf = kmalloc(size, GFP_KERNEL);
3598         if (!reply_buf) {
3599                 ret = -ENOMEM;
3600                 goto out_err;
3601         }
3602
3603         snapid = cpu_to_le64(CEPH_NOSNAP);
3604         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3605                                 "rbd", "get_parent",
3606                                 &snapid, sizeof (snapid),
3607                                 reply_buf, size);
3608         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3609         if (ret < 0)
3610                 goto out_err;
3611
3612         p = reply_buf;
3613         end = reply_buf + ret;
3614         ret = -ERANGE;
3615         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3616         if (parent_spec->pool_id == CEPH_NOPOOL)
3617                 goto out;       /* No parent?  No problem. */
3618
3619         /* The ceph file layout needs to fit pool id in 32 bits */
3620
3621         ret = -EIO;
3622         if (parent_spec->pool_id > (u64)U32_MAX) {
3623                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3624                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3625                 goto out_err;
3626         }
3627
3628         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3629         if (IS_ERR(image_id)) {
3630                 ret = PTR_ERR(image_id);
3631                 goto out_err;
3632         }
3633         parent_spec->image_id = image_id;
3634         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3635         ceph_decode_64_safe(&p, end, overlap, out_err);
3636
3637         rbd_dev->parent_overlap = overlap;
3638         rbd_dev->parent_spec = parent_spec;
3639         parent_spec = NULL;     /* rbd_dev now owns this */
3640 out:
3641         ret = 0;
3642 out_err:
3643         kfree(reply_buf);
3644         rbd_spec_put(parent_spec);
3645
3646         return ret;
3647 }
3648
3649 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3650 {
3651         struct {
3652                 __le64 stripe_unit;
3653                 __le64 stripe_count;
3654         } __attribute__ ((packed)) striping_info_buf = { 0 };
3655         size_t size = sizeof (striping_info_buf);
3656         void *p;
3657         u64 obj_size;
3658         u64 stripe_unit;
3659         u64 stripe_count;
3660         int ret;
3661
3662         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3663                                 "rbd", "get_stripe_unit_count", NULL, 0,
3664                                 (char *)&striping_info_buf, size);
3665         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3666         if (ret < 0)
3667                 return ret;
3668         if (ret < size)
3669                 return -ERANGE;
3670
3671         /*
3672          * We don't actually support the "fancy striping" feature
3673          * (STRIPINGV2) yet, but if the striping sizes are the
3674          * defaults the behavior is the same as before.  So find
3675          * out, and only fail if the image has non-default values.
3676          */
3677         ret = -EINVAL;
3678         obj_size = (u64)1 << rbd_dev->header.obj_order;
3679         p = &striping_info_buf;
3680         stripe_unit = ceph_decode_64(&p);
3681         if (stripe_unit != obj_size) {
3682                 rbd_warn(rbd_dev, "unsupported stripe unit "
3683                                 "(got %llu want %llu)",
3684                                 stripe_unit, obj_size);
3685                 return -EINVAL;
3686         }
3687         stripe_count = ceph_decode_64(&p);
3688         if (stripe_count != 1) {
3689                 rbd_warn(rbd_dev, "unsupported stripe count "
3690                                 "(got %llu want 1)", stripe_count);
3691                 return -EINVAL;
3692         }
3693         rbd_dev->header.stripe_unit = stripe_unit;
3694         rbd_dev->header.stripe_count = stripe_count;
3695
3696         return 0;
3697 }
3698
3699 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3700 {
3701         size_t image_id_size;
3702         char *image_id;
3703         void *p;
3704         void *end;
3705         size_t size;
3706         void *reply_buf = NULL;
3707         size_t len = 0;
3708         char *image_name = NULL;
3709         int ret;
3710
3711         rbd_assert(!rbd_dev->spec->image_name);
3712
3713         len = strlen(rbd_dev->spec->image_id);
3714         image_id_size = sizeof (__le32) + len;
3715         image_id = kmalloc(image_id_size, GFP_KERNEL);
3716         if (!image_id)
3717                 return NULL;
3718
3719         p = image_id;
3720         end = image_id + image_id_size;
3721         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3722
3723         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3724         reply_buf = kmalloc(size, GFP_KERNEL);
3725         if (!reply_buf)
3726                 goto out;
3727
3728         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3729                                 "rbd", "dir_get_name",
3730                                 image_id, image_id_size,
3731                                 reply_buf, size);
3732         if (ret < 0)
3733                 goto out;
3734         p = reply_buf;
3735         end = reply_buf + ret;
3736
3737         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3738         if (IS_ERR(image_name))
3739                 image_name = NULL;
3740         else
3741                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3742 out:
3743         kfree(reply_buf);
3744         kfree(image_id);
3745
3746         return image_name;
3747 }
3748
3749 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3750 {
3751         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3752         const char *snap_name;
3753         u32 which = 0;
3754
3755         /* Skip over names until we find the one we are looking for */
3756
3757         snap_name = rbd_dev->header.snap_names;
3758         while (which < snapc->num_snaps) {
3759                 if (!strcmp(name, snap_name))
3760                         return snapc->snaps[which];
3761                 snap_name += strlen(snap_name) + 1;
3762                 which++;
3763         }
3764         return CEPH_NOSNAP;
3765 }
3766
3767 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3768 {
3769         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3770         u32 which;
3771         bool found = false;
3772         u64 snap_id;
3773
3774         for (which = 0; !found && which < snapc->num_snaps; which++) {
3775                 const char *snap_name;
3776
3777                 snap_id = snapc->snaps[which];
3778                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3779                 if (IS_ERR(snap_name))
3780                         break;
3781                 found = !strcmp(name, snap_name);
3782                 kfree(snap_name);
3783         }
3784         return found ? snap_id : CEPH_NOSNAP;
3785 }
3786
3787 /*
3788  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3789  * no snapshot by that name is found, or if an error occurs.
3790  */
3791 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3792 {
3793         if (rbd_dev->image_format == 1)
3794                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3795
3796         return rbd_v2_snap_id_by_name(rbd_dev, name);
3797 }
3798
3799 /*
3800  * When an rbd image has a parent image, it is identified by the
3801  * pool, image, and snapshot ids (not names).  This function fills
3802  * in the names for those ids.  (It's OK if we can't figure out the
3803  * name for an image id, but the pool and snapshot ids should always
3804  * exist and have names.)  All names in an rbd spec are dynamically
3805  * allocated.
3806  *
3807  * When an image being mapped (not a parent) is probed, we have the
3808  * pool name and pool id, image name and image id, and the snapshot
3809  * name.  The only thing we're missing is the snapshot id.
3810  */
3811 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3812 {
3813         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3814         struct rbd_spec *spec = rbd_dev->spec;
3815         const char *pool_name;
3816         const char *image_name;
3817         const char *snap_name;
3818         int ret;
3819
3820         /*
3821          * An image being mapped will have the pool name (etc.), but
3822          * we need to look up the snapshot id.
3823          */
3824         if (spec->pool_name) {
3825                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3826                         u64 snap_id;
3827
3828                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3829                         if (snap_id == CEPH_NOSNAP)
3830                                 return -ENOENT;
3831                         spec->snap_id = snap_id;
3832                 } else {
3833                         spec->snap_id = CEPH_NOSNAP;
3834                 }
3835
3836                 return 0;
3837         }
3838
3839         /* Get the pool name; we have to make our own copy of this */
3840
3841         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3842         if (!pool_name) {
3843                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3844                 return -EIO;
3845         }
3846         pool_name = kstrdup(pool_name, GFP_KERNEL);
3847         if (!pool_name)
3848                 return -ENOMEM;
3849
3850         /* Fetch the image name; tolerate failure here */
3851
3852         image_name = rbd_dev_image_name(rbd_dev);
3853         if (!image_name)
3854                 rbd_warn(rbd_dev, "unable to get image name");
3855
3856         /* Look up the snapshot name, and make a copy */
3857
3858         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3859         if (!snap_name) {
3860                 ret = -ENOMEM;
3861                 goto out_err;
3862         }
3863
3864         spec->pool_name = pool_name;
3865         spec->image_name = image_name;
3866         spec->snap_name = snap_name;
3867
3868         return 0;
3869 out_err:
3870         kfree(image_name);
3871         kfree(pool_name);
3872
3873         return ret;
3874 }
3875
3876 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3877 {
3878         size_t size;
3879         int ret;
3880         void *reply_buf;
3881         void *p;
3882         void *end;
3883         u64 seq;
3884         u32 snap_count;
3885         struct ceph_snap_context *snapc;
3886         u32 i;
3887
3888         /*
3889          * We'll need room for the seq value (maximum snapshot id),
3890          * snapshot count, and array of that many snapshot ids.
3891          * For now we have a fixed upper limit on the number we're
3892          * prepared to receive.
3893          */
3894         size = sizeof (__le64) + sizeof (__le32) +
3895                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3896         reply_buf = kzalloc(size, GFP_KERNEL);
3897         if (!reply_buf)
3898                 return -ENOMEM;
3899
3900         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3901                                 "rbd", "get_snapcontext", NULL, 0,
3902                                 reply_buf, size);
3903         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3904         if (ret < 0)
3905                 goto out;
3906
3907         p = reply_buf;
3908         end = reply_buf + ret;
3909         ret = -ERANGE;
3910         ceph_decode_64_safe(&p, end, seq, out);
3911         ceph_decode_32_safe(&p, end, snap_count, out);
3912
3913         /*
3914          * Make sure the reported number of snapshot ids wouldn't go
3915          * beyond the end of our buffer.  But before checking that,
3916          * make sure the computed size of the snapshot context we
3917          * allocate is representable in a size_t.
3918          */
3919         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3920                                  / sizeof (u64)) {
3921                 ret = -EINVAL;
3922                 goto out;
3923         }
3924         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3925                 goto out;
3926         ret = 0;
3927
3928         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3929         if (!snapc) {
3930                 ret = -ENOMEM;
3931                 goto out;
3932         }
3933         snapc->seq = seq;
3934         for (i = 0; i < snap_count; i++)
3935                 snapc->snaps[i] = ceph_decode_64(&p);
3936
3937         rbd_dev->header.snapc = snapc;
3938
3939         dout("  snap context seq = %llu, snap_count = %u\n",
3940                 (unsigned long long)seq, (unsigned int)snap_count);
3941 out:
3942         kfree(reply_buf);
3943
3944         return ret;
3945 }
3946
3947 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3948                                         u64 snap_id)
3949 {
3950         size_t size;
3951         void *reply_buf;
3952         __le64 snapid;
3953         int ret;
3954         void *p;
3955         void *end;
3956         char *snap_name;
3957
3958         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3959         reply_buf = kmalloc(size, GFP_KERNEL);
3960         if (!reply_buf)
3961                 return ERR_PTR(-ENOMEM);
3962
3963         snapid = cpu_to_le64(snap_id);
3964         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3965                                 "rbd", "get_snapshot_name",
3966                                 &snapid, sizeof (snapid),
3967                                 reply_buf, size);
3968         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3969         if (ret < 0) {
3970                 snap_name = ERR_PTR(ret);
3971                 goto out;
3972         }
3973
3974         p = reply_buf;
3975         end = reply_buf + ret;
3976         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3977         if (IS_ERR(snap_name))
3978                 goto out;
3979
3980         dout("  snap_id 0x%016llx snap_name = %s\n",
3981                 (unsigned long long)snap_id, snap_name);
3982 out:
3983         kfree(reply_buf);
3984
3985         return snap_name;
3986 }
3987
3988 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3989 {
3990         int ret;
3991
3992         down_write(&rbd_dev->header_rwsem);
3993
3994         ret = rbd_dev_v2_image_size(rbd_dev);
3995         if (ret)
3996                 goto out;
3997         rbd_update_mapping_size(rbd_dev);
3998
3999         ret = rbd_dev_v2_snap_context(rbd_dev);
4000         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4001         if (ret)
4002                 goto out;
4003 out:
4004         up_write(&rbd_dev->header_rwsem);
4005
4006         return ret;
4007 }
4008
4009 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4010 {
4011         struct device *dev;
4012         int ret;
4013
4014         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4015
4016         dev = &rbd_dev->dev;
4017         dev->bus = &rbd_bus_type;
4018         dev->type = &rbd_device_type;
4019         dev->parent = &rbd_root_dev;
4020         dev->release = rbd_dev_device_release;
4021         dev_set_name(dev, "%d", rbd_dev->dev_id);
4022         ret = device_register(dev);
4023
4024         mutex_unlock(&ctl_mutex);
4025
4026         return ret;
4027 }
4028
4029 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4030 {
4031         device_unregister(&rbd_dev->dev);
4032 }
4033
4034 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4035
4036 /*
4037  * Get a unique rbd identifier for the given new rbd_dev, and add
4038  * the rbd_dev to the global list.  The minimum rbd id is 1.
4039  */
4040 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4041 {
4042         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4043
4044         spin_lock(&rbd_dev_list_lock);
4045         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4046         spin_unlock(&rbd_dev_list_lock);
4047         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4048                 (unsigned long long) rbd_dev->dev_id);
4049 }
4050
4051 /*
4052  * Remove an rbd_dev from the global list, and record that its
4053  * identifier is no longer in use.
4054  */
4055 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4056 {
4057         struct list_head *tmp;
4058         int rbd_id = rbd_dev->dev_id;
4059         int max_id;
4060
4061         rbd_assert(rbd_id > 0);
4062
4063         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4064                 (unsigned long long) rbd_dev->dev_id);
4065         spin_lock(&rbd_dev_list_lock);
4066         list_del_init(&rbd_dev->node);
4067
4068         /*
4069          * If the id being "put" is not the current maximum, there
4070          * is nothing special we need to do.
4071          */
4072         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4073                 spin_unlock(&rbd_dev_list_lock);
4074                 return;
4075         }
4076
4077         /*
4078          * We need to update the current maximum id.  Search the
4079          * list to find out what it is.  We're more likely to find
4080          * the maximum at the end, so search the list backward.
4081          */
4082         max_id = 0;
4083         list_for_each_prev(tmp, &rbd_dev_list) {
4084                 struct rbd_device *rbd_dev;
4085
4086                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4087                 if (rbd_dev->dev_id > max_id)
4088                         max_id = rbd_dev->dev_id;
4089         }
4090         spin_unlock(&rbd_dev_list_lock);
4091
4092         /*
4093          * The max id could have been updated by rbd_dev_id_get(), in
4094          * which case it now accurately reflects the new maximum.
4095          * Be careful not to overwrite the maximum value in that
4096          * case.
4097          */
4098         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4099         dout("  max dev id has been reset\n");
4100 }
4101
4102 /*
4103  * Skips over white space at *buf, and updates *buf to point to the
4104  * first found non-space character (if any). Returns the length of
4105  * the token (string of non-white space characters) found.  Note
4106  * that *buf must be terminated with '\0'.
4107  */
4108 static inline size_t next_token(const char **buf)
4109 {
4110         /*
4111         * These are the characters that produce nonzero for
4112         * isspace() in the "C" and "POSIX" locales.
4113         */
4114         const char *spaces = " \f\n\r\t\v";
4115
4116         *buf += strspn(*buf, spaces);   /* Find start of token */
4117
4118         return strcspn(*buf, spaces);   /* Return token length */
4119 }
4120
4121 /*
4122  * Finds the next token in *buf, and if the provided token buffer is
4123  * big enough, copies the found token into it.  The result, if
4124  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4125  * must be terminated with '\0' on entry.
4126  *
4127  * Returns the length of the token found (not including the '\0').
4128  * Return value will be 0 if no token is found, and it will be >=
4129  * token_size if the token would not fit.
4130  *
4131  * The *buf pointer will be updated to point beyond the end of the
4132  * found token.  Note that this occurs even if the token buffer is
4133  * too small to hold it.
4134  */
4135 static inline size_t copy_token(const char **buf,
4136                                 char *token,
4137                                 size_t token_size)
4138 {
4139         size_t len;
4140
4141         len = next_token(buf);
4142         if (len < token_size) {
4143                 memcpy(token, *buf, len);
4144                 *(token + len) = '\0';
4145         }
4146         *buf += len;
4147
4148         return len;
4149 }
4150
4151 /*
4152  * Finds the next token in *buf, dynamically allocates a buffer big
4153  * enough to hold a copy of it, and copies the token into the new
4154  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4155  * that a duplicate buffer is created even for a zero-length token.
4156  *
4157  * Returns a pointer to the newly-allocated duplicate, or a null
4158  * pointer if memory for the duplicate was not available.  If
4159  * the lenp argument is a non-null pointer, the length of the token
4160  * (not including the '\0') is returned in *lenp.
4161  *
4162  * If successful, the *buf pointer will be updated to point beyond
4163  * the end of the found token.
4164  *
4165  * Note: uses GFP_KERNEL for allocation.
4166  */
4167 static inline char *dup_token(const char **buf, size_t *lenp)
4168 {
4169         char *dup;
4170         size_t len;
4171
4172         len = next_token(buf);
4173         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4174         if (!dup)
4175                 return NULL;
4176         *(dup + len) = '\0';
4177         *buf += len;
4178
4179         if (lenp)
4180                 *lenp = len;
4181
4182         return dup;
4183 }
4184
4185 /*
4186  * Parse the options provided for an "rbd add" (i.e., rbd image
4187  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4188  * and the data written is passed here via a NUL-terminated buffer.
4189  * Returns 0 if successful or an error code otherwise.
4190  *
4191  * The information extracted from these options is recorded in
4192  * the other parameters which return dynamically-allocated
4193  * structures:
4194  *  ceph_opts
4195  *      The address of a pointer that will refer to a ceph options
4196  *      structure.  Caller must release the returned pointer using
4197  *      ceph_destroy_options() when it is no longer needed.
4198  *  rbd_opts
4199  *      Address of an rbd options pointer.  Fully initialized by
4200  *      this function; caller must release with kfree().
4201  *  spec
4202  *      Address of an rbd image specification pointer.  Fully
4203  *      initialized by this function based on parsed options.
4204  *      Caller must release with rbd_spec_put().
4205  *
4206  * The options passed take this form:
4207  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4208  * where:
4209  *  <mon_addrs>
4210  *      A comma-separated list of one or more monitor addresses.
4211  *      A monitor address is an ip address, optionally followed
4212  *      by a port number (separated by a colon).
4213  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4214  *  <options>
4215  *      A comma-separated list of ceph and/or rbd options.
4216  *  <pool_name>
4217  *      The name of the rados pool containing the rbd image.
4218  *  <image_name>
4219  *      The name of the image in that pool to map.
4220  *  <snap_id>
4221  *      An optional snapshot id.  If provided, the mapping will
4222  *      present data from the image at the time that snapshot was
4223  *      created.  The image head is used if no snapshot id is
4224  *      provided.  Snapshot mappings are always read-only.
4225  */
4226 static int rbd_add_parse_args(const char *buf,
4227                                 struct ceph_options **ceph_opts,
4228                                 struct rbd_options **opts,
4229                                 struct rbd_spec **rbd_spec)
4230 {
4231         size_t len;
4232         char *options;
4233         const char *mon_addrs;
4234         char *snap_name;
4235         size_t mon_addrs_size;
4236         struct rbd_spec *spec = NULL;
4237         struct rbd_options *rbd_opts = NULL;
4238         struct ceph_options *copts;
4239         int ret;
4240
4241         /* The first four tokens are required */
4242
4243         len = next_token(&buf);
4244         if (!len) {
4245                 rbd_warn(NULL, "no monitor address(es) provided");
4246                 return -EINVAL;
4247         }
4248         mon_addrs = buf;
4249         mon_addrs_size = len + 1;
4250         buf += len;
4251
4252         ret = -EINVAL;
4253         options = dup_token(&buf, NULL);
4254         if (!options)
4255                 return -ENOMEM;
4256         if (!*options) {
4257                 rbd_warn(NULL, "no options provided");
4258                 goto out_err;
4259         }
4260
4261         spec = rbd_spec_alloc();
4262         if (!spec)
4263                 goto out_mem;
4264
4265         spec->pool_name = dup_token(&buf, NULL);
4266         if (!spec->pool_name)
4267                 goto out_mem;
4268         if (!*spec->pool_name) {
4269                 rbd_warn(NULL, "no pool name provided");
4270                 goto out_err;
4271         }
4272
4273         spec->image_name = dup_token(&buf, NULL);
4274         if (!spec->image_name)
4275                 goto out_mem;
4276         if (!*spec->image_name) {
4277                 rbd_warn(NULL, "no image name provided");
4278                 goto out_err;
4279         }
4280
4281         /*
4282          * Snapshot name is optional; default is to use "-"
4283          * (indicating the head/no snapshot).
4284          */
4285         len = next_token(&buf);
4286         if (!len) {
4287                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4288                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4289         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4290                 ret = -ENAMETOOLONG;
4291                 goto out_err;
4292         }
4293         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4294         if (!snap_name)
4295                 goto out_mem;
4296         *(snap_name + len) = '\0';
4297         spec->snap_name = snap_name;
4298
4299         /* Initialize all rbd options to the defaults */
4300
4301         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4302         if (!rbd_opts)
4303                 goto out_mem;
4304
4305         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4306
4307         copts = ceph_parse_options(options, mon_addrs,
4308                                         mon_addrs + mon_addrs_size - 1,
4309                                         parse_rbd_opts_token, rbd_opts);
4310         if (IS_ERR(copts)) {
4311                 ret = PTR_ERR(copts);
4312                 goto out_err;
4313         }
4314         kfree(options);
4315
4316         *ceph_opts = copts;
4317         *opts = rbd_opts;
4318         *rbd_spec = spec;
4319
4320         return 0;
4321 out_mem:
4322         ret = -ENOMEM;
4323 out_err:
4324         kfree(rbd_opts);
4325         rbd_spec_put(spec);
4326         kfree(options);
4327
4328         return ret;
4329 }
4330
4331 /*
4332  * An rbd format 2 image has a unique identifier, distinct from the
4333  * name given to it by the user.  Internally, that identifier is
4334  * what's used to specify the names of objects related to the image.
4335  *
4336  * A special "rbd id" object is used to map an rbd image name to its
4337  * id.  If that object doesn't exist, then there is no v2 rbd image
4338  * with the supplied name.
4339  *
4340  * This function will record the given rbd_dev's image_id field if
4341  * it can be determined, and in that case will return 0.  If any
4342  * errors occur a negative errno will be returned and the rbd_dev's
4343  * image_id field will be unchanged (and should be NULL).
4344  */
4345 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4346 {
4347         int ret;
4348         size_t size;
4349         char *object_name;
4350         void *response;
4351         char *image_id;
4352
4353         /*
4354          * When probing a parent image, the image id is already
4355          * known (and the image name likely is not).  There's no
4356          * need to fetch the image id again in this case.  We
4357          * do still need to set the image format though.
4358          */
4359         if (rbd_dev->spec->image_id) {
4360                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4361
4362                 return 0;
4363         }
4364
4365         /*
4366          * First, see if the format 2 image id file exists, and if
4367          * so, get the image's persistent id from it.
4368          */
4369         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4370         object_name = kmalloc(size, GFP_NOIO);
4371         if (!object_name)
4372                 return -ENOMEM;
4373         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4374         dout("rbd id object name is %s\n", object_name);
4375
4376         /* Response will be an encoded string, which includes a length */
4377
4378         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4379         response = kzalloc(size, GFP_NOIO);
4380         if (!response) {
4381                 ret = -ENOMEM;
4382                 goto out;
4383         }
4384
4385         /* If it doesn't exist we'll assume it's a format 1 image */
4386
4387         ret = rbd_obj_method_sync(rbd_dev, object_name,
4388                                 "rbd", "get_id", NULL, 0,
4389                                 response, RBD_IMAGE_ID_LEN_MAX);
4390         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4391         if (ret == -ENOENT) {
4392                 image_id = kstrdup("", GFP_KERNEL);
4393                 ret = image_id ? 0 : -ENOMEM;
4394                 if (!ret)
4395                         rbd_dev->image_format = 1;
4396         } else if (ret > sizeof (__le32)) {
4397                 void *p = response;
4398
4399                 image_id = ceph_extract_encoded_string(&p, p + ret,
4400                                                 NULL, GFP_NOIO);
4401                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4402                 if (!ret)
4403                         rbd_dev->image_format = 2;
4404         } else {
4405                 ret = -EINVAL;
4406         }
4407
4408         if (!ret) {
4409                 rbd_dev->spec->image_id = image_id;
4410                 dout("image_id is %s\n", image_id);
4411         }
4412 out:
4413         kfree(response);
4414         kfree(object_name);
4415
4416         return ret;
4417 }
4418
4419 /* Undo whatever state changes are made by v1 or v2 image probe */
4420
4421 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4422 {
4423         struct rbd_image_header *header;
4424
4425         rbd_dev_remove_parent(rbd_dev);
4426         rbd_spec_put(rbd_dev->parent_spec);
4427         rbd_dev->parent_spec = NULL;
4428         rbd_dev->parent_overlap = 0;
4429
4430         /* Free dynamic fields from the header, then zero it out */
4431
4432         header = &rbd_dev->header;
4433         ceph_put_snap_context(header->snapc);
4434         kfree(header->snap_sizes);
4435         kfree(header->snap_names);
4436         kfree(header->object_prefix);
4437         memset(header, 0, sizeof (*header));
4438 }
4439
4440 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4441 {
4442         int ret;
4443
4444         /* Populate rbd image metadata */
4445
4446         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4447         if (ret < 0)
4448                 goto out_err;
4449
4450         /* Version 1 images have no parent (no layering) */
4451
4452         rbd_dev->parent_spec = NULL;
4453         rbd_dev->parent_overlap = 0;
4454
4455         dout("discovered version 1 image, header name is %s\n",
4456                 rbd_dev->header_name);
4457
4458         return 0;
4459
4460 out_err:
4461         kfree(rbd_dev->header_name);
4462         rbd_dev->header_name = NULL;
4463         kfree(rbd_dev->spec->image_id);
4464         rbd_dev->spec->image_id = NULL;
4465
4466         return ret;
4467 }
4468
4469 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4470 {
4471         int ret;
4472
4473         ret = rbd_dev_v2_image_size(rbd_dev);
4474         if (ret)
4475                 goto out_err;
4476
4477         /* Get the object prefix (a.k.a. block_name) for the image */
4478
4479         ret = rbd_dev_v2_object_prefix(rbd_dev);
4480         if (ret)
4481                 goto out_err;
4482
4483         /* Get the and check features for the image */
4484
4485         ret = rbd_dev_v2_features(rbd_dev);
4486         if (ret)
4487                 goto out_err;
4488
4489         /* If the image supports layering, get the parent info */
4490
4491         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4492                 ret = rbd_dev_v2_parent_info(rbd_dev);
4493                 if (ret)
4494                         goto out_err;
4495
4496                 /*
4497                  * Don't print a warning for parent images.  We can
4498                  * tell this point because we won't know its pool
4499                  * name yet (just its pool id).
4500                  */
4501                 if (rbd_dev->spec->pool_name)
4502                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4503                                         "is EXPERIMENTAL!");
4504         }
4505
4506         /* If the image supports fancy striping, get its parameters */
4507
4508         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4509                 ret = rbd_dev_v2_striping_info(rbd_dev);
4510                 if (ret < 0)
4511                         goto out_err;
4512         }
4513
4514         /* crypto and compression type aren't (yet) supported for v2 images */
4515
4516         rbd_dev->header.crypt_type = 0;
4517         rbd_dev->header.comp_type = 0;
4518
4519         /* Get the snapshot context, plus the header version */
4520
4521         ret = rbd_dev_v2_snap_context(rbd_dev);
4522         if (ret)
4523                 goto out_err;
4524
4525         dout("discovered version 2 image, header name is %s\n",
4526                 rbd_dev->header_name);
4527
4528         return 0;
4529 out_err:
4530         rbd_dev->parent_overlap = 0;
4531         rbd_spec_put(rbd_dev->parent_spec);
4532         rbd_dev->parent_spec = NULL;
4533         kfree(rbd_dev->header_name);
4534         rbd_dev->header_name = NULL;
4535         kfree(rbd_dev->header.object_prefix);
4536         rbd_dev->header.object_prefix = NULL;
4537
4538         return ret;
4539 }
4540
4541 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4542 {
4543         struct rbd_device *parent = NULL;
4544         struct rbd_spec *parent_spec;
4545         struct rbd_client *rbdc;
4546         int ret;
4547
4548         if (!rbd_dev->parent_spec)
4549                 return 0;
4550         /*
4551          * We need to pass a reference to the client and the parent
4552          * spec when creating the parent rbd_dev.  Images related by
4553          * parent/child relationships always share both.
4554          */
4555         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4556         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4557
4558         ret = -ENOMEM;
4559         parent = rbd_dev_create(rbdc, parent_spec);
4560         if (!parent)
4561                 goto out_err;
4562
4563         ret = rbd_dev_image_probe(parent);
4564         if (ret < 0)
4565                 goto out_err;
4566         rbd_dev->parent = parent;
4567
4568         return 0;
4569 out_err:
4570         if (parent) {
4571                 rbd_spec_put(rbd_dev->parent_spec);
4572                 kfree(rbd_dev->header_name);
4573                 rbd_dev_destroy(parent);
4574         } else {
4575                 rbd_put_client(rbdc);
4576                 rbd_spec_put(parent_spec);
4577         }
4578
4579         return ret;
4580 }
4581
4582 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4583 {
4584         int ret;
4585
4586         ret = rbd_dev_mapping_set(rbd_dev);
4587         if (ret)
4588                 return ret;
4589
4590         /* generate unique id: find highest unique id, add one */
4591         rbd_dev_id_get(rbd_dev);
4592
4593         /* Fill in the device name, now that we have its id. */
4594         BUILD_BUG_ON(DEV_NAME_LEN
4595                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4596         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4597
4598         /* Get our block major device number. */
4599
4600         ret = register_blkdev(0, rbd_dev->name);
4601         if (ret < 0)
4602                 goto err_out_id;
4603         rbd_dev->major = ret;
4604
4605         /* Set up the blkdev mapping. */
4606
4607         ret = rbd_init_disk(rbd_dev);
4608         if (ret)
4609                 goto err_out_blkdev;
4610
4611         ret = rbd_bus_add_dev(rbd_dev);
4612         if (ret)
4613                 goto err_out_disk;
4614
4615         /* Everything's ready.  Announce the disk to the world. */
4616
4617         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4618         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4619         add_disk(rbd_dev->disk);
4620
4621         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4622                 (unsigned long long) rbd_dev->mapping.size);
4623
4624         return ret;
4625
4626 err_out_disk:
4627         rbd_free_disk(rbd_dev);
4628 err_out_blkdev:
4629         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4630 err_out_id:
4631         rbd_dev_id_put(rbd_dev);
4632         rbd_dev_mapping_clear(rbd_dev);
4633
4634         return ret;
4635 }
4636
4637 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4638 {
4639         struct rbd_spec *spec = rbd_dev->spec;
4640         size_t size;
4641
4642         /* Record the header object name for this rbd image. */
4643
4644         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4645
4646         if (rbd_dev->image_format == 1)
4647                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4648         else
4649                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4650
4651         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4652         if (!rbd_dev->header_name)
4653                 return -ENOMEM;
4654
4655         if (rbd_dev->image_format == 1)
4656                 sprintf(rbd_dev->header_name, "%s%s",
4657                         spec->image_name, RBD_SUFFIX);
4658         else
4659                 sprintf(rbd_dev->header_name, "%s%s",
4660                         RBD_HEADER_PREFIX, spec->image_id);
4661         return 0;
4662 }
4663
4664 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4665 {
4666         int ret;
4667
4668         rbd_dev_unprobe(rbd_dev);
4669         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4670         if (ret)
4671                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4672         kfree(rbd_dev->header_name);
4673         rbd_dev->header_name = NULL;
4674         rbd_dev->image_format = 0;
4675         kfree(rbd_dev->spec->image_id);
4676         rbd_dev->spec->image_id = NULL;
4677
4678         rbd_dev_destroy(rbd_dev);
4679 }
4680
4681 /*
4682  * Probe for the existence of the header object for the given rbd
4683  * device.  For format 2 images this includes determining the image
4684  * id.
4685  */
4686 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4687 {
4688         int ret;
4689         int tmp;
4690
4691         /*
4692          * Get the id from the image id object.  If it's not a
4693          * format 2 image, we'll get ENOENT back, and we'll assume
4694          * it's a format 1 image.
4695          */
4696         ret = rbd_dev_image_id(rbd_dev);
4697         if (ret)
4698                 return ret;
4699         rbd_assert(rbd_dev->spec->image_id);
4700         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4701
4702         ret = rbd_dev_header_name(rbd_dev);
4703         if (ret)
4704                 goto err_out_format;
4705
4706         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4707         if (ret)
4708                 goto out_header_name;
4709
4710         if (rbd_dev->image_format == 1)
4711                 ret = rbd_dev_v1_probe(rbd_dev);
4712         else
4713                 ret = rbd_dev_v2_probe(rbd_dev);
4714         if (ret)
4715                 goto err_out_watch;
4716
4717         ret = rbd_dev_spec_update(rbd_dev);
4718         if (ret)
4719                 goto err_out_probe;
4720
4721         ret = rbd_dev_probe_parent(rbd_dev);
4722         if (!ret)
4723                 return 0;
4724
4725 err_out_probe:
4726         rbd_dev_unprobe(rbd_dev);
4727 err_out_watch:
4728         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4729         if (tmp)
4730                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4731 out_header_name:
4732         kfree(rbd_dev->header_name);
4733         rbd_dev->header_name = NULL;
4734 err_out_format:
4735         rbd_dev->image_format = 0;
4736         kfree(rbd_dev->spec->image_id);
4737         rbd_dev->spec->image_id = NULL;
4738
4739         dout("probe failed, returning %d\n", ret);
4740
4741         return ret;
4742 }
4743
4744 static ssize_t rbd_add(struct bus_type *bus,
4745                        const char *buf,
4746                        size_t count)
4747 {
4748         struct rbd_device *rbd_dev = NULL;
4749         struct ceph_options *ceph_opts = NULL;
4750         struct rbd_options *rbd_opts = NULL;
4751         struct rbd_spec *spec = NULL;
4752         struct rbd_client *rbdc;
4753         struct ceph_osd_client *osdc;
4754         int rc = -ENOMEM;
4755
4756         if (!try_module_get(THIS_MODULE))
4757                 return -ENODEV;
4758
4759         /* parse add command */
4760         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4761         if (rc < 0)
4762                 goto err_out_module;
4763
4764         rbdc = rbd_get_client(ceph_opts);
4765         if (IS_ERR(rbdc)) {
4766                 rc = PTR_ERR(rbdc);
4767                 goto err_out_args;
4768         }
4769         ceph_opts = NULL;       /* rbd_dev client now owns this */
4770
4771         /* pick the pool */
4772         osdc = &rbdc->client->osdc;
4773         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4774         if (rc < 0)
4775                 goto err_out_client;
4776         spec->pool_id = (u64)rc;
4777
4778         /* The ceph file layout needs to fit pool id in 32 bits */
4779
4780         if (spec->pool_id > (u64)U32_MAX) {
4781                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4782                                 (unsigned long long)spec->pool_id, U32_MAX);
4783                 rc = -EIO;
4784                 goto err_out_client;
4785         }
4786
4787         rbd_dev = rbd_dev_create(rbdc, spec);
4788         if (!rbd_dev)
4789                 goto err_out_client;
4790         rbdc = NULL;            /* rbd_dev now owns this */
4791         spec = NULL;            /* rbd_dev now owns this */
4792
4793         rbd_dev->mapping.read_only = rbd_opts->read_only;
4794         kfree(rbd_opts);
4795         rbd_opts = NULL;        /* done with this */
4796
4797         rc = rbd_dev_image_probe(rbd_dev);
4798         if (rc < 0)
4799                 goto err_out_rbd_dev;
4800
4801         rc = rbd_dev_device_setup(rbd_dev);
4802         if (!rc)
4803                 return count;
4804
4805         rbd_dev_image_release(rbd_dev);
4806 err_out_rbd_dev:
4807         rbd_dev_destroy(rbd_dev);
4808 err_out_client:
4809         rbd_put_client(rbdc);
4810 err_out_args:
4811         if (ceph_opts)
4812                 ceph_destroy_options(ceph_opts);
4813         kfree(rbd_opts);
4814         rbd_spec_put(spec);
4815 err_out_module:
4816         module_put(THIS_MODULE);
4817
4818         dout("Error adding device %s\n", buf);
4819
4820         return (ssize_t)rc;
4821 }
4822
4823 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4824 {
4825         struct list_head *tmp;
4826         struct rbd_device *rbd_dev;
4827
4828         spin_lock(&rbd_dev_list_lock);
4829         list_for_each(tmp, &rbd_dev_list) {
4830                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4831                 if (rbd_dev->dev_id == dev_id) {
4832                         spin_unlock(&rbd_dev_list_lock);
4833                         return rbd_dev;
4834                 }
4835         }
4836         spin_unlock(&rbd_dev_list_lock);
4837         return NULL;
4838 }
4839
4840 static void rbd_dev_device_release(struct device *dev)
4841 {
4842         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4843
4844         rbd_free_disk(rbd_dev);
4845         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4846         rbd_dev_clear_mapping(rbd_dev);
4847         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4848         rbd_dev->major = 0;
4849         rbd_dev_id_put(rbd_dev);
4850         rbd_dev_mapping_clear(rbd_dev);
4851 }
4852
4853 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4854 {
4855         while (rbd_dev->parent) {
4856                 struct rbd_device *first = rbd_dev;
4857                 struct rbd_device *second = first->parent;
4858                 struct rbd_device *third;
4859
4860                 /*
4861                  * Follow to the parent with no grandparent and
4862                  * remove it.
4863                  */
4864                 while (second && (third = second->parent)) {
4865                         first = second;
4866                         second = third;
4867                 }
4868                 rbd_assert(second);
4869                 rbd_dev_image_release(second);
4870                 first->parent = NULL;
4871                 first->parent_overlap = 0;
4872
4873                 rbd_assert(first->parent_spec);
4874                 rbd_spec_put(first->parent_spec);
4875                 first->parent_spec = NULL;
4876         }
4877 }
4878
4879 static ssize_t rbd_remove(struct bus_type *bus,
4880                           const char *buf,
4881                           size_t count)
4882 {
4883         struct rbd_device *rbd_dev = NULL;
4884         int target_id;
4885         unsigned long ul;
4886         int ret;
4887
4888         ret = strict_strtoul(buf, 10, &ul);
4889         if (ret)
4890                 return ret;
4891
4892         /* convert to int; abort if we lost anything in the conversion */
4893         target_id = (int) ul;
4894         if (target_id != ul)
4895                 return -EINVAL;
4896
4897         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4898
4899         rbd_dev = __rbd_get_dev(target_id);
4900         if (!rbd_dev) {
4901                 ret = -ENOENT;
4902                 goto done;
4903         }
4904
4905         spin_lock_irq(&rbd_dev->lock);
4906         if (rbd_dev->open_count)
4907                 ret = -EBUSY;
4908         else
4909                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4910         spin_unlock_irq(&rbd_dev->lock);
4911         if (ret < 0)
4912                 goto done;
4913         ret = count;
4914         rbd_bus_del_dev(rbd_dev);
4915         rbd_dev_image_release(rbd_dev);
4916         module_put(THIS_MODULE);
4917 done:
4918         mutex_unlock(&ctl_mutex);
4919
4920         return ret;
4921 }
4922
4923 /*
4924  * create control files in sysfs
4925  * /sys/bus/rbd/...
4926  */
4927 static int rbd_sysfs_init(void)
4928 {
4929         int ret;
4930
4931         ret = device_register(&rbd_root_dev);
4932         if (ret < 0)
4933                 return ret;
4934
4935         ret = bus_register(&rbd_bus_type);
4936         if (ret < 0)
4937                 device_unregister(&rbd_root_dev);
4938
4939         return ret;
4940 }
4941
4942 static void rbd_sysfs_cleanup(void)
4943 {
4944         bus_unregister(&rbd_bus_type);
4945         device_unregister(&rbd_root_dev);
4946 }
4947
4948 static int __init rbd_init(void)
4949 {
4950         int rc;
4951
4952         if (!libceph_compatible(NULL)) {
4953                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4954
4955                 return -EINVAL;
4956         }
4957         rc = rbd_sysfs_init();
4958         if (rc)
4959                 return rc;
4960         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4961         return 0;
4962 }
4963
4964 static void __exit rbd_exit(void)
4965 {
4966         rbd_sysfs_cleanup();
4967 }
4968
4969 module_init(rbd_init);
4970 module_exit(rbd_exit);
4971
4972 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4973 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4974 MODULE_DESCRIPTION("rados block device");
4975
4976 /* following authorship retained from original osdblk.c */
4977 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4978
4979 MODULE_LICENSE("GPL");