]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
597b9bbe2fc7100d11c0de18b9e37d81c5f7e2f1
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227         u32                     copyup_page_count;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261         spinlock_t              completion_lock;/* protects next_completion */
262         u32                     next_completion;
263         rbd_img_callback_t      callback;
264         u64                     xferred;/* aggregate bytes transferred */
265         int                     result; /* first nonzero obj_request result */
266
267         u32                     obj_request_count;
268         struct list_head        obj_requests;   /* rbd_obj_request structs */
269
270         struct kref             kref;
271 };
272
273 #define for_each_obj_request(ireq, oreq) \
274         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279
280 struct rbd_mapping {
281         u64                     size;
282         u64                     features;
283         bool                    read_only;
284 };
285
286 /*
287  * a single device
288  */
289 struct rbd_device {
290         int                     dev_id;         /* blkdev unique id */
291
292         int                     major;          /* blkdev assigned major */
293         struct gendisk          *disk;          /* blkdev's gendisk and rq */
294
295         u32                     image_format;   /* Either 1 or 2 */
296         struct rbd_client       *rbd_client;
297
298         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299
300         spinlock_t              lock;           /* queue, flags, open_count */
301
302         struct rbd_image_header header;
303         unsigned long           flags;          /* possibly lock protected */
304         struct rbd_spec         *spec;
305
306         char                    *header_name;
307
308         struct ceph_file_layout layout;
309
310         struct ceph_osd_event   *watch_event;
311         struct rbd_obj_request  *watch_request;
312
313         struct rbd_spec         *parent_spec;
314         u64                     parent_overlap;
315         struct rbd_device       *parent;
316
317         /* protects updating the header */
318         struct rw_semaphore     header_rwsem;
319
320         struct rbd_mapping      mapping;
321
322         struct list_head        node;
323
324         /* sysfs related */
325         struct device           dev;
326         unsigned long           open_count;     /* protected by lock */
327 };
328
329 /*
330  * Flag bits for rbd_dev->flags.  If atomicity is required,
331  * rbd_dev->lock is used to protect access.
332  *
333  * Currently, only the "removing" flag (which is coupled with the
334  * "open_count" field) requires atomic access.
335  */
336 enum rbd_dev_flags {
337         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
338         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
339 };
340
341 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
342
343 static LIST_HEAD(rbd_dev_list);    /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345
346 static LIST_HEAD(rbd_client_list);              /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
348
349 /* Slab caches for frequently-allocated structures */
350
351 static struct kmem_cache        *rbd_img_request_cache;
352 static struct kmem_cache        *rbd_obj_request_cache;
353 static struct kmem_cache        *rbd_segment_name_cache;
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static void rbd_dev_device_release(struct device *dev);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
433                                         u64 snap_id);
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435                                 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
437                 u64 *snap_features);
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
439
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
441 {
442         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443         bool removing = false;
444
445         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446                 return -EROFS;
447
448         spin_lock_irq(&rbd_dev->lock);
449         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450                 removing = true;
451         else
452                 rbd_dev->open_count++;
453         spin_unlock_irq(&rbd_dev->lock);
454         if (removing)
455                 return -ENOENT;
456
457         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458         (void) get_device(&rbd_dev->dev);
459         set_device_ro(bdev, rbd_dev->mapping.read_only);
460         mutex_unlock(&ctl_mutex);
461
462         return 0;
463 }
464
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
466 {
467         struct rbd_device *rbd_dev = disk->private_data;
468         unsigned long open_count_before;
469
470         spin_lock_irq(&rbd_dev->lock);
471         open_count_before = rbd_dev->open_count--;
472         spin_unlock_irq(&rbd_dev->lock);
473         rbd_assert(open_count_before > 0);
474
475         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476         put_device(&rbd_dev->dev);
477         mutex_unlock(&ctl_mutex);
478
479         return 0;
480 }
481
482 static const struct block_device_operations rbd_bd_ops = {
483         .owner                  = THIS_MODULE,
484         .open                   = rbd_open,
485         .release                = rbd_release,
486 };
487
488 /*
489  * Initialize an rbd client instance.
490  * We own *ceph_opts.
491  */
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
493 {
494         struct rbd_client *rbdc;
495         int ret = -ENOMEM;
496
497         dout("%s:\n", __func__);
498         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499         if (!rbdc)
500                 goto out_opt;
501
502         kref_init(&rbdc->kref);
503         INIT_LIST_HEAD(&rbdc->node);
504
505         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
506
507         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508         if (IS_ERR(rbdc->client))
509                 goto out_mutex;
510         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
511
512         ret = ceph_open_session(rbdc->client);
513         if (ret < 0)
514                 goto out_err;
515
516         spin_lock(&rbd_client_list_lock);
517         list_add_tail(&rbdc->node, &rbd_client_list);
518         spin_unlock(&rbd_client_list_lock);
519
520         mutex_unlock(&ctl_mutex);
521         dout("%s: rbdc %p\n", __func__, rbdc);
522
523         return rbdc;
524
525 out_err:
526         ceph_destroy_client(rbdc->client);
527 out_mutex:
528         mutex_unlock(&ctl_mutex);
529         kfree(rbdc);
530 out_opt:
531         if (ceph_opts)
532                 ceph_destroy_options(ceph_opts);
533         dout("%s: error %d\n", __func__, ret);
534
535         return ERR_PTR(ret);
536 }
537
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
539 {
540         kref_get(&rbdc->kref);
541
542         return rbdc;
543 }
544
545 /*
546  * Find a ceph client with specific addr and configuration.  If
547  * found, bump its reference count.
548  */
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
550 {
551         struct rbd_client *client_node;
552         bool found = false;
553
554         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555                 return NULL;
556
557         spin_lock(&rbd_client_list_lock);
558         list_for_each_entry(client_node, &rbd_client_list, node) {
559                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560                         __rbd_get_client(client_node);
561
562                         found = true;
563                         break;
564                 }
565         }
566         spin_unlock(&rbd_client_list_lock);
567
568         return found ? client_node : NULL;
569 }
570
571 /*
572  * mount options
573  */
574 enum {
575         Opt_last_int,
576         /* int args above */
577         Opt_last_string,
578         /* string args above */
579         Opt_read_only,
580         Opt_read_write,
581         /* Boolean args above */
582         Opt_last_bool,
583 };
584
585 static match_table_t rbd_opts_tokens = {
586         /* int args above */
587         /* string args above */
588         {Opt_read_only, "read_only"},
589         {Opt_read_only, "ro"},          /* Alternate spelling */
590         {Opt_read_write, "read_write"},
591         {Opt_read_write, "rw"},         /* Alternate spelling */
592         /* Boolean args above */
593         {-1, NULL}
594 };
595
596 struct rbd_options {
597         bool    read_only;
598 };
599
600 #define RBD_READ_ONLY_DEFAULT   false
601
602 static int parse_rbd_opts_token(char *c, void *private)
603 {
604         struct rbd_options *rbd_opts = private;
605         substring_t argstr[MAX_OPT_ARGS];
606         int token, intval, ret;
607
608         token = match_token(c, rbd_opts_tokens, argstr);
609         if (token < 0)
610                 return -EINVAL;
611
612         if (token < Opt_last_int) {
613                 ret = match_int(&argstr[0], &intval);
614                 if (ret < 0) {
615                         pr_err("bad mount option arg (not int) "
616                                "at '%s'\n", c);
617                         return ret;
618                 }
619                 dout("got int token %d val %d\n", token, intval);
620         } else if (token > Opt_last_int && token < Opt_last_string) {
621                 dout("got string token %d val %s\n", token,
622                      argstr[0].from);
623         } else if (token > Opt_last_string && token < Opt_last_bool) {
624                 dout("got Boolean token %d\n", token);
625         } else {
626                 dout("got token %d\n", token);
627         }
628
629         switch (token) {
630         case Opt_read_only:
631                 rbd_opts->read_only = true;
632                 break;
633         case Opt_read_write:
634                 rbd_opts->read_only = false;
635                 break;
636         default:
637                 rbd_assert(false);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Get a ceph client with specific addr and configuration, if one does
645  * not exist create it.
646  */
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650
651         rbdc = rbd_client_find(ceph_opts);
652         if (rbdc)       /* using an existing client */
653                 ceph_destroy_options(ceph_opts);
654         else
655                 rbdc = rbd_client_create(ceph_opts);
656
657         return rbdc;
658 }
659
660 /*
661  * Destroy ceph client
662  *
663  * Caller must hold rbd_client_list_lock.
664  */
665 static void rbd_client_release(struct kref *kref)
666 {
667         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
668
669         dout("%s: rbdc %p\n", __func__, rbdc);
670         spin_lock(&rbd_client_list_lock);
671         list_del(&rbdc->node);
672         spin_unlock(&rbd_client_list_lock);
673
674         ceph_destroy_client(rbdc->client);
675         kfree(rbdc);
676 }
677
678 /*
679  * Drop reference to ceph client node. If it's not referenced anymore, release
680  * it.
681  */
682 static void rbd_put_client(struct rbd_client *rbdc)
683 {
684         if (rbdc)
685                 kref_put(&rbdc->kref, rbd_client_release);
686 }
687
688 static bool rbd_image_format_valid(u32 image_format)
689 {
690         return image_format == 1 || image_format == 2;
691 }
692
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 {
695         size_t size;
696         u32 snap_count;
697
698         /* The header has to start with the magic rbd header text */
699         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700                 return false;
701
702         /* The bio layer requires at least sector-sized I/O */
703
704         if (ondisk->options.order < SECTOR_SHIFT)
705                 return false;
706
707         /* If we use u64 in a few spots we may be able to loosen this */
708
709         if (ondisk->options.order > 8 * sizeof (int) - 1)
710                 return false;
711
712         /*
713          * The size of a snapshot header has to fit in a size_t, and
714          * that limits the number of snapshots.
715          */
716         snap_count = le32_to_cpu(ondisk->snap_count);
717         size = SIZE_MAX - sizeof (struct ceph_snap_context);
718         if (snap_count > size / sizeof (__le64))
719                 return false;
720
721         /*
722          * Not only that, but the size of the entire the snapshot
723          * header must also be representable in a size_t.
724          */
725         size -= snap_count * sizeof (__le64);
726         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
727                 return false;
728
729         return true;
730 }
731
732 /*
733  * Fill an rbd image header with information from the given format 1
734  * on-disk header.
735  */
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737                                  struct rbd_image_header_ondisk *ondisk)
738 {
739         struct rbd_image_header *header = &rbd_dev->header;
740         bool first_time = header->object_prefix == NULL;
741         struct ceph_snap_context *snapc;
742         char *object_prefix = NULL;
743         char *snap_names = NULL;
744         u64 *snap_sizes = NULL;
745         u32 snap_count;
746         size_t size;
747         int ret = -ENOMEM;
748         u32 i;
749
750         /* Allocate this now to avoid having to handle failure below */
751
752         if (first_time) {
753                 size_t len;
754
755                 len = strnlen(ondisk->object_prefix,
756                                 sizeof (ondisk->object_prefix));
757                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758                 if (!object_prefix)
759                         return -ENOMEM;
760                 memcpy(object_prefix, ondisk->object_prefix, len);
761                 object_prefix[len] = '\0';
762         }
763
764         /* Allocate the snapshot context and fill it in */
765
766         snap_count = le32_to_cpu(ondisk->snap_count);
767         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768         if (!snapc)
769                 goto out_err;
770         snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         if (snap_count) {
772                 struct rbd_image_snap_ondisk *snaps;
773                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
774
775                 /* We'll keep a copy of the snapshot names... */
776
777                 if (snap_names_len > (u64)SIZE_MAX)
778                         goto out_2big;
779                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
780                 if (!snap_names)
781                         goto out_err;
782
783                 /* ...as well as the array of their sizes. */
784
785                 size = snap_count * sizeof (*header->snap_sizes);
786                 snap_sizes = kmalloc(size, GFP_KERNEL);
787                 if (!snap_sizes)
788                         goto out_err;
789
790                 /*
791                  * Copy the names, and fill in each snapshot's id
792                  * and size.
793                  *
794                  * Note that rbd_dev_v1_header_info() guarantees the
795                  * ondisk buffer we're working with has
796                  * snap_names_len bytes beyond the end of the
797                  * snapshot id array, this memcpy() is safe.
798                  */
799                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800                 snaps = ondisk->snaps;
801                 for (i = 0; i < snap_count; i++) {
802                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
804                 }
805         }
806
807         /* We won't fail any more, fill in the header */
808
809         down_write(&rbd_dev->header_rwsem);
810         if (first_time) {
811                 header->object_prefix = object_prefix;
812                 header->obj_order = ondisk->options.order;
813                 header->crypt_type = ondisk->options.crypt_type;
814                 header->comp_type = ondisk->options.comp_type;
815                 /* The rest aren't used for format 1 images */
816                 header->stripe_unit = 0;
817                 header->stripe_count = 0;
818                 header->features = 0;
819         } else {
820                 ceph_put_snap_context(header->snapc);
821                 kfree(header->snap_names);
822                 kfree(header->snap_sizes);
823         }
824
825         /* The remaining fields always get updated (when we refresh) */
826
827         header->image_size = le64_to_cpu(ondisk->image_size);
828         header->snapc = snapc;
829         header->snap_names = snap_names;
830         header->snap_sizes = snap_sizes;
831
832         /* Make sure mapping size is consistent with header info */
833
834         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835                 if (rbd_dev->mapping.size != header->image_size)
836                         rbd_dev->mapping.size = header->image_size;
837
838         up_write(&rbd_dev->header_rwsem);
839
840         return 0;
841 out_2big:
842         ret = -EIO;
843 out_err:
844         kfree(snap_sizes);
845         kfree(snap_names);
846         ceph_put_snap_context(snapc);
847         kfree(object_prefix);
848
849         return ret;
850 }
851
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
853 {
854         const char *snap_name;
855
856         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
857
858         /* Skip over names until we find the one we are looking for */
859
860         snap_name = rbd_dev->header.snap_names;
861         while (which--)
862                 snap_name += strlen(snap_name) + 1;
863
864         return kstrdup(snap_name, GFP_KERNEL);
865 }
866
867 /*
868  * Snapshot id comparison function for use with qsort()/bsearch().
869  * Note that result is for snapshots in *descending* order.
870  */
871 static int snapid_compare_reverse(const void *s1, const void *s2)
872 {
873         u64 snap_id1 = *(u64 *)s1;
874         u64 snap_id2 = *(u64 *)s2;
875
876         if (snap_id1 < snap_id2)
877                 return 1;
878         return snap_id1 == snap_id2 ? 0 : -1;
879 }
880
881 /*
882  * Search a snapshot context to see if the given snapshot id is
883  * present.
884  *
885  * Returns the position of the snapshot id in the array if it's found,
886  * or BAD_SNAP_INDEX otherwise.
887  *
888  * Note: The snapshot array is in kept sorted (by the osd) in
889  * reverse order, highest snapshot id first.
890  */
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
892 {
893         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894         u64 *found;
895
896         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897                                 sizeof (snap_id), snapid_compare_reverse);
898
899         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 }
901
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
903                                         u64 snap_id)
904 {
905         u32 which;
906
907         which = rbd_dev_snap_index(rbd_dev, snap_id);
908         if (which == BAD_SNAP_INDEX)
909                 return NULL;
910
911         return _rbd_dev_v1_snap_name(rbd_dev, which);
912 }
913
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
915 {
916         if (snap_id == CEPH_NOSNAP)
917                 return RBD_SNAP_HEAD_NAME;
918
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (rbd_dev->image_format == 1)
921                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
922
923         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 }
925
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927                                 u64 *snap_size)
928 {
929         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930         if (snap_id == CEPH_NOSNAP) {
931                 *snap_size = rbd_dev->header.image_size;
932         } else if (rbd_dev->image_format == 1) {
933                 u32 which;
934
935                 which = rbd_dev_snap_index(rbd_dev, snap_id);
936                 if (which == BAD_SNAP_INDEX)
937                         return -ENOENT;
938
939                 *snap_size = rbd_dev->header.snap_sizes[which];
940         } else {
941                 u64 size = 0;
942                 int ret;
943
944                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
945                 if (ret)
946                         return ret;
947
948                 *snap_size = size;
949         }
950         return 0;
951 }
952
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954                         u64 *snap_features)
955 {
956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957         if (snap_id == CEPH_NOSNAP) {
958                 *snap_features = rbd_dev->header.features;
959         } else if (rbd_dev->image_format == 1) {
960                 *snap_features = 0;     /* No features for format 1 */
961         } else {
962                 u64 features = 0;
963                 int ret;
964
965                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
966                 if (ret)
967                         return ret;
968
969                 *snap_features = features;
970         }
971         return 0;
972 }
973
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
975 {
976         u64 snap_id = rbd_dev->spec->snap_id;
977         u64 size = 0;
978         u64 features = 0;
979         int ret;
980
981         ret = rbd_snap_size(rbd_dev, snap_id, &size);
982         if (ret)
983                 return ret;
984         ret = rbd_snap_features(rbd_dev, snap_id, &features);
985         if (ret)
986                 return ret;
987
988         rbd_dev->mapping.size = size;
989         rbd_dev->mapping.features = features;
990
991         return 0;
992 }
993
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
995 {
996         rbd_dev->mapping.size = 0;
997         rbd_dev->mapping.features = 0;
998 }
999
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         char *name;
1003         u64 segment;
1004         int ret;
1005
1006         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007         if (!name)
1008                 return NULL;
1009         segment = offset >> rbd_dev->header.obj_order;
1010         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011                         rbd_dev->header.object_prefix, segment);
1012         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013                 pr_err("error formatting segment name for #%llu (%d)\n",
1014                         segment, ret);
1015                 kfree(name);
1016                 name = NULL;
1017         }
1018
1019         return name;
1020 }
1021
1022 static void rbd_segment_name_free(const char *name)
1023 {
1024         /* The explicit cast here is needed to drop the const qualifier */
1025
1026         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 }
1028
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1030 {
1031         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1032
1033         return offset & (segment_size - 1);
1034 }
1035
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037                                 u64 offset, u64 length)
1038 {
1039         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1040
1041         offset &= segment_size - 1;
1042
1043         rbd_assert(length <= U64_MAX - offset);
1044         if (offset + length > segment_size)
1045                 length = segment_size - offset;
1046
1047         return length;
1048 }
1049
1050 /*
1051  * returns the size of an object in the image
1052  */
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1054 {
1055         return 1 << header->obj_order;
1056 }
1057
1058 /*
1059  * bio helpers
1060  */
1061
1062 static void bio_chain_put(struct bio *chain)
1063 {
1064         struct bio *tmp;
1065
1066         while (chain) {
1067                 tmp = chain;
1068                 chain = chain->bi_next;
1069                 bio_put(tmp);
1070         }
1071 }
1072
1073 /*
1074  * zeros a bio chain, starting at specific offset
1075  */
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 {
1078         struct bio_vec *bv;
1079         unsigned long flags;
1080         void *buf;
1081         int i;
1082         int pos = 0;
1083
1084         while (chain) {
1085                 bio_for_each_segment(bv, chain, i) {
1086                         if (pos + bv->bv_len > start_ofs) {
1087                                 int remainder = max(start_ofs - pos, 0);
1088                                 buf = bvec_kmap_irq(bv, &flags);
1089                                 memset(buf + remainder, 0,
1090                                        bv->bv_len - remainder);
1091                                 bvec_kunmap_irq(buf, &flags);
1092                         }
1093                         pos += bv->bv_len;
1094                 }
1095
1096                 chain = chain->bi_next;
1097         }
1098 }
1099
1100 /*
1101  * similar to zero_bio_chain(), zeros data defined by a page array,
1102  * starting at the given byte offset from the start of the array and
1103  * continuing up to the given end offset.  The pages array is
1104  * assumed to be big enough to hold all bytes up to the end.
1105  */
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1107 {
1108         struct page **page = &pages[offset >> PAGE_SHIFT];
1109
1110         rbd_assert(end > offset);
1111         rbd_assert(end - offset <= (u64)SIZE_MAX);
1112         while (offset < end) {
1113                 size_t page_offset;
1114                 size_t length;
1115                 unsigned long flags;
1116                 void *kaddr;
1117
1118                 page_offset = (size_t)(offset & ~PAGE_MASK);
1119                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120                 local_irq_save(flags);
1121                 kaddr = kmap_atomic(*page);
1122                 memset(kaddr + page_offset, 0, length);
1123                 kunmap_atomic(kaddr);
1124                 local_irq_restore(flags);
1125
1126                 offset += length;
1127                 page++;
1128         }
1129 }
1130
1131 /*
1132  * Clone a portion of a bio, starting at the given byte offset
1133  * and continuing for the number of bytes indicated.
1134  */
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136                                         unsigned int offset,
1137                                         unsigned int len,
1138                                         gfp_t gfpmask)
1139 {
1140         struct bio_vec *bv;
1141         unsigned int resid;
1142         unsigned short idx;
1143         unsigned int voff;
1144         unsigned short end_idx;
1145         unsigned short vcnt;
1146         struct bio *bio;
1147
1148         /* Handle the easy case for the caller */
1149
1150         if (!offset && len == bio_src->bi_size)
1151                 return bio_clone(bio_src, gfpmask);
1152
1153         if (WARN_ON_ONCE(!len))
1154                 return NULL;
1155         if (WARN_ON_ONCE(len > bio_src->bi_size))
1156                 return NULL;
1157         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158                 return NULL;
1159
1160         /* Find first affected segment... */
1161
1162         resid = offset;
1163         __bio_for_each_segment(bv, bio_src, idx, 0) {
1164                 if (resid < bv->bv_len)
1165                         break;
1166                 resid -= bv->bv_len;
1167         }
1168         voff = resid;
1169
1170         /* ...and the last affected segment */
1171
1172         resid += len;
1173         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174                 if (resid <= bv->bv_len)
1175                         break;
1176                 resid -= bv->bv_len;
1177         }
1178         vcnt = end_idx - idx + 1;
1179
1180         /* Build the clone */
1181
1182         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1183         if (!bio)
1184                 return NULL;    /* ENOMEM */
1185
1186         bio->bi_bdev = bio_src->bi_bdev;
1187         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188         bio->bi_rw = bio_src->bi_rw;
1189         bio->bi_flags |= 1 << BIO_CLONED;
1190
1191         /*
1192          * Copy over our part of the bio_vec, then update the first
1193          * and last (or only) entries.
1194          */
1195         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196                         vcnt * sizeof (struct bio_vec));
1197         bio->bi_io_vec[0].bv_offset += voff;
1198         if (vcnt > 1) {
1199                 bio->bi_io_vec[0].bv_len -= voff;
1200                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1201         } else {
1202                 bio->bi_io_vec[0].bv_len = len;
1203         }
1204
1205         bio->bi_vcnt = vcnt;
1206         bio->bi_size = len;
1207         bio->bi_idx = 0;
1208
1209         return bio;
1210 }
1211
1212 /*
1213  * Clone a portion of a bio chain, starting at the given byte offset
1214  * into the first bio in the source chain and continuing for the
1215  * number of bytes indicated.  The result is another bio chain of
1216  * exactly the given length, or a null pointer on error.
1217  *
1218  * The bio_src and offset parameters are both in-out.  On entry they
1219  * refer to the first source bio and the offset into that bio where
1220  * the start of data to be cloned is located.
1221  *
1222  * On return, bio_src is updated to refer to the bio in the source
1223  * chain that contains first un-cloned byte, and *offset will
1224  * contain the offset of that byte within that bio.
1225  */
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227                                         unsigned int *offset,
1228                                         unsigned int len,
1229                                         gfp_t gfpmask)
1230 {
1231         struct bio *bi = *bio_src;
1232         unsigned int off = *offset;
1233         struct bio *chain = NULL;
1234         struct bio **end;
1235
1236         /* Build up a chain of clone bios up to the limit */
1237
1238         if (!bi || off >= bi->bi_size || !len)
1239                 return NULL;            /* Nothing to clone */
1240
1241         end = &chain;
1242         while (len) {
1243                 unsigned int bi_size;
1244                 struct bio *bio;
1245
1246                 if (!bi) {
1247                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248                         goto out_err;   /* EINVAL; ran out of bio's */
1249                 }
1250                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1252                 if (!bio)
1253                         goto out_err;   /* ENOMEM */
1254
1255                 *end = bio;
1256                 end = &bio->bi_next;
1257
1258                 off += bi_size;
1259                 if (off == bi->bi_size) {
1260                         bi = bi->bi_next;
1261                         off = 0;
1262                 }
1263                 len -= bi_size;
1264         }
1265         *bio_src = bi;
1266         *offset = off;
1267
1268         return chain;
1269 out_err:
1270         bio_chain_put(chain);
1271
1272         return NULL;
1273 }
1274
1275 /*
1276  * The default/initial value for all object request flags is 0.  For
1277  * each flag, once its value is set to 1 it is never reset to 0
1278  * again.
1279  */
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev;
1284
1285                 rbd_dev = obj_request->img_request->rbd_dev;
1286                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1287                         obj_request);
1288         }
1289 }
1290
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 {
1293         smp_mb();
1294         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 }
1296
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1298 {
1299         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300                 struct rbd_device *rbd_dev = NULL;
1301
1302                 if (obj_request_img_data_test(obj_request))
1303                         rbd_dev = obj_request->img_request->rbd_dev;
1304                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1305                         obj_request);
1306         }
1307 }
1308
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1313 }
1314
1315 /*
1316  * This sets the KNOWN flag after (possibly) setting the EXISTS
1317  * flag.  The latter is set based on the "exists" value provided.
1318  *
1319  * Note that for our purposes once an object exists it never goes
1320  * away again.  It's possible that the response from two existence
1321  * checks are separated by the creation of the target object, and
1322  * the first ("doesn't exist") response arrives *after* the second
1323  * ("does exist").  In that case we ignore the second one.
1324  */
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1326                                 bool exists)
1327 {
1328         if (exists)
1329                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 {
1336         smp_mb();
1337         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 }
1339
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 {
1342         smp_mb();
1343         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 }
1345
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1347 {
1348         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349                 atomic_read(&obj_request->kref.refcount));
1350         kref_get(&obj_request->kref);
1351 }
1352
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request != NULL);
1357         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358                 atomic_read(&obj_request->kref.refcount));
1359         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 }
1361
1362 static void rbd_img_request_destroy(struct kref *kref);
1363 static void rbd_img_request_put(struct rbd_img_request *img_request)
1364 {
1365         rbd_assert(img_request != NULL);
1366         dout("%s: img %p (was %d)\n", __func__, img_request,
1367                 atomic_read(&img_request->kref.refcount));
1368         kref_put(&img_request->kref, rbd_img_request_destroy);
1369 }
1370
1371 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1372                                         struct rbd_obj_request *obj_request)
1373 {
1374         rbd_assert(obj_request->img_request == NULL);
1375
1376         /* Image request now owns object's original reference */
1377         obj_request->img_request = img_request;
1378         obj_request->which = img_request->obj_request_count;
1379         rbd_assert(!obj_request_img_data_test(obj_request));
1380         obj_request_img_data_set(obj_request);
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382         img_request->obj_request_count++;
1383         list_add_tail(&obj_request->links, &img_request->obj_requests);
1384         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1385                 obj_request->which);
1386 }
1387
1388 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1389                                         struct rbd_obj_request *obj_request)
1390 {
1391         rbd_assert(obj_request->which != BAD_WHICH);
1392
1393         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1394                 obj_request->which);
1395         list_del(&obj_request->links);
1396         rbd_assert(img_request->obj_request_count > 0);
1397         img_request->obj_request_count--;
1398         rbd_assert(obj_request->which == img_request->obj_request_count);
1399         obj_request->which = BAD_WHICH;
1400         rbd_assert(obj_request_img_data_test(obj_request));
1401         rbd_assert(obj_request->img_request == img_request);
1402         obj_request->img_request = NULL;
1403         obj_request->callback = NULL;
1404         rbd_obj_request_put(obj_request);
1405 }
1406
1407 static bool obj_request_type_valid(enum obj_request_type type)
1408 {
1409         switch (type) {
1410         case OBJ_REQUEST_NODATA:
1411         case OBJ_REQUEST_BIO:
1412         case OBJ_REQUEST_PAGES:
1413                 return true;
1414         default:
1415                 return false;
1416         }
1417 }
1418
1419 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1420                                 struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1423
1424         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1425 }
1426
1427 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1428 {
1429
1430         dout("%s: img %p\n", __func__, img_request);
1431
1432         /*
1433          * If no error occurred, compute the aggregate transfer
1434          * count for the image request.  We could instead use
1435          * atomic64_cmpxchg() to update it as each object request
1436          * completes; not clear which way is better off hand.
1437          */
1438         if (!img_request->result) {
1439                 struct rbd_obj_request *obj_request;
1440                 u64 xferred = 0;
1441
1442                 for_each_obj_request(img_request, obj_request)
1443                         xferred += obj_request->xferred;
1444                 img_request->xferred = xferred;
1445         }
1446
1447         if (img_request->callback)
1448                 img_request->callback(img_request);
1449         else
1450                 rbd_img_request_put(img_request);
1451 }
1452
1453 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1454
1455 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p\n", __func__, obj_request);
1458
1459         return wait_for_completion_interruptible(&obj_request->completion);
1460 }
1461
1462 /*
1463  * The default/initial value for all image request flags is 0.  Each
1464  * is conditionally set to 1 at image request initialization time
1465  * and currently never change thereafter.
1466  */
1467 static void img_request_write_set(struct rbd_img_request *img_request)
1468 {
1469         set_bit(IMG_REQ_WRITE, &img_request->flags);
1470         smp_mb();
1471 }
1472
1473 static bool img_request_write_test(struct rbd_img_request *img_request)
1474 {
1475         smp_mb();
1476         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1477 }
1478
1479 static void img_request_child_set(struct rbd_img_request *img_request)
1480 {
1481         set_bit(IMG_REQ_CHILD, &img_request->flags);
1482         smp_mb();
1483 }
1484
1485 static bool img_request_child_test(struct rbd_img_request *img_request)
1486 {
1487         smp_mb();
1488         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1489 }
1490
1491 static void img_request_layered_set(struct rbd_img_request *img_request)
1492 {
1493         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool img_request_layered_test(struct rbd_img_request *img_request)
1498 {
1499         smp_mb();
1500         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1501 }
1502
1503 static void
1504 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1505 {
1506         u64 xferred = obj_request->xferred;
1507         u64 length = obj_request->length;
1508
1509         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1510                 obj_request, obj_request->img_request, obj_request->result,
1511                 xferred, length);
1512         /*
1513          * ENOENT means a hole in the image.  We zero-fill the
1514          * entire length of the request.  A short read also implies
1515          * zero-fill to the end of the request.  Either way we
1516          * update the xferred count to indicate the whole request
1517          * was satisfied.
1518          */
1519         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1520         if (obj_request->result == -ENOENT) {
1521                 if (obj_request->type == OBJ_REQUEST_BIO)
1522                         zero_bio_chain(obj_request->bio_list, 0);
1523                 else
1524                         zero_pages(obj_request->pages, 0, length);
1525                 obj_request->result = 0;
1526                 obj_request->xferred = length;
1527         } else if (xferred < length && !obj_request->result) {
1528                 if (obj_request->type == OBJ_REQUEST_BIO)
1529                         zero_bio_chain(obj_request->bio_list, xferred);
1530                 else
1531                         zero_pages(obj_request->pages, xferred, length);
1532                 obj_request->xferred = length;
1533         }
1534         obj_request_done_set(obj_request);
1535 }
1536
1537 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p cb %p\n", __func__, obj_request,
1540                 obj_request->callback);
1541         if (obj_request->callback)
1542                 obj_request->callback(obj_request);
1543         else
1544                 complete_all(&obj_request->completion);
1545 }
1546
1547 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = NULL;
1556         struct rbd_device *rbd_dev = NULL;
1557         bool layered = false;
1558
1559         if (obj_request_img_data_test(obj_request)) {
1560                 img_request = obj_request->img_request;
1561                 layered = img_request && img_request_layered_test(img_request);
1562                 rbd_dev = img_request->rbd_dev;
1563         }
1564
1565         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1566                 obj_request, img_request, obj_request->result,
1567                 obj_request->xferred, obj_request->length);
1568         if (layered && obj_request->result == -ENOENT &&
1569                         obj_request->img_offset < rbd_dev->parent_overlap)
1570                 rbd_img_parent_read(obj_request);
1571         else if (img_request)
1572                 rbd_img_obj_request_read_callback(obj_request);
1573         else
1574                 obj_request_done_set(obj_request);
1575 }
1576
1577 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1578 {
1579         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1580                 obj_request->result, obj_request->length);
1581         /*
1582          * There is no such thing as a successful short write.  Set
1583          * it to our originally-requested length.
1584          */
1585         obj_request->xferred = obj_request->length;
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 /*
1590  * For a simple stat call there's nothing to do.  We'll do more if
1591  * this is part of a write sequence for a layered image.
1592  */
1593 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1594 {
1595         dout("%s: obj %p\n", __func__, obj_request);
1596         obj_request_done_set(obj_request);
1597 }
1598
1599 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1600                                 struct ceph_msg *msg)
1601 {
1602         struct rbd_obj_request *obj_request = osd_req->r_priv;
1603         u16 opcode;
1604
1605         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1606         rbd_assert(osd_req == obj_request->osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 rbd_assert(obj_request->img_request);
1609                 rbd_assert(obj_request->which != BAD_WHICH);
1610         } else {
1611                 rbd_assert(obj_request->which == BAD_WHICH);
1612         }
1613
1614         if (osd_req->r_result < 0)
1615                 obj_request->result = osd_req->r_result;
1616
1617         BUG_ON(osd_req->r_num_ops > 2);
1618
1619         /*
1620          * We support a 64-bit length, but ultimately it has to be
1621          * passed to blk_end_request(), which takes an unsigned int.
1622          */
1623         obj_request->xferred = osd_req->r_reply_op_len[0];
1624         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1625         opcode = osd_req->r_ops[0].op;
1626         switch (opcode) {
1627         case CEPH_OSD_OP_READ:
1628                 rbd_osd_read_callback(obj_request);
1629                 break;
1630         case CEPH_OSD_OP_WRITE:
1631                 rbd_osd_write_callback(obj_request);
1632                 break;
1633         case CEPH_OSD_OP_STAT:
1634                 rbd_osd_stat_callback(obj_request);
1635                 break;
1636         case CEPH_OSD_OP_CALL:
1637         case CEPH_OSD_OP_NOTIFY_ACK:
1638         case CEPH_OSD_OP_WATCH:
1639                 rbd_osd_trivial_callback(obj_request);
1640                 break;
1641         default:
1642                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1643                         obj_request->object_name, (unsigned short) opcode);
1644                 break;
1645         }
1646
1647         if (obj_request_done_test(obj_request))
1648                 rbd_obj_request_complete(obj_request);
1649 }
1650
1651 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1652 {
1653         struct rbd_img_request *img_request = obj_request->img_request;
1654         struct ceph_osd_request *osd_req = obj_request->osd_req;
1655         u64 snap_id;
1656
1657         rbd_assert(osd_req != NULL);
1658
1659         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1660         ceph_osdc_build_request(osd_req, obj_request->offset,
1661                         NULL, snap_id, NULL);
1662 }
1663
1664 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1665 {
1666         struct rbd_img_request *img_request = obj_request->img_request;
1667         struct ceph_osd_request *osd_req = obj_request->osd_req;
1668         struct ceph_snap_context *snapc;
1669         struct timespec mtime = CURRENT_TIME;
1670
1671         rbd_assert(osd_req != NULL);
1672
1673         snapc = img_request ? img_request->snapc : NULL;
1674         ceph_osdc_build_request(osd_req, obj_request->offset,
1675                         snapc, CEPH_NOSNAP, &mtime);
1676 }
1677
1678 static struct ceph_osd_request *rbd_osd_req_create(
1679                                         struct rbd_device *rbd_dev,
1680                                         bool write_request,
1681                                         struct rbd_obj_request *obj_request)
1682 {
1683         struct ceph_snap_context *snapc = NULL;
1684         struct ceph_osd_client *osdc;
1685         struct ceph_osd_request *osd_req;
1686
1687         if (obj_request_img_data_test(obj_request)) {
1688                 struct rbd_img_request *img_request = obj_request->img_request;
1689
1690                 rbd_assert(write_request ==
1691                                 img_request_write_test(img_request));
1692                 if (write_request)
1693                         snapc = img_request->snapc;
1694         }
1695
1696         /* Allocate and initialize the request, for the single op */
1697
1698         osdc = &rbd_dev->rbd_client->client->osdc;
1699         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1700         if (!osd_req)
1701                 return NULL;    /* ENOMEM */
1702
1703         if (write_request)
1704                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1705         else
1706                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1707
1708         osd_req->r_callback = rbd_osd_req_callback;
1709         osd_req->r_priv = obj_request;
1710
1711         osd_req->r_oid_len = strlen(obj_request->object_name);
1712         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1713         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1714
1715         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1716
1717         return osd_req;
1718 }
1719
1720 /*
1721  * Create a copyup osd request based on the information in the
1722  * object request supplied.  A copyup request has two osd ops,
1723  * a copyup method call, and a "normal" write request.
1724  */
1725 static struct ceph_osd_request *
1726 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1727 {
1728         struct rbd_img_request *img_request;
1729         struct ceph_snap_context *snapc;
1730         struct rbd_device *rbd_dev;
1731         struct ceph_osd_client *osdc;
1732         struct ceph_osd_request *osd_req;
1733
1734         rbd_assert(obj_request_img_data_test(obj_request));
1735         img_request = obj_request->img_request;
1736         rbd_assert(img_request);
1737         rbd_assert(img_request_write_test(img_request));
1738
1739         /* Allocate and initialize the request, for the two ops */
1740
1741         snapc = img_request->snapc;
1742         rbd_dev = img_request->rbd_dev;
1743         osdc = &rbd_dev->rbd_client->client->osdc;
1744         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1745         if (!osd_req)
1746                 return NULL;    /* ENOMEM */
1747
1748         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1749         osd_req->r_callback = rbd_osd_req_callback;
1750         osd_req->r_priv = obj_request;
1751
1752         osd_req->r_oid_len = strlen(obj_request->object_name);
1753         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1754         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1755
1756         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1757
1758         return osd_req;
1759 }
1760
1761
1762 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1763 {
1764         ceph_osdc_put_request(osd_req);
1765 }
1766
1767 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1768
1769 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1770                                                 u64 offset, u64 length,
1771                                                 enum obj_request_type type)
1772 {
1773         struct rbd_obj_request *obj_request;
1774         size_t size;
1775         char *name;
1776
1777         rbd_assert(obj_request_type_valid(type));
1778
1779         size = strlen(object_name) + 1;
1780         name = kmalloc(size, GFP_KERNEL);
1781         if (!name)
1782                 return NULL;
1783
1784         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1785         if (!obj_request) {
1786                 kfree(name);
1787                 return NULL;
1788         }
1789
1790         obj_request->object_name = memcpy(name, object_name, size);
1791         obj_request->offset = offset;
1792         obj_request->length = length;
1793         obj_request->flags = 0;
1794         obj_request->which = BAD_WHICH;
1795         obj_request->type = type;
1796         INIT_LIST_HEAD(&obj_request->links);
1797         init_completion(&obj_request->completion);
1798         kref_init(&obj_request->kref);
1799
1800         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1801                 offset, length, (int)type, obj_request);
1802
1803         return obj_request;
1804 }
1805
1806 static void rbd_obj_request_destroy(struct kref *kref)
1807 {
1808         struct rbd_obj_request *obj_request;
1809
1810         obj_request = container_of(kref, struct rbd_obj_request, kref);
1811
1812         dout("%s: obj %p\n", __func__, obj_request);
1813
1814         rbd_assert(obj_request->img_request == NULL);
1815         rbd_assert(obj_request->which == BAD_WHICH);
1816
1817         if (obj_request->osd_req)
1818                 rbd_osd_req_destroy(obj_request->osd_req);
1819
1820         rbd_assert(obj_request_type_valid(obj_request->type));
1821         switch (obj_request->type) {
1822         case OBJ_REQUEST_NODATA:
1823                 break;          /* Nothing to do */
1824         case OBJ_REQUEST_BIO:
1825                 if (obj_request->bio_list)
1826                         bio_chain_put(obj_request->bio_list);
1827                 break;
1828         case OBJ_REQUEST_PAGES:
1829                 if (obj_request->pages)
1830                         ceph_release_page_vector(obj_request->pages,
1831                                                 obj_request->page_count);
1832                 break;
1833         }
1834
1835         kfree(obj_request->object_name);
1836         obj_request->object_name = NULL;
1837         kmem_cache_free(rbd_obj_request_cache, obj_request);
1838 }
1839
1840 /*
1841  * Caller is responsible for filling in the list of object requests
1842  * that comprises the image request, and the Linux request pointer
1843  * (if there is one).
1844  */
1845 static struct rbd_img_request *rbd_img_request_create(
1846                                         struct rbd_device *rbd_dev,
1847                                         u64 offset, u64 length,
1848                                         bool write_request,
1849                                         bool child_request)
1850 {
1851         struct rbd_img_request *img_request;
1852
1853         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1854         if (!img_request)
1855                 return NULL;
1856
1857         if (write_request) {
1858                 down_read(&rbd_dev->header_rwsem);
1859                 ceph_get_snap_context(rbd_dev->header.snapc);
1860                 up_read(&rbd_dev->header_rwsem);
1861         }
1862
1863         img_request->rq = NULL;
1864         img_request->rbd_dev = rbd_dev;
1865         img_request->offset = offset;
1866         img_request->length = length;
1867         img_request->flags = 0;
1868         if (write_request) {
1869                 img_request_write_set(img_request);
1870                 img_request->snapc = rbd_dev->header.snapc;
1871         } else {
1872                 img_request->snap_id = rbd_dev->spec->snap_id;
1873         }
1874         if (child_request)
1875                 img_request_child_set(img_request);
1876         if (rbd_dev->parent_spec)
1877                 img_request_layered_set(img_request);
1878         spin_lock_init(&img_request->completion_lock);
1879         img_request->next_completion = 0;
1880         img_request->callback = NULL;
1881         img_request->result = 0;
1882         img_request->obj_request_count = 0;
1883         INIT_LIST_HEAD(&img_request->obj_requests);
1884         kref_init(&img_request->kref);
1885
1886         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1887                 write_request ? "write" : "read", offset, length,
1888                 img_request);
1889
1890         return img_request;
1891 }
1892
1893 static void rbd_img_request_destroy(struct kref *kref)
1894 {
1895         struct rbd_img_request *img_request;
1896         struct rbd_obj_request *obj_request;
1897         struct rbd_obj_request *next_obj_request;
1898
1899         img_request = container_of(kref, struct rbd_img_request, kref);
1900
1901         dout("%s: img %p\n", __func__, img_request);
1902
1903         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1904                 rbd_img_obj_request_del(img_request, obj_request);
1905         rbd_assert(img_request->obj_request_count == 0);
1906
1907         if (img_request_write_test(img_request))
1908                 ceph_put_snap_context(img_request->snapc);
1909
1910         if (img_request_child_test(img_request))
1911                 rbd_obj_request_put(img_request->obj_request);
1912
1913         kmem_cache_free(rbd_img_request_cache, img_request);
1914 }
1915
1916 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1917 {
1918         struct rbd_img_request *img_request;
1919         unsigned int xferred;
1920         int result;
1921         bool more;
1922
1923         rbd_assert(obj_request_img_data_test(obj_request));
1924         img_request = obj_request->img_request;
1925
1926         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1927         xferred = (unsigned int)obj_request->xferred;
1928         result = obj_request->result;
1929         if (result) {
1930                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1931
1932                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1933                         img_request_write_test(img_request) ? "write" : "read",
1934                         obj_request->length, obj_request->img_offset,
1935                         obj_request->offset);
1936                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1937                         result, xferred);
1938                 if (!img_request->result)
1939                         img_request->result = result;
1940         }
1941
1942         /* Image object requests don't own their page array */
1943
1944         if (obj_request->type == OBJ_REQUEST_PAGES) {
1945                 obj_request->pages = NULL;
1946                 obj_request->page_count = 0;
1947         }
1948
1949         if (img_request_child_test(img_request)) {
1950                 rbd_assert(img_request->obj_request != NULL);
1951                 more = obj_request->which < img_request->obj_request_count - 1;
1952         } else {
1953                 rbd_assert(img_request->rq != NULL);
1954                 more = blk_end_request(img_request->rq, result, xferred);
1955         }
1956
1957         return more;
1958 }
1959
1960 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1961 {
1962         struct rbd_img_request *img_request;
1963         u32 which = obj_request->which;
1964         bool more = true;
1965
1966         rbd_assert(obj_request_img_data_test(obj_request));
1967         img_request = obj_request->img_request;
1968
1969         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1970         rbd_assert(img_request != NULL);
1971         rbd_assert(img_request->obj_request_count > 0);
1972         rbd_assert(which != BAD_WHICH);
1973         rbd_assert(which < img_request->obj_request_count);
1974         rbd_assert(which >= img_request->next_completion);
1975
1976         spin_lock_irq(&img_request->completion_lock);
1977         if (which != img_request->next_completion)
1978                 goto out;
1979
1980         for_each_obj_request_from(img_request, obj_request) {
1981                 rbd_assert(more);
1982                 rbd_assert(which < img_request->obj_request_count);
1983
1984                 if (!obj_request_done_test(obj_request))
1985                         break;
1986                 more = rbd_img_obj_end_request(obj_request);
1987                 which++;
1988         }
1989
1990         rbd_assert(more ^ (which == img_request->obj_request_count));
1991         img_request->next_completion = which;
1992 out:
1993         spin_unlock_irq(&img_request->completion_lock);
1994
1995         if (!more)
1996                 rbd_img_request_complete(img_request);
1997 }
1998
1999 /*
2000  * Split up an image request into one or more object requests, each
2001  * to a different object.  The "type" parameter indicates whether
2002  * "data_desc" is the pointer to the head of a list of bio
2003  * structures, or the base of a page array.  In either case this
2004  * function assumes data_desc describes memory sufficient to hold
2005  * all data described by the image request.
2006  */
2007 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2008                                         enum obj_request_type type,
2009                                         void *data_desc)
2010 {
2011         struct rbd_device *rbd_dev = img_request->rbd_dev;
2012         struct rbd_obj_request *obj_request = NULL;
2013         struct rbd_obj_request *next_obj_request;
2014         bool write_request = img_request_write_test(img_request);
2015         struct bio *bio_list;
2016         unsigned int bio_offset = 0;
2017         struct page **pages;
2018         u64 img_offset;
2019         u64 resid;
2020         u16 opcode;
2021
2022         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2023                 (int)type, data_desc);
2024
2025         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2026         img_offset = img_request->offset;
2027         resid = img_request->length;
2028         rbd_assert(resid > 0);
2029
2030         if (type == OBJ_REQUEST_BIO) {
2031                 bio_list = data_desc;
2032                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2033         } else {
2034                 rbd_assert(type == OBJ_REQUEST_PAGES);
2035                 pages = data_desc;
2036         }
2037
2038         while (resid) {
2039                 struct ceph_osd_request *osd_req;
2040                 const char *object_name;
2041                 u64 offset;
2042                 u64 length;
2043
2044                 object_name = rbd_segment_name(rbd_dev, img_offset);
2045                 if (!object_name)
2046                         goto out_unwind;
2047                 offset = rbd_segment_offset(rbd_dev, img_offset);
2048                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2049                 obj_request = rbd_obj_request_create(object_name,
2050                                                 offset, length, type);
2051                 /* object request has its own copy of the object name */
2052                 rbd_segment_name_free(object_name);
2053                 if (!obj_request)
2054                         goto out_unwind;
2055
2056                 if (type == OBJ_REQUEST_BIO) {
2057                         unsigned int clone_size;
2058
2059                         rbd_assert(length <= (u64)UINT_MAX);
2060                         clone_size = (unsigned int)length;
2061                         obj_request->bio_list =
2062                                         bio_chain_clone_range(&bio_list,
2063                                                                 &bio_offset,
2064                                                                 clone_size,
2065                                                                 GFP_ATOMIC);
2066                         if (!obj_request->bio_list)
2067                                 goto out_partial;
2068                 } else {
2069                         unsigned int page_count;
2070
2071                         obj_request->pages = pages;
2072                         page_count = (u32)calc_pages_for(offset, length);
2073                         obj_request->page_count = page_count;
2074                         if ((offset + length) & ~PAGE_MASK)
2075                                 page_count--;   /* more on last page */
2076                         pages += page_count;
2077                 }
2078
2079                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2080                                                 obj_request);
2081                 if (!osd_req)
2082                         goto out_partial;
2083                 obj_request->osd_req = osd_req;
2084                 obj_request->callback = rbd_img_obj_callback;
2085
2086                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2087                                                 0, 0);
2088                 if (type == OBJ_REQUEST_BIO)
2089                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2090                                         obj_request->bio_list, length);
2091                 else
2092                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2093                                         obj_request->pages, length,
2094                                         offset & ~PAGE_MASK, false, false);
2095
2096                 if (write_request)
2097                         rbd_osd_req_format_write(obj_request);
2098                 else
2099                         rbd_osd_req_format_read(obj_request);
2100
2101                 obj_request->img_offset = img_offset;
2102                 rbd_img_obj_request_add(img_request, obj_request);
2103
2104                 img_offset += length;
2105                 resid -= length;
2106         }
2107
2108         return 0;
2109
2110 out_partial:
2111         rbd_obj_request_put(obj_request);
2112 out_unwind:
2113         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2114                 rbd_obj_request_put(obj_request);
2115
2116         return -ENOMEM;
2117 }
2118
2119 static void
2120 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2121 {
2122         struct rbd_img_request *img_request;
2123         struct rbd_device *rbd_dev;
2124         struct page **pages;
2125         u32 page_count;
2126
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128         rbd_assert(obj_request_img_data_test(obj_request));
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request);
2131
2132         rbd_dev = img_request->rbd_dev;
2133         rbd_assert(rbd_dev);
2134
2135         pages = obj_request->copyup_pages;
2136         rbd_assert(pages != NULL);
2137         obj_request->copyup_pages = NULL;
2138         page_count = obj_request->copyup_page_count;
2139         rbd_assert(page_count);
2140         obj_request->copyup_page_count = 0;
2141         ceph_release_page_vector(pages, page_count);
2142
2143         /*
2144          * We want the transfer count to reflect the size of the
2145          * original write request.  There is no such thing as a
2146          * successful short write, so if the request was successful
2147          * we can just set it to the originally-requested length.
2148          */
2149         if (!obj_request->result)
2150                 obj_request->xferred = obj_request->length;
2151
2152         /* Finish up with the normal image object callback */
2153
2154         rbd_img_obj_callback(obj_request);
2155 }
2156
2157 static void
2158 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2159 {
2160         struct rbd_obj_request *orig_request;
2161         struct ceph_osd_request *osd_req;
2162         struct ceph_osd_client *osdc;
2163         struct rbd_device *rbd_dev;
2164         struct page **pages;
2165         u32 page_count;
2166         int result;
2167         u64 parent_length;
2168
2169         rbd_assert(img_request_child_test(img_request));
2170
2171         /* First get what we need from the image request */
2172
2173         pages = img_request->copyup_pages;
2174         rbd_assert(pages != NULL);
2175         img_request->copyup_pages = NULL;
2176         page_count = img_request->copyup_page_count;
2177         rbd_assert(page_count);
2178         img_request->copyup_page_count = 0;
2179
2180         orig_request = img_request->obj_request;
2181         rbd_assert(orig_request != NULL);
2182         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2183         result = img_request->result;
2184         parent_length = img_request->length;
2185         rbd_assert(parent_length == img_request->xferred);
2186         rbd_img_request_put(img_request);
2187
2188         rbd_assert(orig_request->img_request);
2189         rbd_dev = orig_request->img_request->rbd_dev;
2190         rbd_assert(rbd_dev);
2191
2192         if (result)
2193                 goto out_err;
2194
2195         /* Allocate the new copyup osd request for the original request */
2196
2197         result = -ENOMEM;
2198         rbd_assert(!orig_request->osd_req);
2199         osd_req = rbd_osd_req_create_copyup(orig_request);
2200         if (!osd_req)
2201                 goto out_err;
2202         orig_request->osd_req = osd_req;
2203         orig_request->copyup_pages = pages;
2204         orig_request->copyup_page_count = page_count;
2205
2206         /* Initialize the copyup op */
2207
2208         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2209         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2210                                                 false, false);
2211
2212         /* Then the original write request op */
2213
2214         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2215                                         orig_request->offset,
2216                                         orig_request->length, 0, 0);
2217         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2218                                         orig_request->length);
2219
2220         rbd_osd_req_format_write(orig_request);
2221
2222         /* All set, send it off. */
2223
2224         orig_request->callback = rbd_img_obj_copyup_callback;
2225         osdc = &rbd_dev->rbd_client->client->osdc;
2226         result = rbd_obj_request_submit(osdc, orig_request);
2227         if (!result)
2228                 return;
2229 out_err:
2230         /* Record the error code and complete the request */
2231
2232         orig_request->result = result;
2233         orig_request->xferred = 0;
2234         obj_request_done_set(orig_request);
2235         rbd_obj_request_complete(orig_request);
2236 }
2237
2238 /*
2239  * Read from the parent image the range of data that covers the
2240  * entire target of the given object request.  This is used for
2241  * satisfying a layered image write request when the target of an
2242  * object request from the image request does not exist.
2243  *
2244  * A page array big enough to hold the returned data is allocated
2245  * and supplied to rbd_img_request_fill() as the "data descriptor."
2246  * When the read completes, this page array will be transferred to
2247  * the original object request for the copyup operation.
2248  *
2249  * If an error occurs, record it as the result of the original
2250  * object request and mark it done so it gets completed.
2251  */
2252 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2253 {
2254         struct rbd_img_request *img_request = NULL;
2255         struct rbd_img_request *parent_request = NULL;
2256         struct rbd_device *rbd_dev;
2257         u64 img_offset;
2258         u64 length;
2259         struct page **pages = NULL;
2260         u32 page_count;
2261         int result;
2262
2263         rbd_assert(obj_request_img_data_test(obj_request));
2264         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2265
2266         img_request = obj_request->img_request;
2267         rbd_assert(img_request != NULL);
2268         rbd_dev = img_request->rbd_dev;
2269         rbd_assert(rbd_dev->parent != NULL);
2270
2271         /*
2272          * First things first.  The original osd request is of no
2273          * use to use any more, we'll need a new one that can hold
2274          * the two ops in a copyup request.  We'll get that later,
2275          * but for now we can release the old one.
2276          */
2277         rbd_osd_req_destroy(obj_request->osd_req);
2278         obj_request->osd_req = NULL;
2279
2280         /*
2281          * Determine the byte range covered by the object in the
2282          * child image to which the original request was to be sent.
2283          */
2284         img_offset = obj_request->img_offset - obj_request->offset;
2285         length = (u64)1 << rbd_dev->header.obj_order;
2286
2287         /*
2288          * There is no defined parent data beyond the parent
2289          * overlap, so limit what we read at that boundary if
2290          * necessary.
2291          */
2292         if (img_offset + length > rbd_dev->parent_overlap) {
2293                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2294                 length = rbd_dev->parent_overlap - img_offset;
2295         }
2296
2297         /*
2298          * Allocate a page array big enough to receive the data read
2299          * from the parent.
2300          */
2301         page_count = (u32)calc_pages_for(0, length);
2302         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2303         if (IS_ERR(pages)) {
2304                 result = PTR_ERR(pages);
2305                 pages = NULL;
2306                 goto out_err;
2307         }
2308
2309         result = -ENOMEM;
2310         parent_request = rbd_img_request_create(rbd_dev->parent,
2311                                                 img_offset, length,
2312                                                 false, true);
2313         if (!parent_request)
2314                 goto out_err;
2315         rbd_obj_request_get(obj_request);
2316         parent_request->obj_request = obj_request;
2317
2318         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2319         if (result)
2320                 goto out_err;
2321         parent_request->copyup_pages = pages;
2322         parent_request->copyup_page_count = page_count;
2323
2324         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2325         result = rbd_img_request_submit(parent_request);
2326         if (!result)
2327                 return 0;
2328
2329         parent_request->copyup_pages = NULL;
2330         parent_request->copyup_page_count = 0;
2331         parent_request->obj_request = NULL;
2332         rbd_obj_request_put(obj_request);
2333 out_err:
2334         if (pages)
2335                 ceph_release_page_vector(pages, page_count);
2336         if (parent_request)
2337                 rbd_img_request_put(parent_request);
2338         obj_request->result = result;
2339         obj_request->xferred = 0;
2340         obj_request_done_set(obj_request);
2341
2342         return result;
2343 }
2344
2345 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2346 {
2347         struct rbd_obj_request *orig_request;
2348         int result;
2349
2350         rbd_assert(!obj_request_img_data_test(obj_request));
2351
2352         /*
2353          * All we need from the object request is the original
2354          * request and the result of the STAT op.  Grab those, then
2355          * we're done with the request.
2356          */
2357         orig_request = obj_request->obj_request;
2358         obj_request->obj_request = NULL;
2359         rbd_assert(orig_request);
2360         rbd_assert(orig_request->img_request);
2361
2362         result = obj_request->result;
2363         obj_request->result = 0;
2364
2365         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2366                 obj_request, orig_request, result,
2367                 obj_request->xferred, obj_request->length);
2368         rbd_obj_request_put(obj_request);
2369
2370         rbd_assert(orig_request);
2371         rbd_assert(orig_request->img_request);
2372
2373         /*
2374          * Our only purpose here is to determine whether the object
2375          * exists, and we don't want to treat the non-existence as
2376          * an error.  If something else comes back, transfer the
2377          * error to the original request and complete it now.
2378          */
2379         if (!result) {
2380                 obj_request_existence_set(orig_request, true);
2381         } else if (result == -ENOENT) {
2382                 obj_request_existence_set(orig_request, false);
2383         } else if (result) {
2384                 orig_request->result = result;
2385                 goto out;
2386         }
2387
2388         /*
2389          * Resubmit the original request now that we have recorded
2390          * whether the target object exists.
2391          */
2392         orig_request->result = rbd_img_obj_request_submit(orig_request);
2393 out:
2394         if (orig_request->result)
2395                 rbd_obj_request_complete(orig_request);
2396         rbd_obj_request_put(orig_request);
2397 }
2398
2399 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2400 {
2401         struct rbd_obj_request *stat_request;
2402         struct rbd_device *rbd_dev;
2403         struct ceph_osd_client *osdc;
2404         struct page **pages = NULL;
2405         u32 page_count;
2406         size_t size;
2407         int ret;
2408
2409         /*
2410          * The response data for a STAT call consists of:
2411          *     le64 length;
2412          *     struct {
2413          *         le32 tv_sec;
2414          *         le32 tv_nsec;
2415          *     } mtime;
2416          */
2417         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2418         page_count = (u32)calc_pages_for(0, size);
2419         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2420         if (IS_ERR(pages))
2421                 return PTR_ERR(pages);
2422
2423         ret = -ENOMEM;
2424         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2425                                                         OBJ_REQUEST_PAGES);
2426         if (!stat_request)
2427                 goto out;
2428
2429         rbd_obj_request_get(obj_request);
2430         stat_request->obj_request = obj_request;
2431         stat_request->pages = pages;
2432         stat_request->page_count = page_count;
2433
2434         rbd_assert(obj_request->img_request);
2435         rbd_dev = obj_request->img_request->rbd_dev;
2436         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2437                                                 stat_request);
2438         if (!stat_request->osd_req)
2439                 goto out;
2440         stat_request->callback = rbd_img_obj_exists_callback;
2441
2442         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2443         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2444                                         false, false);
2445         rbd_osd_req_format_read(stat_request);
2446
2447         osdc = &rbd_dev->rbd_client->client->osdc;
2448         ret = rbd_obj_request_submit(osdc, stat_request);
2449 out:
2450         if (ret)
2451                 rbd_obj_request_put(obj_request);
2452
2453         return ret;
2454 }
2455
2456 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2457 {
2458         struct rbd_img_request *img_request;
2459         struct rbd_device *rbd_dev;
2460         bool known;
2461
2462         rbd_assert(obj_request_img_data_test(obj_request));
2463
2464         img_request = obj_request->img_request;
2465         rbd_assert(img_request);
2466         rbd_dev = img_request->rbd_dev;
2467
2468         /*
2469          * Only writes to layered images need special handling.
2470          * Reads and non-layered writes are simple object requests.
2471          * Layered writes that start beyond the end of the overlap
2472          * with the parent have no parent data, so they too are
2473          * simple object requests.  Finally, if the target object is
2474          * known to already exist, its parent data has already been
2475          * copied, so a write to the object can also be handled as a
2476          * simple object request.
2477          */
2478         if (!img_request_write_test(img_request) ||
2479                 !img_request_layered_test(img_request) ||
2480                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2481                 ((known = obj_request_known_test(obj_request)) &&
2482                         obj_request_exists_test(obj_request))) {
2483
2484                 struct rbd_device *rbd_dev;
2485                 struct ceph_osd_client *osdc;
2486
2487                 rbd_dev = obj_request->img_request->rbd_dev;
2488                 osdc = &rbd_dev->rbd_client->client->osdc;
2489
2490                 return rbd_obj_request_submit(osdc, obj_request);
2491         }
2492
2493         /*
2494          * It's a layered write.  The target object might exist but
2495          * we may not know that yet.  If we know it doesn't exist,
2496          * start by reading the data for the full target object from
2497          * the parent so we can use it for a copyup to the target.
2498          */
2499         if (known)
2500                 return rbd_img_obj_parent_read_full(obj_request);
2501
2502         /* We don't know whether the target exists.  Go find out. */
2503
2504         return rbd_img_obj_exists_submit(obj_request);
2505 }
2506
2507 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2508 {
2509         struct rbd_obj_request *obj_request;
2510         struct rbd_obj_request *next_obj_request;
2511
2512         dout("%s: img %p\n", __func__, img_request);
2513         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2514                 int ret;
2515
2516                 ret = rbd_img_obj_request_submit(obj_request);
2517                 if (ret)
2518                         return ret;
2519         }
2520
2521         return 0;
2522 }
2523
2524 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2525 {
2526         struct rbd_obj_request *obj_request;
2527         struct rbd_device *rbd_dev;
2528         u64 obj_end;
2529
2530         rbd_assert(img_request_child_test(img_request));
2531
2532         obj_request = img_request->obj_request;
2533         rbd_assert(obj_request);
2534         rbd_assert(obj_request->img_request);
2535
2536         obj_request->result = img_request->result;
2537         if (obj_request->result)
2538                 goto out;
2539
2540         /*
2541          * We need to zero anything beyond the parent overlap
2542          * boundary.  Since rbd_img_obj_request_read_callback()
2543          * will zero anything beyond the end of a short read, an
2544          * easy way to do this is to pretend the data from the
2545          * parent came up short--ending at the overlap boundary.
2546          */
2547         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2548         obj_end = obj_request->img_offset + obj_request->length;
2549         rbd_dev = obj_request->img_request->rbd_dev;
2550         if (obj_end > rbd_dev->parent_overlap) {
2551                 u64 xferred = 0;
2552
2553                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2554                         xferred = rbd_dev->parent_overlap -
2555                                         obj_request->img_offset;
2556
2557                 obj_request->xferred = min(img_request->xferred, xferred);
2558         } else {
2559                 obj_request->xferred = img_request->xferred;
2560         }
2561 out:
2562         rbd_img_request_put(img_request);
2563         rbd_img_obj_request_read_callback(obj_request);
2564         rbd_obj_request_complete(obj_request);
2565 }
2566
2567 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2568 {
2569         struct rbd_device *rbd_dev;
2570         struct rbd_img_request *img_request;
2571         int result;
2572
2573         rbd_assert(obj_request_img_data_test(obj_request));
2574         rbd_assert(obj_request->img_request != NULL);
2575         rbd_assert(obj_request->result == (s32) -ENOENT);
2576         rbd_assert(obj_request_type_valid(obj_request->type));
2577
2578         rbd_dev = obj_request->img_request->rbd_dev;
2579         rbd_assert(rbd_dev->parent != NULL);
2580         /* rbd_read_finish(obj_request, obj_request->length); */
2581         img_request = rbd_img_request_create(rbd_dev->parent,
2582                                                 obj_request->img_offset,
2583                                                 obj_request->length,
2584                                                 false, true);
2585         result = -ENOMEM;
2586         if (!img_request)
2587                 goto out_err;
2588
2589         rbd_obj_request_get(obj_request);
2590         img_request->obj_request = obj_request;
2591
2592         if (obj_request->type == OBJ_REQUEST_BIO)
2593                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2594                                                 obj_request->bio_list);
2595         else
2596                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2597                                                 obj_request->pages);
2598         if (result)
2599                 goto out_err;
2600
2601         img_request->callback = rbd_img_parent_read_callback;
2602         result = rbd_img_request_submit(img_request);
2603         if (result)
2604                 goto out_err;
2605
2606         return;
2607 out_err:
2608         if (img_request)
2609                 rbd_img_request_put(img_request);
2610         obj_request->result = result;
2611         obj_request->xferred = 0;
2612         obj_request_done_set(obj_request);
2613 }
2614
2615 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2616 {
2617         struct rbd_obj_request *obj_request;
2618         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2619         int ret;
2620
2621         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2622                                                         OBJ_REQUEST_NODATA);
2623         if (!obj_request)
2624                 return -ENOMEM;
2625
2626         ret = -ENOMEM;
2627         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2628         if (!obj_request->osd_req)
2629                 goto out;
2630         obj_request->callback = rbd_obj_request_put;
2631
2632         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2633                                         notify_id, 0, 0);
2634         rbd_osd_req_format_read(obj_request);
2635
2636         ret = rbd_obj_request_submit(osdc, obj_request);
2637 out:
2638         if (ret)
2639                 rbd_obj_request_put(obj_request);
2640
2641         return ret;
2642 }
2643
2644 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2645 {
2646         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2647         int ret;
2648
2649         if (!rbd_dev)
2650                 return;
2651
2652         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2653                 rbd_dev->header_name, (unsigned long long)notify_id,
2654                 (unsigned int)opcode);
2655         ret = rbd_dev_refresh(rbd_dev);
2656         if (ret)
2657                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2658
2659         rbd_obj_notify_ack(rbd_dev, notify_id);
2660 }
2661
2662 /*
2663  * Request sync osd watch/unwatch.  The value of "start" determines
2664  * whether a watch request is being initiated or torn down.
2665  */
2666 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2667 {
2668         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2669         struct rbd_obj_request *obj_request;
2670         int ret;
2671
2672         rbd_assert(start ^ !!rbd_dev->watch_event);
2673         rbd_assert(start ^ !!rbd_dev->watch_request);
2674
2675         if (start) {
2676                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2677                                                 &rbd_dev->watch_event);
2678                 if (ret < 0)
2679                         return ret;
2680                 rbd_assert(rbd_dev->watch_event != NULL);
2681         }
2682
2683         ret = -ENOMEM;
2684         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2685                                                         OBJ_REQUEST_NODATA);
2686         if (!obj_request)
2687                 goto out_cancel;
2688
2689         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2690         if (!obj_request->osd_req)
2691                 goto out_cancel;
2692
2693         if (start)
2694                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2695         else
2696                 ceph_osdc_unregister_linger_request(osdc,
2697                                         rbd_dev->watch_request->osd_req);
2698
2699         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2700                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2701         rbd_osd_req_format_write(obj_request);
2702
2703         ret = rbd_obj_request_submit(osdc, obj_request);
2704         if (ret)
2705                 goto out_cancel;
2706         ret = rbd_obj_request_wait(obj_request);
2707         if (ret)
2708                 goto out_cancel;
2709         ret = obj_request->result;
2710         if (ret)
2711                 goto out_cancel;
2712
2713         /*
2714          * A watch request is set to linger, so the underlying osd
2715          * request won't go away until we unregister it.  We retain
2716          * a pointer to the object request during that time (in
2717          * rbd_dev->watch_request), so we'll keep a reference to
2718          * it.  We'll drop that reference (below) after we've
2719          * unregistered it.
2720          */
2721         if (start) {
2722                 rbd_dev->watch_request = obj_request;
2723
2724                 return 0;
2725         }
2726
2727         /* We have successfully torn down the watch request */
2728
2729         rbd_obj_request_put(rbd_dev->watch_request);
2730         rbd_dev->watch_request = NULL;
2731 out_cancel:
2732         /* Cancel the event if we're tearing down, or on error */
2733         ceph_osdc_cancel_event(rbd_dev->watch_event);
2734         rbd_dev->watch_event = NULL;
2735         if (obj_request)
2736                 rbd_obj_request_put(obj_request);
2737
2738         return ret;
2739 }
2740
2741 /*
2742  * Synchronous osd object method call.  Returns the number of bytes
2743  * returned in the outbound buffer, or a negative error code.
2744  */
2745 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2746                              const char *object_name,
2747                              const char *class_name,
2748                              const char *method_name,
2749                              const void *outbound,
2750                              size_t outbound_size,
2751                              void *inbound,
2752                              size_t inbound_size)
2753 {
2754         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2755         struct rbd_obj_request *obj_request;
2756         struct page **pages;
2757         u32 page_count;
2758         int ret;
2759
2760         /*
2761          * Method calls are ultimately read operations.  The result
2762          * should placed into the inbound buffer provided.  They
2763          * also supply outbound data--parameters for the object
2764          * method.  Currently if this is present it will be a
2765          * snapshot id.
2766          */
2767         page_count = (u32)calc_pages_for(0, inbound_size);
2768         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2769         if (IS_ERR(pages))
2770                 return PTR_ERR(pages);
2771
2772         ret = -ENOMEM;
2773         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2774                                                         OBJ_REQUEST_PAGES);
2775         if (!obj_request)
2776                 goto out;
2777
2778         obj_request->pages = pages;
2779         obj_request->page_count = page_count;
2780
2781         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2782         if (!obj_request->osd_req)
2783                 goto out;
2784
2785         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2786                                         class_name, method_name);
2787         if (outbound_size) {
2788                 struct ceph_pagelist *pagelist;
2789
2790                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2791                 if (!pagelist)
2792                         goto out;
2793
2794                 ceph_pagelist_init(pagelist);
2795                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2796                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2797                                                 pagelist);
2798         }
2799         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2800                                         obj_request->pages, inbound_size,
2801                                         0, false, false);
2802         rbd_osd_req_format_read(obj_request);
2803
2804         ret = rbd_obj_request_submit(osdc, obj_request);
2805         if (ret)
2806                 goto out;
2807         ret = rbd_obj_request_wait(obj_request);
2808         if (ret)
2809                 goto out;
2810
2811         ret = obj_request->result;
2812         if (ret < 0)
2813                 goto out;
2814
2815         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2816         ret = (int)obj_request->xferred;
2817         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2818 out:
2819         if (obj_request)
2820                 rbd_obj_request_put(obj_request);
2821         else
2822                 ceph_release_page_vector(pages, page_count);
2823
2824         return ret;
2825 }
2826
2827 static void rbd_request_fn(struct request_queue *q)
2828                 __releases(q->queue_lock) __acquires(q->queue_lock)
2829 {
2830         struct rbd_device *rbd_dev = q->queuedata;
2831         bool read_only = rbd_dev->mapping.read_only;
2832         struct request *rq;
2833         int result;
2834
2835         while ((rq = blk_fetch_request(q))) {
2836                 bool write_request = rq_data_dir(rq) == WRITE;
2837                 struct rbd_img_request *img_request;
2838                 u64 offset;
2839                 u64 length;
2840
2841                 /* Ignore any non-FS requests that filter through. */
2842
2843                 if (rq->cmd_type != REQ_TYPE_FS) {
2844                         dout("%s: non-fs request type %d\n", __func__,
2845                                 (int) rq->cmd_type);
2846                         __blk_end_request_all(rq, 0);
2847                         continue;
2848                 }
2849
2850                 /* Ignore/skip any zero-length requests */
2851
2852                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2853                 length = (u64) blk_rq_bytes(rq);
2854
2855                 if (!length) {
2856                         dout("%s: zero-length request\n", __func__);
2857                         __blk_end_request_all(rq, 0);
2858                         continue;
2859                 }
2860
2861                 spin_unlock_irq(q->queue_lock);
2862
2863                 /* Disallow writes to a read-only device */
2864
2865                 if (write_request) {
2866                         result = -EROFS;
2867                         if (read_only)
2868                                 goto end_request;
2869                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2870                 }
2871
2872                 /*
2873                  * Quit early if the mapped snapshot no longer
2874                  * exists.  It's still possible the snapshot will
2875                  * have disappeared by the time our request arrives
2876                  * at the osd, but there's no sense in sending it if
2877                  * we already know.
2878                  */
2879                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2880                         dout("request for non-existent snapshot");
2881                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2882                         result = -ENXIO;
2883                         goto end_request;
2884                 }
2885
2886                 result = -EINVAL;
2887                 if (offset && length > U64_MAX - offset + 1) {
2888                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2889                                 offset, length);
2890                         goto end_request;       /* Shouldn't happen */
2891                 }
2892
2893                 result = -EIO;
2894                 if (offset + length > rbd_dev->mapping.size) {
2895                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2896                                 offset, length, rbd_dev->mapping.size);
2897                         goto end_request;
2898                 }
2899
2900                 result = -ENOMEM;
2901                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2902                                                         write_request, false);
2903                 if (!img_request)
2904                         goto end_request;
2905
2906                 img_request->rq = rq;
2907
2908                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2909                                                 rq->bio);
2910                 if (!result)
2911                         result = rbd_img_request_submit(img_request);
2912                 if (result)
2913                         rbd_img_request_put(img_request);
2914 end_request:
2915                 spin_lock_irq(q->queue_lock);
2916                 if (result < 0) {
2917                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2918                                 write_request ? "write" : "read",
2919                                 length, offset, result);
2920
2921                         __blk_end_request_all(rq, result);
2922                 }
2923         }
2924 }
2925
2926 /*
2927  * a queue callback. Makes sure that we don't create a bio that spans across
2928  * multiple osd objects. One exception would be with a single page bios,
2929  * which we handle later at bio_chain_clone_range()
2930  */
2931 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2932                           struct bio_vec *bvec)
2933 {
2934         struct rbd_device *rbd_dev = q->queuedata;
2935         sector_t sector_offset;
2936         sector_t sectors_per_obj;
2937         sector_t obj_sector_offset;
2938         int ret;
2939
2940         /*
2941          * Find how far into its rbd object the partition-relative
2942          * bio start sector is to offset relative to the enclosing
2943          * device.
2944          */
2945         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2946         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2947         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2948
2949         /*
2950          * Compute the number of bytes from that offset to the end
2951          * of the object.  Account for what's already used by the bio.
2952          */
2953         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2954         if (ret > bmd->bi_size)
2955                 ret -= bmd->bi_size;
2956         else
2957                 ret = 0;
2958
2959         /*
2960          * Don't send back more than was asked for.  And if the bio
2961          * was empty, let the whole thing through because:  "Note
2962          * that a block device *must* allow a single page to be
2963          * added to an empty bio."
2964          */
2965         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2966         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2967                 ret = (int) bvec->bv_len;
2968
2969         return ret;
2970 }
2971
2972 static void rbd_free_disk(struct rbd_device *rbd_dev)
2973 {
2974         struct gendisk *disk = rbd_dev->disk;
2975
2976         if (!disk)
2977                 return;
2978
2979         rbd_dev->disk = NULL;
2980         if (disk->flags & GENHD_FL_UP) {
2981                 del_gendisk(disk);
2982                 if (disk->queue)
2983                         blk_cleanup_queue(disk->queue);
2984         }
2985         put_disk(disk);
2986 }
2987
2988 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2989                                 const char *object_name,
2990                                 u64 offset, u64 length, void *buf)
2991
2992 {
2993         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2994         struct rbd_obj_request *obj_request;
2995         struct page **pages = NULL;
2996         u32 page_count;
2997         size_t size;
2998         int ret;
2999
3000         page_count = (u32) calc_pages_for(offset, length);
3001         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3002         if (IS_ERR(pages))
3003                 ret = PTR_ERR(pages);
3004
3005         ret = -ENOMEM;
3006         obj_request = rbd_obj_request_create(object_name, offset, length,
3007                                                         OBJ_REQUEST_PAGES);
3008         if (!obj_request)
3009                 goto out;
3010
3011         obj_request->pages = pages;
3012         obj_request->page_count = page_count;
3013
3014         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3015         if (!obj_request->osd_req)
3016                 goto out;
3017
3018         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3019                                         offset, length, 0, 0);
3020         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3021                                         obj_request->pages,
3022                                         obj_request->length,
3023                                         obj_request->offset & ~PAGE_MASK,
3024                                         false, false);
3025         rbd_osd_req_format_read(obj_request);
3026
3027         ret = rbd_obj_request_submit(osdc, obj_request);
3028         if (ret)
3029                 goto out;
3030         ret = rbd_obj_request_wait(obj_request);
3031         if (ret)
3032                 goto out;
3033
3034         ret = obj_request->result;
3035         if (ret < 0)
3036                 goto out;
3037
3038         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3039         size = (size_t) obj_request->xferred;
3040         ceph_copy_from_page_vector(pages, buf, 0, size);
3041         rbd_assert(size <= (size_t)INT_MAX);
3042         ret = (int)size;
3043 out:
3044         if (obj_request)
3045                 rbd_obj_request_put(obj_request);
3046         else
3047                 ceph_release_page_vector(pages, page_count);
3048
3049         return ret;
3050 }
3051
3052 /*
3053  * Read the complete header for the given rbd device.  On successful
3054  * return, the rbd_dev->header field will contain up-to-date
3055  * information about the image.
3056  */
3057 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3058 {
3059         struct rbd_image_header_ondisk *ondisk = NULL;
3060         u32 snap_count = 0;
3061         u64 names_size = 0;
3062         u32 want_count;
3063         int ret;
3064
3065         /*
3066          * The complete header will include an array of its 64-bit
3067          * snapshot ids, followed by the names of those snapshots as
3068          * a contiguous block of NUL-terminated strings.  Note that
3069          * the number of snapshots could change by the time we read
3070          * it in, in which case we re-read it.
3071          */
3072         do {
3073                 size_t size;
3074
3075                 kfree(ondisk);
3076
3077                 size = sizeof (*ondisk);
3078                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3079                 size += names_size;
3080                 ondisk = kmalloc(size, GFP_KERNEL);
3081                 if (!ondisk)
3082                         return -ENOMEM;
3083
3084                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3085                                        0, size, ondisk);
3086                 if (ret < 0)
3087                         goto out;
3088                 if ((size_t)ret < size) {
3089                         ret = -ENXIO;
3090                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3091                                 size, ret);
3092                         goto out;
3093                 }
3094                 if (!rbd_dev_ondisk_valid(ondisk)) {
3095                         ret = -ENXIO;
3096                         rbd_warn(rbd_dev, "invalid header");
3097                         goto out;
3098                 }
3099
3100                 names_size = le64_to_cpu(ondisk->snap_names_len);
3101                 want_count = snap_count;
3102                 snap_count = le32_to_cpu(ondisk->snap_count);
3103         } while (snap_count != want_count);
3104
3105         ret = rbd_header_from_disk(rbd_dev, ondisk);
3106 out:
3107         kfree(ondisk);
3108
3109         return ret;
3110 }
3111
3112 /*
3113  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3114  * has disappeared from the (just updated) snapshot context.
3115  */
3116 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3117 {
3118         u64 snap_id;
3119
3120         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3121                 return;
3122
3123         snap_id = rbd_dev->spec->snap_id;
3124         if (snap_id == CEPH_NOSNAP)
3125                 return;
3126
3127         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3128                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3129 }
3130
3131 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3132 {
3133         u64 mapping_size;
3134         int ret;
3135
3136         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3137         mapping_size = rbd_dev->mapping.size;
3138         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3139         if (rbd_dev->image_format == 1)
3140                 ret = rbd_dev_v1_header_info(rbd_dev);
3141         else
3142                 ret = rbd_dev_v2_header_info(rbd_dev);
3143
3144         /* If it's a mapped snapshot, validate its EXISTS flag */
3145
3146         rbd_exists_validate(rbd_dev);
3147         mutex_unlock(&ctl_mutex);
3148         if (mapping_size != rbd_dev->mapping.size) {
3149                 sector_t size;
3150
3151                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3152                 dout("setting size to %llu sectors", (unsigned long long)size);
3153                 set_capacity(rbd_dev->disk, size);
3154                 revalidate_disk(rbd_dev->disk);
3155         }
3156
3157         return ret;
3158 }
3159
3160 static int rbd_init_disk(struct rbd_device *rbd_dev)
3161 {
3162         struct gendisk *disk;
3163         struct request_queue *q;
3164         u64 segment_size;
3165
3166         /* create gendisk info */
3167         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3168         if (!disk)
3169                 return -ENOMEM;
3170
3171         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3172                  rbd_dev->dev_id);
3173         disk->major = rbd_dev->major;
3174         disk->first_minor = 0;
3175         disk->fops = &rbd_bd_ops;
3176         disk->private_data = rbd_dev;
3177
3178         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3179         if (!q)
3180                 goto out_disk;
3181
3182         /* We use the default size, but let's be explicit about it. */
3183         blk_queue_physical_block_size(q, SECTOR_SIZE);
3184
3185         /* set io sizes to object size */
3186         segment_size = rbd_obj_bytes(&rbd_dev->header);
3187         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3188         blk_queue_max_segment_size(q, segment_size);
3189         blk_queue_io_min(q, segment_size);
3190         blk_queue_io_opt(q, segment_size);
3191
3192         blk_queue_merge_bvec(q, rbd_merge_bvec);
3193         disk->queue = q;
3194
3195         q->queuedata = rbd_dev;
3196
3197         rbd_dev->disk = disk;
3198
3199         return 0;
3200 out_disk:
3201         put_disk(disk);
3202
3203         return -ENOMEM;
3204 }
3205
3206 /*
3207   sysfs
3208 */
3209
3210 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3211 {
3212         return container_of(dev, struct rbd_device, dev);
3213 }
3214
3215 static ssize_t rbd_size_show(struct device *dev,
3216                              struct device_attribute *attr, char *buf)
3217 {
3218         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3219
3220         return sprintf(buf, "%llu\n",
3221                 (unsigned long long)rbd_dev->mapping.size);
3222 }
3223
3224 /*
3225  * Note this shows the features for whatever's mapped, which is not
3226  * necessarily the base image.
3227  */
3228 static ssize_t rbd_features_show(struct device *dev,
3229                              struct device_attribute *attr, char *buf)
3230 {
3231         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3232
3233         return sprintf(buf, "0x%016llx\n",
3234                         (unsigned long long)rbd_dev->mapping.features);
3235 }
3236
3237 static ssize_t rbd_major_show(struct device *dev,
3238                               struct device_attribute *attr, char *buf)
3239 {
3240         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3241
3242         if (rbd_dev->major)
3243                 return sprintf(buf, "%d\n", rbd_dev->major);
3244
3245         return sprintf(buf, "(none)\n");
3246
3247 }
3248
3249 static ssize_t rbd_client_id_show(struct device *dev,
3250                                   struct device_attribute *attr, char *buf)
3251 {
3252         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3253
3254         return sprintf(buf, "client%lld\n",
3255                         ceph_client_id(rbd_dev->rbd_client->client));
3256 }
3257
3258 static ssize_t rbd_pool_show(struct device *dev,
3259                              struct device_attribute *attr, char *buf)
3260 {
3261         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3262
3263         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3264 }
3265
3266 static ssize_t rbd_pool_id_show(struct device *dev,
3267                              struct device_attribute *attr, char *buf)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270
3271         return sprintf(buf, "%llu\n",
3272                         (unsigned long long) rbd_dev->spec->pool_id);
3273 }
3274
3275 static ssize_t rbd_name_show(struct device *dev,
3276                              struct device_attribute *attr, char *buf)
3277 {
3278         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3279
3280         if (rbd_dev->spec->image_name)
3281                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3282
3283         return sprintf(buf, "(unknown)\n");
3284 }
3285
3286 static ssize_t rbd_image_id_show(struct device *dev,
3287                              struct device_attribute *attr, char *buf)
3288 {
3289         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290
3291         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3292 }
3293
3294 /*
3295  * Shows the name of the currently-mapped snapshot (or
3296  * RBD_SNAP_HEAD_NAME for the base image).
3297  */
3298 static ssize_t rbd_snap_show(struct device *dev,
3299                              struct device_attribute *attr,
3300                              char *buf)
3301 {
3302         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3303
3304         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3305 }
3306
3307 /*
3308  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3309  * for the parent image.  If there is no parent, simply shows
3310  * "(no parent image)".
3311  */
3312 static ssize_t rbd_parent_show(struct device *dev,
3313                              struct device_attribute *attr,
3314                              char *buf)
3315 {
3316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3317         struct rbd_spec *spec = rbd_dev->parent_spec;
3318         int count;
3319         char *bufp = buf;
3320
3321         if (!spec)
3322                 return sprintf(buf, "(no parent image)\n");
3323
3324         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3325                         (unsigned long long) spec->pool_id, spec->pool_name);
3326         if (count < 0)
3327                 return count;
3328         bufp += count;
3329
3330         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3331                         spec->image_name ? spec->image_name : "(unknown)");
3332         if (count < 0)
3333                 return count;
3334         bufp += count;
3335
3336         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3337                         (unsigned long long) spec->snap_id, spec->snap_name);
3338         if (count < 0)
3339                 return count;
3340         bufp += count;
3341
3342         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3343         if (count < 0)
3344                 return count;
3345         bufp += count;
3346
3347         return (ssize_t) (bufp - buf);
3348 }
3349
3350 static ssize_t rbd_image_refresh(struct device *dev,
3351                                  struct device_attribute *attr,
3352                                  const char *buf,
3353                                  size_t size)
3354 {
3355         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3356         int ret;
3357
3358         ret = rbd_dev_refresh(rbd_dev);
3359         if (ret)
3360                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3361
3362         return ret < 0 ? ret : size;
3363 }
3364
3365 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3366 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3367 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3368 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3369 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3370 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3371 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3372 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3373 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3374 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3375 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3376
3377 static struct attribute *rbd_attrs[] = {
3378         &dev_attr_size.attr,
3379         &dev_attr_features.attr,
3380         &dev_attr_major.attr,
3381         &dev_attr_client_id.attr,
3382         &dev_attr_pool.attr,
3383         &dev_attr_pool_id.attr,
3384         &dev_attr_name.attr,
3385         &dev_attr_image_id.attr,
3386         &dev_attr_current_snap.attr,
3387         &dev_attr_parent.attr,
3388         &dev_attr_refresh.attr,
3389         NULL
3390 };
3391
3392 static struct attribute_group rbd_attr_group = {
3393         .attrs = rbd_attrs,
3394 };
3395
3396 static const struct attribute_group *rbd_attr_groups[] = {
3397         &rbd_attr_group,
3398         NULL
3399 };
3400
3401 static void rbd_sysfs_dev_release(struct device *dev)
3402 {
3403 }
3404
3405 static struct device_type rbd_device_type = {
3406         .name           = "rbd",
3407         .groups         = rbd_attr_groups,
3408         .release        = rbd_sysfs_dev_release,
3409 };
3410
3411 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3412 {
3413         kref_get(&spec->kref);
3414
3415         return spec;
3416 }
3417
3418 static void rbd_spec_free(struct kref *kref);
3419 static void rbd_spec_put(struct rbd_spec *spec)
3420 {
3421         if (spec)
3422                 kref_put(&spec->kref, rbd_spec_free);
3423 }
3424
3425 static struct rbd_spec *rbd_spec_alloc(void)
3426 {
3427         struct rbd_spec *spec;
3428
3429         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3430         if (!spec)
3431                 return NULL;
3432         kref_init(&spec->kref);
3433
3434         return spec;
3435 }
3436
3437 static void rbd_spec_free(struct kref *kref)
3438 {
3439         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3440
3441         kfree(spec->pool_name);
3442         kfree(spec->image_id);
3443         kfree(spec->image_name);
3444         kfree(spec->snap_name);
3445         kfree(spec);
3446 }
3447
3448 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3449                                 struct rbd_spec *spec)
3450 {
3451         struct rbd_device *rbd_dev;
3452
3453         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3454         if (!rbd_dev)
3455                 return NULL;
3456
3457         spin_lock_init(&rbd_dev->lock);
3458         rbd_dev->flags = 0;
3459         INIT_LIST_HEAD(&rbd_dev->node);
3460         init_rwsem(&rbd_dev->header_rwsem);
3461
3462         rbd_dev->spec = spec;
3463         rbd_dev->rbd_client = rbdc;
3464
3465         /* Initialize the layout used for all rbd requests */
3466
3467         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3468         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3469         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3470         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3471
3472         return rbd_dev;
3473 }
3474
3475 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3476 {
3477         rbd_put_client(rbd_dev->rbd_client);
3478         rbd_spec_put(rbd_dev->spec);
3479         kfree(rbd_dev);
3480 }
3481
3482 /*
3483  * Get the size and object order for an image snapshot, or if
3484  * snap_id is CEPH_NOSNAP, gets this information for the base
3485  * image.
3486  */
3487 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3488                                 u8 *order, u64 *snap_size)
3489 {
3490         __le64 snapid = cpu_to_le64(snap_id);
3491         int ret;
3492         struct {
3493                 u8 order;
3494                 __le64 size;
3495         } __attribute__ ((packed)) size_buf = { 0 };
3496
3497         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3498                                 "rbd", "get_size",
3499                                 &snapid, sizeof (snapid),
3500                                 &size_buf, sizeof (size_buf));
3501         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3502         if (ret < 0)
3503                 return ret;
3504         if (ret < sizeof (size_buf))
3505                 return -ERANGE;
3506
3507         if (order)
3508                 *order = size_buf.order;
3509         *snap_size = le64_to_cpu(size_buf.size);
3510
3511         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3512                 (unsigned long long)snap_id, (unsigned int)*order,
3513                 (unsigned long long)*snap_size);
3514
3515         return 0;
3516 }
3517
3518 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3519 {
3520         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3521                                         &rbd_dev->header.obj_order,
3522                                         &rbd_dev->header.image_size);
3523 }
3524
3525 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3526 {
3527         void *reply_buf;
3528         int ret;
3529         void *p;
3530
3531         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3532         if (!reply_buf)
3533                 return -ENOMEM;
3534
3535         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3536                                 "rbd", "get_object_prefix", NULL, 0,
3537                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3538         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3539         if (ret < 0)
3540                 goto out;
3541
3542         p = reply_buf;
3543         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3544                                                 p + ret, NULL, GFP_NOIO);
3545         ret = 0;
3546
3547         if (IS_ERR(rbd_dev->header.object_prefix)) {
3548                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3549                 rbd_dev->header.object_prefix = NULL;
3550         } else {
3551                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3552         }
3553 out:
3554         kfree(reply_buf);
3555
3556         return ret;
3557 }
3558
3559 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3560                 u64 *snap_features)
3561 {
3562         __le64 snapid = cpu_to_le64(snap_id);
3563         struct {
3564                 __le64 features;
3565                 __le64 incompat;
3566         } __attribute__ ((packed)) features_buf = { 0 };
3567         u64 incompat;
3568         int ret;
3569
3570         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3571                                 "rbd", "get_features",
3572                                 &snapid, sizeof (snapid),
3573                                 &features_buf, sizeof (features_buf));
3574         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3575         if (ret < 0)
3576                 return ret;
3577         if (ret < sizeof (features_buf))
3578                 return -ERANGE;
3579
3580         incompat = le64_to_cpu(features_buf.incompat);
3581         if (incompat & ~RBD_FEATURES_SUPPORTED)
3582                 return -ENXIO;
3583
3584         *snap_features = le64_to_cpu(features_buf.features);
3585
3586         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3587                 (unsigned long long)snap_id,
3588                 (unsigned long long)*snap_features,
3589                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3590
3591         return 0;
3592 }
3593
3594 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3595 {
3596         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3597                                                 &rbd_dev->header.features);
3598 }
3599
3600 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3601 {
3602         struct rbd_spec *parent_spec;
3603         size_t size;
3604         void *reply_buf = NULL;
3605         __le64 snapid;
3606         void *p;
3607         void *end;
3608         char *image_id;
3609         u64 overlap;
3610         int ret;
3611
3612         parent_spec = rbd_spec_alloc();
3613         if (!parent_spec)
3614                 return -ENOMEM;
3615
3616         size = sizeof (__le64) +                                /* pool_id */
3617                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3618                 sizeof (__le64) +                               /* snap_id */
3619                 sizeof (__le64);                                /* overlap */
3620         reply_buf = kmalloc(size, GFP_KERNEL);
3621         if (!reply_buf) {
3622                 ret = -ENOMEM;
3623                 goto out_err;
3624         }
3625
3626         snapid = cpu_to_le64(CEPH_NOSNAP);
3627         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3628                                 "rbd", "get_parent",
3629                                 &snapid, sizeof (snapid),
3630                                 reply_buf, size);
3631         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3632         if (ret < 0)
3633                 goto out_err;
3634
3635         p = reply_buf;
3636         end = reply_buf + ret;
3637         ret = -ERANGE;
3638         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3639         if (parent_spec->pool_id == CEPH_NOPOOL)
3640                 goto out;       /* No parent?  No problem. */
3641
3642         /* The ceph file layout needs to fit pool id in 32 bits */
3643
3644         ret = -EIO;
3645         if (parent_spec->pool_id > (u64)U32_MAX) {
3646                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3647                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3648                 goto out_err;
3649         }
3650
3651         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3652         if (IS_ERR(image_id)) {
3653                 ret = PTR_ERR(image_id);
3654                 goto out_err;
3655         }
3656         parent_spec->image_id = image_id;
3657         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3658         ceph_decode_64_safe(&p, end, overlap, out_err);
3659
3660         rbd_dev->parent_overlap = overlap;
3661         rbd_dev->parent_spec = parent_spec;
3662         parent_spec = NULL;     /* rbd_dev now owns this */
3663 out:
3664         ret = 0;
3665 out_err:
3666         kfree(reply_buf);
3667         rbd_spec_put(parent_spec);
3668
3669         return ret;
3670 }
3671
3672 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3673 {
3674         struct {
3675                 __le64 stripe_unit;
3676                 __le64 stripe_count;
3677         } __attribute__ ((packed)) striping_info_buf = { 0 };
3678         size_t size = sizeof (striping_info_buf);
3679         void *p;
3680         u64 obj_size;
3681         u64 stripe_unit;
3682         u64 stripe_count;
3683         int ret;
3684
3685         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3686                                 "rbd", "get_stripe_unit_count", NULL, 0,
3687                                 (char *)&striping_info_buf, size);
3688         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3689         if (ret < 0)
3690                 return ret;
3691         if (ret < size)
3692                 return -ERANGE;
3693
3694         /*
3695          * We don't actually support the "fancy striping" feature
3696          * (STRIPINGV2) yet, but if the striping sizes are the
3697          * defaults the behavior is the same as before.  So find
3698          * out, and only fail if the image has non-default values.
3699          */
3700         ret = -EINVAL;
3701         obj_size = (u64)1 << rbd_dev->header.obj_order;
3702         p = &striping_info_buf;
3703         stripe_unit = ceph_decode_64(&p);
3704         if (stripe_unit != obj_size) {
3705                 rbd_warn(rbd_dev, "unsupported stripe unit "
3706                                 "(got %llu want %llu)",
3707                                 stripe_unit, obj_size);
3708                 return -EINVAL;
3709         }
3710         stripe_count = ceph_decode_64(&p);
3711         if (stripe_count != 1) {
3712                 rbd_warn(rbd_dev, "unsupported stripe count "
3713                                 "(got %llu want 1)", stripe_count);
3714                 return -EINVAL;
3715         }
3716         rbd_dev->header.stripe_unit = stripe_unit;
3717         rbd_dev->header.stripe_count = stripe_count;
3718
3719         return 0;
3720 }
3721
3722 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3723 {
3724         size_t image_id_size;
3725         char *image_id;
3726         void *p;
3727         void *end;
3728         size_t size;
3729         void *reply_buf = NULL;
3730         size_t len = 0;
3731         char *image_name = NULL;
3732         int ret;
3733
3734         rbd_assert(!rbd_dev->spec->image_name);
3735
3736         len = strlen(rbd_dev->spec->image_id);
3737         image_id_size = sizeof (__le32) + len;
3738         image_id = kmalloc(image_id_size, GFP_KERNEL);
3739         if (!image_id)
3740                 return NULL;
3741
3742         p = image_id;
3743         end = image_id + image_id_size;
3744         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3745
3746         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3747         reply_buf = kmalloc(size, GFP_KERNEL);
3748         if (!reply_buf)
3749                 goto out;
3750
3751         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3752                                 "rbd", "dir_get_name",
3753                                 image_id, image_id_size,
3754                                 reply_buf, size);
3755         if (ret < 0)
3756                 goto out;
3757         p = reply_buf;
3758         end = reply_buf + ret;
3759
3760         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3761         if (IS_ERR(image_name))
3762                 image_name = NULL;
3763         else
3764                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3765 out:
3766         kfree(reply_buf);
3767         kfree(image_id);
3768
3769         return image_name;
3770 }
3771
3772 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3773 {
3774         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3775         const char *snap_name;
3776         u32 which = 0;
3777
3778         /* Skip over names until we find the one we are looking for */
3779
3780         snap_name = rbd_dev->header.snap_names;
3781         while (which < snapc->num_snaps) {
3782                 if (!strcmp(name, snap_name))
3783                         return snapc->snaps[which];
3784                 snap_name += strlen(snap_name) + 1;
3785                 which++;
3786         }
3787         return CEPH_NOSNAP;
3788 }
3789
3790 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3791 {
3792         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3793         u32 which;
3794         bool found = false;
3795         u64 snap_id;
3796
3797         for (which = 0; !found && which < snapc->num_snaps; which++) {
3798                 const char *snap_name;
3799
3800                 snap_id = snapc->snaps[which];
3801                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3802                 if (IS_ERR(snap_name))
3803                         break;
3804                 found = !strcmp(name, snap_name);
3805                 kfree(snap_name);
3806         }
3807         return found ? snap_id : CEPH_NOSNAP;
3808 }
3809
3810 /*
3811  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3812  * no snapshot by that name is found, or if an error occurs.
3813  */
3814 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3815 {
3816         if (rbd_dev->image_format == 1)
3817                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3818
3819         return rbd_v2_snap_id_by_name(rbd_dev, name);
3820 }
3821
3822 /*
3823  * When an rbd image has a parent image, it is identified by the
3824  * pool, image, and snapshot ids (not names).  This function fills
3825  * in the names for those ids.  (It's OK if we can't figure out the
3826  * name for an image id, but the pool and snapshot ids should always
3827  * exist and have names.)  All names in an rbd spec are dynamically
3828  * allocated.
3829  *
3830  * When an image being mapped (not a parent) is probed, we have the
3831  * pool name and pool id, image name and image id, and the snapshot
3832  * name.  The only thing we're missing is the snapshot id.
3833  */
3834 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3835 {
3836         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3837         struct rbd_spec *spec = rbd_dev->spec;
3838         const char *pool_name;
3839         const char *image_name;
3840         const char *snap_name;
3841         int ret;
3842
3843         /*
3844          * An image being mapped will have the pool name (etc.), but
3845          * we need to look up the snapshot id.
3846          */
3847         if (spec->pool_name) {
3848                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3849                         u64 snap_id;
3850
3851                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3852                         if (snap_id == CEPH_NOSNAP)
3853                                 return -ENOENT;
3854                         spec->snap_id = snap_id;
3855                 } else {
3856                         spec->snap_id = CEPH_NOSNAP;
3857                 }
3858
3859                 return 0;
3860         }
3861
3862         /* Get the pool name; we have to make our own copy of this */
3863
3864         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3865         if (!pool_name) {
3866                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3867                 return -EIO;
3868         }
3869         pool_name = kstrdup(pool_name, GFP_KERNEL);
3870         if (!pool_name)
3871                 return -ENOMEM;
3872
3873         /* Fetch the image name; tolerate failure here */
3874
3875         image_name = rbd_dev_image_name(rbd_dev);
3876         if (!image_name)
3877                 rbd_warn(rbd_dev, "unable to get image name");
3878
3879         /* Look up the snapshot name, and make a copy */
3880
3881         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3882         if (!snap_name) {
3883                 ret = -ENOMEM;
3884                 goto out_err;
3885         }
3886
3887         spec->pool_name = pool_name;
3888         spec->image_name = image_name;
3889         spec->snap_name = snap_name;
3890
3891         return 0;
3892 out_err:
3893         kfree(image_name);
3894         kfree(pool_name);
3895
3896         return ret;
3897 }
3898
3899 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3900 {
3901         size_t size;
3902         int ret;
3903         void *reply_buf;
3904         void *p;
3905         void *end;
3906         u64 seq;
3907         u32 snap_count;
3908         struct ceph_snap_context *snapc;
3909         u32 i;
3910
3911         /*
3912          * We'll need room for the seq value (maximum snapshot id),
3913          * snapshot count, and array of that many snapshot ids.
3914          * For now we have a fixed upper limit on the number we're
3915          * prepared to receive.
3916          */
3917         size = sizeof (__le64) + sizeof (__le32) +
3918                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3919         reply_buf = kzalloc(size, GFP_KERNEL);
3920         if (!reply_buf)
3921                 return -ENOMEM;
3922
3923         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3924                                 "rbd", "get_snapcontext", NULL, 0,
3925                                 reply_buf, size);
3926         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3927         if (ret < 0)
3928                 goto out;
3929
3930         p = reply_buf;
3931         end = reply_buf + ret;
3932         ret = -ERANGE;
3933         ceph_decode_64_safe(&p, end, seq, out);
3934         ceph_decode_32_safe(&p, end, snap_count, out);
3935
3936         /*
3937          * Make sure the reported number of snapshot ids wouldn't go
3938          * beyond the end of our buffer.  But before checking that,
3939          * make sure the computed size of the snapshot context we
3940          * allocate is representable in a size_t.
3941          */
3942         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3943                                  / sizeof (u64)) {
3944                 ret = -EINVAL;
3945                 goto out;
3946         }
3947         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3948                 goto out;
3949         ret = 0;
3950
3951         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3952         if (!snapc) {
3953                 ret = -ENOMEM;
3954                 goto out;
3955         }
3956         snapc->seq = seq;
3957         for (i = 0; i < snap_count; i++)
3958                 snapc->snaps[i] = ceph_decode_64(&p);
3959
3960         ceph_put_snap_context(rbd_dev->header.snapc);
3961         rbd_dev->header.snapc = snapc;
3962
3963         dout("  snap context seq = %llu, snap_count = %u\n",
3964                 (unsigned long long)seq, (unsigned int)snap_count);
3965 out:
3966         kfree(reply_buf);
3967
3968         return ret;
3969 }
3970
3971 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3972                                         u64 snap_id)
3973 {
3974         size_t size;
3975         void *reply_buf;
3976         __le64 snapid;
3977         int ret;
3978         void *p;
3979         void *end;
3980         char *snap_name;
3981
3982         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3983         reply_buf = kmalloc(size, GFP_KERNEL);
3984         if (!reply_buf)
3985                 return ERR_PTR(-ENOMEM);
3986
3987         snapid = cpu_to_le64(snap_id);
3988         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3989                                 "rbd", "get_snapshot_name",
3990                                 &snapid, sizeof (snapid),
3991                                 reply_buf, size);
3992         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3993         if (ret < 0) {
3994                 snap_name = ERR_PTR(ret);
3995                 goto out;
3996         }
3997
3998         p = reply_buf;
3999         end = reply_buf + ret;
4000         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4001         if (IS_ERR(snap_name))
4002                 goto out;
4003
4004         dout("  snap_id 0x%016llx snap_name = %s\n",
4005                 (unsigned long long)snap_id, snap_name);
4006 out:
4007         kfree(reply_buf);
4008
4009         return snap_name;
4010 }
4011
4012 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4013 {
4014         bool first_time = rbd_dev->header.object_prefix == NULL;
4015         int ret;
4016
4017         down_write(&rbd_dev->header_rwsem);
4018
4019         if (first_time) {
4020                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4021                 if (ret)
4022                         goto out;
4023         }
4024
4025         ret = rbd_dev_v2_image_size(rbd_dev);
4026         if (ret)
4027                 goto out;
4028         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4029                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4030                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4031
4032         ret = rbd_dev_v2_snap_context(rbd_dev);
4033         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4034         if (ret)
4035                 goto out;
4036 out:
4037         up_write(&rbd_dev->header_rwsem);
4038
4039         return ret;
4040 }
4041
4042 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4043 {
4044         struct device *dev;
4045         int ret;
4046
4047         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4048
4049         dev = &rbd_dev->dev;
4050         dev->bus = &rbd_bus_type;
4051         dev->type = &rbd_device_type;
4052         dev->parent = &rbd_root_dev;
4053         dev->release = rbd_dev_device_release;
4054         dev_set_name(dev, "%d", rbd_dev->dev_id);
4055         ret = device_register(dev);
4056
4057         mutex_unlock(&ctl_mutex);
4058
4059         return ret;
4060 }
4061
4062 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4063 {
4064         device_unregister(&rbd_dev->dev);
4065 }
4066
4067 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4068
4069 /*
4070  * Get a unique rbd identifier for the given new rbd_dev, and add
4071  * the rbd_dev to the global list.  The minimum rbd id is 1.
4072  */
4073 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4074 {
4075         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4076
4077         spin_lock(&rbd_dev_list_lock);
4078         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4079         spin_unlock(&rbd_dev_list_lock);
4080         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4081                 (unsigned long long) rbd_dev->dev_id);
4082 }
4083
4084 /*
4085  * Remove an rbd_dev from the global list, and record that its
4086  * identifier is no longer in use.
4087  */
4088 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4089 {
4090         struct list_head *tmp;
4091         int rbd_id = rbd_dev->dev_id;
4092         int max_id;
4093
4094         rbd_assert(rbd_id > 0);
4095
4096         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4097                 (unsigned long long) rbd_dev->dev_id);
4098         spin_lock(&rbd_dev_list_lock);
4099         list_del_init(&rbd_dev->node);
4100
4101         /*
4102          * If the id being "put" is not the current maximum, there
4103          * is nothing special we need to do.
4104          */
4105         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4106                 spin_unlock(&rbd_dev_list_lock);
4107                 return;
4108         }
4109
4110         /*
4111          * We need to update the current maximum id.  Search the
4112          * list to find out what it is.  We're more likely to find
4113          * the maximum at the end, so search the list backward.
4114          */
4115         max_id = 0;
4116         list_for_each_prev(tmp, &rbd_dev_list) {
4117                 struct rbd_device *rbd_dev;
4118
4119                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4120                 if (rbd_dev->dev_id > max_id)
4121                         max_id = rbd_dev->dev_id;
4122         }
4123         spin_unlock(&rbd_dev_list_lock);
4124
4125         /*
4126          * The max id could have been updated by rbd_dev_id_get(), in
4127          * which case it now accurately reflects the new maximum.
4128          * Be careful not to overwrite the maximum value in that
4129          * case.
4130          */
4131         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4132         dout("  max dev id has been reset\n");
4133 }
4134
4135 /*
4136  * Skips over white space at *buf, and updates *buf to point to the
4137  * first found non-space character (if any). Returns the length of
4138  * the token (string of non-white space characters) found.  Note
4139  * that *buf must be terminated with '\0'.
4140  */
4141 static inline size_t next_token(const char **buf)
4142 {
4143         /*
4144         * These are the characters that produce nonzero for
4145         * isspace() in the "C" and "POSIX" locales.
4146         */
4147         const char *spaces = " \f\n\r\t\v";
4148
4149         *buf += strspn(*buf, spaces);   /* Find start of token */
4150
4151         return strcspn(*buf, spaces);   /* Return token length */
4152 }
4153
4154 /*
4155  * Finds the next token in *buf, and if the provided token buffer is
4156  * big enough, copies the found token into it.  The result, if
4157  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4158  * must be terminated with '\0' on entry.
4159  *
4160  * Returns the length of the token found (not including the '\0').
4161  * Return value will be 0 if no token is found, and it will be >=
4162  * token_size if the token would not fit.
4163  *
4164  * The *buf pointer will be updated to point beyond the end of the
4165  * found token.  Note that this occurs even if the token buffer is
4166  * too small to hold it.
4167  */
4168 static inline size_t copy_token(const char **buf,
4169                                 char *token,
4170                                 size_t token_size)
4171 {
4172         size_t len;
4173
4174         len = next_token(buf);
4175         if (len < token_size) {
4176                 memcpy(token, *buf, len);
4177                 *(token + len) = '\0';
4178         }
4179         *buf += len;
4180
4181         return len;
4182 }
4183
4184 /*
4185  * Finds the next token in *buf, dynamically allocates a buffer big
4186  * enough to hold a copy of it, and copies the token into the new
4187  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4188  * that a duplicate buffer is created even for a zero-length token.
4189  *
4190  * Returns a pointer to the newly-allocated duplicate, or a null
4191  * pointer if memory for the duplicate was not available.  If
4192  * the lenp argument is a non-null pointer, the length of the token
4193  * (not including the '\0') is returned in *lenp.
4194  *
4195  * If successful, the *buf pointer will be updated to point beyond
4196  * the end of the found token.
4197  *
4198  * Note: uses GFP_KERNEL for allocation.
4199  */
4200 static inline char *dup_token(const char **buf, size_t *lenp)
4201 {
4202         char *dup;
4203         size_t len;
4204
4205         len = next_token(buf);
4206         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4207         if (!dup)
4208                 return NULL;
4209         *(dup + len) = '\0';
4210         *buf += len;
4211
4212         if (lenp)
4213                 *lenp = len;
4214
4215         return dup;
4216 }
4217
4218 /*
4219  * Parse the options provided for an "rbd add" (i.e., rbd image
4220  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4221  * and the data written is passed here via a NUL-terminated buffer.
4222  * Returns 0 if successful or an error code otherwise.
4223  *
4224  * The information extracted from these options is recorded in
4225  * the other parameters which return dynamically-allocated
4226  * structures:
4227  *  ceph_opts
4228  *      The address of a pointer that will refer to a ceph options
4229  *      structure.  Caller must release the returned pointer using
4230  *      ceph_destroy_options() when it is no longer needed.
4231  *  rbd_opts
4232  *      Address of an rbd options pointer.  Fully initialized by
4233  *      this function; caller must release with kfree().
4234  *  spec
4235  *      Address of an rbd image specification pointer.  Fully
4236  *      initialized by this function based on parsed options.
4237  *      Caller must release with rbd_spec_put().
4238  *
4239  * The options passed take this form:
4240  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4241  * where:
4242  *  <mon_addrs>
4243  *      A comma-separated list of one or more monitor addresses.
4244  *      A monitor address is an ip address, optionally followed
4245  *      by a port number (separated by a colon).
4246  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4247  *  <options>
4248  *      A comma-separated list of ceph and/or rbd options.
4249  *  <pool_name>
4250  *      The name of the rados pool containing the rbd image.
4251  *  <image_name>
4252  *      The name of the image in that pool to map.
4253  *  <snap_id>
4254  *      An optional snapshot id.  If provided, the mapping will
4255  *      present data from the image at the time that snapshot was
4256  *      created.  The image head is used if no snapshot id is
4257  *      provided.  Snapshot mappings are always read-only.
4258  */
4259 static int rbd_add_parse_args(const char *buf,
4260                                 struct ceph_options **ceph_opts,
4261                                 struct rbd_options **opts,
4262                                 struct rbd_spec **rbd_spec)
4263 {
4264         size_t len;
4265         char *options;
4266         const char *mon_addrs;
4267         char *snap_name;
4268         size_t mon_addrs_size;
4269         struct rbd_spec *spec = NULL;
4270         struct rbd_options *rbd_opts = NULL;
4271         struct ceph_options *copts;
4272         int ret;
4273
4274         /* The first four tokens are required */
4275
4276         len = next_token(&buf);
4277         if (!len) {
4278                 rbd_warn(NULL, "no monitor address(es) provided");
4279                 return -EINVAL;
4280         }
4281         mon_addrs = buf;
4282         mon_addrs_size = len + 1;
4283         buf += len;
4284
4285         ret = -EINVAL;
4286         options = dup_token(&buf, NULL);
4287         if (!options)
4288                 return -ENOMEM;
4289         if (!*options) {
4290                 rbd_warn(NULL, "no options provided");
4291                 goto out_err;
4292         }
4293
4294         spec = rbd_spec_alloc();
4295         if (!spec)
4296                 goto out_mem;
4297
4298         spec->pool_name = dup_token(&buf, NULL);
4299         if (!spec->pool_name)
4300                 goto out_mem;
4301         if (!*spec->pool_name) {
4302                 rbd_warn(NULL, "no pool name provided");
4303                 goto out_err;
4304         }
4305
4306         spec->image_name = dup_token(&buf, NULL);
4307         if (!spec->image_name)
4308                 goto out_mem;
4309         if (!*spec->image_name) {
4310                 rbd_warn(NULL, "no image name provided");
4311                 goto out_err;
4312         }
4313
4314         /*
4315          * Snapshot name is optional; default is to use "-"
4316          * (indicating the head/no snapshot).
4317          */
4318         len = next_token(&buf);
4319         if (!len) {
4320                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4321                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4322         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4323                 ret = -ENAMETOOLONG;
4324                 goto out_err;
4325         }
4326         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4327         if (!snap_name)
4328                 goto out_mem;
4329         *(snap_name + len) = '\0';
4330         spec->snap_name = snap_name;
4331
4332         /* Initialize all rbd options to the defaults */
4333
4334         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4335         if (!rbd_opts)
4336                 goto out_mem;
4337
4338         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4339
4340         copts = ceph_parse_options(options, mon_addrs,
4341                                         mon_addrs + mon_addrs_size - 1,
4342                                         parse_rbd_opts_token, rbd_opts);
4343         if (IS_ERR(copts)) {
4344                 ret = PTR_ERR(copts);
4345                 goto out_err;
4346         }
4347         kfree(options);
4348
4349         *ceph_opts = copts;
4350         *opts = rbd_opts;
4351         *rbd_spec = spec;
4352
4353         return 0;
4354 out_mem:
4355         ret = -ENOMEM;
4356 out_err:
4357         kfree(rbd_opts);
4358         rbd_spec_put(spec);
4359         kfree(options);
4360
4361         return ret;
4362 }
4363
4364 /*
4365  * An rbd format 2 image has a unique identifier, distinct from the
4366  * name given to it by the user.  Internally, that identifier is
4367  * what's used to specify the names of objects related to the image.
4368  *
4369  * A special "rbd id" object is used to map an rbd image name to its
4370  * id.  If that object doesn't exist, then there is no v2 rbd image
4371  * with the supplied name.
4372  *
4373  * This function will record the given rbd_dev's image_id field if
4374  * it can be determined, and in that case will return 0.  If any
4375  * errors occur a negative errno will be returned and the rbd_dev's
4376  * image_id field will be unchanged (and should be NULL).
4377  */
4378 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4379 {
4380         int ret;
4381         size_t size;
4382         char *object_name;
4383         void *response;
4384         char *image_id;
4385
4386         /*
4387          * When probing a parent image, the image id is already
4388          * known (and the image name likely is not).  There's no
4389          * need to fetch the image id again in this case.  We
4390          * do still need to set the image format though.
4391          */
4392         if (rbd_dev->spec->image_id) {
4393                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4394
4395                 return 0;
4396         }
4397
4398         /*
4399          * First, see if the format 2 image id file exists, and if
4400          * so, get the image's persistent id from it.
4401          */
4402         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4403         object_name = kmalloc(size, GFP_NOIO);
4404         if (!object_name)
4405                 return -ENOMEM;
4406         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4407         dout("rbd id object name is %s\n", object_name);
4408
4409         /* Response will be an encoded string, which includes a length */
4410
4411         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4412         response = kzalloc(size, GFP_NOIO);
4413         if (!response) {
4414                 ret = -ENOMEM;
4415                 goto out;
4416         }
4417
4418         /* If it doesn't exist we'll assume it's a format 1 image */
4419
4420         ret = rbd_obj_method_sync(rbd_dev, object_name,
4421                                 "rbd", "get_id", NULL, 0,
4422                                 response, RBD_IMAGE_ID_LEN_MAX);
4423         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4424         if (ret == -ENOENT) {
4425                 image_id = kstrdup("", GFP_KERNEL);
4426                 ret = image_id ? 0 : -ENOMEM;
4427                 if (!ret)
4428                         rbd_dev->image_format = 1;
4429         } else if (ret > sizeof (__le32)) {
4430                 void *p = response;
4431
4432                 image_id = ceph_extract_encoded_string(&p, p + ret,
4433                                                 NULL, GFP_NOIO);
4434                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4435                 if (!ret)
4436                         rbd_dev->image_format = 2;
4437         } else {
4438                 ret = -EINVAL;
4439         }
4440
4441         if (!ret) {
4442                 rbd_dev->spec->image_id = image_id;
4443                 dout("image_id is %s\n", image_id);
4444         }
4445 out:
4446         kfree(response);
4447         kfree(object_name);
4448
4449         return ret;
4450 }
4451
4452 /* Undo whatever state changes are made by v1 or v2 image probe */
4453
4454 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4455 {
4456         struct rbd_image_header *header;
4457
4458         rbd_dev_remove_parent(rbd_dev);
4459         rbd_spec_put(rbd_dev->parent_spec);
4460         rbd_dev->parent_spec = NULL;
4461         rbd_dev->parent_overlap = 0;
4462
4463         /* Free dynamic fields from the header, then zero it out */
4464
4465         header = &rbd_dev->header;
4466         ceph_put_snap_context(header->snapc);
4467         kfree(header->snap_sizes);
4468         kfree(header->snap_names);
4469         kfree(header->object_prefix);
4470         memset(header, 0, sizeof (*header));
4471 }
4472
4473 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4474 {
4475         int ret;
4476
4477         ret = rbd_dev_v2_object_prefix(rbd_dev);
4478         if (ret)
4479                 goto out_err;
4480
4481         /*
4482          * Get the and check features for the image.  Currently the
4483          * features are assumed to never change.
4484          */
4485         ret = rbd_dev_v2_features(rbd_dev);
4486         if (ret)
4487                 goto out_err;
4488
4489         /* If the image supports layering, get the parent info */
4490
4491         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4492                 ret = rbd_dev_v2_parent_info(rbd_dev);
4493                 if (ret)
4494                         goto out_err;
4495                 /*
4496                  * Print a warning if this image has a parent.
4497                  * Don't print it if the image now being probed
4498                  * is itself a parent.  We can tell at this point
4499                  * because we won't know its pool name yet (just its
4500                  * pool id).
4501                  */
4502                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4503                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4504                                         "is EXPERIMENTAL!");
4505         }
4506
4507         /* If the image supports fancy striping, get its parameters */
4508
4509         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4510                 ret = rbd_dev_v2_striping_info(rbd_dev);
4511                 if (ret < 0)
4512                         goto out_err;
4513         }
4514         /* No support for crypto and compression type format 2 images */
4515
4516         return 0;
4517 out_err:
4518         rbd_dev->parent_overlap = 0;
4519         rbd_spec_put(rbd_dev->parent_spec);
4520         rbd_dev->parent_spec = NULL;
4521         kfree(rbd_dev->header_name);
4522         rbd_dev->header_name = NULL;
4523         kfree(rbd_dev->header.object_prefix);
4524         rbd_dev->header.object_prefix = NULL;
4525
4526         return ret;
4527 }
4528
4529 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4530 {
4531         struct rbd_device *parent = NULL;
4532         struct rbd_spec *parent_spec;
4533         struct rbd_client *rbdc;
4534         int ret;
4535
4536         if (!rbd_dev->parent_spec)
4537                 return 0;
4538         /*
4539          * We need to pass a reference to the client and the parent
4540          * spec when creating the parent rbd_dev.  Images related by
4541          * parent/child relationships always share both.
4542          */
4543         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4544         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4545
4546         ret = -ENOMEM;
4547         parent = rbd_dev_create(rbdc, parent_spec);
4548         if (!parent)
4549                 goto out_err;
4550
4551         ret = rbd_dev_image_probe(parent, false);
4552         if (ret < 0)
4553                 goto out_err;
4554         rbd_dev->parent = parent;
4555
4556         return 0;
4557 out_err:
4558         if (parent) {
4559                 rbd_spec_put(rbd_dev->parent_spec);
4560                 kfree(rbd_dev->header_name);
4561                 rbd_dev_destroy(parent);
4562         } else {
4563                 rbd_put_client(rbdc);
4564                 rbd_spec_put(parent_spec);
4565         }
4566
4567         return ret;
4568 }
4569
4570 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4571 {
4572         int ret;
4573
4574         /* generate unique id: find highest unique id, add one */
4575         rbd_dev_id_get(rbd_dev);
4576
4577         /* Fill in the device name, now that we have its id. */
4578         BUILD_BUG_ON(DEV_NAME_LEN
4579                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4580         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4581
4582         /* Get our block major device number. */
4583
4584         ret = register_blkdev(0, rbd_dev->name);
4585         if (ret < 0)
4586                 goto err_out_id;
4587         rbd_dev->major = ret;
4588
4589         /* Set up the blkdev mapping. */
4590
4591         ret = rbd_init_disk(rbd_dev);
4592         if (ret)
4593                 goto err_out_blkdev;
4594
4595         ret = rbd_dev_mapping_set(rbd_dev);
4596         if (ret)
4597                 goto err_out_disk;
4598         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4599
4600         ret = rbd_bus_add_dev(rbd_dev);
4601         if (ret)
4602                 goto err_out_mapping;
4603
4604         /* Everything's ready.  Announce the disk to the world. */
4605
4606         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4607         add_disk(rbd_dev->disk);
4608
4609         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4610                 (unsigned long long) rbd_dev->mapping.size);
4611
4612         return ret;
4613
4614 err_out_mapping:
4615         rbd_dev_mapping_clear(rbd_dev);
4616 err_out_disk:
4617         rbd_free_disk(rbd_dev);
4618 err_out_blkdev:
4619         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4620 err_out_id:
4621         rbd_dev_id_put(rbd_dev);
4622         rbd_dev_mapping_clear(rbd_dev);
4623
4624         return ret;
4625 }
4626
4627 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4628 {
4629         struct rbd_spec *spec = rbd_dev->spec;
4630         size_t size;
4631
4632         /* Record the header object name for this rbd image. */
4633
4634         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4635
4636         if (rbd_dev->image_format == 1)
4637                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4638         else
4639                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4640
4641         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4642         if (!rbd_dev->header_name)
4643                 return -ENOMEM;
4644
4645         if (rbd_dev->image_format == 1)
4646                 sprintf(rbd_dev->header_name, "%s%s",
4647                         spec->image_name, RBD_SUFFIX);
4648         else
4649                 sprintf(rbd_dev->header_name, "%s%s",
4650                         RBD_HEADER_PREFIX, spec->image_id);
4651         return 0;
4652 }
4653
4654 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4655 {
4656         rbd_dev_unprobe(rbd_dev);
4657         kfree(rbd_dev->header_name);
4658         rbd_dev->header_name = NULL;
4659         rbd_dev->image_format = 0;
4660         kfree(rbd_dev->spec->image_id);
4661         rbd_dev->spec->image_id = NULL;
4662
4663         rbd_dev_destroy(rbd_dev);
4664 }
4665
4666 /*
4667  * Probe for the existence of the header object for the given rbd
4668  * device.  If this image is the one being mapped (i.e., not a
4669  * parent), initiate a watch on its header object before using that
4670  * object to get detailed information about the rbd image.
4671  */
4672 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4673 {
4674         int ret;
4675         int tmp;
4676
4677         /*
4678          * Get the id from the image id object.  If it's not a
4679          * format 2 image, we'll get ENOENT back, and we'll assume
4680          * it's a format 1 image.
4681          */
4682         ret = rbd_dev_image_id(rbd_dev);
4683         if (ret)
4684                 return ret;
4685         rbd_assert(rbd_dev->spec->image_id);
4686         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4687
4688         ret = rbd_dev_header_name(rbd_dev);
4689         if (ret)
4690                 goto err_out_format;
4691
4692         if (mapping) {
4693                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4694                 if (ret)
4695                         goto out_header_name;
4696         }
4697
4698         if (rbd_dev->image_format == 1)
4699                 ret = rbd_dev_v1_header_info(rbd_dev);
4700         else
4701                 ret = rbd_dev_v2_header_info(rbd_dev);
4702         if (ret)
4703                 goto err_out_watch;
4704
4705         ret = rbd_dev_spec_update(rbd_dev);
4706         if (ret)
4707                 goto err_out_probe;
4708
4709         ret = rbd_dev_probe_parent(rbd_dev);
4710         if (ret)
4711                 goto err_out_probe;
4712
4713         dout("discovered format %u image, header name is %s\n",
4714                 rbd_dev->image_format, rbd_dev->header_name);
4715
4716         return 0;
4717 err_out_probe:
4718         rbd_dev_unprobe(rbd_dev);
4719 err_out_watch:
4720         if (mapping) {
4721                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4722                 if (tmp)
4723                         rbd_warn(rbd_dev, "unable to tear down "
4724                                         "watch request (%d)\n", tmp);
4725         }
4726 out_header_name:
4727         kfree(rbd_dev->header_name);
4728         rbd_dev->header_name = NULL;
4729 err_out_format:
4730         rbd_dev->image_format = 0;
4731         kfree(rbd_dev->spec->image_id);
4732         rbd_dev->spec->image_id = NULL;
4733
4734         dout("probe failed, returning %d\n", ret);
4735
4736         return ret;
4737 }
4738
4739 static ssize_t rbd_add(struct bus_type *bus,
4740                        const char *buf,
4741                        size_t count)
4742 {
4743         struct rbd_device *rbd_dev = NULL;
4744         struct ceph_options *ceph_opts = NULL;
4745         struct rbd_options *rbd_opts = NULL;
4746         struct rbd_spec *spec = NULL;
4747         struct rbd_client *rbdc;
4748         struct ceph_osd_client *osdc;
4749         bool read_only;
4750         int rc = -ENOMEM;
4751
4752         if (!try_module_get(THIS_MODULE))
4753                 return -ENODEV;
4754
4755         /* parse add command */
4756         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4757         if (rc < 0)
4758                 goto err_out_module;
4759         read_only = rbd_opts->read_only;
4760         kfree(rbd_opts);
4761         rbd_opts = NULL;        /* done with this */
4762
4763         rbdc = rbd_get_client(ceph_opts);
4764         if (IS_ERR(rbdc)) {
4765                 rc = PTR_ERR(rbdc);
4766                 goto err_out_args;
4767         }
4768         ceph_opts = NULL;       /* rbd_dev client now owns this */
4769
4770         /* pick the pool */
4771         osdc = &rbdc->client->osdc;
4772         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4773         if (rc < 0)
4774                 goto err_out_client;
4775         spec->pool_id = (u64)rc;
4776
4777         /* The ceph file layout needs to fit pool id in 32 bits */
4778
4779         if (spec->pool_id > (u64)U32_MAX) {
4780                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4781                                 (unsigned long long)spec->pool_id, U32_MAX);
4782                 rc = -EIO;
4783                 goto err_out_client;
4784         }
4785
4786         rbd_dev = rbd_dev_create(rbdc, spec);
4787         if (!rbd_dev)
4788                 goto err_out_client;
4789         rbdc = NULL;            /* rbd_dev now owns this */
4790         spec = NULL;            /* rbd_dev now owns this */
4791
4792         rc = rbd_dev_image_probe(rbd_dev, true);
4793         if (rc < 0)
4794                 goto err_out_rbd_dev;
4795
4796         /* If we are mapping a snapshot it must be marked read-only */
4797
4798         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4799                 read_only = true;
4800         rbd_dev->mapping.read_only = read_only;
4801
4802         rc = rbd_dev_device_setup(rbd_dev);
4803         if (!rc)
4804                 return count;
4805
4806         rbd_dev_image_release(rbd_dev);
4807 err_out_rbd_dev:
4808         rbd_dev_destroy(rbd_dev);
4809 err_out_client:
4810         rbd_put_client(rbdc);
4811 err_out_args:
4812         if (ceph_opts)
4813                 ceph_destroy_options(ceph_opts);
4814         kfree(rbd_opts);
4815         rbd_spec_put(spec);
4816 err_out_module:
4817         module_put(THIS_MODULE);
4818
4819         dout("Error adding device %s\n", buf);
4820
4821         return (ssize_t)rc;
4822 }
4823
4824 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4825 {
4826         struct list_head *tmp;
4827         struct rbd_device *rbd_dev;
4828
4829         spin_lock(&rbd_dev_list_lock);
4830         list_for_each(tmp, &rbd_dev_list) {
4831                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4832                 if (rbd_dev->dev_id == dev_id) {
4833                         spin_unlock(&rbd_dev_list_lock);
4834                         return rbd_dev;
4835                 }
4836         }
4837         spin_unlock(&rbd_dev_list_lock);
4838         return NULL;
4839 }
4840
4841 static void rbd_dev_device_release(struct device *dev)
4842 {
4843         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4844
4845         rbd_free_disk(rbd_dev);
4846         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4847         rbd_dev_mapping_clear(rbd_dev);
4848         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4849         rbd_dev->major = 0;
4850         rbd_dev_id_put(rbd_dev);
4851         rbd_dev_mapping_clear(rbd_dev);
4852 }
4853
4854 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4855 {
4856         while (rbd_dev->parent) {
4857                 struct rbd_device *first = rbd_dev;
4858                 struct rbd_device *second = first->parent;
4859                 struct rbd_device *third;
4860
4861                 /*
4862                  * Follow to the parent with no grandparent and
4863                  * remove it.
4864                  */
4865                 while (second && (third = second->parent)) {
4866                         first = second;
4867                         second = third;
4868                 }
4869                 rbd_assert(second);
4870                 rbd_dev_image_release(second);
4871                 first->parent = NULL;
4872                 first->parent_overlap = 0;
4873
4874                 rbd_assert(first->parent_spec);
4875                 rbd_spec_put(first->parent_spec);
4876                 first->parent_spec = NULL;
4877         }
4878 }
4879
4880 static ssize_t rbd_remove(struct bus_type *bus,
4881                           const char *buf,
4882                           size_t count)
4883 {
4884         struct rbd_device *rbd_dev = NULL;
4885         int target_id;
4886         unsigned long ul;
4887         int ret;
4888
4889         ret = strict_strtoul(buf, 10, &ul);
4890         if (ret)
4891                 return ret;
4892
4893         /* convert to int; abort if we lost anything in the conversion */
4894         target_id = (int) ul;
4895         if (target_id != ul)
4896                 return -EINVAL;
4897
4898         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4899
4900         rbd_dev = __rbd_get_dev(target_id);
4901         if (!rbd_dev) {
4902                 ret = -ENOENT;
4903                 goto done;
4904         }
4905
4906         spin_lock_irq(&rbd_dev->lock);
4907         if (rbd_dev->open_count)
4908                 ret = -EBUSY;
4909         else
4910                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4911         spin_unlock_irq(&rbd_dev->lock);
4912         if (ret < 0)
4913                 goto done;
4914         rbd_bus_del_dev(rbd_dev);
4915         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4916         if (ret)
4917                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4918         rbd_dev_image_release(rbd_dev);
4919         module_put(THIS_MODULE);
4920         ret = count;
4921 done:
4922         mutex_unlock(&ctl_mutex);
4923
4924         return ret;
4925 }
4926
4927 /*
4928  * create control files in sysfs
4929  * /sys/bus/rbd/...
4930  */
4931 static int rbd_sysfs_init(void)
4932 {
4933         int ret;
4934
4935         ret = device_register(&rbd_root_dev);
4936         if (ret < 0)
4937                 return ret;
4938
4939         ret = bus_register(&rbd_bus_type);
4940         if (ret < 0)
4941                 device_unregister(&rbd_root_dev);
4942
4943         return ret;
4944 }
4945
4946 static void rbd_sysfs_cleanup(void)
4947 {
4948         bus_unregister(&rbd_bus_type);
4949         device_unregister(&rbd_root_dev);
4950 }
4951
4952 static int rbd_slab_init(void)
4953 {
4954         rbd_assert(!rbd_img_request_cache);
4955         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4956                                         sizeof (struct rbd_img_request),
4957                                         __alignof__(struct rbd_img_request),
4958                                         0, NULL);
4959         if (!rbd_img_request_cache)
4960                 return -ENOMEM;
4961
4962         rbd_assert(!rbd_obj_request_cache);
4963         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4964                                         sizeof (struct rbd_obj_request),
4965                                         __alignof__(struct rbd_obj_request),
4966                                         0, NULL);
4967         if (!rbd_obj_request_cache)
4968                 goto out_err;
4969
4970         rbd_assert(!rbd_segment_name_cache);
4971         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4972                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4973         if (rbd_segment_name_cache)
4974                 return 0;
4975 out_err:
4976         if (rbd_obj_request_cache) {
4977                 kmem_cache_destroy(rbd_obj_request_cache);
4978                 rbd_obj_request_cache = NULL;
4979         }
4980
4981         kmem_cache_destroy(rbd_img_request_cache);
4982         rbd_img_request_cache = NULL;
4983
4984         return -ENOMEM;
4985 }
4986
4987 static void rbd_slab_exit(void)
4988 {
4989         rbd_assert(rbd_segment_name_cache);
4990         kmem_cache_destroy(rbd_segment_name_cache);
4991         rbd_segment_name_cache = NULL;
4992
4993         rbd_assert(rbd_obj_request_cache);
4994         kmem_cache_destroy(rbd_obj_request_cache);
4995         rbd_obj_request_cache = NULL;
4996
4997         rbd_assert(rbd_img_request_cache);
4998         kmem_cache_destroy(rbd_img_request_cache);
4999         rbd_img_request_cache = NULL;
5000 }
5001
5002 static int __init rbd_init(void)
5003 {
5004         int rc;
5005
5006         if (!libceph_compatible(NULL)) {
5007                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5008
5009                 return -EINVAL;
5010         }
5011         rc = rbd_slab_init();
5012         if (rc)
5013                 return rc;
5014         rc = rbd_sysfs_init();
5015         if (rc)
5016                 rbd_slab_exit();
5017         else
5018                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5019
5020         return rc;
5021 }
5022
5023 static void __exit rbd_exit(void)
5024 {
5025         rbd_sysfs_cleanup();
5026         rbd_slab_exit();
5027 }
5028
5029 module_init(rbd_init);
5030 module_exit(rbd_exit);
5031
5032 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5033 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5034 MODULE_DESCRIPTION("rados block device");
5035
5036 /* following authorship retained from original osdblk.c */
5037 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5038
5039 MODULE_LICENSE("GPL");