]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: use binary search for snapshot lookup
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43
44 #include "rbd_types.h"
45
46 #define RBD_DEBUG       /* Activate rbd_assert() calls */
47
48 /*
49  * The basic unit of block I/O is a sector.  It is interpreted in a
50  * number of contexts in Linux (blk, bio, genhd), but the default is
51  * universally 512 bytes.  These symbols are just slightly more
52  * meaningful than the bare numbers they represent.
53  */
54 #define SECTOR_SHIFT    9
55 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
56
57 #define RBD_DRV_NAME "rbd"
58 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
59
60 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
61
62 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
63 #define RBD_MAX_SNAP_NAME_LEN   \
64                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
65
66 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
71
72 /* This allows a single page to hold an image name sent by OSD */
73 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
74 #define RBD_IMAGE_ID_LEN_MAX    64
75
76 #define RBD_OBJ_PREFIX_LEN_MAX  64
77
78 /* Feature bits */
79
80 #define RBD_FEATURE_LAYERING    (1<<0)
81 #define RBD_FEATURE_STRIPINGV2  (1<<1)
82 #define RBD_FEATURES_ALL \
83             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 stripe_unit;
116         u64 stripe_count;
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 static int rbd_img_request_submit(struct rbd_img_request *img_request);
348
349 static void rbd_dev_device_release(struct device *dev);
350
351 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
352                        size_t count);
353 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
354                           size_t count);
355 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
356
357 static struct bus_attribute rbd_bus_attrs[] = {
358         __ATTR(add, S_IWUSR, NULL, rbd_add),
359         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
360         __ATTR_NULL
361 };
362
363 static struct bus_type rbd_bus_type = {
364         .name           = "rbd",
365         .bus_attrs      = rbd_bus_attrs,
366 };
367
368 static void rbd_root_dev_release(struct device *dev)
369 {
370 }
371
372 static struct device rbd_root_dev = {
373         .init_name =    "rbd",
374         .release =      rbd_root_dev_release,
375 };
376
377 static __printf(2, 3)
378 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
379 {
380         struct va_format vaf;
381         va_list args;
382
383         va_start(args, fmt);
384         vaf.fmt = fmt;
385         vaf.va = &args;
386
387         if (!rbd_dev)
388                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
389         else if (rbd_dev->disk)
390                 printk(KERN_WARNING "%s: %s: %pV\n",
391                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
392         else if (rbd_dev->spec && rbd_dev->spec->image_name)
393                 printk(KERN_WARNING "%s: image %s: %pV\n",
394                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
395         else if (rbd_dev->spec && rbd_dev->spec->image_id)
396                 printk(KERN_WARNING "%s: id %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
398         else    /* punt */
399                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
400                         RBD_DRV_NAME, rbd_dev, &vaf);
401         va_end(args);
402 }
403
404 #ifdef RBD_DEBUG
405 #define rbd_assert(expr)                                                \
406                 if (unlikely(!(expr))) {                                \
407                         printk(KERN_ERR "\nAssertion failure in %s() "  \
408                                                 "at line %d:\n\n"       \
409                                         "\trbd_assert(%s);\n\n",        \
410                                         __func__, __LINE__, #expr);     \
411                         BUG();                                          \
412                 }
413 #else /* !RBD_DEBUG */
414 #  define rbd_assert(expr)      ((void) 0)
415 #endif /* !RBD_DEBUG */
416
417 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
418 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
419 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
420
421 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
422 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
423 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
424                                         u64 snap_id);
425 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
426                                 u8 *order, u64 *snap_size);
427 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
428                 u64 *snap_features);
429 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
430
431 static int rbd_open(struct block_device *bdev, fmode_t mode)
432 {
433         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
434         bool removing = false;
435
436         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
437                 return -EROFS;
438
439         spin_lock_irq(&rbd_dev->lock);
440         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
441                 removing = true;
442         else
443                 rbd_dev->open_count++;
444         spin_unlock_irq(&rbd_dev->lock);
445         if (removing)
446                 return -ENOENT;
447
448         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
449         (void) get_device(&rbd_dev->dev);
450         set_device_ro(bdev, rbd_dev->mapping.read_only);
451         mutex_unlock(&ctl_mutex);
452
453         return 0;
454 }
455
456 static int rbd_release(struct gendisk *disk, fmode_t mode)
457 {
458         struct rbd_device *rbd_dev = disk->private_data;
459         unsigned long open_count_before;
460
461         spin_lock_irq(&rbd_dev->lock);
462         open_count_before = rbd_dev->open_count--;
463         spin_unlock_irq(&rbd_dev->lock);
464         rbd_assert(open_count_before > 0);
465
466         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
467         put_device(&rbd_dev->dev);
468         mutex_unlock(&ctl_mutex);
469
470         return 0;
471 }
472
473 static const struct block_device_operations rbd_bd_ops = {
474         .owner                  = THIS_MODULE,
475         .open                   = rbd_open,
476         .release                = rbd_release,
477 };
478
479 /*
480  * Initialize an rbd client instance.
481  * We own *ceph_opts.
482  */
483 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
484 {
485         struct rbd_client *rbdc;
486         int ret = -ENOMEM;
487
488         dout("%s:\n", __func__);
489         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
490         if (!rbdc)
491                 goto out_opt;
492
493         kref_init(&rbdc->kref);
494         INIT_LIST_HEAD(&rbdc->node);
495
496         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
497
498         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
499         if (IS_ERR(rbdc->client))
500                 goto out_mutex;
501         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
502
503         ret = ceph_open_session(rbdc->client);
504         if (ret < 0)
505                 goto out_err;
506
507         spin_lock(&rbd_client_list_lock);
508         list_add_tail(&rbdc->node, &rbd_client_list);
509         spin_unlock(&rbd_client_list_lock);
510
511         mutex_unlock(&ctl_mutex);
512         dout("%s: rbdc %p\n", __func__, rbdc);
513
514         return rbdc;
515
516 out_err:
517         ceph_destroy_client(rbdc->client);
518 out_mutex:
519         mutex_unlock(&ctl_mutex);
520         kfree(rbdc);
521 out_opt:
522         if (ceph_opts)
523                 ceph_destroy_options(ceph_opts);
524         dout("%s: error %d\n", __func__, ret);
525
526         return ERR_PTR(ret);
527 }
528
529 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
530 {
531         kref_get(&rbdc->kref);
532
533         return rbdc;
534 }
535
536 /*
537  * Find a ceph client with specific addr and configuration.  If
538  * found, bump its reference count.
539  */
540 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
541 {
542         struct rbd_client *client_node;
543         bool found = false;
544
545         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
546                 return NULL;
547
548         spin_lock(&rbd_client_list_lock);
549         list_for_each_entry(client_node, &rbd_client_list, node) {
550                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
551                         __rbd_get_client(client_node);
552
553                         found = true;
554                         break;
555                 }
556         }
557         spin_unlock(&rbd_client_list_lock);
558
559         return found ? client_node : NULL;
560 }
561
562 /*
563  * mount options
564  */
565 enum {
566         Opt_last_int,
567         /* int args above */
568         Opt_last_string,
569         /* string args above */
570         Opt_read_only,
571         Opt_read_write,
572         /* Boolean args above */
573         Opt_last_bool,
574 };
575
576 static match_table_t rbd_opts_tokens = {
577         /* int args above */
578         /* string args above */
579         {Opt_read_only, "read_only"},
580         {Opt_read_only, "ro"},          /* Alternate spelling */
581         {Opt_read_write, "read_write"},
582         {Opt_read_write, "rw"},         /* Alternate spelling */
583         /* Boolean args above */
584         {-1, NULL}
585 };
586
587 struct rbd_options {
588         bool    read_only;
589 };
590
591 #define RBD_READ_ONLY_DEFAULT   false
592
593 static int parse_rbd_opts_token(char *c, void *private)
594 {
595         struct rbd_options *rbd_opts = private;
596         substring_t argstr[MAX_OPT_ARGS];
597         int token, intval, ret;
598
599         token = match_token(c, rbd_opts_tokens, argstr);
600         if (token < 0)
601                 return -EINVAL;
602
603         if (token < Opt_last_int) {
604                 ret = match_int(&argstr[0], &intval);
605                 if (ret < 0) {
606                         pr_err("bad mount option arg (not int) "
607                                "at '%s'\n", c);
608                         return ret;
609                 }
610                 dout("got int token %d val %d\n", token, intval);
611         } else if (token > Opt_last_int && token < Opt_last_string) {
612                 dout("got string token %d val %s\n", token,
613                      argstr[0].from);
614         } else if (token > Opt_last_string && token < Opt_last_bool) {
615                 dout("got Boolean token %d\n", token);
616         } else {
617                 dout("got token %d\n", token);
618         }
619
620         switch (token) {
621         case Opt_read_only:
622                 rbd_opts->read_only = true;
623                 break;
624         case Opt_read_write:
625                 rbd_opts->read_only = false;
626                 break;
627         default:
628                 rbd_assert(false);
629                 break;
630         }
631         return 0;
632 }
633
634 /*
635  * Get a ceph client with specific addr and configuration, if one does
636  * not exist create it.
637  */
638 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
639 {
640         struct rbd_client *rbdc;
641
642         rbdc = rbd_client_find(ceph_opts);
643         if (rbdc)       /* using an existing client */
644                 ceph_destroy_options(ceph_opts);
645         else
646                 rbdc = rbd_client_create(ceph_opts);
647
648         return rbdc;
649 }
650
651 /*
652  * Destroy ceph client
653  *
654  * Caller must hold rbd_client_list_lock.
655  */
656 static void rbd_client_release(struct kref *kref)
657 {
658         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
659
660         dout("%s: rbdc %p\n", __func__, rbdc);
661         spin_lock(&rbd_client_list_lock);
662         list_del(&rbdc->node);
663         spin_unlock(&rbd_client_list_lock);
664
665         ceph_destroy_client(rbdc->client);
666         kfree(rbdc);
667 }
668
669 /*
670  * Drop reference to ceph client node. If it's not referenced anymore, release
671  * it.
672  */
673 static void rbd_put_client(struct rbd_client *rbdc)
674 {
675         if (rbdc)
676                 kref_put(&rbdc->kref, rbd_client_release);
677 }
678
679 static bool rbd_image_format_valid(u32 image_format)
680 {
681         return image_format == 1 || image_format == 2;
682 }
683
684 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
685 {
686         size_t size;
687         u32 snap_count;
688
689         /* The header has to start with the magic rbd header text */
690         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
691                 return false;
692
693         /* The bio layer requires at least sector-sized I/O */
694
695         if (ondisk->options.order < SECTOR_SHIFT)
696                 return false;
697
698         /* If we use u64 in a few spots we may be able to loosen this */
699
700         if (ondisk->options.order > 8 * sizeof (int) - 1)
701                 return false;
702
703         /*
704          * The size of a snapshot header has to fit in a size_t, and
705          * that limits the number of snapshots.
706          */
707         snap_count = le32_to_cpu(ondisk->snap_count);
708         size = SIZE_MAX - sizeof (struct ceph_snap_context);
709         if (snap_count > size / sizeof (__le64))
710                 return false;
711
712         /*
713          * Not only that, but the size of the entire the snapshot
714          * header must also be representable in a size_t.
715          */
716         size -= snap_count * sizeof (__le64);
717         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
718                 return false;
719
720         return true;
721 }
722
723 /*
724  * Create a new header structure, translate header format from the on-disk
725  * header.
726  */
727 static int rbd_header_from_disk(struct rbd_image_header *header,
728                                  struct rbd_image_header_ondisk *ondisk)
729 {
730         u32 snap_count;
731         size_t len;
732         size_t size;
733         u32 i;
734
735         memset(header, 0, sizeof (*header));
736
737         snap_count = le32_to_cpu(ondisk->snap_count);
738
739         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
740         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
741         if (!header->object_prefix)
742                 return -ENOMEM;
743         memcpy(header->object_prefix, ondisk->object_prefix, len);
744         header->object_prefix[len] = '\0';
745
746         if (snap_count) {
747                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
748
749                 /* Save a copy of the snapshot names */
750
751                 if (snap_names_len > (u64) SIZE_MAX)
752                         return -EIO;
753                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
754                 if (!header->snap_names)
755                         goto out_err;
756                 /*
757                  * Note that rbd_dev_v1_header_read() guarantees
758                  * the ondisk buffer we're working with has
759                  * snap_names_len bytes beyond the end of the
760                  * snapshot id array, this memcpy() is safe.
761                  */
762                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
763                         snap_names_len);
764
765                 /* Record each snapshot's size */
766
767                 size = snap_count * sizeof (*header->snap_sizes);
768                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
769                 if (!header->snap_sizes)
770                         goto out_err;
771                 for (i = 0; i < snap_count; i++)
772                         header->snap_sizes[i] =
773                                 le64_to_cpu(ondisk->snaps[i].image_size);
774         } else {
775                 header->snap_names = NULL;
776                 header->snap_sizes = NULL;
777         }
778
779         header->features = 0;   /* No features support in v1 images */
780         header->obj_order = ondisk->options.order;
781         header->crypt_type = ondisk->options.crypt_type;
782         header->comp_type = ondisk->options.comp_type;
783
784         /* Allocate and fill in the snapshot context */
785
786         header->image_size = le64_to_cpu(ondisk->image_size);
787
788         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
789         if (!header->snapc)
790                 goto out_err;
791         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
792         for (i = 0; i < snap_count; i++)
793                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
794
795         return 0;
796
797 out_err:
798         kfree(header->snap_sizes);
799         header->snap_sizes = NULL;
800         kfree(header->snap_names);
801         header->snap_names = NULL;
802         kfree(header->object_prefix);
803         header->object_prefix = NULL;
804
805         return -ENOMEM;
806 }
807
808 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
809 {
810         const char *snap_name;
811
812         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
813
814         /* Skip over names until we find the one we are looking for */
815
816         snap_name = rbd_dev->header.snap_names;
817         while (which--)
818                 snap_name += strlen(snap_name) + 1;
819
820         return kstrdup(snap_name, GFP_KERNEL);
821 }
822
823 /*
824  * Snapshot id comparison function for use with qsort()/bsearch().
825  * Note that result is for snapshots in *descending* order.
826  */
827 static int snapid_compare_reverse(const void *s1, const void *s2)
828 {
829         u64 snap_id1 = *(u64 *)s1;
830         u64 snap_id2 = *(u64 *)s2;
831
832         if (snap_id1 < snap_id2)
833                 return 1;
834         return snap_id1 == snap_id2 ? 0 : -1;
835 }
836
837 /*
838  * Search a snapshot context to see if the given snapshot id is
839  * present.
840  *
841  * Returns the position of the snapshot id in the array if it's found,
842  * or BAD_SNAP_INDEX otherwise.
843  *
844  * Note: The snapshot array is in kept sorted (by the osd) in
845  * reverse order, highest snapshot id first.
846  */
847 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
848 {
849         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
850         u64 *found;
851
852         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
853                                 sizeof (snap_id), snapid_compare_reverse);
854
855         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
856 }
857
858 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
859                                         u64 snap_id)
860 {
861         u32 which;
862
863         which = rbd_dev_snap_index(rbd_dev, snap_id);
864         if (which == BAD_SNAP_INDEX)
865                 return NULL;
866
867         return _rbd_dev_v1_snap_name(rbd_dev, which);
868 }
869
870 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
871 {
872         if (snap_id == CEPH_NOSNAP)
873                 return RBD_SNAP_HEAD_NAME;
874
875         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
876         if (rbd_dev->image_format == 1)
877                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
878
879         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
880 }
881
882 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
883                                 u64 *snap_size)
884 {
885         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
886         if (snap_id == CEPH_NOSNAP) {
887                 *snap_size = rbd_dev->header.image_size;
888         } else if (rbd_dev->image_format == 1) {
889                 u32 which;
890
891                 which = rbd_dev_snap_index(rbd_dev, snap_id);
892                 if (which == BAD_SNAP_INDEX)
893                         return -ENOENT;
894
895                 *snap_size = rbd_dev->header.snap_sizes[which];
896         } else {
897                 u64 size = 0;
898                 int ret;
899
900                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
901                 if (ret)
902                         return ret;
903
904                 *snap_size = size;
905         }
906         return 0;
907 }
908
909 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
910                         u64 *snap_features)
911 {
912         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
913         if (snap_id == CEPH_NOSNAP) {
914                 *snap_features = rbd_dev->header.features;
915         } else if (rbd_dev->image_format == 1) {
916                 *snap_features = 0;     /* No features for format 1 */
917         } else {
918                 u64 features = 0;
919                 int ret;
920
921                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
922                 if (ret)
923                         return ret;
924
925                 *snap_features = features;
926         }
927         return 0;
928 }
929
930 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
931 {
932         const char *snap_name = rbd_dev->spec->snap_name;
933         u64 snap_id;
934         u64 size = 0;
935         u64 features = 0;
936         int ret;
937
938         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
939                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
940                 if (snap_id == CEPH_NOSNAP)
941                         return -ENOENT;
942         } else {
943                 snap_id = CEPH_NOSNAP;
944         }
945
946         ret = rbd_snap_size(rbd_dev, snap_id, &size);
947         if (ret)
948                 return ret;
949         ret = rbd_snap_features(rbd_dev, snap_id, &features);
950         if (ret)
951                 return ret;
952
953         rbd_dev->mapping.size = size;
954         rbd_dev->mapping.features = features;
955
956         /* If we are mapping a snapshot it must be marked read-only */
957
958         if (snap_id != CEPH_NOSNAP)
959                 rbd_dev->mapping.read_only = true;
960
961         return 0;
962 }
963
964 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
965 {
966         rbd_dev->mapping.size = 0;
967         rbd_dev->mapping.features = 0;
968         rbd_dev->mapping.read_only = true;
969 }
970
971 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
972 {
973         rbd_dev->mapping.size = 0;
974         rbd_dev->mapping.features = 0;
975         rbd_dev->mapping.read_only = true;
976 }
977
978 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
979 {
980         char *name;
981         u64 segment;
982         int ret;
983
984         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
985         if (!name)
986                 return NULL;
987         segment = offset >> rbd_dev->header.obj_order;
988         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
989                         rbd_dev->header.object_prefix, segment);
990         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
991                 pr_err("error formatting segment name for #%llu (%d)\n",
992                         segment, ret);
993                 kfree(name);
994                 name = NULL;
995         }
996
997         return name;
998 }
999
1000 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1003
1004         return offset & (segment_size - 1);
1005 }
1006
1007 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1008                                 u64 offset, u64 length)
1009 {
1010         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1011
1012         offset &= segment_size - 1;
1013
1014         rbd_assert(length <= U64_MAX - offset);
1015         if (offset + length > segment_size)
1016                 length = segment_size - offset;
1017
1018         return length;
1019 }
1020
1021 /*
1022  * returns the size of an object in the image
1023  */
1024 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1025 {
1026         return 1 << header->obj_order;
1027 }
1028
1029 /*
1030  * bio helpers
1031  */
1032
1033 static void bio_chain_put(struct bio *chain)
1034 {
1035         struct bio *tmp;
1036
1037         while (chain) {
1038                 tmp = chain;
1039                 chain = chain->bi_next;
1040                 bio_put(tmp);
1041         }
1042 }
1043
1044 /*
1045  * zeros a bio chain, starting at specific offset
1046  */
1047 static void zero_bio_chain(struct bio *chain, int start_ofs)
1048 {
1049         struct bio_vec *bv;
1050         unsigned long flags;
1051         void *buf;
1052         int i;
1053         int pos = 0;
1054
1055         while (chain) {
1056                 bio_for_each_segment(bv, chain, i) {
1057                         if (pos + bv->bv_len > start_ofs) {
1058                                 int remainder = max(start_ofs - pos, 0);
1059                                 buf = bvec_kmap_irq(bv, &flags);
1060                                 memset(buf + remainder, 0,
1061                                        bv->bv_len - remainder);
1062                                 bvec_kunmap_irq(buf, &flags);
1063                         }
1064                         pos += bv->bv_len;
1065                 }
1066
1067                 chain = chain->bi_next;
1068         }
1069 }
1070
1071 /*
1072  * similar to zero_bio_chain(), zeros data defined by a page array,
1073  * starting at the given byte offset from the start of the array and
1074  * continuing up to the given end offset.  The pages array is
1075  * assumed to be big enough to hold all bytes up to the end.
1076  */
1077 static void zero_pages(struct page **pages, u64 offset, u64 end)
1078 {
1079         struct page **page = &pages[offset >> PAGE_SHIFT];
1080
1081         rbd_assert(end > offset);
1082         rbd_assert(end - offset <= (u64)SIZE_MAX);
1083         while (offset < end) {
1084                 size_t page_offset;
1085                 size_t length;
1086                 unsigned long flags;
1087                 void *kaddr;
1088
1089                 page_offset = (size_t)(offset & ~PAGE_MASK);
1090                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1091                 local_irq_save(flags);
1092                 kaddr = kmap_atomic(*page);
1093                 memset(kaddr + page_offset, 0, length);
1094                 kunmap_atomic(kaddr);
1095                 local_irq_restore(flags);
1096
1097                 offset += length;
1098                 page++;
1099         }
1100 }
1101
1102 /*
1103  * Clone a portion of a bio, starting at the given byte offset
1104  * and continuing for the number of bytes indicated.
1105  */
1106 static struct bio *bio_clone_range(struct bio *bio_src,
1107                                         unsigned int offset,
1108                                         unsigned int len,
1109                                         gfp_t gfpmask)
1110 {
1111         struct bio_vec *bv;
1112         unsigned int resid;
1113         unsigned short idx;
1114         unsigned int voff;
1115         unsigned short end_idx;
1116         unsigned short vcnt;
1117         struct bio *bio;
1118
1119         /* Handle the easy case for the caller */
1120
1121         if (!offset && len == bio_src->bi_size)
1122                 return bio_clone(bio_src, gfpmask);
1123
1124         if (WARN_ON_ONCE(!len))
1125                 return NULL;
1126         if (WARN_ON_ONCE(len > bio_src->bi_size))
1127                 return NULL;
1128         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1129                 return NULL;
1130
1131         /* Find first affected segment... */
1132
1133         resid = offset;
1134         __bio_for_each_segment(bv, bio_src, idx, 0) {
1135                 if (resid < bv->bv_len)
1136                         break;
1137                 resid -= bv->bv_len;
1138         }
1139         voff = resid;
1140
1141         /* ...and the last affected segment */
1142
1143         resid += len;
1144         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1145                 if (resid <= bv->bv_len)
1146                         break;
1147                 resid -= bv->bv_len;
1148         }
1149         vcnt = end_idx - idx + 1;
1150
1151         /* Build the clone */
1152
1153         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1154         if (!bio)
1155                 return NULL;    /* ENOMEM */
1156
1157         bio->bi_bdev = bio_src->bi_bdev;
1158         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1159         bio->bi_rw = bio_src->bi_rw;
1160         bio->bi_flags |= 1 << BIO_CLONED;
1161
1162         /*
1163          * Copy over our part of the bio_vec, then update the first
1164          * and last (or only) entries.
1165          */
1166         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1167                         vcnt * sizeof (struct bio_vec));
1168         bio->bi_io_vec[0].bv_offset += voff;
1169         if (vcnt > 1) {
1170                 bio->bi_io_vec[0].bv_len -= voff;
1171                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1172         } else {
1173                 bio->bi_io_vec[0].bv_len = len;
1174         }
1175
1176         bio->bi_vcnt = vcnt;
1177         bio->bi_size = len;
1178         bio->bi_idx = 0;
1179
1180         return bio;
1181 }
1182
1183 /*
1184  * Clone a portion of a bio chain, starting at the given byte offset
1185  * into the first bio in the source chain and continuing for the
1186  * number of bytes indicated.  The result is another bio chain of
1187  * exactly the given length, or a null pointer on error.
1188  *
1189  * The bio_src and offset parameters are both in-out.  On entry they
1190  * refer to the first source bio and the offset into that bio where
1191  * the start of data to be cloned is located.
1192  *
1193  * On return, bio_src is updated to refer to the bio in the source
1194  * chain that contains first un-cloned byte, and *offset will
1195  * contain the offset of that byte within that bio.
1196  */
1197 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1198                                         unsigned int *offset,
1199                                         unsigned int len,
1200                                         gfp_t gfpmask)
1201 {
1202         struct bio *bi = *bio_src;
1203         unsigned int off = *offset;
1204         struct bio *chain = NULL;
1205         struct bio **end;
1206
1207         /* Build up a chain of clone bios up to the limit */
1208
1209         if (!bi || off >= bi->bi_size || !len)
1210                 return NULL;            /* Nothing to clone */
1211
1212         end = &chain;
1213         while (len) {
1214                 unsigned int bi_size;
1215                 struct bio *bio;
1216
1217                 if (!bi) {
1218                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1219                         goto out_err;   /* EINVAL; ran out of bio's */
1220                 }
1221                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1222                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1223                 if (!bio)
1224                         goto out_err;   /* ENOMEM */
1225
1226                 *end = bio;
1227                 end = &bio->bi_next;
1228
1229                 off += bi_size;
1230                 if (off == bi->bi_size) {
1231                         bi = bi->bi_next;
1232                         off = 0;
1233                 }
1234                 len -= bi_size;
1235         }
1236         *bio_src = bi;
1237         *offset = off;
1238
1239         return chain;
1240 out_err:
1241         bio_chain_put(chain);
1242
1243         return NULL;
1244 }
1245
1246 /*
1247  * The default/initial value for all object request flags is 0.  For
1248  * each flag, once its value is set to 1 it is never reset to 0
1249  * again.
1250  */
1251 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1252 {
1253         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1254                 struct rbd_device *rbd_dev;
1255
1256                 rbd_dev = obj_request->img_request->rbd_dev;
1257                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1258                         obj_request);
1259         }
1260 }
1261
1262 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1263 {
1264         smp_mb();
1265         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1266 }
1267
1268 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1269 {
1270         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1271                 struct rbd_device *rbd_dev = NULL;
1272
1273                 if (obj_request_img_data_test(obj_request))
1274                         rbd_dev = obj_request->img_request->rbd_dev;
1275                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1276                         obj_request);
1277         }
1278 }
1279
1280 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1281 {
1282         smp_mb();
1283         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1284 }
1285
1286 /*
1287  * This sets the KNOWN flag after (possibly) setting the EXISTS
1288  * flag.  The latter is set based on the "exists" value provided.
1289  *
1290  * Note that for our purposes once an object exists it never goes
1291  * away again.  It's possible that the response from two existence
1292  * checks are separated by the creation of the target object, and
1293  * the first ("doesn't exist") response arrives *after* the second
1294  * ("does exist").  In that case we ignore the second one.
1295  */
1296 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1297                                 bool exists)
1298 {
1299         if (exists)
1300                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1301         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1302         smp_mb();
1303 }
1304
1305 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1306 {
1307         smp_mb();
1308         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1309 }
1310
1311 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1312 {
1313         smp_mb();
1314         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1315 }
1316
1317 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1318 {
1319         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1320                 atomic_read(&obj_request->kref.refcount));
1321         kref_get(&obj_request->kref);
1322 }
1323
1324 static void rbd_obj_request_destroy(struct kref *kref);
1325 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1326 {
1327         rbd_assert(obj_request != NULL);
1328         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1329                 atomic_read(&obj_request->kref.refcount));
1330         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1331 }
1332
1333 static void rbd_img_request_get(struct rbd_img_request *img_request)
1334 {
1335         dout("%s: img %p (was %d)\n", __func__, img_request,
1336                 atomic_read(&img_request->kref.refcount));
1337         kref_get(&img_request->kref);
1338 }
1339
1340 static void rbd_img_request_destroy(struct kref *kref);
1341 static void rbd_img_request_put(struct rbd_img_request *img_request)
1342 {
1343         rbd_assert(img_request != NULL);
1344         dout("%s: img %p (was %d)\n", __func__, img_request,
1345                 atomic_read(&img_request->kref.refcount));
1346         kref_put(&img_request->kref, rbd_img_request_destroy);
1347 }
1348
1349 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1350                                         struct rbd_obj_request *obj_request)
1351 {
1352         rbd_assert(obj_request->img_request == NULL);
1353
1354         /* Image request now owns object's original reference */
1355         obj_request->img_request = img_request;
1356         obj_request->which = img_request->obj_request_count;
1357         rbd_assert(!obj_request_img_data_test(obj_request));
1358         obj_request_img_data_set(obj_request);
1359         rbd_assert(obj_request->which != BAD_WHICH);
1360         img_request->obj_request_count++;
1361         list_add_tail(&obj_request->links, &img_request->obj_requests);
1362         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1363                 obj_request->which);
1364 }
1365
1366 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1367                                         struct rbd_obj_request *obj_request)
1368 {
1369         rbd_assert(obj_request->which != BAD_WHICH);
1370
1371         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1372                 obj_request->which);
1373         list_del(&obj_request->links);
1374         rbd_assert(img_request->obj_request_count > 0);
1375         img_request->obj_request_count--;
1376         rbd_assert(obj_request->which == img_request->obj_request_count);
1377         obj_request->which = BAD_WHICH;
1378         rbd_assert(obj_request_img_data_test(obj_request));
1379         rbd_assert(obj_request->img_request == img_request);
1380         obj_request->img_request = NULL;
1381         obj_request->callback = NULL;
1382         rbd_obj_request_put(obj_request);
1383 }
1384
1385 static bool obj_request_type_valid(enum obj_request_type type)
1386 {
1387         switch (type) {
1388         case OBJ_REQUEST_NODATA:
1389         case OBJ_REQUEST_BIO:
1390         case OBJ_REQUEST_PAGES:
1391                 return true;
1392         default:
1393                 return false;
1394         }
1395 }
1396
1397 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1398                                 struct rbd_obj_request *obj_request)
1399 {
1400         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1401
1402         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1403 }
1404
1405 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1406 {
1407
1408         dout("%s: img %p\n", __func__, img_request);
1409
1410         /*
1411          * If no error occurred, compute the aggregate transfer
1412          * count for the image request.  We could instead use
1413          * atomic64_cmpxchg() to update it as each object request
1414          * completes; not clear which way is better off hand.
1415          */
1416         if (!img_request->result) {
1417                 struct rbd_obj_request *obj_request;
1418                 u64 xferred = 0;
1419
1420                 for_each_obj_request(img_request, obj_request)
1421                         xferred += obj_request->xferred;
1422                 img_request->xferred = xferred;
1423         }
1424
1425         if (img_request->callback)
1426                 img_request->callback(img_request);
1427         else
1428                 rbd_img_request_put(img_request);
1429 }
1430
1431 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1432
1433 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1434 {
1435         dout("%s: obj %p\n", __func__, obj_request);
1436
1437         return wait_for_completion_interruptible(&obj_request->completion);
1438 }
1439
1440 /*
1441  * The default/initial value for all image request flags is 0.  Each
1442  * is conditionally set to 1 at image request initialization time
1443  * and currently never change thereafter.
1444  */
1445 static void img_request_write_set(struct rbd_img_request *img_request)
1446 {
1447         set_bit(IMG_REQ_WRITE, &img_request->flags);
1448         smp_mb();
1449 }
1450
1451 static bool img_request_write_test(struct rbd_img_request *img_request)
1452 {
1453         smp_mb();
1454         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1455 }
1456
1457 static void img_request_child_set(struct rbd_img_request *img_request)
1458 {
1459         set_bit(IMG_REQ_CHILD, &img_request->flags);
1460         smp_mb();
1461 }
1462
1463 static bool img_request_child_test(struct rbd_img_request *img_request)
1464 {
1465         smp_mb();
1466         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1467 }
1468
1469 static void img_request_layered_set(struct rbd_img_request *img_request)
1470 {
1471         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1472         smp_mb();
1473 }
1474
1475 static bool img_request_layered_test(struct rbd_img_request *img_request)
1476 {
1477         smp_mb();
1478         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1479 }
1480
1481 static void
1482 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1483 {
1484         u64 xferred = obj_request->xferred;
1485         u64 length = obj_request->length;
1486
1487         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1488                 obj_request, obj_request->img_request, obj_request->result,
1489                 xferred, length);
1490         /*
1491          * ENOENT means a hole in the image.  We zero-fill the
1492          * entire length of the request.  A short read also implies
1493          * zero-fill to the end of the request.  Either way we
1494          * update the xferred count to indicate the whole request
1495          * was satisfied.
1496          */
1497         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1498         if (obj_request->result == -ENOENT) {
1499                 if (obj_request->type == OBJ_REQUEST_BIO)
1500                         zero_bio_chain(obj_request->bio_list, 0);
1501                 else
1502                         zero_pages(obj_request->pages, 0, length);
1503                 obj_request->result = 0;
1504                 obj_request->xferred = length;
1505         } else if (xferred < length && !obj_request->result) {
1506                 if (obj_request->type == OBJ_REQUEST_BIO)
1507                         zero_bio_chain(obj_request->bio_list, xferred);
1508                 else
1509                         zero_pages(obj_request->pages, xferred, length);
1510                 obj_request->xferred = length;
1511         }
1512         obj_request_done_set(obj_request);
1513 }
1514
1515 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1516 {
1517         dout("%s: obj %p cb %p\n", __func__, obj_request,
1518                 obj_request->callback);
1519         if (obj_request->callback)
1520                 obj_request->callback(obj_request);
1521         else
1522                 complete_all(&obj_request->completion);
1523 }
1524
1525 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1526 {
1527         dout("%s: obj %p\n", __func__, obj_request);
1528         obj_request_done_set(obj_request);
1529 }
1530
1531 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1532 {
1533         struct rbd_img_request *img_request = NULL;
1534         struct rbd_device *rbd_dev = NULL;
1535         bool layered = false;
1536
1537         if (obj_request_img_data_test(obj_request)) {
1538                 img_request = obj_request->img_request;
1539                 layered = img_request && img_request_layered_test(img_request);
1540                 rbd_dev = img_request->rbd_dev;
1541         }
1542
1543         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1544                 obj_request, img_request, obj_request->result,
1545                 obj_request->xferred, obj_request->length);
1546         if (layered && obj_request->result == -ENOENT &&
1547                         obj_request->img_offset < rbd_dev->parent_overlap)
1548                 rbd_img_parent_read(obj_request);
1549         else if (img_request)
1550                 rbd_img_obj_request_read_callback(obj_request);
1551         else
1552                 obj_request_done_set(obj_request);
1553 }
1554
1555 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1556 {
1557         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1558                 obj_request->result, obj_request->length);
1559         /*
1560          * There is no such thing as a successful short write.  Set
1561          * it to our originally-requested length.
1562          */
1563         obj_request->xferred = obj_request->length;
1564         obj_request_done_set(obj_request);
1565 }
1566
1567 /*
1568  * For a simple stat call there's nothing to do.  We'll do more if
1569  * this is part of a write sequence for a layered image.
1570  */
1571 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1572 {
1573         dout("%s: obj %p\n", __func__, obj_request);
1574         obj_request_done_set(obj_request);
1575 }
1576
1577 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1578                                 struct ceph_msg *msg)
1579 {
1580         struct rbd_obj_request *obj_request = osd_req->r_priv;
1581         u16 opcode;
1582
1583         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1584         rbd_assert(osd_req == obj_request->osd_req);
1585         if (obj_request_img_data_test(obj_request)) {
1586                 rbd_assert(obj_request->img_request);
1587                 rbd_assert(obj_request->which != BAD_WHICH);
1588         } else {
1589                 rbd_assert(obj_request->which == BAD_WHICH);
1590         }
1591
1592         if (osd_req->r_result < 0)
1593                 obj_request->result = osd_req->r_result;
1594
1595         BUG_ON(osd_req->r_num_ops > 2);
1596
1597         /*
1598          * We support a 64-bit length, but ultimately it has to be
1599          * passed to blk_end_request(), which takes an unsigned int.
1600          */
1601         obj_request->xferred = osd_req->r_reply_op_len[0];
1602         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1603         opcode = osd_req->r_ops[0].op;
1604         switch (opcode) {
1605         case CEPH_OSD_OP_READ:
1606                 rbd_osd_read_callback(obj_request);
1607                 break;
1608         case CEPH_OSD_OP_WRITE:
1609                 rbd_osd_write_callback(obj_request);
1610                 break;
1611         case CEPH_OSD_OP_STAT:
1612                 rbd_osd_stat_callback(obj_request);
1613                 break;
1614         case CEPH_OSD_OP_CALL:
1615         case CEPH_OSD_OP_NOTIFY_ACK:
1616         case CEPH_OSD_OP_WATCH:
1617                 rbd_osd_trivial_callback(obj_request);
1618                 break;
1619         default:
1620                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1621                         obj_request->object_name, (unsigned short) opcode);
1622                 break;
1623         }
1624
1625         if (obj_request_done_test(obj_request))
1626                 rbd_obj_request_complete(obj_request);
1627 }
1628
1629 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1630 {
1631         struct rbd_img_request *img_request = obj_request->img_request;
1632         struct ceph_osd_request *osd_req = obj_request->osd_req;
1633         u64 snap_id;
1634
1635         rbd_assert(osd_req != NULL);
1636
1637         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1638         ceph_osdc_build_request(osd_req, obj_request->offset,
1639                         NULL, snap_id, NULL);
1640 }
1641
1642 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1643 {
1644         struct rbd_img_request *img_request = obj_request->img_request;
1645         struct ceph_osd_request *osd_req = obj_request->osd_req;
1646         struct ceph_snap_context *snapc;
1647         struct timespec mtime = CURRENT_TIME;
1648
1649         rbd_assert(osd_req != NULL);
1650
1651         snapc = img_request ? img_request->snapc : NULL;
1652         ceph_osdc_build_request(osd_req, obj_request->offset,
1653                         snapc, CEPH_NOSNAP, &mtime);
1654 }
1655
1656 static struct ceph_osd_request *rbd_osd_req_create(
1657                                         struct rbd_device *rbd_dev,
1658                                         bool write_request,
1659                                         struct rbd_obj_request *obj_request)
1660 {
1661         struct ceph_snap_context *snapc = NULL;
1662         struct ceph_osd_client *osdc;
1663         struct ceph_osd_request *osd_req;
1664
1665         if (obj_request_img_data_test(obj_request)) {
1666                 struct rbd_img_request *img_request = obj_request->img_request;
1667
1668                 rbd_assert(write_request ==
1669                                 img_request_write_test(img_request));
1670                 if (write_request)
1671                         snapc = img_request->snapc;
1672         }
1673
1674         /* Allocate and initialize the request, for the single op */
1675
1676         osdc = &rbd_dev->rbd_client->client->osdc;
1677         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1678         if (!osd_req)
1679                 return NULL;    /* ENOMEM */
1680
1681         if (write_request)
1682                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1683         else
1684                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1685
1686         osd_req->r_callback = rbd_osd_req_callback;
1687         osd_req->r_priv = obj_request;
1688
1689         osd_req->r_oid_len = strlen(obj_request->object_name);
1690         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1691         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1692
1693         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1694
1695         return osd_req;
1696 }
1697
1698 /*
1699  * Create a copyup osd request based on the information in the
1700  * object request supplied.  A copyup request has two osd ops,
1701  * a copyup method call, and a "normal" write request.
1702  */
1703 static struct ceph_osd_request *
1704 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1705 {
1706         struct rbd_img_request *img_request;
1707         struct ceph_snap_context *snapc;
1708         struct rbd_device *rbd_dev;
1709         struct ceph_osd_client *osdc;
1710         struct ceph_osd_request *osd_req;
1711
1712         rbd_assert(obj_request_img_data_test(obj_request));
1713         img_request = obj_request->img_request;
1714         rbd_assert(img_request);
1715         rbd_assert(img_request_write_test(img_request));
1716
1717         /* Allocate and initialize the request, for the two ops */
1718
1719         snapc = img_request->snapc;
1720         rbd_dev = img_request->rbd_dev;
1721         osdc = &rbd_dev->rbd_client->client->osdc;
1722         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1723         if (!osd_req)
1724                 return NULL;    /* ENOMEM */
1725
1726         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1727         osd_req->r_callback = rbd_osd_req_callback;
1728         osd_req->r_priv = obj_request;
1729
1730         osd_req->r_oid_len = strlen(obj_request->object_name);
1731         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1732         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1733
1734         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1735
1736         return osd_req;
1737 }
1738
1739
1740 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1741 {
1742         ceph_osdc_put_request(osd_req);
1743 }
1744
1745 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1746
1747 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1748                                                 u64 offset, u64 length,
1749                                                 enum obj_request_type type)
1750 {
1751         struct rbd_obj_request *obj_request;
1752         size_t size;
1753         char *name;
1754
1755         rbd_assert(obj_request_type_valid(type));
1756
1757         size = strlen(object_name) + 1;
1758         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1759         if (!obj_request)
1760                 return NULL;
1761
1762         name = (char *)(obj_request + 1);
1763         obj_request->object_name = memcpy(name, object_name, size);
1764         obj_request->offset = offset;
1765         obj_request->length = length;
1766         obj_request->flags = 0;
1767         obj_request->which = BAD_WHICH;
1768         obj_request->type = type;
1769         INIT_LIST_HEAD(&obj_request->links);
1770         init_completion(&obj_request->completion);
1771         kref_init(&obj_request->kref);
1772
1773         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1774                 offset, length, (int)type, obj_request);
1775
1776         return obj_request;
1777 }
1778
1779 static void rbd_obj_request_destroy(struct kref *kref)
1780 {
1781         struct rbd_obj_request *obj_request;
1782
1783         obj_request = container_of(kref, struct rbd_obj_request, kref);
1784
1785         dout("%s: obj %p\n", __func__, obj_request);
1786
1787         rbd_assert(obj_request->img_request == NULL);
1788         rbd_assert(obj_request->which == BAD_WHICH);
1789
1790         if (obj_request->osd_req)
1791                 rbd_osd_req_destroy(obj_request->osd_req);
1792
1793         rbd_assert(obj_request_type_valid(obj_request->type));
1794         switch (obj_request->type) {
1795         case OBJ_REQUEST_NODATA:
1796                 break;          /* Nothing to do */
1797         case OBJ_REQUEST_BIO:
1798                 if (obj_request->bio_list)
1799                         bio_chain_put(obj_request->bio_list);
1800                 break;
1801         case OBJ_REQUEST_PAGES:
1802                 if (obj_request->pages)
1803                         ceph_release_page_vector(obj_request->pages,
1804                                                 obj_request->page_count);
1805                 break;
1806         }
1807
1808         kfree(obj_request);
1809 }
1810
1811 /*
1812  * Caller is responsible for filling in the list of object requests
1813  * that comprises the image request, and the Linux request pointer
1814  * (if there is one).
1815  */
1816 static struct rbd_img_request *rbd_img_request_create(
1817                                         struct rbd_device *rbd_dev,
1818                                         u64 offset, u64 length,
1819                                         bool write_request,
1820                                         bool child_request)
1821 {
1822         struct rbd_img_request *img_request;
1823
1824         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1825         if (!img_request)
1826                 return NULL;
1827
1828         if (write_request) {
1829                 down_read(&rbd_dev->header_rwsem);
1830                 ceph_get_snap_context(rbd_dev->header.snapc);
1831                 up_read(&rbd_dev->header_rwsem);
1832         }
1833
1834         img_request->rq = NULL;
1835         img_request->rbd_dev = rbd_dev;
1836         img_request->offset = offset;
1837         img_request->length = length;
1838         img_request->flags = 0;
1839         if (write_request) {
1840                 img_request_write_set(img_request);
1841                 img_request->snapc = rbd_dev->header.snapc;
1842         } else {
1843                 img_request->snap_id = rbd_dev->spec->snap_id;
1844         }
1845         if (child_request)
1846                 img_request_child_set(img_request);
1847         if (rbd_dev->parent_spec)
1848                 img_request_layered_set(img_request);
1849         spin_lock_init(&img_request->completion_lock);
1850         img_request->next_completion = 0;
1851         img_request->callback = NULL;
1852         img_request->result = 0;
1853         img_request->obj_request_count = 0;
1854         INIT_LIST_HEAD(&img_request->obj_requests);
1855         kref_init(&img_request->kref);
1856
1857         rbd_img_request_get(img_request);       /* Avoid a warning */
1858         rbd_img_request_put(img_request);       /* TEMPORARY */
1859
1860         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1861                 write_request ? "write" : "read", offset, length,
1862                 img_request);
1863
1864         return img_request;
1865 }
1866
1867 static void rbd_img_request_destroy(struct kref *kref)
1868 {
1869         struct rbd_img_request *img_request;
1870         struct rbd_obj_request *obj_request;
1871         struct rbd_obj_request *next_obj_request;
1872
1873         img_request = container_of(kref, struct rbd_img_request, kref);
1874
1875         dout("%s: img %p\n", __func__, img_request);
1876
1877         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1878                 rbd_img_obj_request_del(img_request, obj_request);
1879         rbd_assert(img_request->obj_request_count == 0);
1880
1881         if (img_request_write_test(img_request))
1882                 ceph_put_snap_context(img_request->snapc);
1883
1884         if (img_request_child_test(img_request))
1885                 rbd_obj_request_put(img_request->obj_request);
1886
1887         kfree(img_request);
1888 }
1889
1890 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1891 {
1892         struct rbd_img_request *img_request;
1893         unsigned int xferred;
1894         int result;
1895         bool more;
1896
1897         rbd_assert(obj_request_img_data_test(obj_request));
1898         img_request = obj_request->img_request;
1899
1900         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1901         xferred = (unsigned int)obj_request->xferred;
1902         result = obj_request->result;
1903         if (result) {
1904                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1905
1906                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1907                         img_request_write_test(img_request) ? "write" : "read",
1908                         obj_request->length, obj_request->img_offset,
1909                         obj_request->offset);
1910                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1911                         result, xferred);
1912                 if (!img_request->result)
1913                         img_request->result = result;
1914         }
1915
1916         /* Image object requests don't own their page array */
1917
1918         if (obj_request->type == OBJ_REQUEST_PAGES) {
1919                 obj_request->pages = NULL;
1920                 obj_request->page_count = 0;
1921         }
1922
1923         if (img_request_child_test(img_request)) {
1924                 rbd_assert(img_request->obj_request != NULL);
1925                 more = obj_request->which < img_request->obj_request_count - 1;
1926         } else {
1927                 rbd_assert(img_request->rq != NULL);
1928                 more = blk_end_request(img_request->rq, result, xferred);
1929         }
1930
1931         return more;
1932 }
1933
1934 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1935 {
1936         struct rbd_img_request *img_request;
1937         u32 which = obj_request->which;
1938         bool more = true;
1939
1940         rbd_assert(obj_request_img_data_test(obj_request));
1941         img_request = obj_request->img_request;
1942
1943         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1944         rbd_assert(img_request != NULL);
1945         rbd_assert(img_request->obj_request_count > 0);
1946         rbd_assert(which != BAD_WHICH);
1947         rbd_assert(which < img_request->obj_request_count);
1948         rbd_assert(which >= img_request->next_completion);
1949
1950         spin_lock_irq(&img_request->completion_lock);
1951         if (which != img_request->next_completion)
1952                 goto out;
1953
1954         for_each_obj_request_from(img_request, obj_request) {
1955                 rbd_assert(more);
1956                 rbd_assert(which < img_request->obj_request_count);
1957
1958                 if (!obj_request_done_test(obj_request))
1959                         break;
1960                 more = rbd_img_obj_end_request(obj_request);
1961                 which++;
1962         }
1963
1964         rbd_assert(more ^ (which == img_request->obj_request_count));
1965         img_request->next_completion = which;
1966 out:
1967         spin_unlock_irq(&img_request->completion_lock);
1968
1969         if (!more)
1970                 rbd_img_request_complete(img_request);
1971 }
1972
1973 /*
1974  * Split up an image request into one or more object requests, each
1975  * to a different object.  The "type" parameter indicates whether
1976  * "data_desc" is the pointer to the head of a list of bio
1977  * structures, or the base of a page array.  In either case this
1978  * function assumes data_desc describes memory sufficient to hold
1979  * all data described by the image request.
1980  */
1981 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1982                                         enum obj_request_type type,
1983                                         void *data_desc)
1984 {
1985         struct rbd_device *rbd_dev = img_request->rbd_dev;
1986         struct rbd_obj_request *obj_request = NULL;
1987         struct rbd_obj_request *next_obj_request;
1988         bool write_request = img_request_write_test(img_request);
1989         struct bio *bio_list;
1990         unsigned int bio_offset = 0;
1991         struct page **pages;
1992         u64 img_offset;
1993         u64 resid;
1994         u16 opcode;
1995
1996         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1997                 (int)type, data_desc);
1998
1999         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2000         img_offset = img_request->offset;
2001         resid = img_request->length;
2002         rbd_assert(resid > 0);
2003
2004         if (type == OBJ_REQUEST_BIO) {
2005                 bio_list = data_desc;
2006                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2007         } else {
2008                 rbd_assert(type == OBJ_REQUEST_PAGES);
2009                 pages = data_desc;
2010         }
2011
2012         while (resid) {
2013                 struct ceph_osd_request *osd_req;
2014                 const char *object_name;
2015                 u64 offset;
2016                 u64 length;
2017
2018                 object_name = rbd_segment_name(rbd_dev, img_offset);
2019                 if (!object_name)
2020                         goto out_unwind;
2021                 offset = rbd_segment_offset(rbd_dev, img_offset);
2022                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2023                 obj_request = rbd_obj_request_create(object_name,
2024                                                 offset, length, type);
2025                 kfree(object_name);     /* object request has its own copy */
2026                 if (!obj_request)
2027                         goto out_unwind;
2028
2029                 if (type == OBJ_REQUEST_BIO) {
2030                         unsigned int clone_size;
2031
2032                         rbd_assert(length <= (u64)UINT_MAX);
2033                         clone_size = (unsigned int)length;
2034                         obj_request->bio_list =
2035                                         bio_chain_clone_range(&bio_list,
2036                                                                 &bio_offset,
2037                                                                 clone_size,
2038                                                                 GFP_ATOMIC);
2039                         if (!obj_request->bio_list)
2040                                 goto out_partial;
2041                 } else {
2042                         unsigned int page_count;
2043
2044                         obj_request->pages = pages;
2045                         page_count = (u32)calc_pages_for(offset, length);
2046                         obj_request->page_count = page_count;
2047                         if ((offset + length) & ~PAGE_MASK)
2048                                 page_count--;   /* more on last page */
2049                         pages += page_count;
2050                 }
2051
2052                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2053                                                 obj_request);
2054                 if (!osd_req)
2055                         goto out_partial;
2056                 obj_request->osd_req = osd_req;
2057                 obj_request->callback = rbd_img_obj_callback;
2058
2059                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2060                                                 0, 0);
2061                 if (type == OBJ_REQUEST_BIO)
2062                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2063                                         obj_request->bio_list, length);
2064                 else
2065                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2066                                         obj_request->pages, length,
2067                                         offset & ~PAGE_MASK, false, false);
2068
2069                 if (write_request)
2070                         rbd_osd_req_format_write(obj_request);
2071                 else
2072                         rbd_osd_req_format_read(obj_request);
2073
2074                 obj_request->img_offset = img_offset;
2075                 rbd_img_obj_request_add(img_request, obj_request);
2076
2077                 img_offset += length;
2078                 resid -= length;
2079         }
2080
2081         return 0;
2082
2083 out_partial:
2084         rbd_obj_request_put(obj_request);
2085 out_unwind:
2086         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2087                 rbd_obj_request_put(obj_request);
2088
2089         return -ENOMEM;
2090 }
2091
2092 static void
2093 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2094 {
2095         struct rbd_img_request *img_request;
2096         struct rbd_device *rbd_dev;
2097         u64 length;
2098         u32 page_count;
2099
2100         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2101         rbd_assert(obj_request_img_data_test(obj_request));
2102         img_request = obj_request->img_request;
2103         rbd_assert(img_request);
2104
2105         rbd_dev = img_request->rbd_dev;
2106         rbd_assert(rbd_dev);
2107         length = (u64)1 << rbd_dev->header.obj_order;
2108         page_count = (u32)calc_pages_for(0, length);
2109
2110         rbd_assert(obj_request->copyup_pages);
2111         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2112         obj_request->copyup_pages = NULL;
2113
2114         /*
2115          * We want the transfer count to reflect the size of the
2116          * original write request.  There is no such thing as a
2117          * successful short write, so if the request was successful
2118          * we can just set it to the originally-requested length.
2119          */
2120         if (!obj_request->result)
2121                 obj_request->xferred = obj_request->length;
2122
2123         /* Finish up with the normal image object callback */
2124
2125         rbd_img_obj_callback(obj_request);
2126 }
2127
2128 static void
2129 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2130 {
2131         struct rbd_obj_request *orig_request;
2132         struct ceph_osd_request *osd_req;
2133         struct ceph_osd_client *osdc;
2134         struct rbd_device *rbd_dev;
2135         struct page **pages;
2136         int result;
2137         u64 obj_size;
2138         u64 xferred;
2139
2140         rbd_assert(img_request_child_test(img_request));
2141
2142         /* First get what we need from the image request */
2143
2144         pages = img_request->copyup_pages;
2145         rbd_assert(pages != NULL);
2146         img_request->copyup_pages = NULL;
2147
2148         orig_request = img_request->obj_request;
2149         rbd_assert(orig_request != NULL);
2150         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2151         result = img_request->result;
2152         obj_size = img_request->length;
2153         xferred = img_request->xferred;
2154
2155         rbd_dev = img_request->rbd_dev;
2156         rbd_assert(rbd_dev);
2157         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2158
2159         rbd_img_request_put(img_request);
2160
2161         if (result)
2162                 goto out_err;
2163
2164         /* Allocate the new copyup osd request for the original request */
2165
2166         result = -ENOMEM;
2167         rbd_assert(!orig_request->osd_req);
2168         osd_req = rbd_osd_req_create_copyup(orig_request);
2169         if (!osd_req)
2170                 goto out_err;
2171         orig_request->osd_req = osd_req;
2172         orig_request->copyup_pages = pages;
2173
2174         /* Initialize the copyup op */
2175
2176         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2177         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2178                                                 false, false);
2179
2180         /* Then the original write request op */
2181
2182         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2183                                         orig_request->offset,
2184                                         orig_request->length, 0, 0);
2185         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2186                                         orig_request->length);
2187
2188         rbd_osd_req_format_write(orig_request);
2189
2190         /* All set, send it off. */
2191
2192         orig_request->callback = rbd_img_obj_copyup_callback;
2193         osdc = &rbd_dev->rbd_client->client->osdc;
2194         result = rbd_obj_request_submit(osdc, orig_request);
2195         if (!result)
2196                 return;
2197 out_err:
2198         /* Record the error code and complete the request */
2199
2200         orig_request->result = result;
2201         orig_request->xferred = 0;
2202         obj_request_done_set(orig_request);
2203         rbd_obj_request_complete(orig_request);
2204 }
2205
2206 /*
2207  * Read from the parent image the range of data that covers the
2208  * entire target of the given object request.  This is used for
2209  * satisfying a layered image write request when the target of an
2210  * object request from the image request does not exist.
2211  *
2212  * A page array big enough to hold the returned data is allocated
2213  * and supplied to rbd_img_request_fill() as the "data descriptor."
2214  * When the read completes, this page array will be transferred to
2215  * the original object request for the copyup operation.
2216  *
2217  * If an error occurs, record it as the result of the original
2218  * object request and mark it done so it gets completed.
2219  */
2220 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2221 {
2222         struct rbd_img_request *img_request = NULL;
2223         struct rbd_img_request *parent_request = NULL;
2224         struct rbd_device *rbd_dev;
2225         u64 img_offset;
2226         u64 length;
2227         struct page **pages = NULL;
2228         u32 page_count;
2229         int result;
2230
2231         rbd_assert(obj_request_img_data_test(obj_request));
2232         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2233
2234         img_request = obj_request->img_request;
2235         rbd_assert(img_request != NULL);
2236         rbd_dev = img_request->rbd_dev;
2237         rbd_assert(rbd_dev->parent != NULL);
2238
2239         /*
2240          * First things first.  The original osd request is of no
2241          * use to use any more, we'll need a new one that can hold
2242          * the two ops in a copyup request.  We'll get that later,
2243          * but for now we can release the old one.
2244          */
2245         rbd_osd_req_destroy(obj_request->osd_req);
2246         obj_request->osd_req = NULL;
2247
2248         /*
2249          * Determine the byte range covered by the object in the
2250          * child image to which the original request was to be sent.
2251          */
2252         img_offset = obj_request->img_offset - obj_request->offset;
2253         length = (u64)1 << rbd_dev->header.obj_order;
2254
2255         /*
2256          * There is no defined parent data beyond the parent
2257          * overlap, so limit what we read at that boundary if
2258          * necessary.
2259          */
2260         if (img_offset + length > rbd_dev->parent_overlap) {
2261                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2262                 length = rbd_dev->parent_overlap - img_offset;
2263         }
2264
2265         /*
2266          * Allocate a page array big enough to receive the data read
2267          * from the parent.
2268          */
2269         page_count = (u32)calc_pages_for(0, length);
2270         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2271         if (IS_ERR(pages)) {
2272                 result = PTR_ERR(pages);
2273                 pages = NULL;
2274                 goto out_err;
2275         }
2276
2277         result = -ENOMEM;
2278         parent_request = rbd_img_request_create(rbd_dev->parent,
2279                                                 img_offset, length,
2280                                                 false, true);
2281         if (!parent_request)
2282                 goto out_err;
2283         rbd_obj_request_get(obj_request);
2284         parent_request->obj_request = obj_request;
2285
2286         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2287         if (result)
2288                 goto out_err;
2289         parent_request->copyup_pages = pages;
2290
2291         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2292         result = rbd_img_request_submit(parent_request);
2293         if (!result)
2294                 return 0;
2295
2296         parent_request->copyup_pages = NULL;
2297         parent_request->obj_request = NULL;
2298         rbd_obj_request_put(obj_request);
2299 out_err:
2300         if (pages)
2301                 ceph_release_page_vector(pages, page_count);
2302         if (parent_request)
2303                 rbd_img_request_put(parent_request);
2304         obj_request->result = result;
2305         obj_request->xferred = 0;
2306         obj_request_done_set(obj_request);
2307
2308         return result;
2309 }
2310
2311 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2312 {
2313         struct rbd_obj_request *orig_request;
2314         int result;
2315
2316         rbd_assert(!obj_request_img_data_test(obj_request));
2317
2318         /*
2319          * All we need from the object request is the original
2320          * request and the result of the STAT op.  Grab those, then
2321          * we're done with the request.
2322          */
2323         orig_request = obj_request->obj_request;
2324         obj_request->obj_request = NULL;
2325         rbd_assert(orig_request);
2326         rbd_assert(orig_request->img_request);
2327
2328         result = obj_request->result;
2329         obj_request->result = 0;
2330
2331         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2332                 obj_request, orig_request, result,
2333                 obj_request->xferred, obj_request->length);
2334         rbd_obj_request_put(obj_request);
2335
2336         rbd_assert(orig_request);
2337         rbd_assert(orig_request->img_request);
2338
2339         /*
2340          * Our only purpose here is to determine whether the object
2341          * exists, and we don't want to treat the non-existence as
2342          * an error.  If something else comes back, transfer the
2343          * error to the original request and complete it now.
2344          */
2345         if (!result) {
2346                 obj_request_existence_set(orig_request, true);
2347         } else if (result == -ENOENT) {
2348                 obj_request_existence_set(orig_request, false);
2349         } else if (result) {
2350                 orig_request->result = result;
2351                 goto out;
2352         }
2353
2354         /*
2355          * Resubmit the original request now that we have recorded
2356          * whether the target object exists.
2357          */
2358         orig_request->result = rbd_img_obj_request_submit(orig_request);
2359 out:
2360         if (orig_request->result)
2361                 rbd_obj_request_complete(orig_request);
2362         rbd_obj_request_put(orig_request);
2363 }
2364
2365 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2366 {
2367         struct rbd_obj_request *stat_request;
2368         struct rbd_device *rbd_dev;
2369         struct ceph_osd_client *osdc;
2370         struct page **pages = NULL;
2371         u32 page_count;
2372         size_t size;
2373         int ret;
2374
2375         /*
2376          * The response data for a STAT call consists of:
2377          *     le64 length;
2378          *     struct {
2379          *         le32 tv_sec;
2380          *         le32 tv_nsec;
2381          *     } mtime;
2382          */
2383         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2384         page_count = (u32)calc_pages_for(0, size);
2385         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2386         if (IS_ERR(pages))
2387                 return PTR_ERR(pages);
2388
2389         ret = -ENOMEM;
2390         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2391                                                         OBJ_REQUEST_PAGES);
2392         if (!stat_request)
2393                 goto out;
2394
2395         rbd_obj_request_get(obj_request);
2396         stat_request->obj_request = obj_request;
2397         stat_request->pages = pages;
2398         stat_request->page_count = page_count;
2399
2400         rbd_assert(obj_request->img_request);
2401         rbd_dev = obj_request->img_request->rbd_dev;
2402         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2403                                                 stat_request);
2404         if (!stat_request->osd_req)
2405                 goto out;
2406         stat_request->callback = rbd_img_obj_exists_callback;
2407
2408         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2409         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2410                                         false, false);
2411         rbd_osd_req_format_read(stat_request);
2412
2413         osdc = &rbd_dev->rbd_client->client->osdc;
2414         ret = rbd_obj_request_submit(osdc, stat_request);
2415 out:
2416         if (ret)
2417                 rbd_obj_request_put(obj_request);
2418
2419         return ret;
2420 }
2421
2422 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2423 {
2424         struct rbd_img_request *img_request;
2425         struct rbd_device *rbd_dev;
2426         bool known;
2427
2428         rbd_assert(obj_request_img_data_test(obj_request));
2429
2430         img_request = obj_request->img_request;
2431         rbd_assert(img_request);
2432         rbd_dev = img_request->rbd_dev;
2433
2434         /*
2435          * Only writes to layered images need special handling.
2436          * Reads and non-layered writes are simple object requests.
2437          * Layered writes that start beyond the end of the overlap
2438          * with the parent have no parent data, so they too are
2439          * simple object requests.  Finally, if the target object is
2440          * known to already exist, its parent data has already been
2441          * copied, so a write to the object can also be handled as a
2442          * simple object request.
2443          */
2444         if (!img_request_write_test(img_request) ||
2445                 !img_request_layered_test(img_request) ||
2446                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2447                 ((known = obj_request_known_test(obj_request)) &&
2448                         obj_request_exists_test(obj_request))) {
2449
2450                 struct rbd_device *rbd_dev;
2451                 struct ceph_osd_client *osdc;
2452
2453                 rbd_dev = obj_request->img_request->rbd_dev;
2454                 osdc = &rbd_dev->rbd_client->client->osdc;
2455
2456                 return rbd_obj_request_submit(osdc, obj_request);
2457         }
2458
2459         /*
2460          * It's a layered write.  The target object might exist but
2461          * we may not know that yet.  If we know it doesn't exist,
2462          * start by reading the data for the full target object from
2463          * the parent so we can use it for a copyup to the target.
2464          */
2465         if (known)
2466                 return rbd_img_obj_parent_read_full(obj_request);
2467
2468         /* We don't know whether the target exists.  Go find out. */
2469
2470         return rbd_img_obj_exists_submit(obj_request);
2471 }
2472
2473 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2474 {
2475         struct rbd_obj_request *obj_request;
2476         struct rbd_obj_request *next_obj_request;
2477
2478         dout("%s: img %p\n", __func__, img_request);
2479         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2480                 int ret;
2481
2482                 ret = rbd_img_obj_request_submit(obj_request);
2483                 if (ret)
2484                         return ret;
2485         }
2486
2487         return 0;
2488 }
2489
2490 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2491 {
2492         struct rbd_obj_request *obj_request;
2493         struct rbd_device *rbd_dev;
2494         u64 obj_end;
2495
2496         rbd_assert(img_request_child_test(img_request));
2497
2498         obj_request = img_request->obj_request;
2499         rbd_assert(obj_request);
2500         rbd_assert(obj_request->img_request);
2501
2502         obj_request->result = img_request->result;
2503         if (obj_request->result)
2504                 goto out;
2505
2506         /*
2507          * We need to zero anything beyond the parent overlap
2508          * boundary.  Since rbd_img_obj_request_read_callback()
2509          * will zero anything beyond the end of a short read, an
2510          * easy way to do this is to pretend the data from the
2511          * parent came up short--ending at the overlap boundary.
2512          */
2513         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2514         obj_end = obj_request->img_offset + obj_request->length;
2515         rbd_dev = obj_request->img_request->rbd_dev;
2516         if (obj_end > rbd_dev->parent_overlap) {
2517                 u64 xferred = 0;
2518
2519                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2520                         xferred = rbd_dev->parent_overlap -
2521                                         obj_request->img_offset;
2522
2523                 obj_request->xferred = min(img_request->xferred, xferred);
2524         } else {
2525                 obj_request->xferred = img_request->xferred;
2526         }
2527 out:
2528         rbd_img_obj_request_read_callback(obj_request);
2529         rbd_obj_request_complete(obj_request);
2530 }
2531
2532 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2533 {
2534         struct rbd_device *rbd_dev;
2535         struct rbd_img_request *img_request;
2536         int result;
2537
2538         rbd_assert(obj_request_img_data_test(obj_request));
2539         rbd_assert(obj_request->img_request != NULL);
2540         rbd_assert(obj_request->result == (s32) -ENOENT);
2541         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2542
2543         rbd_dev = obj_request->img_request->rbd_dev;
2544         rbd_assert(rbd_dev->parent != NULL);
2545         /* rbd_read_finish(obj_request, obj_request->length); */
2546         img_request = rbd_img_request_create(rbd_dev->parent,
2547                                                 obj_request->img_offset,
2548                                                 obj_request->length,
2549                                                 false, true);
2550         result = -ENOMEM;
2551         if (!img_request)
2552                 goto out_err;
2553
2554         rbd_obj_request_get(obj_request);
2555         img_request->obj_request = obj_request;
2556
2557         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2558                                         obj_request->bio_list);
2559         if (result)
2560                 goto out_err;
2561
2562         img_request->callback = rbd_img_parent_read_callback;
2563         result = rbd_img_request_submit(img_request);
2564         if (result)
2565                 goto out_err;
2566
2567         return;
2568 out_err:
2569         if (img_request)
2570                 rbd_img_request_put(img_request);
2571         obj_request->result = result;
2572         obj_request->xferred = 0;
2573         obj_request_done_set(obj_request);
2574 }
2575
2576 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2577 {
2578         struct rbd_obj_request *obj_request;
2579         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2580         int ret;
2581
2582         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2583                                                         OBJ_REQUEST_NODATA);
2584         if (!obj_request)
2585                 return -ENOMEM;
2586
2587         ret = -ENOMEM;
2588         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2589         if (!obj_request->osd_req)
2590                 goto out;
2591         obj_request->callback = rbd_obj_request_put;
2592
2593         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2594                                         notify_id, 0, 0);
2595         rbd_osd_req_format_read(obj_request);
2596
2597         ret = rbd_obj_request_submit(osdc, obj_request);
2598 out:
2599         if (ret)
2600                 rbd_obj_request_put(obj_request);
2601
2602         return ret;
2603 }
2604
2605 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2606 {
2607         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2608
2609         if (!rbd_dev)
2610                 return;
2611
2612         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2613                 rbd_dev->header_name, (unsigned long long)notify_id,
2614                 (unsigned int)opcode);
2615         (void)rbd_dev_refresh(rbd_dev);
2616
2617         rbd_obj_notify_ack(rbd_dev, notify_id);
2618 }
2619
2620 /*
2621  * Request sync osd watch/unwatch.  The value of "start" determines
2622  * whether a watch request is being initiated or torn down.
2623  */
2624 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2625 {
2626         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2627         struct rbd_obj_request *obj_request;
2628         int ret;
2629
2630         rbd_assert(start ^ !!rbd_dev->watch_event);
2631         rbd_assert(start ^ !!rbd_dev->watch_request);
2632
2633         if (start) {
2634                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2635                                                 &rbd_dev->watch_event);
2636                 if (ret < 0)
2637                         return ret;
2638                 rbd_assert(rbd_dev->watch_event != NULL);
2639         }
2640
2641         ret = -ENOMEM;
2642         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2643                                                         OBJ_REQUEST_NODATA);
2644         if (!obj_request)
2645                 goto out_cancel;
2646
2647         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2648         if (!obj_request->osd_req)
2649                 goto out_cancel;
2650
2651         if (start)
2652                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2653         else
2654                 ceph_osdc_unregister_linger_request(osdc,
2655                                         rbd_dev->watch_request->osd_req);
2656
2657         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2658                                 rbd_dev->watch_event->cookie, 0, start);
2659         rbd_osd_req_format_write(obj_request);
2660
2661         ret = rbd_obj_request_submit(osdc, obj_request);
2662         if (ret)
2663                 goto out_cancel;
2664         ret = rbd_obj_request_wait(obj_request);
2665         if (ret)
2666                 goto out_cancel;
2667         ret = obj_request->result;
2668         if (ret)
2669                 goto out_cancel;
2670
2671         /*
2672          * A watch request is set to linger, so the underlying osd
2673          * request won't go away until we unregister it.  We retain
2674          * a pointer to the object request during that time (in
2675          * rbd_dev->watch_request), so we'll keep a reference to
2676          * it.  We'll drop that reference (below) after we've
2677          * unregistered it.
2678          */
2679         if (start) {
2680                 rbd_dev->watch_request = obj_request;
2681
2682                 return 0;
2683         }
2684
2685         /* We have successfully torn down the watch request */
2686
2687         rbd_obj_request_put(rbd_dev->watch_request);
2688         rbd_dev->watch_request = NULL;
2689 out_cancel:
2690         /* Cancel the event if we're tearing down, or on error */
2691         ceph_osdc_cancel_event(rbd_dev->watch_event);
2692         rbd_dev->watch_event = NULL;
2693         if (obj_request)
2694                 rbd_obj_request_put(obj_request);
2695
2696         return ret;
2697 }
2698
2699 /*
2700  * Synchronous osd object method call.  Returns the number of bytes
2701  * returned in the outbound buffer, or a negative error code.
2702  */
2703 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2704                              const char *object_name,
2705                              const char *class_name,
2706                              const char *method_name,
2707                              const void *outbound,
2708                              size_t outbound_size,
2709                              void *inbound,
2710                              size_t inbound_size)
2711 {
2712         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2713         struct rbd_obj_request *obj_request;
2714         struct page **pages;
2715         u32 page_count;
2716         int ret;
2717
2718         /*
2719          * Method calls are ultimately read operations.  The result
2720          * should placed into the inbound buffer provided.  They
2721          * also supply outbound data--parameters for the object
2722          * method.  Currently if this is present it will be a
2723          * snapshot id.
2724          */
2725         page_count = (u32)calc_pages_for(0, inbound_size);
2726         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2727         if (IS_ERR(pages))
2728                 return PTR_ERR(pages);
2729
2730         ret = -ENOMEM;
2731         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2732                                                         OBJ_REQUEST_PAGES);
2733         if (!obj_request)
2734                 goto out;
2735
2736         obj_request->pages = pages;
2737         obj_request->page_count = page_count;
2738
2739         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2740         if (!obj_request->osd_req)
2741                 goto out;
2742
2743         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2744                                         class_name, method_name);
2745         if (outbound_size) {
2746                 struct ceph_pagelist *pagelist;
2747
2748                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2749                 if (!pagelist)
2750                         goto out;
2751
2752                 ceph_pagelist_init(pagelist);
2753                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2754                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2755                                                 pagelist);
2756         }
2757         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2758                                         obj_request->pages, inbound_size,
2759                                         0, false, false);
2760         rbd_osd_req_format_read(obj_request);
2761
2762         ret = rbd_obj_request_submit(osdc, obj_request);
2763         if (ret)
2764                 goto out;
2765         ret = rbd_obj_request_wait(obj_request);
2766         if (ret)
2767                 goto out;
2768
2769         ret = obj_request->result;
2770         if (ret < 0)
2771                 goto out;
2772
2773         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2774         ret = (int)obj_request->xferred;
2775         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2776 out:
2777         if (obj_request)
2778                 rbd_obj_request_put(obj_request);
2779         else
2780                 ceph_release_page_vector(pages, page_count);
2781
2782         return ret;
2783 }
2784
2785 static void rbd_request_fn(struct request_queue *q)
2786                 __releases(q->queue_lock) __acquires(q->queue_lock)
2787 {
2788         struct rbd_device *rbd_dev = q->queuedata;
2789         bool read_only = rbd_dev->mapping.read_only;
2790         struct request *rq;
2791         int result;
2792
2793         while ((rq = blk_fetch_request(q))) {
2794                 bool write_request = rq_data_dir(rq) == WRITE;
2795                 struct rbd_img_request *img_request;
2796                 u64 offset;
2797                 u64 length;
2798
2799                 /* Ignore any non-FS requests that filter through. */
2800
2801                 if (rq->cmd_type != REQ_TYPE_FS) {
2802                         dout("%s: non-fs request type %d\n", __func__,
2803                                 (int) rq->cmd_type);
2804                         __blk_end_request_all(rq, 0);
2805                         continue;
2806                 }
2807
2808                 /* Ignore/skip any zero-length requests */
2809
2810                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2811                 length = (u64) blk_rq_bytes(rq);
2812
2813                 if (!length) {
2814                         dout("%s: zero-length request\n", __func__);
2815                         __blk_end_request_all(rq, 0);
2816                         continue;
2817                 }
2818
2819                 spin_unlock_irq(q->queue_lock);
2820
2821                 /* Disallow writes to a read-only device */
2822
2823                 if (write_request) {
2824                         result = -EROFS;
2825                         if (read_only)
2826                                 goto end_request;
2827                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2828                 }
2829
2830                 /*
2831                  * Quit early if the mapped snapshot no longer
2832                  * exists.  It's still possible the snapshot will
2833                  * have disappeared by the time our request arrives
2834                  * at the osd, but there's no sense in sending it if
2835                  * we already know.
2836                  */
2837                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2838                         dout("request for non-existent snapshot");
2839                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2840                         result = -ENXIO;
2841                         goto end_request;
2842                 }
2843
2844                 result = -EINVAL;
2845                 if (offset && length > U64_MAX - offset + 1) {
2846                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2847                                 offset, length);
2848                         goto end_request;       /* Shouldn't happen */
2849                 }
2850
2851                 result = -ENOMEM;
2852                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2853                                                         write_request, false);
2854                 if (!img_request)
2855                         goto end_request;
2856
2857                 img_request->rq = rq;
2858
2859                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2860                                                 rq->bio);
2861                 if (!result)
2862                         result = rbd_img_request_submit(img_request);
2863                 if (result)
2864                         rbd_img_request_put(img_request);
2865 end_request:
2866                 spin_lock_irq(q->queue_lock);
2867                 if (result < 0) {
2868                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2869                                 write_request ? "write" : "read",
2870                                 length, offset, result);
2871
2872                         __blk_end_request_all(rq, result);
2873                 }
2874         }
2875 }
2876
2877 /*
2878  * a queue callback. Makes sure that we don't create a bio that spans across
2879  * multiple osd objects. One exception would be with a single page bios,
2880  * which we handle later at bio_chain_clone_range()
2881  */
2882 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2883                           struct bio_vec *bvec)
2884 {
2885         struct rbd_device *rbd_dev = q->queuedata;
2886         sector_t sector_offset;
2887         sector_t sectors_per_obj;
2888         sector_t obj_sector_offset;
2889         int ret;
2890
2891         /*
2892          * Find how far into its rbd object the partition-relative
2893          * bio start sector is to offset relative to the enclosing
2894          * device.
2895          */
2896         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2897         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2898         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2899
2900         /*
2901          * Compute the number of bytes from that offset to the end
2902          * of the object.  Account for what's already used by the bio.
2903          */
2904         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2905         if (ret > bmd->bi_size)
2906                 ret -= bmd->bi_size;
2907         else
2908                 ret = 0;
2909
2910         /*
2911          * Don't send back more than was asked for.  And if the bio
2912          * was empty, let the whole thing through because:  "Note
2913          * that a block device *must* allow a single page to be
2914          * added to an empty bio."
2915          */
2916         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2917         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2918                 ret = (int) bvec->bv_len;
2919
2920         return ret;
2921 }
2922
2923 static void rbd_free_disk(struct rbd_device *rbd_dev)
2924 {
2925         struct gendisk *disk = rbd_dev->disk;
2926
2927         if (!disk)
2928                 return;
2929
2930         rbd_dev->disk = NULL;
2931         if (disk->flags & GENHD_FL_UP) {
2932                 del_gendisk(disk);
2933                 if (disk->queue)
2934                         blk_cleanup_queue(disk->queue);
2935         }
2936         put_disk(disk);
2937 }
2938
2939 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2940                                 const char *object_name,
2941                                 u64 offset, u64 length, void *buf)
2942
2943 {
2944         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2945         struct rbd_obj_request *obj_request;
2946         struct page **pages = NULL;
2947         u32 page_count;
2948         size_t size;
2949         int ret;
2950
2951         page_count = (u32) calc_pages_for(offset, length);
2952         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2953         if (IS_ERR(pages))
2954                 ret = PTR_ERR(pages);
2955
2956         ret = -ENOMEM;
2957         obj_request = rbd_obj_request_create(object_name, offset, length,
2958                                                         OBJ_REQUEST_PAGES);
2959         if (!obj_request)
2960                 goto out;
2961
2962         obj_request->pages = pages;
2963         obj_request->page_count = page_count;
2964
2965         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2966         if (!obj_request->osd_req)
2967                 goto out;
2968
2969         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2970                                         offset, length, 0, 0);
2971         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2972                                         obj_request->pages,
2973                                         obj_request->length,
2974                                         obj_request->offset & ~PAGE_MASK,
2975                                         false, false);
2976         rbd_osd_req_format_read(obj_request);
2977
2978         ret = rbd_obj_request_submit(osdc, obj_request);
2979         if (ret)
2980                 goto out;
2981         ret = rbd_obj_request_wait(obj_request);
2982         if (ret)
2983                 goto out;
2984
2985         ret = obj_request->result;
2986         if (ret < 0)
2987                 goto out;
2988
2989         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2990         size = (size_t) obj_request->xferred;
2991         ceph_copy_from_page_vector(pages, buf, 0, size);
2992         rbd_assert(size <= (size_t)INT_MAX);
2993         ret = (int)size;
2994 out:
2995         if (obj_request)
2996                 rbd_obj_request_put(obj_request);
2997         else
2998                 ceph_release_page_vector(pages, page_count);
2999
3000         return ret;
3001 }
3002
3003 /*
3004  * Read the complete header for the given rbd device.
3005  *
3006  * Returns a pointer to a dynamically-allocated buffer containing
3007  * the complete and validated header.  Caller can pass the address
3008  * of a variable that will be filled in with the version of the
3009  * header object at the time it was read.
3010  *
3011  * Returns a pointer-coded errno if a failure occurs.
3012  */
3013 static struct rbd_image_header_ondisk *
3014 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3015 {
3016         struct rbd_image_header_ondisk *ondisk = NULL;
3017         u32 snap_count = 0;
3018         u64 names_size = 0;
3019         u32 want_count;
3020         int ret;
3021
3022         /*
3023          * The complete header will include an array of its 64-bit
3024          * snapshot ids, followed by the names of those snapshots as
3025          * a contiguous block of NUL-terminated strings.  Note that
3026          * the number of snapshots could change by the time we read
3027          * it in, in which case we re-read it.
3028          */
3029         do {
3030                 size_t size;
3031
3032                 kfree(ondisk);
3033
3034                 size = sizeof (*ondisk);
3035                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3036                 size += names_size;
3037                 ondisk = kmalloc(size, GFP_KERNEL);
3038                 if (!ondisk)
3039                         return ERR_PTR(-ENOMEM);
3040
3041                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3042                                        0, size, ondisk);
3043                 if (ret < 0)
3044                         goto out_err;
3045                 if ((size_t)ret < size) {
3046                         ret = -ENXIO;
3047                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3048                                 size, ret);
3049                         goto out_err;
3050                 }
3051                 if (!rbd_dev_ondisk_valid(ondisk)) {
3052                         ret = -ENXIO;
3053                         rbd_warn(rbd_dev, "invalid header");
3054                         goto out_err;
3055                 }
3056
3057                 names_size = le64_to_cpu(ondisk->snap_names_len);
3058                 want_count = snap_count;
3059                 snap_count = le32_to_cpu(ondisk->snap_count);
3060         } while (snap_count != want_count);
3061
3062         return ondisk;
3063
3064 out_err:
3065         kfree(ondisk);
3066
3067         return ERR_PTR(ret);
3068 }
3069
3070 /*
3071  * reload the ondisk the header
3072  */
3073 static int rbd_read_header(struct rbd_device *rbd_dev,
3074                            struct rbd_image_header *header)
3075 {
3076         struct rbd_image_header_ondisk *ondisk;
3077         int ret;
3078
3079         ondisk = rbd_dev_v1_header_read(rbd_dev);
3080         if (IS_ERR(ondisk))
3081                 return PTR_ERR(ondisk);
3082         ret = rbd_header_from_disk(header, ondisk);
3083         kfree(ondisk);
3084
3085         return ret;
3086 }
3087
3088 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3089 {
3090         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3091                 return;
3092
3093         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3094                 sector_t size;
3095
3096                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3097                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3098                 dout("setting size to %llu sectors", (unsigned long long)size);
3099                 set_capacity(rbd_dev->disk, size);
3100         }
3101 }
3102
3103 /*
3104  * only read the first part of the ondisk header, without the snaps info
3105  */
3106 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3107 {
3108         int ret;
3109         struct rbd_image_header h;
3110
3111         ret = rbd_read_header(rbd_dev, &h);
3112         if (ret < 0)
3113                 return ret;
3114
3115         down_write(&rbd_dev->header_rwsem);
3116
3117         /* Update image size, and check for resize of mapped image */
3118         rbd_dev->header.image_size = h.image_size;
3119         rbd_update_mapping_size(rbd_dev);
3120
3121         /* rbd_dev->header.object_prefix shouldn't change */
3122         kfree(rbd_dev->header.snap_sizes);
3123         kfree(rbd_dev->header.snap_names);
3124         /* osd requests may still refer to snapc */
3125         ceph_put_snap_context(rbd_dev->header.snapc);
3126
3127         rbd_dev->header.image_size = h.image_size;
3128         rbd_dev->header.snapc = h.snapc;
3129         rbd_dev->header.snap_names = h.snap_names;
3130         rbd_dev->header.snap_sizes = h.snap_sizes;
3131         /* Free the extra copy of the object prefix */
3132         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3133                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3134         kfree(h.object_prefix);
3135
3136         up_write(&rbd_dev->header_rwsem);
3137
3138         return ret;
3139 }
3140
3141 /*
3142  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3143  * has disappeared from the (just updated) snapshot context.
3144  */
3145 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3146 {
3147         u64 snap_id;
3148
3149         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3150                 return;
3151
3152         snap_id = rbd_dev->spec->snap_id;
3153         if (snap_id == CEPH_NOSNAP)
3154                 return;
3155
3156         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3157                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3158 }
3159
3160 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3161 {
3162         u64 image_size;
3163         int ret;
3164
3165         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3166         image_size = rbd_dev->header.image_size;
3167         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3168         if (rbd_dev->image_format == 1)
3169                 ret = rbd_dev_v1_refresh(rbd_dev);
3170         else
3171                 ret = rbd_dev_v2_refresh(rbd_dev);
3172
3173         /* If it's a mapped snapshot, validate its EXISTS flag */
3174
3175         rbd_exists_validate(rbd_dev);
3176         mutex_unlock(&ctl_mutex);
3177         if (ret)
3178                 rbd_warn(rbd_dev, "got notification but failed to "
3179                            " update snaps: %d\n", ret);
3180         if (image_size != rbd_dev->header.image_size)
3181                 revalidate_disk(rbd_dev->disk);
3182
3183         return ret;
3184 }
3185
3186 static int rbd_init_disk(struct rbd_device *rbd_dev)
3187 {
3188         struct gendisk *disk;
3189         struct request_queue *q;
3190         u64 segment_size;
3191
3192         /* create gendisk info */
3193         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3194         if (!disk)
3195                 return -ENOMEM;
3196
3197         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3198                  rbd_dev->dev_id);
3199         disk->major = rbd_dev->major;
3200         disk->first_minor = 0;
3201         disk->fops = &rbd_bd_ops;
3202         disk->private_data = rbd_dev;
3203
3204         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3205         if (!q)
3206                 goto out_disk;
3207
3208         /* We use the default size, but let's be explicit about it. */
3209         blk_queue_physical_block_size(q, SECTOR_SIZE);
3210
3211         /* set io sizes to object size */
3212         segment_size = rbd_obj_bytes(&rbd_dev->header);
3213         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3214         blk_queue_max_segment_size(q, segment_size);
3215         blk_queue_io_min(q, segment_size);
3216         blk_queue_io_opt(q, segment_size);
3217
3218         blk_queue_merge_bvec(q, rbd_merge_bvec);
3219         disk->queue = q;
3220
3221         q->queuedata = rbd_dev;
3222
3223         rbd_dev->disk = disk;
3224
3225         return 0;
3226 out_disk:
3227         put_disk(disk);
3228
3229         return -ENOMEM;
3230 }
3231
3232 /*
3233   sysfs
3234 */
3235
3236 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3237 {
3238         return container_of(dev, struct rbd_device, dev);
3239 }
3240
3241 static ssize_t rbd_size_show(struct device *dev,
3242                              struct device_attribute *attr, char *buf)
3243 {
3244         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3245
3246         return sprintf(buf, "%llu\n",
3247                 (unsigned long long)rbd_dev->mapping.size);
3248 }
3249
3250 /*
3251  * Note this shows the features for whatever's mapped, which is not
3252  * necessarily the base image.
3253  */
3254 static ssize_t rbd_features_show(struct device *dev,
3255                              struct device_attribute *attr, char *buf)
3256 {
3257         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3258
3259         return sprintf(buf, "0x%016llx\n",
3260                         (unsigned long long)rbd_dev->mapping.features);
3261 }
3262
3263 static ssize_t rbd_major_show(struct device *dev,
3264                               struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         if (rbd_dev->major)
3269                 return sprintf(buf, "%d\n", rbd_dev->major);
3270
3271         return sprintf(buf, "(none)\n");
3272
3273 }
3274
3275 static ssize_t rbd_client_id_show(struct device *dev,
3276                                   struct device_attribute *attr, char *buf)
3277 {
3278         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3279
3280         return sprintf(buf, "client%lld\n",
3281                         ceph_client_id(rbd_dev->rbd_client->client));
3282 }
3283
3284 static ssize_t rbd_pool_show(struct device *dev,
3285                              struct device_attribute *attr, char *buf)
3286 {
3287         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288
3289         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3290 }
3291
3292 static ssize_t rbd_pool_id_show(struct device *dev,
3293                              struct device_attribute *attr, char *buf)
3294 {
3295         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3296
3297         return sprintf(buf, "%llu\n",
3298                         (unsigned long long) rbd_dev->spec->pool_id);
3299 }
3300
3301 static ssize_t rbd_name_show(struct device *dev,
3302                              struct device_attribute *attr, char *buf)
3303 {
3304         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3305
3306         if (rbd_dev->spec->image_name)
3307                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3308
3309         return sprintf(buf, "(unknown)\n");
3310 }
3311
3312 static ssize_t rbd_image_id_show(struct device *dev,
3313                              struct device_attribute *attr, char *buf)
3314 {
3315         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3316
3317         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3318 }
3319
3320 /*
3321  * Shows the name of the currently-mapped snapshot (or
3322  * RBD_SNAP_HEAD_NAME for the base image).
3323  */
3324 static ssize_t rbd_snap_show(struct device *dev,
3325                              struct device_attribute *attr,
3326                              char *buf)
3327 {
3328         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3329
3330         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3331 }
3332
3333 /*
3334  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3335  * for the parent image.  If there is no parent, simply shows
3336  * "(no parent image)".
3337  */
3338 static ssize_t rbd_parent_show(struct device *dev,
3339                              struct device_attribute *attr,
3340                              char *buf)
3341 {
3342         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3343         struct rbd_spec *spec = rbd_dev->parent_spec;
3344         int count;
3345         char *bufp = buf;
3346
3347         if (!spec)
3348                 return sprintf(buf, "(no parent image)\n");
3349
3350         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3351                         (unsigned long long) spec->pool_id, spec->pool_name);
3352         if (count < 0)
3353                 return count;
3354         bufp += count;
3355
3356         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3357                         spec->image_name ? spec->image_name : "(unknown)");
3358         if (count < 0)
3359                 return count;
3360         bufp += count;
3361
3362         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3363                         (unsigned long long) spec->snap_id, spec->snap_name);
3364         if (count < 0)
3365                 return count;
3366         bufp += count;
3367
3368         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3369         if (count < 0)
3370                 return count;
3371         bufp += count;
3372
3373         return (ssize_t) (bufp - buf);
3374 }
3375
3376 static ssize_t rbd_image_refresh(struct device *dev,
3377                                  struct device_attribute *attr,
3378                                  const char *buf,
3379                                  size_t size)
3380 {
3381         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3382         int ret;
3383
3384         ret = rbd_dev_refresh(rbd_dev);
3385
3386         return ret < 0 ? ret : size;
3387 }
3388
3389 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3390 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3391 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3392 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3393 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3394 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3395 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3396 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3397 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3398 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3399 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3400
3401 static struct attribute *rbd_attrs[] = {
3402         &dev_attr_size.attr,
3403         &dev_attr_features.attr,
3404         &dev_attr_major.attr,
3405         &dev_attr_client_id.attr,
3406         &dev_attr_pool.attr,
3407         &dev_attr_pool_id.attr,
3408         &dev_attr_name.attr,
3409         &dev_attr_image_id.attr,
3410         &dev_attr_current_snap.attr,
3411         &dev_attr_parent.attr,
3412         &dev_attr_refresh.attr,
3413         NULL
3414 };
3415
3416 static struct attribute_group rbd_attr_group = {
3417         .attrs = rbd_attrs,
3418 };
3419
3420 static const struct attribute_group *rbd_attr_groups[] = {
3421         &rbd_attr_group,
3422         NULL
3423 };
3424
3425 static void rbd_sysfs_dev_release(struct device *dev)
3426 {
3427 }
3428
3429 static struct device_type rbd_device_type = {
3430         .name           = "rbd",
3431         .groups         = rbd_attr_groups,
3432         .release        = rbd_sysfs_dev_release,
3433 };
3434
3435 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3436 {
3437         kref_get(&spec->kref);
3438
3439         return spec;
3440 }
3441
3442 static void rbd_spec_free(struct kref *kref);
3443 static void rbd_spec_put(struct rbd_spec *spec)
3444 {
3445         if (spec)
3446                 kref_put(&spec->kref, rbd_spec_free);
3447 }
3448
3449 static struct rbd_spec *rbd_spec_alloc(void)
3450 {
3451         struct rbd_spec *spec;
3452
3453         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3454         if (!spec)
3455                 return NULL;
3456         kref_init(&spec->kref);
3457
3458         return spec;
3459 }
3460
3461 static void rbd_spec_free(struct kref *kref)
3462 {
3463         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3464
3465         kfree(spec->pool_name);
3466         kfree(spec->image_id);
3467         kfree(spec->image_name);
3468         kfree(spec->snap_name);
3469         kfree(spec);
3470 }
3471
3472 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3473                                 struct rbd_spec *spec)
3474 {
3475         struct rbd_device *rbd_dev;
3476
3477         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3478         if (!rbd_dev)
3479                 return NULL;
3480
3481         spin_lock_init(&rbd_dev->lock);
3482         rbd_dev->flags = 0;
3483         INIT_LIST_HEAD(&rbd_dev->node);
3484         init_rwsem(&rbd_dev->header_rwsem);
3485
3486         rbd_dev->spec = spec;
3487         rbd_dev->rbd_client = rbdc;
3488
3489         /* Initialize the layout used for all rbd requests */
3490
3491         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3492         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3493         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3494         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3495
3496         return rbd_dev;
3497 }
3498
3499 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3500 {
3501         rbd_put_client(rbd_dev->rbd_client);
3502         rbd_spec_put(rbd_dev->spec);
3503         kfree(rbd_dev);
3504 }
3505
3506 /*
3507  * Get the size and object order for an image snapshot, or if
3508  * snap_id is CEPH_NOSNAP, gets this information for the base
3509  * image.
3510  */
3511 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3512                                 u8 *order, u64 *snap_size)
3513 {
3514         __le64 snapid = cpu_to_le64(snap_id);
3515         int ret;
3516         struct {
3517                 u8 order;
3518                 __le64 size;
3519         } __attribute__ ((packed)) size_buf = { 0 };
3520
3521         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3522                                 "rbd", "get_size",
3523                                 &snapid, sizeof (snapid),
3524                                 &size_buf, sizeof (size_buf));
3525         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3526         if (ret < 0)
3527                 return ret;
3528         if (ret < sizeof (size_buf))
3529                 return -ERANGE;
3530
3531         if (order)
3532                 *order = size_buf.order;
3533         *snap_size = le64_to_cpu(size_buf.size);
3534
3535         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3536                 (unsigned long long)snap_id, (unsigned int)*order,
3537                 (unsigned long long)*snap_size);
3538
3539         return 0;
3540 }
3541
3542 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3543 {
3544         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3545                                         &rbd_dev->header.obj_order,
3546                                         &rbd_dev->header.image_size);
3547 }
3548
3549 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3550 {
3551         void *reply_buf;
3552         int ret;
3553         void *p;
3554
3555         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3556         if (!reply_buf)
3557                 return -ENOMEM;
3558
3559         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3560                                 "rbd", "get_object_prefix", NULL, 0,
3561                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3562         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3563         if (ret < 0)
3564                 goto out;
3565
3566         p = reply_buf;
3567         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3568                                                 p + ret, NULL, GFP_NOIO);
3569         ret = 0;
3570
3571         if (IS_ERR(rbd_dev->header.object_prefix)) {
3572                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3573                 rbd_dev->header.object_prefix = NULL;
3574         } else {
3575                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3576         }
3577 out:
3578         kfree(reply_buf);
3579
3580         return ret;
3581 }
3582
3583 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3584                 u64 *snap_features)
3585 {
3586         __le64 snapid = cpu_to_le64(snap_id);
3587         struct {
3588                 __le64 features;
3589                 __le64 incompat;
3590         } __attribute__ ((packed)) features_buf = { 0 };
3591         u64 incompat;
3592         int ret;
3593
3594         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3595                                 "rbd", "get_features",
3596                                 &snapid, sizeof (snapid),
3597                                 &features_buf, sizeof (features_buf));
3598         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3599         if (ret < 0)
3600                 return ret;
3601         if (ret < sizeof (features_buf))
3602                 return -ERANGE;
3603
3604         incompat = le64_to_cpu(features_buf.incompat);
3605         if (incompat & ~RBD_FEATURES_SUPPORTED)
3606                 return -ENXIO;
3607
3608         *snap_features = le64_to_cpu(features_buf.features);
3609
3610         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3611                 (unsigned long long)snap_id,
3612                 (unsigned long long)*snap_features,
3613                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3614
3615         return 0;
3616 }
3617
3618 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3619 {
3620         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3621                                                 &rbd_dev->header.features);
3622 }
3623
3624 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3625 {
3626         struct rbd_spec *parent_spec;
3627         size_t size;
3628         void *reply_buf = NULL;
3629         __le64 snapid;
3630         void *p;
3631         void *end;
3632         char *image_id;
3633         u64 overlap;
3634         int ret;
3635
3636         parent_spec = rbd_spec_alloc();
3637         if (!parent_spec)
3638                 return -ENOMEM;
3639
3640         size = sizeof (__le64) +                                /* pool_id */
3641                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3642                 sizeof (__le64) +                               /* snap_id */
3643                 sizeof (__le64);                                /* overlap */
3644         reply_buf = kmalloc(size, GFP_KERNEL);
3645         if (!reply_buf) {
3646                 ret = -ENOMEM;
3647                 goto out_err;
3648         }
3649
3650         snapid = cpu_to_le64(CEPH_NOSNAP);
3651         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3652                                 "rbd", "get_parent",
3653                                 &snapid, sizeof (snapid),
3654                                 reply_buf, size);
3655         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3656         if (ret < 0)
3657                 goto out_err;
3658
3659         p = reply_buf;
3660         end = reply_buf + ret;
3661         ret = -ERANGE;
3662         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3663         if (parent_spec->pool_id == CEPH_NOPOOL)
3664                 goto out;       /* No parent?  No problem. */
3665
3666         /* The ceph file layout needs to fit pool id in 32 bits */
3667
3668         ret = -EIO;
3669         if (parent_spec->pool_id > (u64)U32_MAX) {
3670                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3671                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3672                 goto out_err;
3673         }
3674
3675         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3676         if (IS_ERR(image_id)) {
3677                 ret = PTR_ERR(image_id);
3678                 goto out_err;
3679         }
3680         parent_spec->image_id = image_id;
3681         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3682         ceph_decode_64_safe(&p, end, overlap, out_err);
3683
3684         rbd_dev->parent_overlap = overlap;
3685         rbd_dev->parent_spec = parent_spec;
3686         parent_spec = NULL;     /* rbd_dev now owns this */
3687 out:
3688         ret = 0;
3689 out_err:
3690         kfree(reply_buf);
3691         rbd_spec_put(parent_spec);
3692
3693         return ret;
3694 }
3695
3696 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3697 {
3698         struct {
3699                 __le64 stripe_unit;
3700                 __le64 stripe_count;
3701         } __attribute__ ((packed)) striping_info_buf = { 0 };
3702         size_t size = sizeof (striping_info_buf);
3703         void *p;
3704         u64 obj_size;
3705         u64 stripe_unit;
3706         u64 stripe_count;
3707         int ret;
3708
3709         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3710                                 "rbd", "get_stripe_unit_count", NULL, 0,
3711                                 (char *)&striping_info_buf, size);
3712         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3713         if (ret < 0)
3714                 return ret;
3715         if (ret < size)
3716                 return -ERANGE;
3717
3718         /*
3719          * We don't actually support the "fancy striping" feature
3720          * (STRIPINGV2) yet, but if the striping sizes are the
3721          * defaults the behavior is the same as before.  So find
3722          * out, and only fail if the image has non-default values.
3723          */
3724         ret = -EINVAL;
3725         obj_size = (u64)1 << rbd_dev->header.obj_order;
3726         p = &striping_info_buf;
3727         stripe_unit = ceph_decode_64(&p);
3728         if (stripe_unit != obj_size) {
3729                 rbd_warn(rbd_dev, "unsupported stripe unit "
3730                                 "(got %llu want %llu)",
3731                                 stripe_unit, obj_size);
3732                 return -EINVAL;
3733         }
3734         stripe_count = ceph_decode_64(&p);
3735         if (stripe_count != 1) {
3736                 rbd_warn(rbd_dev, "unsupported stripe count "
3737                                 "(got %llu want 1)", stripe_count);
3738                 return -EINVAL;
3739         }
3740         rbd_dev->header.stripe_unit = stripe_unit;
3741         rbd_dev->header.stripe_count = stripe_count;
3742
3743         return 0;
3744 }
3745
3746 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3747 {
3748         size_t image_id_size;
3749         char *image_id;
3750         void *p;
3751         void *end;
3752         size_t size;
3753         void *reply_buf = NULL;
3754         size_t len = 0;
3755         char *image_name = NULL;
3756         int ret;
3757
3758         rbd_assert(!rbd_dev->spec->image_name);
3759
3760         len = strlen(rbd_dev->spec->image_id);
3761         image_id_size = sizeof (__le32) + len;
3762         image_id = kmalloc(image_id_size, GFP_KERNEL);
3763         if (!image_id)
3764                 return NULL;
3765
3766         p = image_id;
3767         end = image_id + image_id_size;
3768         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3769
3770         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3771         reply_buf = kmalloc(size, GFP_KERNEL);
3772         if (!reply_buf)
3773                 goto out;
3774
3775         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3776                                 "rbd", "dir_get_name",
3777                                 image_id, image_id_size,
3778                                 reply_buf, size);
3779         if (ret < 0)
3780                 goto out;
3781         p = reply_buf;
3782         end = reply_buf + ret;
3783
3784         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3785         if (IS_ERR(image_name))
3786                 image_name = NULL;
3787         else
3788                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3789 out:
3790         kfree(reply_buf);
3791         kfree(image_id);
3792
3793         return image_name;
3794 }
3795
3796 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3797 {
3798         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3799         const char *snap_name;
3800         u32 which = 0;
3801
3802         /* Skip over names until we find the one we are looking for */
3803
3804         snap_name = rbd_dev->header.snap_names;
3805         while (which < snapc->num_snaps) {
3806                 if (!strcmp(name, snap_name))
3807                         return snapc->snaps[which];
3808                 snap_name += strlen(snap_name) + 1;
3809                 which++;
3810         }
3811         return CEPH_NOSNAP;
3812 }
3813
3814 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3815 {
3816         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3817         u32 which;
3818         bool found = false;
3819         u64 snap_id;
3820
3821         for (which = 0; !found && which < snapc->num_snaps; which++) {
3822                 const char *snap_name;
3823
3824                 snap_id = snapc->snaps[which];
3825                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3826                 if (IS_ERR(snap_name))
3827                         break;
3828                 found = !strcmp(name, snap_name);
3829                 kfree(snap_name);
3830         }
3831         return found ? snap_id : CEPH_NOSNAP;
3832 }
3833
3834 /*
3835  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3836  * no snapshot by that name is found, or if an error occurs.
3837  */
3838 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3839 {
3840         if (rbd_dev->image_format == 1)
3841                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3842
3843         return rbd_v2_snap_id_by_name(rbd_dev, name);
3844 }
3845
3846 /*
3847  * When an rbd image has a parent image, it is identified by the
3848  * pool, image, and snapshot ids (not names).  This function fills
3849  * in the names for those ids.  (It's OK if we can't figure out the
3850  * name for an image id, but the pool and snapshot ids should always
3851  * exist and have names.)  All names in an rbd spec are dynamically
3852  * allocated.
3853  *
3854  * When an image being mapped (not a parent) is probed, we have the
3855  * pool name and pool id, image name and image id, and the snapshot
3856  * name.  The only thing we're missing is the snapshot id.
3857  */
3858 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3859 {
3860         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3861         struct rbd_spec *spec = rbd_dev->spec;
3862         const char *pool_name;
3863         const char *image_name;
3864         const char *snap_name;
3865         int ret;
3866
3867         /*
3868          * An image being mapped will have the pool name (etc.), but
3869          * we need to look up the snapshot id.
3870          */
3871         if (spec->pool_name) {
3872                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3873                         u64 snap_id;
3874
3875                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3876                         if (snap_id == CEPH_NOSNAP)
3877                                 return -ENOENT;
3878                         spec->snap_id = snap_id;
3879                 } else {
3880                         spec->snap_id = CEPH_NOSNAP;
3881                 }
3882
3883                 return 0;
3884         }
3885
3886         /* Get the pool name; we have to make our own copy of this */
3887
3888         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3889         if (!pool_name) {
3890                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3891                 return -EIO;
3892         }
3893         pool_name = kstrdup(pool_name, GFP_KERNEL);
3894         if (!pool_name)
3895                 return -ENOMEM;
3896
3897         /* Fetch the image name; tolerate failure here */
3898
3899         image_name = rbd_dev_image_name(rbd_dev);
3900         if (!image_name)
3901                 rbd_warn(rbd_dev, "unable to get image name");
3902
3903         /* Look up the snapshot name, and make a copy */
3904
3905         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3906         if (!snap_name) {
3907                 ret = -ENOMEM;
3908                 goto out_err;
3909         }
3910
3911         spec->pool_name = pool_name;
3912         spec->image_name = image_name;
3913         spec->snap_name = snap_name;
3914
3915         return 0;
3916 out_err:
3917         kfree(image_name);
3918         kfree(pool_name);
3919
3920         return ret;
3921 }
3922
3923 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3924 {
3925         size_t size;
3926         int ret;
3927         void *reply_buf;
3928         void *p;
3929         void *end;
3930         u64 seq;
3931         u32 snap_count;
3932         struct ceph_snap_context *snapc;
3933         u32 i;
3934
3935         /*
3936          * We'll need room for the seq value (maximum snapshot id),
3937          * snapshot count, and array of that many snapshot ids.
3938          * For now we have a fixed upper limit on the number we're
3939          * prepared to receive.
3940          */
3941         size = sizeof (__le64) + sizeof (__le32) +
3942                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3943         reply_buf = kzalloc(size, GFP_KERNEL);
3944         if (!reply_buf)
3945                 return -ENOMEM;
3946
3947         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3948                                 "rbd", "get_snapcontext", NULL, 0,
3949                                 reply_buf, size);
3950         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3951         if (ret < 0)
3952                 goto out;
3953
3954         p = reply_buf;
3955         end = reply_buf + ret;
3956         ret = -ERANGE;
3957         ceph_decode_64_safe(&p, end, seq, out);
3958         ceph_decode_32_safe(&p, end, snap_count, out);
3959
3960         /*
3961          * Make sure the reported number of snapshot ids wouldn't go
3962          * beyond the end of our buffer.  But before checking that,
3963          * make sure the computed size of the snapshot context we
3964          * allocate is representable in a size_t.
3965          */
3966         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3967                                  / sizeof (u64)) {
3968                 ret = -EINVAL;
3969                 goto out;
3970         }
3971         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3972                 goto out;
3973         ret = 0;
3974
3975         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3976         if (!snapc) {
3977                 ret = -ENOMEM;
3978                 goto out;
3979         }
3980         snapc->seq = seq;
3981         for (i = 0; i < snap_count; i++)
3982                 snapc->snaps[i] = ceph_decode_64(&p);
3983
3984         rbd_dev->header.snapc = snapc;
3985
3986         dout("  snap context seq = %llu, snap_count = %u\n",
3987                 (unsigned long long)seq, (unsigned int)snap_count);
3988 out:
3989         kfree(reply_buf);
3990
3991         return ret;
3992 }
3993
3994 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3995                                         u64 snap_id)
3996 {
3997         size_t size;
3998         void *reply_buf;
3999         __le64 snapid;
4000         int ret;
4001         void *p;
4002         void *end;
4003         char *snap_name;
4004
4005         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4006         reply_buf = kmalloc(size, GFP_KERNEL);
4007         if (!reply_buf)
4008                 return ERR_PTR(-ENOMEM);
4009
4010         snapid = cpu_to_le64(snap_id);
4011         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4012                                 "rbd", "get_snapshot_name",
4013                                 &snapid, sizeof (snapid),
4014                                 reply_buf, size);
4015         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4016         if (ret < 0) {
4017                 snap_name = ERR_PTR(ret);
4018                 goto out;
4019         }
4020
4021         p = reply_buf;
4022         end = reply_buf + ret;
4023         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4024         if (IS_ERR(snap_name))
4025                 goto out;
4026
4027         dout("  snap_id 0x%016llx snap_name = %s\n",
4028                 (unsigned long long)snap_id, snap_name);
4029 out:
4030         kfree(reply_buf);
4031
4032         return snap_name;
4033 }
4034
4035 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4036 {
4037         int ret;
4038
4039         down_write(&rbd_dev->header_rwsem);
4040
4041         ret = rbd_dev_v2_image_size(rbd_dev);
4042         if (ret)
4043                 goto out;
4044         rbd_update_mapping_size(rbd_dev);
4045
4046         ret = rbd_dev_v2_snap_context(rbd_dev);
4047         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4048         if (ret)
4049                 goto out;
4050 out:
4051         up_write(&rbd_dev->header_rwsem);
4052
4053         return ret;
4054 }
4055
4056 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4057 {
4058         struct device *dev;
4059         int ret;
4060
4061         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4062
4063         dev = &rbd_dev->dev;
4064         dev->bus = &rbd_bus_type;
4065         dev->type = &rbd_device_type;
4066         dev->parent = &rbd_root_dev;
4067         dev->release = rbd_dev_device_release;
4068         dev_set_name(dev, "%d", rbd_dev->dev_id);
4069         ret = device_register(dev);
4070
4071         mutex_unlock(&ctl_mutex);
4072
4073         return ret;
4074 }
4075
4076 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4077 {
4078         device_unregister(&rbd_dev->dev);
4079 }
4080
4081 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4082
4083 /*
4084  * Get a unique rbd identifier for the given new rbd_dev, and add
4085  * the rbd_dev to the global list.  The minimum rbd id is 1.
4086  */
4087 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4088 {
4089         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4090
4091         spin_lock(&rbd_dev_list_lock);
4092         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4093         spin_unlock(&rbd_dev_list_lock);
4094         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4095                 (unsigned long long) rbd_dev->dev_id);
4096 }
4097
4098 /*
4099  * Remove an rbd_dev from the global list, and record that its
4100  * identifier is no longer in use.
4101  */
4102 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4103 {
4104         struct list_head *tmp;
4105         int rbd_id = rbd_dev->dev_id;
4106         int max_id;
4107
4108         rbd_assert(rbd_id > 0);
4109
4110         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4111                 (unsigned long long) rbd_dev->dev_id);
4112         spin_lock(&rbd_dev_list_lock);
4113         list_del_init(&rbd_dev->node);
4114
4115         /*
4116          * If the id being "put" is not the current maximum, there
4117          * is nothing special we need to do.
4118          */
4119         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4120                 spin_unlock(&rbd_dev_list_lock);
4121                 return;
4122         }
4123
4124         /*
4125          * We need to update the current maximum id.  Search the
4126          * list to find out what it is.  We're more likely to find
4127          * the maximum at the end, so search the list backward.
4128          */
4129         max_id = 0;
4130         list_for_each_prev(tmp, &rbd_dev_list) {
4131                 struct rbd_device *rbd_dev;
4132
4133                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4134                 if (rbd_dev->dev_id > max_id)
4135                         max_id = rbd_dev->dev_id;
4136         }
4137         spin_unlock(&rbd_dev_list_lock);
4138
4139         /*
4140          * The max id could have been updated by rbd_dev_id_get(), in
4141          * which case it now accurately reflects the new maximum.
4142          * Be careful not to overwrite the maximum value in that
4143          * case.
4144          */
4145         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4146         dout("  max dev id has been reset\n");
4147 }
4148
4149 /*
4150  * Skips over white space at *buf, and updates *buf to point to the
4151  * first found non-space character (if any). Returns the length of
4152  * the token (string of non-white space characters) found.  Note
4153  * that *buf must be terminated with '\0'.
4154  */
4155 static inline size_t next_token(const char **buf)
4156 {
4157         /*
4158         * These are the characters that produce nonzero for
4159         * isspace() in the "C" and "POSIX" locales.
4160         */
4161         const char *spaces = " \f\n\r\t\v";
4162
4163         *buf += strspn(*buf, spaces);   /* Find start of token */
4164
4165         return strcspn(*buf, spaces);   /* Return token length */
4166 }
4167
4168 /*
4169  * Finds the next token in *buf, and if the provided token buffer is
4170  * big enough, copies the found token into it.  The result, if
4171  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4172  * must be terminated with '\0' on entry.
4173  *
4174  * Returns the length of the token found (not including the '\0').
4175  * Return value will be 0 if no token is found, and it will be >=
4176  * token_size if the token would not fit.
4177  *
4178  * The *buf pointer will be updated to point beyond the end of the
4179  * found token.  Note that this occurs even if the token buffer is
4180  * too small to hold it.
4181  */
4182 static inline size_t copy_token(const char **buf,
4183                                 char *token,
4184                                 size_t token_size)
4185 {
4186         size_t len;
4187
4188         len = next_token(buf);
4189         if (len < token_size) {
4190                 memcpy(token, *buf, len);
4191                 *(token + len) = '\0';
4192         }
4193         *buf += len;
4194
4195         return len;
4196 }
4197
4198 /*
4199  * Finds the next token in *buf, dynamically allocates a buffer big
4200  * enough to hold a copy of it, and copies the token into the new
4201  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4202  * that a duplicate buffer is created even for a zero-length token.
4203  *
4204  * Returns a pointer to the newly-allocated duplicate, or a null
4205  * pointer if memory for the duplicate was not available.  If
4206  * the lenp argument is a non-null pointer, the length of the token
4207  * (not including the '\0') is returned in *lenp.
4208  *
4209  * If successful, the *buf pointer will be updated to point beyond
4210  * the end of the found token.
4211  *
4212  * Note: uses GFP_KERNEL for allocation.
4213  */
4214 static inline char *dup_token(const char **buf, size_t *lenp)
4215 {
4216         char *dup;
4217         size_t len;
4218
4219         len = next_token(buf);
4220         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4221         if (!dup)
4222                 return NULL;
4223         *(dup + len) = '\0';
4224         *buf += len;
4225
4226         if (lenp)
4227                 *lenp = len;
4228
4229         return dup;
4230 }
4231
4232 /*
4233  * Parse the options provided for an "rbd add" (i.e., rbd image
4234  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4235  * and the data written is passed here via a NUL-terminated buffer.
4236  * Returns 0 if successful or an error code otherwise.
4237  *
4238  * The information extracted from these options is recorded in
4239  * the other parameters which return dynamically-allocated
4240  * structures:
4241  *  ceph_opts
4242  *      The address of a pointer that will refer to a ceph options
4243  *      structure.  Caller must release the returned pointer using
4244  *      ceph_destroy_options() when it is no longer needed.
4245  *  rbd_opts
4246  *      Address of an rbd options pointer.  Fully initialized by
4247  *      this function; caller must release with kfree().
4248  *  spec
4249  *      Address of an rbd image specification pointer.  Fully
4250  *      initialized by this function based on parsed options.
4251  *      Caller must release with rbd_spec_put().
4252  *
4253  * The options passed take this form:
4254  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4255  * where:
4256  *  <mon_addrs>
4257  *      A comma-separated list of one or more monitor addresses.
4258  *      A monitor address is an ip address, optionally followed
4259  *      by a port number (separated by a colon).
4260  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4261  *  <options>
4262  *      A comma-separated list of ceph and/or rbd options.
4263  *  <pool_name>
4264  *      The name of the rados pool containing the rbd image.
4265  *  <image_name>
4266  *      The name of the image in that pool to map.
4267  *  <snap_id>
4268  *      An optional snapshot id.  If provided, the mapping will
4269  *      present data from the image at the time that snapshot was
4270  *      created.  The image head is used if no snapshot id is
4271  *      provided.  Snapshot mappings are always read-only.
4272  */
4273 static int rbd_add_parse_args(const char *buf,
4274                                 struct ceph_options **ceph_opts,
4275                                 struct rbd_options **opts,
4276                                 struct rbd_spec **rbd_spec)
4277 {
4278         size_t len;
4279         char *options;
4280         const char *mon_addrs;
4281         char *snap_name;
4282         size_t mon_addrs_size;
4283         struct rbd_spec *spec = NULL;
4284         struct rbd_options *rbd_opts = NULL;
4285         struct ceph_options *copts;
4286         int ret;
4287
4288         /* The first four tokens are required */
4289
4290         len = next_token(&buf);
4291         if (!len) {
4292                 rbd_warn(NULL, "no monitor address(es) provided");
4293                 return -EINVAL;
4294         }
4295         mon_addrs = buf;
4296         mon_addrs_size = len + 1;
4297         buf += len;
4298
4299         ret = -EINVAL;
4300         options = dup_token(&buf, NULL);
4301         if (!options)
4302                 return -ENOMEM;
4303         if (!*options) {
4304                 rbd_warn(NULL, "no options provided");
4305                 goto out_err;
4306         }
4307
4308         spec = rbd_spec_alloc();
4309         if (!spec)
4310                 goto out_mem;
4311
4312         spec->pool_name = dup_token(&buf, NULL);
4313         if (!spec->pool_name)
4314                 goto out_mem;
4315         if (!*spec->pool_name) {
4316                 rbd_warn(NULL, "no pool name provided");
4317                 goto out_err;
4318         }
4319
4320         spec->image_name = dup_token(&buf, NULL);
4321         if (!spec->image_name)
4322                 goto out_mem;
4323         if (!*spec->image_name) {
4324                 rbd_warn(NULL, "no image name provided");
4325                 goto out_err;
4326         }
4327
4328         /*
4329          * Snapshot name is optional; default is to use "-"
4330          * (indicating the head/no snapshot).
4331          */
4332         len = next_token(&buf);
4333         if (!len) {
4334                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4335                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4336         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4337                 ret = -ENAMETOOLONG;
4338                 goto out_err;
4339         }
4340         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4341         if (!snap_name)
4342                 goto out_mem;
4343         *(snap_name + len) = '\0';
4344         spec->snap_name = snap_name;
4345
4346         /* Initialize all rbd options to the defaults */
4347
4348         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4349         if (!rbd_opts)
4350                 goto out_mem;
4351
4352         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4353
4354         copts = ceph_parse_options(options, mon_addrs,
4355                                         mon_addrs + mon_addrs_size - 1,
4356                                         parse_rbd_opts_token, rbd_opts);
4357         if (IS_ERR(copts)) {
4358                 ret = PTR_ERR(copts);
4359                 goto out_err;
4360         }
4361         kfree(options);
4362
4363         *ceph_opts = copts;
4364         *opts = rbd_opts;
4365         *rbd_spec = spec;
4366
4367         return 0;
4368 out_mem:
4369         ret = -ENOMEM;
4370 out_err:
4371         kfree(rbd_opts);
4372         rbd_spec_put(spec);
4373         kfree(options);
4374
4375         return ret;
4376 }
4377
4378 /*
4379  * An rbd format 2 image has a unique identifier, distinct from the
4380  * name given to it by the user.  Internally, that identifier is
4381  * what's used to specify the names of objects related to the image.
4382  *
4383  * A special "rbd id" object is used to map an rbd image name to its
4384  * id.  If that object doesn't exist, then there is no v2 rbd image
4385  * with the supplied name.
4386  *
4387  * This function will record the given rbd_dev's image_id field if
4388  * it can be determined, and in that case will return 0.  If any
4389  * errors occur a negative errno will be returned and the rbd_dev's
4390  * image_id field will be unchanged (and should be NULL).
4391  */
4392 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4393 {
4394         int ret;
4395         size_t size;
4396         char *object_name;
4397         void *response;
4398         char *image_id;
4399
4400         /*
4401          * When probing a parent image, the image id is already
4402          * known (and the image name likely is not).  There's no
4403          * need to fetch the image id again in this case.  We
4404          * do still need to set the image format though.
4405          */
4406         if (rbd_dev->spec->image_id) {
4407                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4408
4409                 return 0;
4410         }
4411
4412         /*
4413          * First, see if the format 2 image id file exists, and if
4414          * so, get the image's persistent id from it.
4415          */
4416         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4417         object_name = kmalloc(size, GFP_NOIO);
4418         if (!object_name)
4419                 return -ENOMEM;
4420         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4421         dout("rbd id object name is %s\n", object_name);
4422
4423         /* Response will be an encoded string, which includes a length */
4424
4425         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4426         response = kzalloc(size, GFP_NOIO);
4427         if (!response) {
4428                 ret = -ENOMEM;
4429                 goto out;
4430         }
4431
4432         /* If it doesn't exist we'll assume it's a format 1 image */
4433
4434         ret = rbd_obj_method_sync(rbd_dev, object_name,
4435                                 "rbd", "get_id", NULL, 0,
4436                                 response, RBD_IMAGE_ID_LEN_MAX);
4437         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4438         if (ret == -ENOENT) {
4439                 image_id = kstrdup("", GFP_KERNEL);
4440                 ret = image_id ? 0 : -ENOMEM;
4441                 if (!ret)
4442                         rbd_dev->image_format = 1;
4443         } else if (ret > sizeof (__le32)) {
4444                 void *p = response;
4445
4446                 image_id = ceph_extract_encoded_string(&p, p + ret,
4447                                                 NULL, GFP_NOIO);
4448                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4449                 if (!ret)
4450                         rbd_dev->image_format = 2;
4451         } else {
4452                 ret = -EINVAL;
4453         }
4454
4455         if (!ret) {
4456                 rbd_dev->spec->image_id = image_id;
4457                 dout("image_id is %s\n", image_id);
4458         }
4459 out:
4460         kfree(response);
4461         kfree(object_name);
4462
4463         return ret;
4464 }
4465
4466 /* Undo whatever state changes are made by v1 or v2 image probe */
4467
4468 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4469 {
4470         struct rbd_image_header *header;
4471
4472         rbd_dev_remove_parent(rbd_dev);
4473         rbd_spec_put(rbd_dev->parent_spec);
4474         rbd_dev->parent_spec = NULL;
4475         rbd_dev->parent_overlap = 0;
4476
4477         /* Free dynamic fields from the header, then zero it out */
4478
4479         header = &rbd_dev->header;
4480         ceph_put_snap_context(header->snapc);
4481         kfree(header->snap_sizes);
4482         kfree(header->snap_names);
4483         kfree(header->object_prefix);
4484         memset(header, 0, sizeof (*header));
4485 }
4486
4487 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4488 {
4489         int ret;
4490
4491         /* Populate rbd image metadata */
4492
4493         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4494         if (ret < 0)
4495                 goto out_err;
4496
4497         /* Version 1 images have no parent (no layering) */
4498
4499         rbd_dev->parent_spec = NULL;
4500         rbd_dev->parent_overlap = 0;
4501
4502         dout("discovered version 1 image, header name is %s\n",
4503                 rbd_dev->header_name);
4504
4505         return 0;
4506
4507 out_err:
4508         kfree(rbd_dev->header_name);
4509         rbd_dev->header_name = NULL;
4510         kfree(rbd_dev->spec->image_id);
4511         rbd_dev->spec->image_id = NULL;
4512
4513         return ret;
4514 }
4515
4516 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4517 {
4518         int ret;
4519
4520         ret = rbd_dev_v2_image_size(rbd_dev);
4521         if (ret)
4522                 goto out_err;
4523
4524         /* Get the object prefix (a.k.a. block_name) for the image */
4525
4526         ret = rbd_dev_v2_object_prefix(rbd_dev);
4527         if (ret)
4528                 goto out_err;
4529
4530         /* Get the and check features for the image */
4531
4532         ret = rbd_dev_v2_features(rbd_dev);
4533         if (ret)
4534                 goto out_err;
4535
4536         /* If the image supports layering, get the parent info */
4537
4538         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4539                 ret = rbd_dev_v2_parent_info(rbd_dev);
4540                 if (ret)
4541                         goto out_err;
4542
4543                 /*
4544                  * Don't print a warning for parent images.  We can
4545                  * tell this point because we won't know its pool
4546                  * name yet (just its pool id).
4547                  */
4548                 if (rbd_dev->spec->pool_name)
4549                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4550                                         "is EXPERIMENTAL!");
4551         }
4552
4553         /* If the image supports fancy striping, get its parameters */
4554
4555         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4556                 ret = rbd_dev_v2_striping_info(rbd_dev);
4557                 if (ret < 0)
4558                         goto out_err;
4559         }
4560
4561         /* crypto and compression type aren't (yet) supported for v2 images */
4562
4563         rbd_dev->header.crypt_type = 0;
4564         rbd_dev->header.comp_type = 0;
4565
4566         /* Get the snapshot context, plus the header version */
4567
4568         ret = rbd_dev_v2_snap_context(rbd_dev);
4569         if (ret)
4570                 goto out_err;
4571
4572         dout("discovered version 2 image, header name is %s\n",
4573                 rbd_dev->header_name);
4574
4575         return 0;
4576 out_err:
4577         rbd_dev->parent_overlap = 0;
4578         rbd_spec_put(rbd_dev->parent_spec);
4579         rbd_dev->parent_spec = NULL;
4580         kfree(rbd_dev->header_name);
4581         rbd_dev->header_name = NULL;
4582         kfree(rbd_dev->header.object_prefix);
4583         rbd_dev->header.object_prefix = NULL;
4584
4585         return ret;
4586 }
4587
4588 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4589 {
4590         struct rbd_device *parent = NULL;
4591         struct rbd_spec *parent_spec;
4592         struct rbd_client *rbdc;
4593         int ret;
4594
4595         if (!rbd_dev->parent_spec)
4596                 return 0;
4597         /*
4598          * We need to pass a reference to the client and the parent
4599          * spec when creating the parent rbd_dev.  Images related by
4600          * parent/child relationships always share both.
4601          */
4602         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4603         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4604
4605         ret = -ENOMEM;
4606         parent = rbd_dev_create(rbdc, parent_spec);
4607         if (!parent)
4608                 goto out_err;
4609
4610         ret = rbd_dev_image_probe(parent);
4611         if (ret < 0)
4612                 goto out_err;
4613         rbd_dev->parent = parent;
4614
4615         return 0;
4616 out_err:
4617         if (parent) {
4618                 rbd_spec_put(rbd_dev->parent_spec);
4619                 kfree(rbd_dev->header_name);
4620                 rbd_dev_destroy(parent);
4621         } else {
4622                 rbd_put_client(rbdc);
4623                 rbd_spec_put(parent_spec);
4624         }
4625
4626         return ret;
4627 }
4628
4629 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4630 {
4631         int ret;
4632
4633         ret = rbd_dev_mapping_set(rbd_dev);
4634         if (ret)
4635                 return ret;
4636
4637         /* generate unique id: find highest unique id, add one */
4638         rbd_dev_id_get(rbd_dev);
4639
4640         /* Fill in the device name, now that we have its id. */
4641         BUILD_BUG_ON(DEV_NAME_LEN
4642                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4643         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4644
4645         /* Get our block major device number. */
4646
4647         ret = register_blkdev(0, rbd_dev->name);
4648         if (ret < 0)
4649                 goto err_out_id;
4650         rbd_dev->major = ret;
4651
4652         /* Set up the blkdev mapping. */
4653
4654         ret = rbd_init_disk(rbd_dev);
4655         if (ret)
4656                 goto err_out_blkdev;
4657
4658         ret = rbd_bus_add_dev(rbd_dev);
4659         if (ret)
4660                 goto err_out_disk;
4661
4662         /* Everything's ready.  Announce the disk to the world. */
4663
4664         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4665         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4666         add_disk(rbd_dev->disk);
4667
4668         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4669                 (unsigned long long) rbd_dev->mapping.size);
4670
4671         return ret;
4672
4673 err_out_disk:
4674         rbd_free_disk(rbd_dev);
4675 err_out_blkdev:
4676         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4677 err_out_id:
4678         rbd_dev_id_put(rbd_dev);
4679         rbd_dev_mapping_clear(rbd_dev);
4680
4681         return ret;
4682 }
4683
4684 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4685 {
4686         struct rbd_spec *spec = rbd_dev->spec;
4687         size_t size;
4688
4689         /* Record the header object name for this rbd image. */
4690
4691         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4692
4693         if (rbd_dev->image_format == 1)
4694                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4695         else
4696                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4697
4698         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4699         if (!rbd_dev->header_name)
4700                 return -ENOMEM;
4701
4702         if (rbd_dev->image_format == 1)
4703                 sprintf(rbd_dev->header_name, "%s%s",
4704                         spec->image_name, RBD_SUFFIX);
4705         else
4706                 sprintf(rbd_dev->header_name, "%s%s",
4707                         RBD_HEADER_PREFIX, spec->image_id);
4708         return 0;
4709 }
4710
4711 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4712 {
4713         int ret;
4714
4715         rbd_dev_unprobe(rbd_dev);
4716         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4717         if (ret)
4718                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4719         kfree(rbd_dev->header_name);
4720         rbd_dev->header_name = NULL;
4721         rbd_dev->image_format = 0;
4722         kfree(rbd_dev->spec->image_id);
4723         rbd_dev->spec->image_id = NULL;
4724
4725         rbd_dev_destroy(rbd_dev);
4726 }
4727
4728 /*
4729  * Probe for the existence of the header object for the given rbd
4730  * device.  For format 2 images this includes determining the image
4731  * id.
4732  */
4733 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4734 {
4735         int ret;
4736         int tmp;
4737
4738         /*
4739          * Get the id from the image id object.  If it's not a
4740          * format 2 image, we'll get ENOENT back, and we'll assume
4741          * it's a format 1 image.
4742          */
4743         ret = rbd_dev_image_id(rbd_dev);
4744         if (ret)
4745                 return ret;
4746         rbd_assert(rbd_dev->spec->image_id);
4747         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4748
4749         ret = rbd_dev_header_name(rbd_dev);
4750         if (ret)
4751                 goto err_out_format;
4752
4753         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4754         if (ret)
4755                 goto out_header_name;
4756
4757         if (rbd_dev->image_format == 1)
4758                 ret = rbd_dev_v1_probe(rbd_dev);
4759         else
4760                 ret = rbd_dev_v2_probe(rbd_dev);
4761         if (ret)
4762                 goto err_out_watch;
4763
4764         ret = rbd_dev_spec_update(rbd_dev);
4765         if (ret)
4766                 goto err_out_probe;
4767
4768         ret = rbd_dev_probe_parent(rbd_dev);
4769         if (!ret)
4770                 return 0;
4771
4772 err_out_probe:
4773         rbd_dev_unprobe(rbd_dev);
4774 err_out_watch:
4775         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4776         if (tmp)
4777                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4778 out_header_name:
4779         kfree(rbd_dev->header_name);
4780         rbd_dev->header_name = NULL;
4781 err_out_format:
4782         rbd_dev->image_format = 0;
4783         kfree(rbd_dev->spec->image_id);
4784         rbd_dev->spec->image_id = NULL;
4785
4786         dout("probe failed, returning %d\n", ret);
4787
4788         return ret;
4789 }
4790
4791 static ssize_t rbd_add(struct bus_type *bus,
4792                        const char *buf,
4793                        size_t count)
4794 {
4795         struct rbd_device *rbd_dev = NULL;
4796         struct ceph_options *ceph_opts = NULL;
4797         struct rbd_options *rbd_opts = NULL;
4798         struct rbd_spec *spec = NULL;
4799         struct rbd_client *rbdc;
4800         struct ceph_osd_client *osdc;
4801         int rc = -ENOMEM;
4802
4803         if (!try_module_get(THIS_MODULE))
4804                 return -ENODEV;
4805
4806         /* parse add command */
4807         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4808         if (rc < 0)
4809                 goto err_out_module;
4810
4811         rbdc = rbd_get_client(ceph_opts);
4812         if (IS_ERR(rbdc)) {
4813                 rc = PTR_ERR(rbdc);
4814                 goto err_out_args;
4815         }
4816         ceph_opts = NULL;       /* rbd_dev client now owns this */
4817
4818         /* pick the pool */
4819         osdc = &rbdc->client->osdc;
4820         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4821         if (rc < 0)
4822                 goto err_out_client;
4823         spec->pool_id = (u64)rc;
4824
4825         /* The ceph file layout needs to fit pool id in 32 bits */
4826
4827         if (spec->pool_id > (u64)U32_MAX) {
4828                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4829                                 (unsigned long long)spec->pool_id, U32_MAX);
4830                 rc = -EIO;
4831                 goto err_out_client;
4832         }
4833
4834         rbd_dev = rbd_dev_create(rbdc, spec);
4835         if (!rbd_dev)
4836                 goto err_out_client;
4837         rbdc = NULL;            /* rbd_dev now owns this */
4838         spec = NULL;            /* rbd_dev now owns this */
4839
4840         rbd_dev->mapping.read_only = rbd_opts->read_only;
4841         kfree(rbd_opts);
4842         rbd_opts = NULL;        /* done with this */
4843
4844         rc = rbd_dev_image_probe(rbd_dev);
4845         if (rc < 0)
4846                 goto err_out_rbd_dev;
4847
4848         rc = rbd_dev_device_setup(rbd_dev);
4849         if (!rc)
4850                 return count;
4851
4852         rbd_dev_image_release(rbd_dev);
4853 err_out_rbd_dev:
4854         rbd_dev_destroy(rbd_dev);
4855 err_out_client:
4856         rbd_put_client(rbdc);
4857 err_out_args:
4858         if (ceph_opts)
4859                 ceph_destroy_options(ceph_opts);
4860         kfree(rbd_opts);
4861         rbd_spec_put(spec);
4862 err_out_module:
4863         module_put(THIS_MODULE);
4864
4865         dout("Error adding device %s\n", buf);
4866
4867         return (ssize_t)rc;
4868 }
4869
4870 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4871 {
4872         struct list_head *tmp;
4873         struct rbd_device *rbd_dev;
4874
4875         spin_lock(&rbd_dev_list_lock);
4876         list_for_each(tmp, &rbd_dev_list) {
4877                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4878                 if (rbd_dev->dev_id == dev_id) {
4879                         spin_unlock(&rbd_dev_list_lock);
4880                         return rbd_dev;
4881                 }
4882         }
4883         spin_unlock(&rbd_dev_list_lock);
4884         return NULL;
4885 }
4886
4887 static void rbd_dev_device_release(struct device *dev)
4888 {
4889         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4890
4891         rbd_free_disk(rbd_dev);
4892         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4893         rbd_dev_clear_mapping(rbd_dev);
4894         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4895         rbd_dev->major = 0;
4896         rbd_dev_id_put(rbd_dev);
4897         rbd_dev_mapping_clear(rbd_dev);
4898 }
4899
4900 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4901 {
4902         while (rbd_dev->parent) {
4903                 struct rbd_device *first = rbd_dev;
4904                 struct rbd_device *second = first->parent;
4905                 struct rbd_device *third;
4906
4907                 /*
4908                  * Follow to the parent with no grandparent and
4909                  * remove it.
4910                  */
4911                 while (second && (third = second->parent)) {
4912                         first = second;
4913                         second = third;
4914                 }
4915                 rbd_assert(second);
4916                 rbd_dev_image_release(second);
4917                 first->parent = NULL;
4918                 first->parent_overlap = 0;
4919
4920                 rbd_assert(first->parent_spec);
4921                 rbd_spec_put(first->parent_spec);
4922                 first->parent_spec = NULL;
4923         }
4924 }
4925
4926 static ssize_t rbd_remove(struct bus_type *bus,
4927                           const char *buf,
4928                           size_t count)
4929 {
4930         struct rbd_device *rbd_dev = NULL;
4931         int target_id;
4932         unsigned long ul;
4933         int ret;
4934
4935         ret = strict_strtoul(buf, 10, &ul);
4936         if (ret)
4937                 return ret;
4938
4939         /* convert to int; abort if we lost anything in the conversion */
4940         target_id = (int) ul;
4941         if (target_id != ul)
4942                 return -EINVAL;
4943
4944         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4945
4946         rbd_dev = __rbd_get_dev(target_id);
4947         if (!rbd_dev) {
4948                 ret = -ENOENT;
4949                 goto done;
4950         }
4951
4952         spin_lock_irq(&rbd_dev->lock);
4953         if (rbd_dev->open_count)
4954                 ret = -EBUSY;
4955         else
4956                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4957         spin_unlock_irq(&rbd_dev->lock);
4958         if (ret < 0)
4959                 goto done;
4960         ret = count;
4961         rbd_bus_del_dev(rbd_dev);
4962         rbd_dev_image_release(rbd_dev);
4963         module_put(THIS_MODULE);
4964 done:
4965         mutex_unlock(&ctl_mutex);
4966
4967         return ret;
4968 }
4969
4970 /*
4971  * create control files in sysfs
4972  * /sys/bus/rbd/...
4973  */
4974 static int rbd_sysfs_init(void)
4975 {
4976         int ret;
4977
4978         ret = device_register(&rbd_root_dev);
4979         if (ret < 0)
4980                 return ret;
4981
4982         ret = bus_register(&rbd_bus_type);
4983         if (ret < 0)
4984                 device_unregister(&rbd_root_dev);
4985
4986         return ret;
4987 }
4988
4989 static void rbd_sysfs_cleanup(void)
4990 {
4991         bus_unregister(&rbd_bus_type);
4992         device_unregister(&rbd_root_dev);
4993 }
4994
4995 static int __init rbd_init(void)
4996 {
4997         int rc;
4998
4999         if (!libceph_compatible(NULL)) {
5000                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5001
5002                 return -EINVAL;
5003         }
5004         rc = rbd_sysfs_init();
5005         if (rc)
5006                 return rc;
5007         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5008         return 0;
5009 }
5010
5011 static void __exit rbd_exit(void)
5012 {
5013         rbd_sysfs_cleanup();
5014 }
5015
5016 module_init(rbd_init);
5017 module_exit(rbd_exit);
5018
5019 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5020 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5021 MODULE_DESCRIPTION("rados block device");
5022
5023 /* following authorship retained from original osdblk.c */
5024 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5025
5026 MODULE_LICENSE("GPL");