]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: ignore zero-overlap parent
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227         u32                     copyup_page_count;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261         spinlock_t              completion_lock;/* protects next_completion */
262         u32                     next_completion;
263         rbd_img_callback_t      callback;
264         u64                     xferred;/* aggregate bytes transferred */
265         int                     result; /* first nonzero obj_request result */
266
267         u32                     obj_request_count;
268         struct list_head        obj_requests;   /* rbd_obj_request structs */
269
270         struct kref             kref;
271 };
272
273 #define for_each_obj_request(ireq, oreq) \
274         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279
280 struct rbd_mapping {
281         u64                     size;
282         u64                     features;
283         bool                    read_only;
284 };
285
286 /*
287  * a single device
288  */
289 struct rbd_device {
290         int                     dev_id;         /* blkdev unique id */
291
292         int                     major;          /* blkdev assigned major */
293         struct gendisk          *disk;          /* blkdev's gendisk and rq */
294
295         u32                     image_format;   /* Either 1 or 2 */
296         struct rbd_client       *rbd_client;
297
298         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299
300         spinlock_t              lock;           /* queue, flags, open_count */
301
302         struct rbd_image_header header;
303         unsigned long           flags;          /* possibly lock protected */
304         struct rbd_spec         *spec;
305
306         char                    *header_name;
307
308         struct ceph_file_layout layout;
309
310         struct ceph_osd_event   *watch_event;
311         struct rbd_obj_request  *watch_request;
312
313         struct rbd_spec         *parent_spec;
314         u64                     parent_overlap;
315         struct rbd_device       *parent;
316
317         /* protects updating the header */
318         struct rw_semaphore     header_rwsem;
319
320         struct rbd_mapping      mapping;
321
322         struct list_head        node;
323
324         /* sysfs related */
325         struct device           dev;
326         unsigned long           open_count;     /* protected by lock */
327 };
328
329 /*
330  * Flag bits for rbd_dev->flags.  If atomicity is required,
331  * rbd_dev->lock is used to protect access.
332  *
333  * Currently, only the "removing" flag (which is coupled with the
334  * "open_count" field) requires atomic access.
335  */
336 enum rbd_dev_flags {
337         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
338         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
339 };
340
341 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
342
343 static LIST_HEAD(rbd_dev_list);    /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345
346 static LIST_HEAD(rbd_client_list);              /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
348
349 /* Slab caches for frequently-allocated structures */
350
351 static struct kmem_cache        *rbd_img_request_cache;
352 static struct kmem_cache        *rbd_obj_request_cache;
353 static struct kmem_cache        *rbd_segment_name_cache;
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static void rbd_dev_device_release(struct device *dev);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
433                                         u64 snap_id);
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435                                 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
437                 u64 *snap_features);
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
439
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
441 {
442         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443         bool removing = false;
444
445         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446                 return -EROFS;
447
448         spin_lock_irq(&rbd_dev->lock);
449         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450                 removing = true;
451         else
452                 rbd_dev->open_count++;
453         spin_unlock_irq(&rbd_dev->lock);
454         if (removing)
455                 return -ENOENT;
456
457         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458         (void) get_device(&rbd_dev->dev);
459         set_device_ro(bdev, rbd_dev->mapping.read_only);
460         mutex_unlock(&ctl_mutex);
461
462         return 0;
463 }
464
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
466 {
467         struct rbd_device *rbd_dev = disk->private_data;
468         unsigned long open_count_before;
469
470         spin_lock_irq(&rbd_dev->lock);
471         open_count_before = rbd_dev->open_count--;
472         spin_unlock_irq(&rbd_dev->lock);
473         rbd_assert(open_count_before > 0);
474
475         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476         put_device(&rbd_dev->dev);
477         mutex_unlock(&ctl_mutex);
478
479         return 0;
480 }
481
482 static const struct block_device_operations rbd_bd_ops = {
483         .owner                  = THIS_MODULE,
484         .open                   = rbd_open,
485         .release                = rbd_release,
486 };
487
488 /*
489  * Initialize an rbd client instance.
490  * We own *ceph_opts.
491  */
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
493 {
494         struct rbd_client *rbdc;
495         int ret = -ENOMEM;
496
497         dout("%s:\n", __func__);
498         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499         if (!rbdc)
500                 goto out_opt;
501
502         kref_init(&rbdc->kref);
503         INIT_LIST_HEAD(&rbdc->node);
504
505         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
506
507         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508         if (IS_ERR(rbdc->client))
509                 goto out_mutex;
510         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
511
512         ret = ceph_open_session(rbdc->client);
513         if (ret < 0)
514                 goto out_err;
515
516         spin_lock(&rbd_client_list_lock);
517         list_add_tail(&rbdc->node, &rbd_client_list);
518         spin_unlock(&rbd_client_list_lock);
519
520         mutex_unlock(&ctl_mutex);
521         dout("%s: rbdc %p\n", __func__, rbdc);
522
523         return rbdc;
524
525 out_err:
526         ceph_destroy_client(rbdc->client);
527 out_mutex:
528         mutex_unlock(&ctl_mutex);
529         kfree(rbdc);
530 out_opt:
531         if (ceph_opts)
532                 ceph_destroy_options(ceph_opts);
533         dout("%s: error %d\n", __func__, ret);
534
535         return ERR_PTR(ret);
536 }
537
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
539 {
540         kref_get(&rbdc->kref);
541
542         return rbdc;
543 }
544
545 /*
546  * Find a ceph client with specific addr and configuration.  If
547  * found, bump its reference count.
548  */
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
550 {
551         struct rbd_client *client_node;
552         bool found = false;
553
554         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555                 return NULL;
556
557         spin_lock(&rbd_client_list_lock);
558         list_for_each_entry(client_node, &rbd_client_list, node) {
559                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560                         __rbd_get_client(client_node);
561
562                         found = true;
563                         break;
564                 }
565         }
566         spin_unlock(&rbd_client_list_lock);
567
568         return found ? client_node : NULL;
569 }
570
571 /*
572  * mount options
573  */
574 enum {
575         Opt_last_int,
576         /* int args above */
577         Opt_last_string,
578         /* string args above */
579         Opt_read_only,
580         Opt_read_write,
581         /* Boolean args above */
582         Opt_last_bool,
583 };
584
585 static match_table_t rbd_opts_tokens = {
586         /* int args above */
587         /* string args above */
588         {Opt_read_only, "read_only"},
589         {Opt_read_only, "ro"},          /* Alternate spelling */
590         {Opt_read_write, "read_write"},
591         {Opt_read_write, "rw"},         /* Alternate spelling */
592         /* Boolean args above */
593         {-1, NULL}
594 };
595
596 struct rbd_options {
597         bool    read_only;
598 };
599
600 #define RBD_READ_ONLY_DEFAULT   false
601
602 static int parse_rbd_opts_token(char *c, void *private)
603 {
604         struct rbd_options *rbd_opts = private;
605         substring_t argstr[MAX_OPT_ARGS];
606         int token, intval, ret;
607
608         token = match_token(c, rbd_opts_tokens, argstr);
609         if (token < 0)
610                 return -EINVAL;
611
612         if (token < Opt_last_int) {
613                 ret = match_int(&argstr[0], &intval);
614                 if (ret < 0) {
615                         pr_err("bad mount option arg (not int) "
616                                "at '%s'\n", c);
617                         return ret;
618                 }
619                 dout("got int token %d val %d\n", token, intval);
620         } else if (token > Opt_last_int && token < Opt_last_string) {
621                 dout("got string token %d val %s\n", token,
622                      argstr[0].from);
623         } else if (token > Opt_last_string && token < Opt_last_bool) {
624                 dout("got Boolean token %d\n", token);
625         } else {
626                 dout("got token %d\n", token);
627         }
628
629         switch (token) {
630         case Opt_read_only:
631                 rbd_opts->read_only = true;
632                 break;
633         case Opt_read_write:
634                 rbd_opts->read_only = false;
635                 break;
636         default:
637                 rbd_assert(false);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Get a ceph client with specific addr and configuration, if one does
645  * not exist create it.
646  */
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650
651         rbdc = rbd_client_find(ceph_opts);
652         if (rbdc)       /* using an existing client */
653                 ceph_destroy_options(ceph_opts);
654         else
655                 rbdc = rbd_client_create(ceph_opts);
656
657         return rbdc;
658 }
659
660 /*
661  * Destroy ceph client
662  *
663  * Caller must hold rbd_client_list_lock.
664  */
665 static void rbd_client_release(struct kref *kref)
666 {
667         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
668
669         dout("%s: rbdc %p\n", __func__, rbdc);
670         spin_lock(&rbd_client_list_lock);
671         list_del(&rbdc->node);
672         spin_unlock(&rbd_client_list_lock);
673
674         ceph_destroy_client(rbdc->client);
675         kfree(rbdc);
676 }
677
678 /*
679  * Drop reference to ceph client node. If it's not referenced anymore, release
680  * it.
681  */
682 static void rbd_put_client(struct rbd_client *rbdc)
683 {
684         if (rbdc)
685                 kref_put(&rbdc->kref, rbd_client_release);
686 }
687
688 static bool rbd_image_format_valid(u32 image_format)
689 {
690         return image_format == 1 || image_format == 2;
691 }
692
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 {
695         size_t size;
696         u32 snap_count;
697
698         /* The header has to start with the magic rbd header text */
699         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700                 return false;
701
702         /* The bio layer requires at least sector-sized I/O */
703
704         if (ondisk->options.order < SECTOR_SHIFT)
705                 return false;
706
707         /* If we use u64 in a few spots we may be able to loosen this */
708
709         if (ondisk->options.order > 8 * sizeof (int) - 1)
710                 return false;
711
712         /*
713          * The size of a snapshot header has to fit in a size_t, and
714          * that limits the number of snapshots.
715          */
716         snap_count = le32_to_cpu(ondisk->snap_count);
717         size = SIZE_MAX - sizeof (struct ceph_snap_context);
718         if (snap_count > size / sizeof (__le64))
719                 return false;
720
721         /*
722          * Not only that, but the size of the entire the snapshot
723          * header must also be representable in a size_t.
724          */
725         size -= snap_count * sizeof (__le64);
726         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
727                 return false;
728
729         return true;
730 }
731
732 /*
733  * Fill an rbd image header with information from the given format 1
734  * on-disk header.
735  */
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737                                  struct rbd_image_header_ondisk *ondisk)
738 {
739         struct rbd_image_header *header = &rbd_dev->header;
740         bool first_time = header->object_prefix == NULL;
741         struct ceph_snap_context *snapc;
742         char *object_prefix = NULL;
743         char *snap_names = NULL;
744         u64 *snap_sizes = NULL;
745         u32 snap_count;
746         size_t size;
747         int ret = -ENOMEM;
748         u32 i;
749
750         /* Allocate this now to avoid having to handle failure below */
751
752         if (first_time) {
753                 size_t len;
754
755                 len = strnlen(ondisk->object_prefix,
756                                 sizeof (ondisk->object_prefix));
757                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758                 if (!object_prefix)
759                         return -ENOMEM;
760                 memcpy(object_prefix, ondisk->object_prefix, len);
761                 object_prefix[len] = '\0';
762         }
763
764         /* Allocate the snapshot context and fill it in */
765
766         snap_count = le32_to_cpu(ondisk->snap_count);
767         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768         if (!snapc)
769                 goto out_err;
770         snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         if (snap_count) {
772                 struct rbd_image_snap_ondisk *snaps;
773                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
774
775                 /* We'll keep a copy of the snapshot names... */
776
777                 if (snap_names_len > (u64)SIZE_MAX)
778                         goto out_2big;
779                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
780                 if (!snap_names)
781                         goto out_err;
782
783                 /* ...as well as the array of their sizes. */
784
785                 size = snap_count * sizeof (*header->snap_sizes);
786                 snap_sizes = kmalloc(size, GFP_KERNEL);
787                 if (!snap_sizes)
788                         goto out_err;
789
790                 /*
791                  * Copy the names, and fill in each snapshot's id
792                  * and size.
793                  *
794                  * Note that rbd_dev_v1_header_info() guarantees the
795                  * ondisk buffer we're working with has
796                  * snap_names_len bytes beyond the end of the
797                  * snapshot id array, this memcpy() is safe.
798                  */
799                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800                 snaps = ondisk->snaps;
801                 for (i = 0; i < snap_count; i++) {
802                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
804                 }
805         }
806
807         /* We won't fail any more, fill in the header */
808
809         down_write(&rbd_dev->header_rwsem);
810         if (first_time) {
811                 header->object_prefix = object_prefix;
812                 header->obj_order = ondisk->options.order;
813                 header->crypt_type = ondisk->options.crypt_type;
814                 header->comp_type = ondisk->options.comp_type;
815                 /* The rest aren't used for format 1 images */
816                 header->stripe_unit = 0;
817                 header->stripe_count = 0;
818                 header->features = 0;
819         } else {
820                 ceph_put_snap_context(header->snapc);
821                 kfree(header->snap_names);
822                 kfree(header->snap_sizes);
823         }
824
825         /* The remaining fields always get updated (when we refresh) */
826
827         header->image_size = le64_to_cpu(ondisk->image_size);
828         header->snapc = snapc;
829         header->snap_names = snap_names;
830         header->snap_sizes = snap_sizes;
831
832         /* Make sure mapping size is consistent with header info */
833
834         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835                 if (rbd_dev->mapping.size != header->image_size)
836                         rbd_dev->mapping.size = header->image_size;
837
838         up_write(&rbd_dev->header_rwsem);
839
840         return 0;
841 out_2big:
842         ret = -EIO;
843 out_err:
844         kfree(snap_sizes);
845         kfree(snap_names);
846         ceph_put_snap_context(snapc);
847         kfree(object_prefix);
848
849         return ret;
850 }
851
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
853 {
854         const char *snap_name;
855
856         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
857
858         /* Skip over names until we find the one we are looking for */
859
860         snap_name = rbd_dev->header.snap_names;
861         while (which--)
862                 snap_name += strlen(snap_name) + 1;
863
864         return kstrdup(snap_name, GFP_KERNEL);
865 }
866
867 /*
868  * Snapshot id comparison function for use with qsort()/bsearch().
869  * Note that result is for snapshots in *descending* order.
870  */
871 static int snapid_compare_reverse(const void *s1, const void *s2)
872 {
873         u64 snap_id1 = *(u64 *)s1;
874         u64 snap_id2 = *(u64 *)s2;
875
876         if (snap_id1 < snap_id2)
877                 return 1;
878         return snap_id1 == snap_id2 ? 0 : -1;
879 }
880
881 /*
882  * Search a snapshot context to see if the given snapshot id is
883  * present.
884  *
885  * Returns the position of the snapshot id in the array if it's found,
886  * or BAD_SNAP_INDEX otherwise.
887  *
888  * Note: The snapshot array is in kept sorted (by the osd) in
889  * reverse order, highest snapshot id first.
890  */
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
892 {
893         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894         u64 *found;
895
896         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897                                 sizeof (snap_id), snapid_compare_reverse);
898
899         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 }
901
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
903                                         u64 snap_id)
904 {
905         u32 which;
906
907         which = rbd_dev_snap_index(rbd_dev, snap_id);
908         if (which == BAD_SNAP_INDEX)
909                 return NULL;
910
911         return _rbd_dev_v1_snap_name(rbd_dev, which);
912 }
913
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
915 {
916         if (snap_id == CEPH_NOSNAP)
917                 return RBD_SNAP_HEAD_NAME;
918
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (rbd_dev->image_format == 1)
921                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
922
923         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 }
925
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927                                 u64 *snap_size)
928 {
929         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930         if (snap_id == CEPH_NOSNAP) {
931                 *snap_size = rbd_dev->header.image_size;
932         } else if (rbd_dev->image_format == 1) {
933                 u32 which;
934
935                 which = rbd_dev_snap_index(rbd_dev, snap_id);
936                 if (which == BAD_SNAP_INDEX)
937                         return -ENOENT;
938
939                 *snap_size = rbd_dev->header.snap_sizes[which];
940         } else {
941                 u64 size = 0;
942                 int ret;
943
944                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
945                 if (ret)
946                         return ret;
947
948                 *snap_size = size;
949         }
950         return 0;
951 }
952
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954                         u64 *snap_features)
955 {
956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957         if (snap_id == CEPH_NOSNAP) {
958                 *snap_features = rbd_dev->header.features;
959         } else if (rbd_dev->image_format == 1) {
960                 *snap_features = 0;     /* No features for format 1 */
961         } else {
962                 u64 features = 0;
963                 int ret;
964
965                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
966                 if (ret)
967                         return ret;
968
969                 *snap_features = features;
970         }
971         return 0;
972 }
973
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
975 {
976         u64 snap_id = rbd_dev->spec->snap_id;
977         u64 size = 0;
978         u64 features = 0;
979         int ret;
980
981         ret = rbd_snap_size(rbd_dev, snap_id, &size);
982         if (ret)
983                 return ret;
984         ret = rbd_snap_features(rbd_dev, snap_id, &features);
985         if (ret)
986                 return ret;
987
988         rbd_dev->mapping.size = size;
989         rbd_dev->mapping.features = features;
990
991         return 0;
992 }
993
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
995 {
996         rbd_dev->mapping.size = 0;
997         rbd_dev->mapping.features = 0;
998 }
999
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         char *name;
1003         u64 segment;
1004         int ret;
1005
1006         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007         if (!name)
1008                 return NULL;
1009         segment = offset >> rbd_dev->header.obj_order;
1010         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011                         rbd_dev->header.object_prefix, segment);
1012         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013                 pr_err("error formatting segment name for #%llu (%d)\n",
1014                         segment, ret);
1015                 kfree(name);
1016                 name = NULL;
1017         }
1018
1019         return name;
1020 }
1021
1022 static void rbd_segment_name_free(const char *name)
1023 {
1024         /* The explicit cast here is needed to drop the const qualifier */
1025
1026         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 }
1028
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1030 {
1031         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1032
1033         return offset & (segment_size - 1);
1034 }
1035
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037                                 u64 offset, u64 length)
1038 {
1039         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1040
1041         offset &= segment_size - 1;
1042
1043         rbd_assert(length <= U64_MAX - offset);
1044         if (offset + length > segment_size)
1045                 length = segment_size - offset;
1046
1047         return length;
1048 }
1049
1050 /*
1051  * returns the size of an object in the image
1052  */
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1054 {
1055         return 1 << header->obj_order;
1056 }
1057
1058 /*
1059  * bio helpers
1060  */
1061
1062 static void bio_chain_put(struct bio *chain)
1063 {
1064         struct bio *tmp;
1065
1066         while (chain) {
1067                 tmp = chain;
1068                 chain = chain->bi_next;
1069                 bio_put(tmp);
1070         }
1071 }
1072
1073 /*
1074  * zeros a bio chain, starting at specific offset
1075  */
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 {
1078         struct bio_vec *bv;
1079         unsigned long flags;
1080         void *buf;
1081         int i;
1082         int pos = 0;
1083
1084         while (chain) {
1085                 bio_for_each_segment(bv, chain, i) {
1086                         if (pos + bv->bv_len > start_ofs) {
1087                                 int remainder = max(start_ofs - pos, 0);
1088                                 buf = bvec_kmap_irq(bv, &flags);
1089                                 memset(buf + remainder, 0,
1090                                        bv->bv_len - remainder);
1091                                 bvec_kunmap_irq(buf, &flags);
1092                         }
1093                         pos += bv->bv_len;
1094                 }
1095
1096                 chain = chain->bi_next;
1097         }
1098 }
1099
1100 /*
1101  * similar to zero_bio_chain(), zeros data defined by a page array,
1102  * starting at the given byte offset from the start of the array and
1103  * continuing up to the given end offset.  The pages array is
1104  * assumed to be big enough to hold all bytes up to the end.
1105  */
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1107 {
1108         struct page **page = &pages[offset >> PAGE_SHIFT];
1109
1110         rbd_assert(end > offset);
1111         rbd_assert(end - offset <= (u64)SIZE_MAX);
1112         while (offset < end) {
1113                 size_t page_offset;
1114                 size_t length;
1115                 unsigned long flags;
1116                 void *kaddr;
1117
1118                 page_offset = (size_t)(offset & ~PAGE_MASK);
1119                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120                 local_irq_save(flags);
1121                 kaddr = kmap_atomic(*page);
1122                 memset(kaddr + page_offset, 0, length);
1123                 kunmap_atomic(kaddr);
1124                 local_irq_restore(flags);
1125
1126                 offset += length;
1127                 page++;
1128         }
1129 }
1130
1131 /*
1132  * Clone a portion of a bio, starting at the given byte offset
1133  * and continuing for the number of bytes indicated.
1134  */
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136                                         unsigned int offset,
1137                                         unsigned int len,
1138                                         gfp_t gfpmask)
1139 {
1140         struct bio_vec *bv;
1141         unsigned int resid;
1142         unsigned short idx;
1143         unsigned int voff;
1144         unsigned short end_idx;
1145         unsigned short vcnt;
1146         struct bio *bio;
1147
1148         /* Handle the easy case for the caller */
1149
1150         if (!offset && len == bio_src->bi_size)
1151                 return bio_clone(bio_src, gfpmask);
1152
1153         if (WARN_ON_ONCE(!len))
1154                 return NULL;
1155         if (WARN_ON_ONCE(len > bio_src->bi_size))
1156                 return NULL;
1157         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158                 return NULL;
1159
1160         /* Find first affected segment... */
1161
1162         resid = offset;
1163         __bio_for_each_segment(bv, bio_src, idx, 0) {
1164                 if (resid < bv->bv_len)
1165                         break;
1166                 resid -= bv->bv_len;
1167         }
1168         voff = resid;
1169
1170         /* ...and the last affected segment */
1171
1172         resid += len;
1173         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174                 if (resid <= bv->bv_len)
1175                         break;
1176                 resid -= bv->bv_len;
1177         }
1178         vcnt = end_idx - idx + 1;
1179
1180         /* Build the clone */
1181
1182         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1183         if (!bio)
1184                 return NULL;    /* ENOMEM */
1185
1186         bio->bi_bdev = bio_src->bi_bdev;
1187         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188         bio->bi_rw = bio_src->bi_rw;
1189         bio->bi_flags |= 1 << BIO_CLONED;
1190
1191         /*
1192          * Copy over our part of the bio_vec, then update the first
1193          * and last (or only) entries.
1194          */
1195         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196                         vcnt * sizeof (struct bio_vec));
1197         bio->bi_io_vec[0].bv_offset += voff;
1198         if (vcnt > 1) {
1199                 bio->bi_io_vec[0].bv_len -= voff;
1200                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1201         } else {
1202                 bio->bi_io_vec[0].bv_len = len;
1203         }
1204
1205         bio->bi_vcnt = vcnt;
1206         bio->bi_size = len;
1207         bio->bi_idx = 0;
1208
1209         return bio;
1210 }
1211
1212 /*
1213  * Clone a portion of a bio chain, starting at the given byte offset
1214  * into the first bio in the source chain and continuing for the
1215  * number of bytes indicated.  The result is another bio chain of
1216  * exactly the given length, or a null pointer on error.
1217  *
1218  * The bio_src and offset parameters are both in-out.  On entry they
1219  * refer to the first source bio and the offset into that bio where
1220  * the start of data to be cloned is located.
1221  *
1222  * On return, bio_src is updated to refer to the bio in the source
1223  * chain that contains first un-cloned byte, and *offset will
1224  * contain the offset of that byte within that bio.
1225  */
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227                                         unsigned int *offset,
1228                                         unsigned int len,
1229                                         gfp_t gfpmask)
1230 {
1231         struct bio *bi = *bio_src;
1232         unsigned int off = *offset;
1233         struct bio *chain = NULL;
1234         struct bio **end;
1235
1236         /* Build up a chain of clone bios up to the limit */
1237
1238         if (!bi || off >= bi->bi_size || !len)
1239                 return NULL;            /* Nothing to clone */
1240
1241         end = &chain;
1242         while (len) {
1243                 unsigned int bi_size;
1244                 struct bio *bio;
1245
1246                 if (!bi) {
1247                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248                         goto out_err;   /* EINVAL; ran out of bio's */
1249                 }
1250                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1252                 if (!bio)
1253                         goto out_err;   /* ENOMEM */
1254
1255                 *end = bio;
1256                 end = &bio->bi_next;
1257
1258                 off += bi_size;
1259                 if (off == bi->bi_size) {
1260                         bi = bi->bi_next;
1261                         off = 0;
1262                 }
1263                 len -= bi_size;
1264         }
1265         *bio_src = bi;
1266         *offset = off;
1267
1268         return chain;
1269 out_err:
1270         bio_chain_put(chain);
1271
1272         return NULL;
1273 }
1274
1275 /*
1276  * The default/initial value for all object request flags is 0.  For
1277  * each flag, once its value is set to 1 it is never reset to 0
1278  * again.
1279  */
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev;
1284
1285                 rbd_dev = obj_request->img_request->rbd_dev;
1286                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1287                         obj_request);
1288         }
1289 }
1290
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 {
1293         smp_mb();
1294         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 }
1296
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1298 {
1299         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300                 struct rbd_device *rbd_dev = NULL;
1301
1302                 if (obj_request_img_data_test(obj_request))
1303                         rbd_dev = obj_request->img_request->rbd_dev;
1304                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1305                         obj_request);
1306         }
1307 }
1308
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1313 }
1314
1315 /*
1316  * This sets the KNOWN flag after (possibly) setting the EXISTS
1317  * flag.  The latter is set based on the "exists" value provided.
1318  *
1319  * Note that for our purposes once an object exists it never goes
1320  * away again.  It's possible that the response from two existence
1321  * checks are separated by the creation of the target object, and
1322  * the first ("doesn't exist") response arrives *after* the second
1323  * ("does exist").  In that case we ignore the second one.
1324  */
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1326                                 bool exists)
1327 {
1328         if (exists)
1329                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 {
1336         smp_mb();
1337         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 }
1339
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 {
1342         smp_mb();
1343         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 }
1345
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1347 {
1348         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349                 atomic_read(&obj_request->kref.refcount));
1350         kref_get(&obj_request->kref);
1351 }
1352
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request != NULL);
1357         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358                 atomic_read(&obj_request->kref.refcount));
1359         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 }
1361
1362 static void rbd_img_request_destroy(struct kref *kref);
1363 static void rbd_img_request_put(struct rbd_img_request *img_request)
1364 {
1365         rbd_assert(img_request != NULL);
1366         dout("%s: img %p (was %d)\n", __func__, img_request,
1367                 atomic_read(&img_request->kref.refcount));
1368         kref_put(&img_request->kref, rbd_img_request_destroy);
1369 }
1370
1371 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1372                                         struct rbd_obj_request *obj_request)
1373 {
1374         rbd_assert(obj_request->img_request == NULL);
1375
1376         /* Image request now owns object's original reference */
1377         obj_request->img_request = img_request;
1378         obj_request->which = img_request->obj_request_count;
1379         rbd_assert(!obj_request_img_data_test(obj_request));
1380         obj_request_img_data_set(obj_request);
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382         img_request->obj_request_count++;
1383         list_add_tail(&obj_request->links, &img_request->obj_requests);
1384         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1385                 obj_request->which);
1386 }
1387
1388 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1389                                         struct rbd_obj_request *obj_request)
1390 {
1391         rbd_assert(obj_request->which != BAD_WHICH);
1392
1393         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1394                 obj_request->which);
1395         list_del(&obj_request->links);
1396         rbd_assert(img_request->obj_request_count > 0);
1397         img_request->obj_request_count--;
1398         rbd_assert(obj_request->which == img_request->obj_request_count);
1399         obj_request->which = BAD_WHICH;
1400         rbd_assert(obj_request_img_data_test(obj_request));
1401         rbd_assert(obj_request->img_request == img_request);
1402         obj_request->img_request = NULL;
1403         obj_request->callback = NULL;
1404         rbd_obj_request_put(obj_request);
1405 }
1406
1407 static bool obj_request_type_valid(enum obj_request_type type)
1408 {
1409         switch (type) {
1410         case OBJ_REQUEST_NODATA:
1411         case OBJ_REQUEST_BIO:
1412         case OBJ_REQUEST_PAGES:
1413                 return true;
1414         default:
1415                 return false;
1416         }
1417 }
1418
1419 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1420                                 struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1423
1424         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1425 }
1426
1427 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1428 {
1429
1430         dout("%s: img %p\n", __func__, img_request);
1431
1432         /*
1433          * If no error occurred, compute the aggregate transfer
1434          * count for the image request.  We could instead use
1435          * atomic64_cmpxchg() to update it as each object request
1436          * completes; not clear which way is better off hand.
1437          */
1438         if (!img_request->result) {
1439                 struct rbd_obj_request *obj_request;
1440                 u64 xferred = 0;
1441
1442                 for_each_obj_request(img_request, obj_request)
1443                         xferred += obj_request->xferred;
1444                 img_request->xferred = xferred;
1445         }
1446
1447         if (img_request->callback)
1448                 img_request->callback(img_request);
1449         else
1450                 rbd_img_request_put(img_request);
1451 }
1452
1453 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1454
1455 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p\n", __func__, obj_request);
1458
1459         return wait_for_completion_interruptible(&obj_request->completion);
1460 }
1461
1462 /*
1463  * The default/initial value for all image request flags is 0.  Each
1464  * is conditionally set to 1 at image request initialization time
1465  * and currently never change thereafter.
1466  */
1467 static void img_request_write_set(struct rbd_img_request *img_request)
1468 {
1469         set_bit(IMG_REQ_WRITE, &img_request->flags);
1470         smp_mb();
1471 }
1472
1473 static bool img_request_write_test(struct rbd_img_request *img_request)
1474 {
1475         smp_mb();
1476         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1477 }
1478
1479 static void img_request_child_set(struct rbd_img_request *img_request)
1480 {
1481         set_bit(IMG_REQ_CHILD, &img_request->flags);
1482         smp_mb();
1483 }
1484
1485 static bool img_request_child_test(struct rbd_img_request *img_request)
1486 {
1487         smp_mb();
1488         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1489 }
1490
1491 static void img_request_layered_set(struct rbd_img_request *img_request)
1492 {
1493         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool img_request_layered_test(struct rbd_img_request *img_request)
1498 {
1499         smp_mb();
1500         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1501 }
1502
1503 static void
1504 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1505 {
1506         u64 xferred = obj_request->xferred;
1507         u64 length = obj_request->length;
1508
1509         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1510                 obj_request, obj_request->img_request, obj_request->result,
1511                 xferred, length);
1512         /*
1513          * ENOENT means a hole in the image.  We zero-fill the
1514          * entire length of the request.  A short read also implies
1515          * zero-fill to the end of the request.  Either way we
1516          * update the xferred count to indicate the whole request
1517          * was satisfied.
1518          */
1519         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1520         if (obj_request->result == -ENOENT) {
1521                 if (obj_request->type == OBJ_REQUEST_BIO)
1522                         zero_bio_chain(obj_request->bio_list, 0);
1523                 else
1524                         zero_pages(obj_request->pages, 0, length);
1525                 obj_request->result = 0;
1526                 obj_request->xferred = length;
1527         } else if (xferred < length && !obj_request->result) {
1528                 if (obj_request->type == OBJ_REQUEST_BIO)
1529                         zero_bio_chain(obj_request->bio_list, xferred);
1530                 else
1531                         zero_pages(obj_request->pages, xferred, length);
1532                 obj_request->xferred = length;
1533         }
1534         obj_request_done_set(obj_request);
1535 }
1536
1537 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p cb %p\n", __func__, obj_request,
1540                 obj_request->callback);
1541         if (obj_request->callback)
1542                 obj_request->callback(obj_request);
1543         else
1544                 complete_all(&obj_request->completion);
1545 }
1546
1547 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = NULL;
1556         struct rbd_device *rbd_dev = NULL;
1557         bool layered = false;
1558
1559         if (obj_request_img_data_test(obj_request)) {
1560                 img_request = obj_request->img_request;
1561                 layered = img_request && img_request_layered_test(img_request);
1562                 rbd_dev = img_request->rbd_dev;
1563         }
1564
1565         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1566                 obj_request, img_request, obj_request->result,
1567                 obj_request->xferred, obj_request->length);
1568         if (layered && obj_request->result == -ENOENT &&
1569                         obj_request->img_offset < rbd_dev->parent_overlap)
1570                 rbd_img_parent_read(obj_request);
1571         else if (img_request)
1572                 rbd_img_obj_request_read_callback(obj_request);
1573         else
1574                 obj_request_done_set(obj_request);
1575 }
1576
1577 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1578 {
1579         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1580                 obj_request->result, obj_request->length);
1581         /*
1582          * There is no such thing as a successful short write.  Set
1583          * it to our originally-requested length.
1584          */
1585         obj_request->xferred = obj_request->length;
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 /*
1590  * For a simple stat call there's nothing to do.  We'll do more if
1591  * this is part of a write sequence for a layered image.
1592  */
1593 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1594 {
1595         dout("%s: obj %p\n", __func__, obj_request);
1596         obj_request_done_set(obj_request);
1597 }
1598
1599 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1600                                 struct ceph_msg *msg)
1601 {
1602         struct rbd_obj_request *obj_request = osd_req->r_priv;
1603         u16 opcode;
1604
1605         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1606         rbd_assert(osd_req == obj_request->osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 rbd_assert(obj_request->img_request);
1609                 rbd_assert(obj_request->which != BAD_WHICH);
1610         } else {
1611                 rbd_assert(obj_request->which == BAD_WHICH);
1612         }
1613
1614         if (osd_req->r_result < 0)
1615                 obj_request->result = osd_req->r_result;
1616
1617         BUG_ON(osd_req->r_num_ops > 2);
1618
1619         /*
1620          * We support a 64-bit length, but ultimately it has to be
1621          * passed to blk_end_request(), which takes an unsigned int.
1622          */
1623         obj_request->xferred = osd_req->r_reply_op_len[0];
1624         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1625         opcode = osd_req->r_ops[0].op;
1626         switch (opcode) {
1627         case CEPH_OSD_OP_READ:
1628                 rbd_osd_read_callback(obj_request);
1629                 break;
1630         case CEPH_OSD_OP_WRITE:
1631                 rbd_osd_write_callback(obj_request);
1632                 break;
1633         case CEPH_OSD_OP_STAT:
1634                 rbd_osd_stat_callback(obj_request);
1635                 break;
1636         case CEPH_OSD_OP_CALL:
1637         case CEPH_OSD_OP_NOTIFY_ACK:
1638         case CEPH_OSD_OP_WATCH:
1639                 rbd_osd_trivial_callback(obj_request);
1640                 break;
1641         default:
1642                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1643                         obj_request->object_name, (unsigned short) opcode);
1644                 break;
1645         }
1646
1647         if (obj_request_done_test(obj_request))
1648                 rbd_obj_request_complete(obj_request);
1649 }
1650
1651 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1652 {
1653         struct rbd_img_request *img_request = obj_request->img_request;
1654         struct ceph_osd_request *osd_req = obj_request->osd_req;
1655         u64 snap_id;
1656
1657         rbd_assert(osd_req != NULL);
1658
1659         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1660         ceph_osdc_build_request(osd_req, obj_request->offset,
1661                         NULL, snap_id, NULL);
1662 }
1663
1664 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1665 {
1666         struct rbd_img_request *img_request = obj_request->img_request;
1667         struct ceph_osd_request *osd_req = obj_request->osd_req;
1668         struct ceph_snap_context *snapc;
1669         struct timespec mtime = CURRENT_TIME;
1670
1671         rbd_assert(osd_req != NULL);
1672
1673         snapc = img_request ? img_request->snapc : NULL;
1674         ceph_osdc_build_request(osd_req, obj_request->offset,
1675                         snapc, CEPH_NOSNAP, &mtime);
1676 }
1677
1678 static struct ceph_osd_request *rbd_osd_req_create(
1679                                         struct rbd_device *rbd_dev,
1680                                         bool write_request,
1681                                         struct rbd_obj_request *obj_request)
1682 {
1683         struct ceph_snap_context *snapc = NULL;
1684         struct ceph_osd_client *osdc;
1685         struct ceph_osd_request *osd_req;
1686
1687         if (obj_request_img_data_test(obj_request)) {
1688                 struct rbd_img_request *img_request = obj_request->img_request;
1689
1690                 rbd_assert(write_request ==
1691                                 img_request_write_test(img_request));
1692                 if (write_request)
1693                         snapc = img_request->snapc;
1694         }
1695
1696         /* Allocate and initialize the request, for the single op */
1697
1698         osdc = &rbd_dev->rbd_client->client->osdc;
1699         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1700         if (!osd_req)
1701                 return NULL;    /* ENOMEM */
1702
1703         if (write_request)
1704                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1705         else
1706                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1707
1708         osd_req->r_callback = rbd_osd_req_callback;
1709         osd_req->r_priv = obj_request;
1710
1711         osd_req->r_oid_len = strlen(obj_request->object_name);
1712         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1713         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1714
1715         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1716
1717         return osd_req;
1718 }
1719
1720 /*
1721  * Create a copyup osd request based on the information in the
1722  * object request supplied.  A copyup request has two osd ops,
1723  * a copyup method call, and a "normal" write request.
1724  */
1725 static struct ceph_osd_request *
1726 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1727 {
1728         struct rbd_img_request *img_request;
1729         struct ceph_snap_context *snapc;
1730         struct rbd_device *rbd_dev;
1731         struct ceph_osd_client *osdc;
1732         struct ceph_osd_request *osd_req;
1733
1734         rbd_assert(obj_request_img_data_test(obj_request));
1735         img_request = obj_request->img_request;
1736         rbd_assert(img_request);
1737         rbd_assert(img_request_write_test(img_request));
1738
1739         /* Allocate and initialize the request, for the two ops */
1740
1741         snapc = img_request->snapc;
1742         rbd_dev = img_request->rbd_dev;
1743         osdc = &rbd_dev->rbd_client->client->osdc;
1744         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1745         if (!osd_req)
1746                 return NULL;    /* ENOMEM */
1747
1748         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1749         osd_req->r_callback = rbd_osd_req_callback;
1750         osd_req->r_priv = obj_request;
1751
1752         osd_req->r_oid_len = strlen(obj_request->object_name);
1753         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1754         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1755
1756         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1757
1758         return osd_req;
1759 }
1760
1761
1762 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1763 {
1764         ceph_osdc_put_request(osd_req);
1765 }
1766
1767 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1768
1769 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1770                                                 u64 offset, u64 length,
1771                                                 enum obj_request_type type)
1772 {
1773         struct rbd_obj_request *obj_request;
1774         size_t size;
1775         char *name;
1776
1777         rbd_assert(obj_request_type_valid(type));
1778
1779         size = strlen(object_name) + 1;
1780         name = kmalloc(size, GFP_KERNEL);
1781         if (!name)
1782                 return NULL;
1783
1784         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1785         if (!obj_request) {
1786                 kfree(name);
1787                 return NULL;
1788         }
1789
1790         obj_request->object_name = memcpy(name, object_name, size);
1791         obj_request->offset = offset;
1792         obj_request->length = length;
1793         obj_request->flags = 0;
1794         obj_request->which = BAD_WHICH;
1795         obj_request->type = type;
1796         INIT_LIST_HEAD(&obj_request->links);
1797         init_completion(&obj_request->completion);
1798         kref_init(&obj_request->kref);
1799
1800         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1801                 offset, length, (int)type, obj_request);
1802
1803         return obj_request;
1804 }
1805
1806 static void rbd_obj_request_destroy(struct kref *kref)
1807 {
1808         struct rbd_obj_request *obj_request;
1809
1810         obj_request = container_of(kref, struct rbd_obj_request, kref);
1811
1812         dout("%s: obj %p\n", __func__, obj_request);
1813
1814         rbd_assert(obj_request->img_request == NULL);
1815         rbd_assert(obj_request->which == BAD_WHICH);
1816
1817         if (obj_request->osd_req)
1818                 rbd_osd_req_destroy(obj_request->osd_req);
1819
1820         rbd_assert(obj_request_type_valid(obj_request->type));
1821         switch (obj_request->type) {
1822         case OBJ_REQUEST_NODATA:
1823                 break;          /* Nothing to do */
1824         case OBJ_REQUEST_BIO:
1825                 if (obj_request->bio_list)
1826                         bio_chain_put(obj_request->bio_list);
1827                 break;
1828         case OBJ_REQUEST_PAGES:
1829                 if (obj_request->pages)
1830                         ceph_release_page_vector(obj_request->pages,
1831                                                 obj_request->page_count);
1832                 break;
1833         }
1834
1835         kfree(obj_request->object_name);
1836         obj_request->object_name = NULL;
1837         kmem_cache_free(rbd_obj_request_cache, obj_request);
1838 }
1839
1840 /*
1841  * Caller is responsible for filling in the list of object requests
1842  * that comprises the image request, and the Linux request pointer
1843  * (if there is one).
1844  */
1845 static struct rbd_img_request *rbd_img_request_create(
1846                                         struct rbd_device *rbd_dev,
1847                                         u64 offset, u64 length,
1848                                         bool write_request,
1849                                         bool child_request)
1850 {
1851         struct rbd_img_request *img_request;
1852
1853         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1854         if (!img_request)
1855                 return NULL;
1856
1857         if (write_request) {
1858                 down_read(&rbd_dev->header_rwsem);
1859                 ceph_get_snap_context(rbd_dev->header.snapc);
1860                 up_read(&rbd_dev->header_rwsem);
1861         }
1862
1863         img_request->rq = NULL;
1864         img_request->rbd_dev = rbd_dev;
1865         img_request->offset = offset;
1866         img_request->length = length;
1867         img_request->flags = 0;
1868         if (write_request) {
1869                 img_request_write_set(img_request);
1870                 img_request->snapc = rbd_dev->header.snapc;
1871         } else {
1872                 img_request->snap_id = rbd_dev->spec->snap_id;
1873         }
1874         if (child_request)
1875                 img_request_child_set(img_request);
1876         if (rbd_dev->parent_spec)
1877                 img_request_layered_set(img_request);
1878         spin_lock_init(&img_request->completion_lock);
1879         img_request->next_completion = 0;
1880         img_request->callback = NULL;
1881         img_request->result = 0;
1882         img_request->obj_request_count = 0;
1883         INIT_LIST_HEAD(&img_request->obj_requests);
1884         kref_init(&img_request->kref);
1885
1886         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1887                 write_request ? "write" : "read", offset, length,
1888                 img_request);
1889
1890         return img_request;
1891 }
1892
1893 static void rbd_img_request_destroy(struct kref *kref)
1894 {
1895         struct rbd_img_request *img_request;
1896         struct rbd_obj_request *obj_request;
1897         struct rbd_obj_request *next_obj_request;
1898
1899         img_request = container_of(kref, struct rbd_img_request, kref);
1900
1901         dout("%s: img %p\n", __func__, img_request);
1902
1903         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1904                 rbd_img_obj_request_del(img_request, obj_request);
1905         rbd_assert(img_request->obj_request_count == 0);
1906
1907         if (img_request_write_test(img_request))
1908                 ceph_put_snap_context(img_request->snapc);
1909
1910         if (img_request_child_test(img_request))
1911                 rbd_obj_request_put(img_request->obj_request);
1912
1913         kmem_cache_free(rbd_img_request_cache, img_request);
1914 }
1915
1916 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1917 {
1918         struct rbd_img_request *img_request;
1919         unsigned int xferred;
1920         int result;
1921         bool more;
1922
1923         rbd_assert(obj_request_img_data_test(obj_request));
1924         img_request = obj_request->img_request;
1925
1926         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1927         xferred = (unsigned int)obj_request->xferred;
1928         result = obj_request->result;
1929         if (result) {
1930                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1931
1932                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1933                         img_request_write_test(img_request) ? "write" : "read",
1934                         obj_request->length, obj_request->img_offset,
1935                         obj_request->offset);
1936                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1937                         result, xferred);
1938                 if (!img_request->result)
1939                         img_request->result = result;
1940         }
1941
1942         /* Image object requests don't own their page array */
1943
1944         if (obj_request->type == OBJ_REQUEST_PAGES) {
1945                 obj_request->pages = NULL;
1946                 obj_request->page_count = 0;
1947         }
1948
1949         if (img_request_child_test(img_request)) {
1950                 rbd_assert(img_request->obj_request != NULL);
1951                 more = obj_request->which < img_request->obj_request_count - 1;
1952         } else {
1953                 rbd_assert(img_request->rq != NULL);
1954                 more = blk_end_request(img_request->rq, result, xferred);
1955         }
1956
1957         return more;
1958 }
1959
1960 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1961 {
1962         struct rbd_img_request *img_request;
1963         u32 which = obj_request->which;
1964         bool more = true;
1965
1966         rbd_assert(obj_request_img_data_test(obj_request));
1967         img_request = obj_request->img_request;
1968
1969         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1970         rbd_assert(img_request != NULL);
1971         rbd_assert(img_request->obj_request_count > 0);
1972         rbd_assert(which != BAD_WHICH);
1973         rbd_assert(which < img_request->obj_request_count);
1974         rbd_assert(which >= img_request->next_completion);
1975
1976         spin_lock_irq(&img_request->completion_lock);
1977         if (which != img_request->next_completion)
1978                 goto out;
1979
1980         for_each_obj_request_from(img_request, obj_request) {
1981                 rbd_assert(more);
1982                 rbd_assert(which < img_request->obj_request_count);
1983
1984                 if (!obj_request_done_test(obj_request))
1985                         break;
1986                 more = rbd_img_obj_end_request(obj_request);
1987                 which++;
1988         }
1989
1990         rbd_assert(more ^ (which == img_request->obj_request_count));
1991         img_request->next_completion = which;
1992 out:
1993         spin_unlock_irq(&img_request->completion_lock);
1994
1995         if (!more)
1996                 rbd_img_request_complete(img_request);
1997 }
1998
1999 /*
2000  * Split up an image request into one or more object requests, each
2001  * to a different object.  The "type" parameter indicates whether
2002  * "data_desc" is the pointer to the head of a list of bio
2003  * structures, or the base of a page array.  In either case this
2004  * function assumes data_desc describes memory sufficient to hold
2005  * all data described by the image request.
2006  */
2007 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2008                                         enum obj_request_type type,
2009                                         void *data_desc)
2010 {
2011         struct rbd_device *rbd_dev = img_request->rbd_dev;
2012         struct rbd_obj_request *obj_request = NULL;
2013         struct rbd_obj_request *next_obj_request;
2014         bool write_request = img_request_write_test(img_request);
2015         struct bio *bio_list;
2016         unsigned int bio_offset = 0;
2017         struct page **pages;
2018         u64 img_offset;
2019         u64 resid;
2020         u16 opcode;
2021
2022         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2023                 (int)type, data_desc);
2024
2025         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2026         img_offset = img_request->offset;
2027         resid = img_request->length;
2028         rbd_assert(resid > 0);
2029
2030         if (type == OBJ_REQUEST_BIO) {
2031                 bio_list = data_desc;
2032                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2033         } else {
2034                 rbd_assert(type == OBJ_REQUEST_PAGES);
2035                 pages = data_desc;
2036         }
2037
2038         while (resid) {
2039                 struct ceph_osd_request *osd_req;
2040                 const char *object_name;
2041                 u64 offset;
2042                 u64 length;
2043
2044                 object_name = rbd_segment_name(rbd_dev, img_offset);
2045                 if (!object_name)
2046                         goto out_unwind;
2047                 offset = rbd_segment_offset(rbd_dev, img_offset);
2048                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2049                 obj_request = rbd_obj_request_create(object_name,
2050                                                 offset, length, type);
2051                 /* object request has its own copy of the object name */
2052                 rbd_segment_name_free(object_name);
2053                 if (!obj_request)
2054                         goto out_unwind;
2055
2056                 if (type == OBJ_REQUEST_BIO) {
2057                         unsigned int clone_size;
2058
2059                         rbd_assert(length <= (u64)UINT_MAX);
2060                         clone_size = (unsigned int)length;
2061                         obj_request->bio_list =
2062                                         bio_chain_clone_range(&bio_list,
2063                                                                 &bio_offset,
2064                                                                 clone_size,
2065                                                                 GFP_ATOMIC);
2066                         if (!obj_request->bio_list)
2067                                 goto out_partial;
2068                 } else {
2069                         unsigned int page_count;
2070
2071                         obj_request->pages = pages;
2072                         page_count = (u32)calc_pages_for(offset, length);
2073                         obj_request->page_count = page_count;
2074                         if ((offset + length) & ~PAGE_MASK)
2075                                 page_count--;   /* more on last page */
2076                         pages += page_count;
2077                 }
2078
2079                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2080                                                 obj_request);
2081                 if (!osd_req)
2082                         goto out_partial;
2083                 obj_request->osd_req = osd_req;
2084                 obj_request->callback = rbd_img_obj_callback;
2085
2086                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2087                                                 0, 0);
2088                 if (type == OBJ_REQUEST_BIO)
2089                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2090                                         obj_request->bio_list, length);
2091                 else
2092                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2093                                         obj_request->pages, length,
2094                                         offset & ~PAGE_MASK, false, false);
2095
2096                 if (write_request)
2097                         rbd_osd_req_format_write(obj_request);
2098                 else
2099                         rbd_osd_req_format_read(obj_request);
2100
2101                 obj_request->img_offset = img_offset;
2102                 rbd_img_obj_request_add(img_request, obj_request);
2103
2104                 img_offset += length;
2105                 resid -= length;
2106         }
2107
2108         return 0;
2109
2110 out_partial:
2111         rbd_obj_request_put(obj_request);
2112 out_unwind:
2113         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2114                 rbd_obj_request_put(obj_request);
2115
2116         return -ENOMEM;
2117 }
2118
2119 static void
2120 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2121 {
2122         struct rbd_img_request *img_request;
2123         struct rbd_device *rbd_dev;
2124         struct page **pages;
2125         u32 page_count;
2126
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128         rbd_assert(obj_request_img_data_test(obj_request));
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request);
2131
2132         rbd_dev = img_request->rbd_dev;
2133         rbd_assert(rbd_dev);
2134
2135         pages = obj_request->copyup_pages;
2136         rbd_assert(pages != NULL);
2137         obj_request->copyup_pages = NULL;
2138         page_count = obj_request->copyup_page_count;
2139         rbd_assert(page_count);
2140         obj_request->copyup_page_count = 0;
2141         ceph_release_page_vector(pages, page_count);
2142
2143         /*
2144          * We want the transfer count to reflect the size of the
2145          * original write request.  There is no such thing as a
2146          * successful short write, so if the request was successful
2147          * we can just set it to the originally-requested length.
2148          */
2149         if (!obj_request->result)
2150                 obj_request->xferred = obj_request->length;
2151
2152         /* Finish up with the normal image object callback */
2153
2154         rbd_img_obj_callback(obj_request);
2155 }
2156
2157 static void
2158 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2159 {
2160         struct rbd_obj_request *orig_request;
2161         struct ceph_osd_request *osd_req;
2162         struct ceph_osd_client *osdc;
2163         struct rbd_device *rbd_dev;
2164         struct page **pages;
2165         u32 page_count;
2166         int result;
2167         u64 parent_length;
2168         u64 offset;
2169         u64 length;
2170
2171         rbd_assert(img_request_child_test(img_request));
2172
2173         /* First get what we need from the image request */
2174
2175         pages = img_request->copyup_pages;
2176         rbd_assert(pages != NULL);
2177         img_request->copyup_pages = NULL;
2178         page_count = img_request->copyup_page_count;
2179         rbd_assert(page_count);
2180         img_request->copyup_page_count = 0;
2181
2182         orig_request = img_request->obj_request;
2183         rbd_assert(orig_request != NULL);
2184         rbd_assert(obj_request_type_valid(orig_request->type));
2185         result = img_request->result;
2186         parent_length = img_request->length;
2187         rbd_assert(parent_length == img_request->xferred);
2188         rbd_img_request_put(img_request);
2189
2190         rbd_assert(orig_request->img_request);
2191         rbd_dev = orig_request->img_request->rbd_dev;
2192         rbd_assert(rbd_dev);
2193
2194         if (result)
2195                 goto out_err;
2196
2197         /* Allocate the new copyup osd request for the original request */
2198
2199         result = -ENOMEM;
2200         rbd_assert(!orig_request->osd_req);
2201         osd_req = rbd_osd_req_create_copyup(orig_request);
2202         if (!osd_req)
2203                 goto out_err;
2204         orig_request->osd_req = osd_req;
2205         orig_request->copyup_pages = pages;
2206         orig_request->copyup_page_count = page_count;
2207
2208         /* Initialize the copyup op */
2209
2210         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2211         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2212                                                 false, false);
2213
2214         /* Then the original write request op */
2215
2216         offset = orig_request->offset;
2217         length = orig_request->length;
2218         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2219                                         offset, length, 0, 0);
2220         if (orig_request->type == OBJ_REQUEST_BIO)
2221                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2222                                         orig_request->bio_list, length);
2223         else
2224                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2225                                         orig_request->pages, length,
2226                                         offset & ~PAGE_MASK, false, false);
2227
2228         rbd_osd_req_format_write(orig_request);
2229
2230         /* All set, send it off. */
2231
2232         orig_request->callback = rbd_img_obj_copyup_callback;
2233         osdc = &rbd_dev->rbd_client->client->osdc;
2234         result = rbd_obj_request_submit(osdc, orig_request);
2235         if (!result)
2236                 return;
2237 out_err:
2238         /* Record the error code and complete the request */
2239
2240         orig_request->result = result;
2241         orig_request->xferred = 0;
2242         obj_request_done_set(orig_request);
2243         rbd_obj_request_complete(orig_request);
2244 }
2245
2246 /*
2247  * Read from the parent image the range of data that covers the
2248  * entire target of the given object request.  This is used for
2249  * satisfying a layered image write request when the target of an
2250  * object request from the image request does not exist.
2251  *
2252  * A page array big enough to hold the returned data is allocated
2253  * and supplied to rbd_img_request_fill() as the "data descriptor."
2254  * When the read completes, this page array will be transferred to
2255  * the original object request for the copyup operation.
2256  *
2257  * If an error occurs, record it as the result of the original
2258  * object request and mark it done so it gets completed.
2259  */
2260 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2261 {
2262         struct rbd_img_request *img_request = NULL;
2263         struct rbd_img_request *parent_request = NULL;
2264         struct rbd_device *rbd_dev;
2265         u64 img_offset;
2266         u64 length;
2267         struct page **pages = NULL;
2268         u32 page_count;
2269         int result;
2270
2271         rbd_assert(obj_request_img_data_test(obj_request));
2272         rbd_assert(obj_request_type_valid(obj_request->type));
2273
2274         img_request = obj_request->img_request;
2275         rbd_assert(img_request != NULL);
2276         rbd_dev = img_request->rbd_dev;
2277         rbd_assert(rbd_dev->parent != NULL);
2278
2279         /*
2280          * First things first.  The original osd request is of no
2281          * use to use any more, we'll need a new one that can hold
2282          * the two ops in a copyup request.  We'll get that later,
2283          * but for now we can release the old one.
2284          */
2285         rbd_osd_req_destroy(obj_request->osd_req);
2286         obj_request->osd_req = NULL;
2287
2288         /*
2289          * Determine the byte range covered by the object in the
2290          * child image to which the original request was to be sent.
2291          */
2292         img_offset = obj_request->img_offset - obj_request->offset;
2293         length = (u64)1 << rbd_dev->header.obj_order;
2294
2295         /*
2296          * There is no defined parent data beyond the parent
2297          * overlap, so limit what we read at that boundary if
2298          * necessary.
2299          */
2300         if (img_offset + length > rbd_dev->parent_overlap) {
2301                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2302                 length = rbd_dev->parent_overlap - img_offset;
2303         }
2304
2305         /*
2306          * Allocate a page array big enough to receive the data read
2307          * from the parent.
2308          */
2309         page_count = (u32)calc_pages_for(0, length);
2310         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2311         if (IS_ERR(pages)) {
2312                 result = PTR_ERR(pages);
2313                 pages = NULL;
2314                 goto out_err;
2315         }
2316
2317         result = -ENOMEM;
2318         parent_request = rbd_img_request_create(rbd_dev->parent,
2319                                                 img_offset, length,
2320                                                 false, true);
2321         if (!parent_request)
2322                 goto out_err;
2323         rbd_obj_request_get(obj_request);
2324         parent_request->obj_request = obj_request;
2325
2326         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2327         if (result)
2328                 goto out_err;
2329         parent_request->copyup_pages = pages;
2330         parent_request->copyup_page_count = page_count;
2331
2332         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2333         result = rbd_img_request_submit(parent_request);
2334         if (!result)
2335                 return 0;
2336
2337         parent_request->copyup_pages = NULL;
2338         parent_request->copyup_page_count = 0;
2339         parent_request->obj_request = NULL;
2340         rbd_obj_request_put(obj_request);
2341 out_err:
2342         if (pages)
2343                 ceph_release_page_vector(pages, page_count);
2344         if (parent_request)
2345                 rbd_img_request_put(parent_request);
2346         obj_request->result = result;
2347         obj_request->xferred = 0;
2348         obj_request_done_set(obj_request);
2349
2350         return result;
2351 }
2352
2353 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2354 {
2355         struct rbd_obj_request *orig_request;
2356         int result;
2357
2358         rbd_assert(!obj_request_img_data_test(obj_request));
2359
2360         /*
2361          * All we need from the object request is the original
2362          * request and the result of the STAT op.  Grab those, then
2363          * we're done with the request.
2364          */
2365         orig_request = obj_request->obj_request;
2366         obj_request->obj_request = NULL;
2367         rbd_assert(orig_request);
2368         rbd_assert(orig_request->img_request);
2369
2370         result = obj_request->result;
2371         obj_request->result = 0;
2372
2373         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2374                 obj_request, orig_request, result,
2375                 obj_request->xferred, obj_request->length);
2376         rbd_obj_request_put(obj_request);
2377
2378         rbd_assert(orig_request);
2379         rbd_assert(orig_request->img_request);
2380
2381         /*
2382          * Our only purpose here is to determine whether the object
2383          * exists, and we don't want to treat the non-existence as
2384          * an error.  If something else comes back, transfer the
2385          * error to the original request and complete it now.
2386          */
2387         if (!result) {
2388                 obj_request_existence_set(orig_request, true);
2389         } else if (result == -ENOENT) {
2390                 obj_request_existence_set(orig_request, false);
2391         } else if (result) {
2392                 orig_request->result = result;
2393                 goto out;
2394         }
2395
2396         /*
2397          * Resubmit the original request now that we have recorded
2398          * whether the target object exists.
2399          */
2400         orig_request->result = rbd_img_obj_request_submit(orig_request);
2401 out:
2402         if (orig_request->result)
2403                 rbd_obj_request_complete(orig_request);
2404         rbd_obj_request_put(orig_request);
2405 }
2406
2407 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2408 {
2409         struct rbd_obj_request *stat_request;
2410         struct rbd_device *rbd_dev;
2411         struct ceph_osd_client *osdc;
2412         struct page **pages = NULL;
2413         u32 page_count;
2414         size_t size;
2415         int ret;
2416
2417         /*
2418          * The response data for a STAT call consists of:
2419          *     le64 length;
2420          *     struct {
2421          *         le32 tv_sec;
2422          *         le32 tv_nsec;
2423          *     } mtime;
2424          */
2425         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2426         page_count = (u32)calc_pages_for(0, size);
2427         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2428         if (IS_ERR(pages))
2429                 return PTR_ERR(pages);
2430
2431         ret = -ENOMEM;
2432         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2433                                                         OBJ_REQUEST_PAGES);
2434         if (!stat_request)
2435                 goto out;
2436
2437         rbd_obj_request_get(obj_request);
2438         stat_request->obj_request = obj_request;
2439         stat_request->pages = pages;
2440         stat_request->page_count = page_count;
2441
2442         rbd_assert(obj_request->img_request);
2443         rbd_dev = obj_request->img_request->rbd_dev;
2444         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2445                                                 stat_request);
2446         if (!stat_request->osd_req)
2447                 goto out;
2448         stat_request->callback = rbd_img_obj_exists_callback;
2449
2450         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2451         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2452                                         false, false);
2453         rbd_osd_req_format_read(stat_request);
2454
2455         osdc = &rbd_dev->rbd_client->client->osdc;
2456         ret = rbd_obj_request_submit(osdc, stat_request);
2457 out:
2458         if (ret)
2459                 rbd_obj_request_put(obj_request);
2460
2461         return ret;
2462 }
2463
2464 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2465 {
2466         struct rbd_img_request *img_request;
2467         struct rbd_device *rbd_dev;
2468         bool known;
2469
2470         rbd_assert(obj_request_img_data_test(obj_request));
2471
2472         img_request = obj_request->img_request;
2473         rbd_assert(img_request);
2474         rbd_dev = img_request->rbd_dev;
2475
2476         /*
2477          * Only writes to layered images need special handling.
2478          * Reads and non-layered writes are simple object requests.
2479          * Layered writes that start beyond the end of the overlap
2480          * with the parent have no parent data, so they too are
2481          * simple object requests.  Finally, if the target object is
2482          * known to already exist, its parent data has already been
2483          * copied, so a write to the object can also be handled as a
2484          * simple object request.
2485          */
2486         if (!img_request_write_test(img_request) ||
2487                 !img_request_layered_test(img_request) ||
2488                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2489                 ((known = obj_request_known_test(obj_request)) &&
2490                         obj_request_exists_test(obj_request))) {
2491
2492                 struct rbd_device *rbd_dev;
2493                 struct ceph_osd_client *osdc;
2494
2495                 rbd_dev = obj_request->img_request->rbd_dev;
2496                 osdc = &rbd_dev->rbd_client->client->osdc;
2497
2498                 return rbd_obj_request_submit(osdc, obj_request);
2499         }
2500
2501         /*
2502          * It's a layered write.  The target object might exist but
2503          * we may not know that yet.  If we know it doesn't exist,
2504          * start by reading the data for the full target object from
2505          * the parent so we can use it for a copyup to the target.
2506          */
2507         if (known)
2508                 return rbd_img_obj_parent_read_full(obj_request);
2509
2510         /* We don't know whether the target exists.  Go find out. */
2511
2512         return rbd_img_obj_exists_submit(obj_request);
2513 }
2514
2515 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2516 {
2517         struct rbd_obj_request *obj_request;
2518         struct rbd_obj_request *next_obj_request;
2519
2520         dout("%s: img %p\n", __func__, img_request);
2521         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2522                 int ret;
2523
2524                 ret = rbd_img_obj_request_submit(obj_request);
2525                 if (ret)
2526                         return ret;
2527         }
2528
2529         return 0;
2530 }
2531
2532 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2533 {
2534         struct rbd_obj_request *obj_request;
2535         struct rbd_device *rbd_dev;
2536         u64 obj_end;
2537
2538         rbd_assert(img_request_child_test(img_request));
2539
2540         obj_request = img_request->obj_request;
2541         rbd_assert(obj_request);
2542         rbd_assert(obj_request->img_request);
2543
2544         obj_request->result = img_request->result;
2545         if (obj_request->result)
2546                 goto out;
2547
2548         /*
2549          * We need to zero anything beyond the parent overlap
2550          * boundary.  Since rbd_img_obj_request_read_callback()
2551          * will zero anything beyond the end of a short read, an
2552          * easy way to do this is to pretend the data from the
2553          * parent came up short--ending at the overlap boundary.
2554          */
2555         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2556         obj_end = obj_request->img_offset + obj_request->length;
2557         rbd_dev = obj_request->img_request->rbd_dev;
2558         if (obj_end > rbd_dev->parent_overlap) {
2559                 u64 xferred = 0;
2560
2561                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2562                         xferred = rbd_dev->parent_overlap -
2563                                         obj_request->img_offset;
2564
2565                 obj_request->xferred = min(img_request->xferred, xferred);
2566         } else {
2567                 obj_request->xferred = img_request->xferred;
2568         }
2569 out:
2570         rbd_img_request_put(img_request);
2571         rbd_img_obj_request_read_callback(obj_request);
2572         rbd_obj_request_complete(obj_request);
2573 }
2574
2575 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2576 {
2577         struct rbd_device *rbd_dev;
2578         struct rbd_img_request *img_request;
2579         int result;
2580
2581         rbd_assert(obj_request_img_data_test(obj_request));
2582         rbd_assert(obj_request->img_request != NULL);
2583         rbd_assert(obj_request->result == (s32) -ENOENT);
2584         rbd_assert(obj_request_type_valid(obj_request->type));
2585
2586         rbd_dev = obj_request->img_request->rbd_dev;
2587         rbd_assert(rbd_dev->parent != NULL);
2588         /* rbd_read_finish(obj_request, obj_request->length); */
2589         img_request = rbd_img_request_create(rbd_dev->parent,
2590                                                 obj_request->img_offset,
2591                                                 obj_request->length,
2592                                                 false, true);
2593         result = -ENOMEM;
2594         if (!img_request)
2595                 goto out_err;
2596
2597         rbd_obj_request_get(obj_request);
2598         img_request->obj_request = obj_request;
2599
2600         if (obj_request->type == OBJ_REQUEST_BIO)
2601                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2602                                                 obj_request->bio_list);
2603         else
2604                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2605                                                 obj_request->pages);
2606         if (result)
2607                 goto out_err;
2608
2609         img_request->callback = rbd_img_parent_read_callback;
2610         result = rbd_img_request_submit(img_request);
2611         if (result)
2612                 goto out_err;
2613
2614         return;
2615 out_err:
2616         if (img_request)
2617                 rbd_img_request_put(img_request);
2618         obj_request->result = result;
2619         obj_request->xferred = 0;
2620         obj_request_done_set(obj_request);
2621 }
2622
2623 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2624 {
2625         struct rbd_obj_request *obj_request;
2626         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2627         int ret;
2628
2629         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2630                                                         OBJ_REQUEST_NODATA);
2631         if (!obj_request)
2632                 return -ENOMEM;
2633
2634         ret = -ENOMEM;
2635         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2636         if (!obj_request->osd_req)
2637                 goto out;
2638         obj_request->callback = rbd_obj_request_put;
2639
2640         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2641                                         notify_id, 0, 0);
2642         rbd_osd_req_format_read(obj_request);
2643
2644         ret = rbd_obj_request_submit(osdc, obj_request);
2645 out:
2646         if (ret)
2647                 rbd_obj_request_put(obj_request);
2648
2649         return ret;
2650 }
2651
2652 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2653 {
2654         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2655         int ret;
2656
2657         if (!rbd_dev)
2658                 return;
2659
2660         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2661                 rbd_dev->header_name, (unsigned long long)notify_id,
2662                 (unsigned int)opcode);
2663         ret = rbd_dev_refresh(rbd_dev);
2664         if (ret)
2665                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2666
2667         rbd_obj_notify_ack(rbd_dev, notify_id);
2668 }
2669
2670 /*
2671  * Request sync osd watch/unwatch.  The value of "start" determines
2672  * whether a watch request is being initiated or torn down.
2673  */
2674 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2675 {
2676         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2677         struct rbd_obj_request *obj_request;
2678         int ret;
2679
2680         rbd_assert(start ^ !!rbd_dev->watch_event);
2681         rbd_assert(start ^ !!rbd_dev->watch_request);
2682
2683         if (start) {
2684                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2685                                                 &rbd_dev->watch_event);
2686                 if (ret < 0)
2687                         return ret;
2688                 rbd_assert(rbd_dev->watch_event != NULL);
2689         }
2690
2691         ret = -ENOMEM;
2692         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2693                                                         OBJ_REQUEST_NODATA);
2694         if (!obj_request)
2695                 goto out_cancel;
2696
2697         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2698         if (!obj_request->osd_req)
2699                 goto out_cancel;
2700
2701         if (start)
2702                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2703         else
2704                 ceph_osdc_unregister_linger_request(osdc,
2705                                         rbd_dev->watch_request->osd_req);
2706
2707         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2708                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2709         rbd_osd_req_format_write(obj_request);
2710
2711         ret = rbd_obj_request_submit(osdc, obj_request);
2712         if (ret)
2713                 goto out_cancel;
2714         ret = rbd_obj_request_wait(obj_request);
2715         if (ret)
2716                 goto out_cancel;
2717         ret = obj_request->result;
2718         if (ret)
2719                 goto out_cancel;
2720
2721         /*
2722          * A watch request is set to linger, so the underlying osd
2723          * request won't go away until we unregister it.  We retain
2724          * a pointer to the object request during that time (in
2725          * rbd_dev->watch_request), so we'll keep a reference to
2726          * it.  We'll drop that reference (below) after we've
2727          * unregistered it.
2728          */
2729         if (start) {
2730                 rbd_dev->watch_request = obj_request;
2731
2732                 return 0;
2733         }
2734
2735         /* We have successfully torn down the watch request */
2736
2737         rbd_obj_request_put(rbd_dev->watch_request);
2738         rbd_dev->watch_request = NULL;
2739 out_cancel:
2740         /* Cancel the event if we're tearing down, or on error */
2741         ceph_osdc_cancel_event(rbd_dev->watch_event);
2742         rbd_dev->watch_event = NULL;
2743         if (obj_request)
2744                 rbd_obj_request_put(obj_request);
2745
2746         return ret;
2747 }
2748
2749 /*
2750  * Synchronous osd object method call.  Returns the number of bytes
2751  * returned in the outbound buffer, or a negative error code.
2752  */
2753 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2754                              const char *object_name,
2755                              const char *class_name,
2756                              const char *method_name,
2757                              const void *outbound,
2758                              size_t outbound_size,
2759                              void *inbound,
2760                              size_t inbound_size)
2761 {
2762         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2763         struct rbd_obj_request *obj_request;
2764         struct page **pages;
2765         u32 page_count;
2766         int ret;
2767
2768         /*
2769          * Method calls are ultimately read operations.  The result
2770          * should placed into the inbound buffer provided.  They
2771          * also supply outbound data--parameters for the object
2772          * method.  Currently if this is present it will be a
2773          * snapshot id.
2774          */
2775         page_count = (u32)calc_pages_for(0, inbound_size);
2776         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2777         if (IS_ERR(pages))
2778                 return PTR_ERR(pages);
2779
2780         ret = -ENOMEM;
2781         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2782                                                         OBJ_REQUEST_PAGES);
2783         if (!obj_request)
2784                 goto out;
2785
2786         obj_request->pages = pages;
2787         obj_request->page_count = page_count;
2788
2789         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2790         if (!obj_request->osd_req)
2791                 goto out;
2792
2793         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2794                                         class_name, method_name);
2795         if (outbound_size) {
2796                 struct ceph_pagelist *pagelist;
2797
2798                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2799                 if (!pagelist)
2800                         goto out;
2801
2802                 ceph_pagelist_init(pagelist);
2803                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2804                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2805                                                 pagelist);
2806         }
2807         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2808                                         obj_request->pages, inbound_size,
2809                                         0, false, false);
2810         rbd_osd_req_format_read(obj_request);
2811
2812         ret = rbd_obj_request_submit(osdc, obj_request);
2813         if (ret)
2814                 goto out;
2815         ret = rbd_obj_request_wait(obj_request);
2816         if (ret)
2817                 goto out;
2818
2819         ret = obj_request->result;
2820         if (ret < 0)
2821                 goto out;
2822
2823         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2824         ret = (int)obj_request->xferred;
2825         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2826 out:
2827         if (obj_request)
2828                 rbd_obj_request_put(obj_request);
2829         else
2830                 ceph_release_page_vector(pages, page_count);
2831
2832         return ret;
2833 }
2834
2835 static void rbd_request_fn(struct request_queue *q)
2836                 __releases(q->queue_lock) __acquires(q->queue_lock)
2837 {
2838         struct rbd_device *rbd_dev = q->queuedata;
2839         bool read_only = rbd_dev->mapping.read_only;
2840         struct request *rq;
2841         int result;
2842
2843         while ((rq = blk_fetch_request(q))) {
2844                 bool write_request = rq_data_dir(rq) == WRITE;
2845                 struct rbd_img_request *img_request;
2846                 u64 offset;
2847                 u64 length;
2848
2849                 /* Ignore any non-FS requests that filter through. */
2850
2851                 if (rq->cmd_type != REQ_TYPE_FS) {
2852                         dout("%s: non-fs request type %d\n", __func__,
2853                                 (int) rq->cmd_type);
2854                         __blk_end_request_all(rq, 0);
2855                         continue;
2856                 }
2857
2858                 /* Ignore/skip any zero-length requests */
2859
2860                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2861                 length = (u64) blk_rq_bytes(rq);
2862
2863                 if (!length) {
2864                         dout("%s: zero-length request\n", __func__);
2865                         __blk_end_request_all(rq, 0);
2866                         continue;
2867                 }
2868
2869                 spin_unlock_irq(q->queue_lock);
2870
2871                 /* Disallow writes to a read-only device */
2872
2873                 if (write_request) {
2874                         result = -EROFS;
2875                         if (read_only)
2876                                 goto end_request;
2877                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2878                 }
2879
2880                 /*
2881                  * Quit early if the mapped snapshot no longer
2882                  * exists.  It's still possible the snapshot will
2883                  * have disappeared by the time our request arrives
2884                  * at the osd, but there's no sense in sending it if
2885                  * we already know.
2886                  */
2887                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2888                         dout("request for non-existent snapshot");
2889                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2890                         result = -ENXIO;
2891                         goto end_request;
2892                 }
2893
2894                 result = -EINVAL;
2895                 if (offset && length > U64_MAX - offset + 1) {
2896                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2897                                 offset, length);
2898                         goto end_request;       /* Shouldn't happen */
2899                 }
2900
2901                 result = -EIO;
2902                 if (offset + length > rbd_dev->mapping.size) {
2903                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2904                                 offset, length, rbd_dev->mapping.size);
2905                         goto end_request;
2906                 }
2907
2908                 result = -ENOMEM;
2909                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2910                                                         write_request, false);
2911                 if (!img_request)
2912                         goto end_request;
2913
2914                 img_request->rq = rq;
2915
2916                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2917                                                 rq->bio);
2918                 if (!result)
2919                         result = rbd_img_request_submit(img_request);
2920                 if (result)
2921                         rbd_img_request_put(img_request);
2922 end_request:
2923                 spin_lock_irq(q->queue_lock);
2924                 if (result < 0) {
2925                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2926                                 write_request ? "write" : "read",
2927                                 length, offset, result);
2928
2929                         __blk_end_request_all(rq, result);
2930                 }
2931         }
2932 }
2933
2934 /*
2935  * a queue callback. Makes sure that we don't create a bio that spans across
2936  * multiple osd objects. One exception would be with a single page bios,
2937  * which we handle later at bio_chain_clone_range()
2938  */
2939 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2940                           struct bio_vec *bvec)
2941 {
2942         struct rbd_device *rbd_dev = q->queuedata;
2943         sector_t sector_offset;
2944         sector_t sectors_per_obj;
2945         sector_t obj_sector_offset;
2946         int ret;
2947
2948         /*
2949          * Find how far into its rbd object the partition-relative
2950          * bio start sector is to offset relative to the enclosing
2951          * device.
2952          */
2953         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2954         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2955         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2956
2957         /*
2958          * Compute the number of bytes from that offset to the end
2959          * of the object.  Account for what's already used by the bio.
2960          */
2961         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2962         if (ret > bmd->bi_size)
2963                 ret -= bmd->bi_size;
2964         else
2965                 ret = 0;
2966
2967         /*
2968          * Don't send back more than was asked for.  And if the bio
2969          * was empty, let the whole thing through because:  "Note
2970          * that a block device *must* allow a single page to be
2971          * added to an empty bio."
2972          */
2973         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2974         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2975                 ret = (int) bvec->bv_len;
2976
2977         return ret;
2978 }
2979
2980 static void rbd_free_disk(struct rbd_device *rbd_dev)
2981 {
2982         struct gendisk *disk = rbd_dev->disk;
2983
2984         if (!disk)
2985                 return;
2986
2987         rbd_dev->disk = NULL;
2988         if (disk->flags & GENHD_FL_UP) {
2989                 del_gendisk(disk);
2990                 if (disk->queue)
2991                         blk_cleanup_queue(disk->queue);
2992         }
2993         put_disk(disk);
2994 }
2995
2996 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2997                                 const char *object_name,
2998                                 u64 offset, u64 length, void *buf)
2999
3000 {
3001         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3002         struct rbd_obj_request *obj_request;
3003         struct page **pages = NULL;
3004         u32 page_count;
3005         size_t size;
3006         int ret;
3007
3008         page_count = (u32) calc_pages_for(offset, length);
3009         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3010         if (IS_ERR(pages))
3011                 ret = PTR_ERR(pages);
3012
3013         ret = -ENOMEM;
3014         obj_request = rbd_obj_request_create(object_name, offset, length,
3015                                                         OBJ_REQUEST_PAGES);
3016         if (!obj_request)
3017                 goto out;
3018
3019         obj_request->pages = pages;
3020         obj_request->page_count = page_count;
3021
3022         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3023         if (!obj_request->osd_req)
3024                 goto out;
3025
3026         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3027                                         offset, length, 0, 0);
3028         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3029                                         obj_request->pages,
3030                                         obj_request->length,
3031                                         obj_request->offset & ~PAGE_MASK,
3032                                         false, false);
3033         rbd_osd_req_format_read(obj_request);
3034
3035         ret = rbd_obj_request_submit(osdc, obj_request);
3036         if (ret)
3037                 goto out;
3038         ret = rbd_obj_request_wait(obj_request);
3039         if (ret)
3040                 goto out;
3041
3042         ret = obj_request->result;
3043         if (ret < 0)
3044                 goto out;
3045
3046         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3047         size = (size_t) obj_request->xferred;
3048         ceph_copy_from_page_vector(pages, buf, 0, size);
3049         rbd_assert(size <= (size_t)INT_MAX);
3050         ret = (int)size;
3051 out:
3052         if (obj_request)
3053                 rbd_obj_request_put(obj_request);
3054         else
3055                 ceph_release_page_vector(pages, page_count);
3056
3057         return ret;
3058 }
3059
3060 /*
3061  * Read the complete header for the given rbd device.  On successful
3062  * return, the rbd_dev->header field will contain up-to-date
3063  * information about the image.
3064  */
3065 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3066 {
3067         struct rbd_image_header_ondisk *ondisk = NULL;
3068         u32 snap_count = 0;
3069         u64 names_size = 0;
3070         u32 want_count;
3071         int ret;
3072
3073         /*
3074          * The complete header will include an array of its 64-bit
3075          * snapshot ids, followed by the names of those snapshots as
3076          * a contiguous block of NUL-terminated strings.  Note that
3077          * the number of snapshots could change by the time we read
3078          * it in, in which case we re-read it.
3079          */
3080         do {
3081                 size_t size;
3082
3083                 kfree(ondisk);
3084
3085                 size = sizeof (*ondisk);
3086                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3087                 size += names_size;
3088                 ondisk = kmalloc(size, GFP_KERNEL);
3089                 if (!ondisk)
3090                         return -ENOMEM;
3091
3092                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3093                                        0, size, ondisk);
3094                 if (ret < 0)
3095                         goto out;
3096                 if ((size_t)ret < size) {
3097                         ret = -ENXIO;
3098                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3099                                 size, ret);
3100                         goto out;
3101                 }
3102                 if (!rbd_dev_ondisk_valid(ondisk)) {
3103                         ret = -ENXIO;
3104                         rbd_warn(rbd_dev, "invalid header");
3105                         goto out;
3106                 }
3107
3108                 names_size = le64_to_cpu(ondisk->snap_names_len);
3109                 want_count = snap_count;
3110                 snap_count = le32_to_cpu(ondisk->snap_count);
3111         } while (snap_count != want_count);
3112
3113         ret = rbd_header_from_disk(rbd_dev, ondisk);
3114 out:
3115         kfree(ondisk);
3116
3117         return ret;
3118 }
3119
3120 /*
3121  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3122  * has disappeared from the (just updated) snapshot context.
3123  */
3124 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3125 {
3126         u64 snap_id;
3127
3128         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3129                 return;
3130
3131         snap_id = rbd_dev->spec->snap_id;
3132         if (snap_id == CEPH_NOSNAP)
3133                 return;
3134
3135         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3136                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3137 }
3138
3139 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3140 {
3141         u64 mapping_size;
3142         int ret;
3143
3144         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3145         mapping_size = rbd_dev->mapping.size;
3146         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3147         if (rbd_dev->image_format == 1)
3148                 ret = rbd_dev_v1_header_info(rbd_dev);
3149         else
3150                 ret = rbd_dev_v2_header_info(rbd_dev);
3151
3152         /* If it's a mapped snapshot, validate its EXISTS flag */
3153
3154         rbd_exists_validate(rbd_dev);
3155         mutex_unlock(&ctl_mutex);
3156         if (mapping_size != rbd_dev->mapping.size) {
3157                 sector_t size;
3158
3159                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3160                 dout("setting size to %llu sectors", (unsigned long long)size);
3161                 set_capacity(rbd_dev->disk, size);
3162                 revalidate_disk(rbd_dev->disk);
3163         }
3164
3165         return ret;
3166 }
3167
3168 static int rbd_init_disk(struct rbd_device *rbd_dev)
3169 {
3170         struct gendisk *disk;
3171         struct request_queue *q;
3172         u64 segment_size;
3173
3174         /* create gendisk info */
3175         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3176         if (!disk)
3177                 return -ENOMEM;
3178
3179         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3180                  rbd_dev->dev_id);
3181         disk->major = rbd_dev->major;
3182         disk->first_minor = 0;
3183         disk->fops = &rbd_bd_ops;
3184         disk->private_data = rbd_dev;
3185
3186         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3187         if (!q)
3188                 goto out_disk;
3189
3190         /* We use the default size, but let's be explicit about it. */
3191         blk_queue_physical_block_size(q, SECTOR_SIZE);
3192
3193         /* set io sizes to object size */
3194         segment_size = rbd_obj_bytes(&rbd_dev->header);
3195         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3196         blk_queue_max_segment_size(q, segment_size);
3197         blk_queue_io_min(q, segment_size);
3198         blk_queue_io_opt(q, segment_size);
3199
3200         blk_queue_merge_bvec(q, rbd_merge_bvec);
3201         disk->queue = q;
3202
3203         q->queuedata = rbd_dev;
3204
3205         rbd_dev->disk = disk;
3206
3207         return 0;
3208 out_disk:
3209         put_disk(disk);
3210
3211         return -ENOMEM;
3212 }
3213
3214 /*
3215   sysfs
3216 */
3217
3218 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3219 {
3220         return container_of(dev, struct rbd_device, dev);
3221 }
3222
3223 static ssize_t rbd_size_show(struct device *dev,
3224                              struct device_attribute *attr, char *buf)
3225 {
3226         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3227
3228         return sprintf(buf, "%llu\n",
3229                 (unsigned long long)rbd_dev->mapping.size);
3230 }
3231
3232 /*
3233  * Note this shows the features for whatever's mapped, which is not
3234  * necessarily the base image.
3235  */
3236 static ssize_t rbd_features_show(struct device *dev,
3237                              struct device_attribute *attr, char *buf)
3238 {
3239         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3240
3241         return sprintf(buf, "0x%016llx\n",
3242                         (unsigned long long)rbd_dev->mapping.features);
3243 }
3244
3245 static ssize_t rbd_major_show(struct device *dev,
3246                               struct device_attribute *attr, char *buf)
3247 {
3248         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3249
3250         if (rbd_dev->major)
3251                 return sprintf(buf, "%d\n", rbd_dev->major);
3252
3253         return sprintf(buf, "(none)\n");
3254
3255 }
3256
3257 static ssize_t rbd_client_id_show(struct device *dev,
3258                                   struct device_attribute *attr, char *buf)
3259 {
3260         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3261
3262         return sprintf(buf, "client%lld\n",
3263                         ceph_client_id(rbd_dev->rbd_client->client));
3264 }
3265
3266 static ssize_t rbd_pool_show(struct device *dev,
3267                              struct device_attribute *attr, char *buf)
3268 {
3269         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270
3271         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3272 }
3273
3274 static ssize_t rbd_pool_id_show(struct device *dev,
3275                              struct device_attribute *attr, char *buf)
3276 {
3277         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3278
3279         return sprintf(buf, "%llu\n",
3280                         (unsigned long long) rbd_dev->spec->pool_id);
3281 }
3282
3283 static ssize_t rbd_name_show(struct device *dev,
3284                              struct device_attribute *attr, char *buf)
3285 {
3286         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287
3288         if (rbd_dev->spec->image_name)
3289                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3290
3291         return sprintf(buf, "(unknown)\n");
3292 }
3293
3294 static ssize_t rbd_image_id_show(struct device *dev,
3295                              struct device_attribute *attr, char *buf)
3296 {
3297         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3298
3299         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3300 }
3301
3302 /*
3303  * Shows the name of the currently-mapped snapshot (or
3304  * RBD_SNAP_HEAD_NAME for the base image).
3305  */
3306 static ssize_t rbd_snap_show(struct device *dev,
3307                              struct device_attribute *attr,
3308                              char *buf)
3309 {
3310         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3311
3312         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3313 }
3314
3315 /*
3316  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3317  * for the parent image.  If there is no parent, simply shows
3318  * "(no parent image)".
3319  */
3320 static ssize_t rbd_parent_show(struct device *dev,
3321                              struct device_attribute *attr,
3322                              char *buf)
3323 {
3324         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3325         struct rbd_spec *spec = rbd_dev->parent_spec;
3326         int count;
3327         char *bufp = buf;
3328
3329         if (!spec)
3330                 return sprintf(buf, "(no parent image)\n");
3331
3332         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3333                         (unsigned long long) spec->pool_id, spec->pool_name);
3334         if (count < 0)
3335                 return count;
3336         bufp += count;
3337
3338         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3339                         spec->image_name ? spec->image_name : "(unknown)");
3340         if (count < 0)
3341                 return count;
3342         bufp += count;
3343
3344         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3345                         (unsigned long long) spec->snap_id, spec->snap_name);
3346         if (count < 0)
3347                 return count;
3348         bufp += count;
3349
3350         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3351         if (count < 0)
3352                 return count;
3353         bufp += count;
3354
3355         return (ssize_t) (bufp - buf);
3356 }
3357
3358 static ssize_t rbd_image_refresh(struct device *dev,
3359                                  struct device_attribute *attr,
3360                                  const char *buf,
3361                                  size_t size)
3362 {
3363         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3364         int ret;
3365
3366         ret = rbd_dev_refresh(rbd_dev);
3367         if (ret)
3368                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3369
3370         return ret < 0 ? ret : size;
3371 }
3372
3373 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3374 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3375 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3376 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3377 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3378 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3379 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3380 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3381 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3382 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3383 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3384
3385 static struct attribute *rbd_attrs[] = {
3386         &dev_attr_size.attr,
3387         &dev_attr_features.attr,
3388         &dev_attr_major.attr,
3389         &dev_attr_client_id.attr,
3390         &dev_attr_pool.attr,
3391         &dev_attr_pool_id.attr,
3392         &dev_attr_name.attr,
3393         &dev_attr_image_id.attr,
3394         &dev_attr_current_snap.attr,
3395         &dev_attr_parent.attr,
3396         &dev_attr_refresh.attr,
3397         NULL
3398 };
3399
3400 static struct attribute_group rbd_attr_group = {
3401         .attrs = rbd_attrs,
3402 };
3403
3404 static const struct attribute_group *rbd_attr_groups[] = {
3405         &rbd_attr_group,
3406         NULL
3407 };
3408
3409 static void rbd_sysfs_dev_release(struct device *dev)
3410 {
3411 }
3412
3413 static struct device_type rbd_device_type = {
3414         .name           = "rbd",
3415         .groups         = rbd_attr_groups,
3416         .release        = rbd_sysfs_dev_release,
3417 };
3418
3419 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3420 {
3421         kref_get(&spec->kref);
3422
3423         return spec;
3424 }
3425
3426 static void rbd_spec_free(struct kref *kref);
3427 static void rbd_spec_put(struct rbd_spec *spec)
3428 {
3429         if (spec)
3430                 kref_put(&spec->kref, rbd_spec_free);
3431 }
3432
3433 static struct rbd_spec *rbd_spec_alloc(void)
3434 {
3435         struct rbd_spec *spec;
3436
3437         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3438         if (!spec)
3439                 return NULL;
3440         kref_init(&spec->kref);
3441
3442         return spec;
3443 }
3444
3445 static void rbd_spec_free(struct kref *kref)
3446 {
3447         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3448
3449         kfree(spec->pool_name);
3450         kfree(spec->image_id);
3451         kfree(spec->image_name);
3452         kfree(spec->snap_name);
3453         kfree(spec);
3454 }
3455
3456 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3457                                 struct rbd_spec *spec)
3458 {
3459         struct rbd_device *rbd_dev;
3460
3461         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3462         if (!rbd_dev)
3463                 return NULL;
3464
3465         spin_lock_init(&rbd_dev->lock);
3466         rbd_dev->flags = 0;
3467         INIT_LIST_HEAD(&rbd_dev->node);
3468         init_rwsem(&rbd_dev->header_rwsem);
3469
3470         rbd_dev->spec = spec;
3471         rbd_dev->rbd_client = rbdc;
3472
3473         /* Initialize the layout used for all rbd requests */
3474
3475         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3476         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3477         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3478         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3479
3480         return rbd_dev;
3481 }
3482
3483 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3484 {
3485         rbd_put_client(rbd_dev->rbd_client);
3486         rbd_spec_put(rbd_dev->spec);
3487         kfree(rbd_dev);
3488 }
3489
3490 /*
3491  * Get the size and object order for an image snapshot, or if
3492  * snap_id is CEPH_NOSNAP, gets this information for the base
3493  * image.
3494  */
3495 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3496                                 u8 *order, u64 *snap_size)
3497 {
3498         __le64 snapid = cpu_to_le64(snap_id);
3499         int ret;
3500         struct {
3501                 u8 order;
3502                 __le64 size;
3503         } __attribute__ ((packed)) size_buf = { 0 };
3504
3505         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3506                                 "rbd", "get_size",
3507                                 &snapid, sizeof (snapid),
3508                                 &size_buf, sizeof (size_buf));
3509         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3510         if (ret < 0)
3511                 return ret;
3512         if (ret < sizeof (size_buf))
3513                 return -ERANGE;
3514
3515         if (order)
3516                 *order = size_buf.order;
3517         *snap_size = le64_to_cpu(size_buf.size);
3518
3519         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3520                 (unsigned long long)snap_id, (unsigned int)*order,
3521                 (unsigned long long)*snap_size);
3522
3523         return 0;
3524 }
3525
3526 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3527 {
3528         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3529                                         &rbd_dev->header.obj_order,
3530                                         &rbd_dev->header.image_size);
3531 }
3532
3533 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3534 {
3535         void *reply_buf;
3536         int ret;
3537         void *p;
3538
3539         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3540         if (!reply_buf)
3541                 return -ENOMEM;
3542
3543         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3544                                 "rbd", "get_object_prefix", NULL, 0,
3545                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3546         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3547         if (ret < 0)
3548                 goto out;
3549
3550         p = reply_buf;
3551         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3552                                                 p + ret, NULL, GFP_NOIO);
3553         ret = 0;
3554
3555         if (IS_ERR(rbd_dev->header.object_prefix)) {
3556                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3557                 rbd_dev->header.object_prefix = NULL;
3558         } else {
3559                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3560         }
3561 out:
3562         kfree(reply_buf);
3563
3564         return ret;
3565 }
3566
3567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3568                 u64 *snap_features)
3569 {
3570         __le64 snapid = cpu_to_le64(snap_id);
3571         struct {
3572                 __le64 features;
3573                 __le64 incompat;
3574         } __attribute__ ((packed)) features_buf = { 0 };
3575         u64 incompat;
3576         int ret;
3577
3578         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3579                                 "rbd", "get_features",
3580                                 &snapid, sizeof (snapid),
3581                                 &features_buf, sizeof (features_buf));
3582         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3583         if (ret < 0)
3584                 return ret;
3585         if (ret < sizeof (features_buf))
3586                 return -ERANGE;
3587
3588         incompat = le64_to_cpu(features_buf.incompat);
3589         if (incompat & ~RBD_FEATURES_SUPPORTED)
3590                 return -ENXIO;
3591
3592         *snap_features = le64_to_cpu(features_buf.features);
3593
3594         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3595                 (unsigned long long)snap_id,
3596                 (unsigned long long)*snap_features,
3597                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3598
3599         return 0;
3600 }
3601
3602 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3603 {
3604         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3605                                                 &rbd_dev->header.features);
3606 }
3607
3608 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3609 {
3610         struct rbd_spec *parent_spec;
3611         size_t size;
3612         void *reply_buf = NULL;
3613         __le64 snapid;
3614         void *p;
3615         void *end;
3616         char *image_id;
3617         u64 overlap;
3618         int ret;
3619
3620         parent_spec = rbd_spec_alloc();
3621         if (!parent_spec)
3622                 return -ENOMEM;
3623
3624         size = sizeof (__le64) +                                /* pool_id */
3625                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3626                 sizeof (__le64) +                               /* snap_id */
3627                 sizeof (__le64);                                /* overlap */
3628         reply_buf = kmalloc(size, GFP_KERNEL);
3629         if (!reply_buf) {
3630                 ret = -ENOMEM;
3631                 goto out_err;
3632         }
3633
3634         snapid = cpu_to_le64(CEPH_NOSNAP);
3635         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3636                                 "rbd", "get_parent",
3637                                 &snapid, sizeof (snapid),
3638                                 reply_buf, size);
3639         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3640         if (ret < 0)
3641                 goto out_err;
3642
3643         p = reply_buf;
3644         end = reply_buf + ret;
3645         ret = -ERANGE;
3646         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3647         if (parent_spec->pool_id == CEPH_NOPOOL)
3648                 goto out;       /* No parent?  No problem. */
3649
3650         /* The ceph file layout needs to fit pool id in 32 bits */
3651
3652         ret = -EIO;
3653         if (parent_spec->pool_id > (u64)U32_MAX) {
3654                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3655                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3656                 goto out_err;
3657         }
3658
3659         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3660         if (IS_ERR(image_id)) {
3661                 ret = PTR_ERR(image_id);
3662                 goto out_err;
3663         }
3664         parent_spec->image_id = image_id;
3665         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3666         ceph_decode_64_safe(&p, end, overlap, out_err);
3667
3668         if (overlap) {
3669                 rbd_dev->parent_spec = parent_spec;
3670                 parent_spec = NULL;     /* rbd_dev now owns this */
3671                 rbd_dev->parent_overlap = overlap;
3672         } else {
3673                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3674         }
3675 out:
3676         ret = 0;
3677 out_err:
3678         kfree(reply_buf);
3679         rbd_spec_put(parent_spec);
3680
3681         return ret;
3682 }
3683
3684 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3685 {
3686         struct {
3687                 __le64 stripe_unit;
3688                 __le64 stripe_count;
3689         } __attribute__ ((packed)) striping_info_buf = { 0 };
3690         size_t size = sizeof (striping_info_buf);
3691         void *p;
3692         u64 obj_size;
3693         u64 stripe_unit;
3694         u64 stripe_count;
3695         int ret;
3696
3697         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3698                                 "rbd", "get_stripe_unit_count", NULL, 0,
3699                                 (char *)&striping_info_buf, size);
3700         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3701         if (ret < 0)
3702                 return ret;
3703         if (ret < size)
3704                 return -ERANGE;
3705
3706         /*
3707          * We don't actually support the "fancy striping" feature
3708          * (STRIPINGV2) yet, but if the striping sizes are the
3709          * defaults the behavior is the same as before.  So find
3710          * out, and only fail if the image has non-default values.
3711          */
3712         ret = -EINVAL;
3713         obj_size = (u64)1 << rbd_dev->header.obj_order;
3714         p = &striping_info_buf;
3715         stripe_unit = ceph_decode_64(&p);
3716         if (stripe_unit != obj_size) {
3717                 rbd_warn(rbd_dev, "unsupported stripe unit "
3718                                 "(got %llu want %llu)",
3719                                 stripe_unit, obj_size);
3720                 return -EINVAL;
3721         }
3722         stripe_count = ceph_decode_64(&p);
3723         if (stripe_count != 1) {
3724                 rbd_warn(rbd_dev, "unsupported stripe count "
3725                                 "(got %llu want 1)", stripe_count);
3726                 return -EINVAL;
3727         }
3728         rbd_dev->header.stripe_unit = stripe_unit;
3729         rbd_dev->header.stripe_count = stripe_count;
3730
3731         return 0;
3732 }
3733
3734 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3735 {
3736         size_t image_id_size;
3737         char *image_id;
3738         void *p;
3739         void *end;
3740         size_t size;
3741         void *reply_buf = NULL;
3742         size_t len = 0;
3743         char *image_name = NULL;
3744         int ret;
3745
3746         rbd_assert(!rbd_dev->spec->image_name);
3747
3748         len = strlen(rbd_dev->spec->image_id);
3749         image_id_size = sizeof (__le32) + len;
3750         image_id = kmalloc(image_id_size, GFP_KERNEL);
3751         if (!image_id)
3752                 return NULL;
3753
3754         p = image_id;
3755         end = image_id + image_id_size;
3756         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3757
3758         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3759         reply_buf = kmalloc(size, GFP_KERNEL);
3760         if (!reply_buf)
3761                 goto out;
3762
3763         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3764                                 "rbd", "dir_get_name",
3765                                 image_id, image_id_size,
3766                                 reply_buf, size);
3767         if (ret < 0)
3768                 goto out;
3769         p = reply_buf;
3770         end = reply_buf + ret;
3771
3772         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3773         if (IS_ERR(image_name))
3774                 image_name = NULL;
3775         else
3776                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3777 out:
3778         kfree(reply_buf);
3779         kfree(image_id);
3780
3781         return image_name;
3782 }
3783
3784 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3785 {
3786         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3787         const char *snap_name;
3788         u32 which = 0;
3789
3790         /* Skip over names until we find the one we are looking for */
3791
3792         snap_name = rbd_dev->header.snap_names;
3793         while (which < snapc->num_snaps) {
3794                 if (!strcmp(name, snap_name))
3795                         return snapc->snaps[which];
3796                 snap_name += strlen(snap_name) + 1;
3797                 which++;
3798         }
3799         return CEPH_NOSNAP;
3800 }
3801
3802 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3803 {
3804         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3805         u32 which;
3806         bool found = false;
3807         u64 snap_id;
3808
3809         for (which = 0; !found && which < snapc->num_snaps; which++) {
3810                 const char *snap_name;
3811
3812                 snap_id = snapc->snaps[which];
3813                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3814                 if (IS_ERR(snap_name))
3815                         break;
3816                 found = !strcmp(name, snap_name);
3817                 kfree(snap_name);
3818         }
3819         return found ? snap_id : CEPH_NOSNAP;
3820 }
3821
3822 /*
3823  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3824  * no snapshot by that name is found, or if an error occurs.
3825  */
3826 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3827 {
3828         if (rbd_dev->image_format == 1)
3829                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3830
3831         return rbd_v2_snap_id_by_name(rbd_dev, name);
3832 }
3833
3834 /*
3835  * When an rbd image has a parent image, it is identified by the
3836  * pool, image, and snapshot ids (not names).  This function fills
3837  * in the names for those ids.  (It's OK if we can't figure out the
3838  * name for an image id, but the pool and snapshot ids should always
3839  * exist and have names.)  All names in an rbd spec are dynamically
3840  * allocated.
3841  *
3842  * When an image being mapped (not a parent) is probed, we have the
3843  * pool name and pool id, image name and image id, and the snapshot
3844  * name.  The only thing we're missing is the snapshot id.
3845  */
3846 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3847 {
3848         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3849         struct rbd_spec *spec = rbd_dev->spec;
3850         const char *pool_name;
3851         const char *image_name;
3852         const char *snap_name;
3853         int ret;
3854
3855         /*
3856          * An image being mapped will have the pool name (etc.), but
3857          * we need to look up the snapshot id.
3858          */
3859         if (spec->pool_name) {
3860                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3861                         u64 snap_id;
3862
3863                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3864                         if (snap_id == CEPH_NOSNAP)
3865                                 return -ENOENT;
3866                         spec->snap_id = snap_id;
3867                 } else {
3868                         spec->snap_id = CEPH_NOSNAP;
3869                 }
3870
3871                 return 0;
3872         }
3873
3874         /* Get the pool name; we have to make our own copy of this */
3875
3876         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3877         if (!pool_name) {
3878                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3879                 return -EIO;
3880         }
3881         pool_name = kstrdup(pool_name, GFP_KERNEL);
3882         if (!pool_name)
3883                 return -ENOMEM;
3884
3885         /* Fetch the image name; tolerate failure here */
3886
3887         image_name = rbd_dev_image_name(rbd_dev);
3888         if (!image_name)
3889                 rbd_warn(rbd_dev, "unable to get image name");
3890
3891         /* Look up the snapshot name, and make a copy */
3892
3893         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3894         if (!snap_name) {
3895                 ret = -ENOMEM;
3896                 goto out_err;
3897         }
3898
3899         spec->pool_name = pool_name;
3900         spec->image_name = image_name;
3901         spec->snap_name = snap_name;
3902
3903         return 0;
3904 out_err:
3905         kfree(image_name);
3906         kfree(pool_name);
3907
3908         return ret;
3909 }
3910
3911 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3912 {
3913         size_t size;
3914         int ret;
3915         void *reply_buf;
3916         void *p;
3917         void *end;
3918         u64 seq;
3919         u32 snap_count;
3920         struct ceph_snap_context *snapc;
3921         u32 i;
3922
3923         /*
3924          * We'll need room for the seq value (maximum snapshot id),
3925          * snapshot count, and array of that many snapshot ids.
3926          * For now we have a fixed upper limit on the number we're
3927          * prepared to receive.
3928          */
3929         size = sizeof (__le64) + sizeof (__le32) +
3930                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3931         reply_buf = kzalloc(size, GFP_KERNEL);
3932         if (!reply_buf)
3933                 return -ENOMEM;
3934
3935         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3936                                 "rbd", "get_snapcontext", NULL, 0,
3937                                 reply_buf, size);
3938         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3939         if (ret < 0)
3940                 goto out;
3941
3942         p = reply_buf;
3943         end = reply_buf + ret;
3944         ret = -ERANGE;
3945         ceph_decode_64_safe(&p, end, seq, out);
3946         ceph_decode_32_safe(&p, end, snap_count, out);
3947
3948         /*
3949          * Make sure the reported number of snapshot ids wouldn't go
3950          * beyond the end of our buffer.  But before checking that,
3951          * make sure the computed size of the snapshot context we
3952          * allocate is representable in a size_t.
3953          */
3954         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3955                                  / sizeof (u64)) {
3956                 ret = -EINVAL;
3957                 goto out;
3958         }
3959         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3960                 goto out;
3961         ret = 0;
3962
3963         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3964         if (!snapc) {
3965                 ret = -ENOMEM;
3966                 goto out;
3967         }
3968         snapc->seq = seq;
3969         for (i = 0; i < snap_count; i++)
3970                 snapc->snaps[i] = ceph_decode_64(&p);
3971
3972         ceph_put_snap_context(rbd_dev->header.snapc);
3973         rbd_dev->header.snapc = snapc;
3974
3975         dout("  snap context seq = %llu, snap_count = %u\n",
3976                 (unsigned long long)seq, (unsigned int)snap_count);
3977 out:
3978         kfree(reply_buf);
3979
3980         return ret;
3981 }
3982
3983 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3984                                         u64 snap_id)
3985 {
3986         size_t size;
3987         void *reply_buf;
3988         __le64 snapid;
3989         int ret;
3990         void *p;
3991         void *end;
3992         char *snap_name;
3993
3994         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3995         reply_buf = kmalloc(size, GFP_KERNEL);
3996         if (!reply_buf)
3997                 return ERR_PTR(-ENOMEM);
3998
3999         snapid = cpu_to_le64(snap_id);
4000         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4001                                 "rbd", "get_snapshot_name",
4002                                 &snapid, sizeof (snapid),
4003                                 reply_buf, size);
4004         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4005         if (ret < 0) {
4006                 snap_name = ERR_PTR(ret);
4007                 goto out;
4008         }
4009
4010         p = reply_buf;
4011         end = reply_buf + ret;
4012         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4013         if (IS_ERR(snap_name))
4014                 goto out;
4015
4016         dout("  snap_id 0x%016llx snap_name = %s\n",
4017                 (unsigned long long)snap_id, snap_name);
4018 out:
4019         kfree(reply_buf);
4020
4021         return snap_name;
4022 }
4023
4024 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4025 {
4026         bool first_time = rbd_dev->header.object_prefix == NULL;
4027         int ret;
4028
4029         down_write(&rbd_dev->header_rwsem);
4030
4031         if (first_time) {
4032                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4033                 if (ret)
4034                         goto out;
4035         }
4036
4037         ret = rbd_dev_v2_image_size(rbd_dev);
4038         if (ret)
4039                 goto out;
4040         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4041                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4042                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4043
4044         ret = rbd_dev_v2_snap_context(rbd_dev);
4045         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4046         if (ret)
4047                 goto out;
4048 out:
4049         up_write(&rbd_dev->header_rwsem);
4050
4051         return ret;
4052 }
4053
4054 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4055 {
4056         struct device *dev;
4057         int ret;
4058
4059         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4060
4061         dev = &rbd_dev->dev;
4062         dev->bus = &rbd_bus_type;
4063         dev->type = &rbd_device_type;
4064         dev->parent = &rbd_root_dev;
4065         dev->release = rbd_dev_device_release;
4066         dev_set_name(dev, "%d", rbd_dev->dev_id);
4067         ret = device_register(dev);
4068
4069         mutex_unlock(&ctl_mutex);
4070
4071         return ret;
4072 }
4073
4074 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4075 {
4076         device_unregister(&rbd_dev->dev);
4077 }
4078
4079 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4080
4081 /*
4082  * Get a unique rbd identifier for the given new rbd_dev, and add
4083  * the rbd_dev to the global list.  The minimum rbd id is 1.
4084  */
4085 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4086 {
4087         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4088
4089         spin_lock(&rbd_dev_list_lock);
4090         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4091         spin_unlock(&rbd_dev_list_lock);
4092         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4093                 (unsigned long long) rbd_dev->dev_id);
4094 }
4095
4096 /*
4097  * Remove an rbd_dev from the global list, and record that its
4098  * identifier is no longer in use.
4099  */
4100 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4101 {
4102         struct list_head *tmp;
4103         int rbd_id = rbd_dev->dev_id;
4104         int max_id;
4105
4106         rbd_assert(rbd_id > 0);
4107
4108         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4109                 (unsigned long long) rbd_dev->dev_id);
4110         spin_lock(&rbd_dev_list_lock);
4111         list_del_init(&rbd_dev->node);
4112
4113         /*
4114          * If the id being "put" is not the current maximum, there
4115          * is nothing special we need to do.
4116          */
4117         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4118                 spin_unlock(&rbd_dev_list_lock);
4119                 return;
4120         }
4121
4122         /*
4123          * We need to update the current maximum id.  Search the
4124          * list to find out what it is.  We're more likely to find
4125          * the maximum at the end, so search the list backward.
4126          */
4127         max_id = 0;
4128         list_for_each_prev(tmp, &rbd_dev_list) {
4129                 struct rbd_device *rbd_dev;
4130
4131                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4132                 if (rbd_dev->dev_id > max_id)
4133                         max_id = rbd_dev->dev_id;
4134         }
4135         spin_unlock(&rbd_dev_list_lock);
4136
4137         /*
4138          * The max id could have been updated by rbd_dev_id_get(), in
4139          * which case it now accurately reflects the new maximum.
4140          * Be careful not to overwrite the maximum value in that
4141          * case.
4142          */
4143         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4144         dout("  max dev id has been reset\n");
4145 }
4146
4147 /*
4148  * Skips over white space at *buf, and updates *buf to point to the
4149  * first found non-space character (if any). Returns the length of
4150  * the token (string of non-white space characters) found.  Note
4151  * that *buf must be terminated with '\0'.
4152  */
4153 static inline size_t next_token(const char **buf)
4154 {
4155         /*
4156         * These are the characters that produce nonzero for
4157         * isspace() in the "C" and "POSIX" locales.
4158         */
4159         const char *spaces = " \f\n\r\t\v";
4160
4161         *buf += strspn(*buf, spaces);   /* Find start of token */
4162
4163         return strcspn(*buf, spaces);   /* Return token length */
4164 }
4165
4166 /*
4167  * Finds the next token in *buf, and if the provided token buffer is
4168  * big enough, copies the found token into it.  The result, if
4169  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4170  * must be terminated with '\0' on entry.
4171  *
4172  * Returns the length of the token found (not including the '\0').
4173  * Return value will be 0 if no token is found, and it will be >=
4174  * token_size if the token would not fit.
4175  *
4176  * The *buf pointer will be updated to point beyond the end of the
4177  * found token.  Note that this occurs even if the token buffer is
4178  * too small to hold it.
4179  */
4180 static inline size_t copy_token(const char **buf,
4181                                 char *token,
4182                                 size_t token_size)
4183 {
4184         size_t len;
4185
4186         len = next_token(buf);
4187         if (len < token_size) {
4188                 memcpy(token, *buf, len);
4189                 *(token + len) = '\0';
4190         }
4191         *buf += len;
4192
4193         return len;
4194 }
4195
4196 /*
4197  * Finds the next token in *buf, dynamically allocates a buffer big
4198  * enough to hold a copy of it, and copies the token into the new
4199  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4200  * that a duplicate buffer is created even for a zero-length token.
4201  *
4202  * Returns a pointer to the newly-allocated duplicate, or a null
4203  * pointer if memory for the duplicate was not available.  If
4204  * the lenp argument is a non-null pointer, the length of the token
4205  * (not including the '\0') is returned in *lenp.
4206  *
4207  * If successful, the *buf pointer will be updated to point beyond
4208  * the end of the found token.
4209  *
4210  * Note: uses GFP_KERNEL for allocation.
4211  */
4212 static inline char *dup_token(const char **buf, size_t *lenp)
4213 {
4214         char *dup;
4215         size_t len;
4216
4217         len = next_token(buf);
4218         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4219         if (!dup)
4220                 return NULL;
4221         *(dup + len) = '\0';
4222         *buf += len;
4223
4224         if (lenp)
4225                 *lenp = len;
4226
4227         return dup;
4228 }
4229
4230 /*
4231  * Parse the options provided for an "rbd add" (i.e., rbd image
4232  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4233  * and the data written is passed here via a NUL-terminated buffer.
4234  * Returns 0 if successful or an error code otherwise.
4235  *
4236  * The information extracted from these options is recorded in
4237  * the other parameters which return dynamically-allocated
4238  * structures:
4239  *  ceph_opts
4240  *      The address of a pointer that will refer to a ceph options
4241  *      structure.  Caller must release the returned pointer using
4242  *      ceph_destroy_options() when it is no longer needed.
4243  *  rbd_opts
4244  *      Address of an rbd options pointer.  Fully initialized by
4245  *      this function; caller must release with kfree().
4246  *  spec
4247  *      Address of an rbd image specification pointer.  Fully
4248  *      initialized by this function based on parsed options.
4249  *      Caller must release with rbd_spec_put().
4250  *
4251  * The options passed take this form:
4252  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4253  * where:
4254  *  <mon_addrs>
4255  *      A comma-separated list of one or more monitor addresses.
4256  *      A monitor address is an ip address, optionally followed
4257  *      by a port number (separated by a colon).
4258  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4259  *  <options>
4260  *      A comma-separated list of ceph and/or rbd options.
4261  *  <pool_name>
4262  *      The name of the rados pool containing the rbd image.
4263  *  <image_name>
4264  *      The name of the image in that pool to map.
4265  *  <snap_id>
4266  *      An optional snapshot id.  If provided, the mapping will
4267  *      present data from the image at the time that snapshot was
4268  *      created.  The image head is used if no snapshot id is
4269  *      provided.  Snapshot mappings are always read-only.
4270  */
4271 static int rbd_add_parse_args(const char *buf,
4272                                 struct ceph_options **ceph_opts,
4273                                 struct rbd_options **opts,
4274                                 struct rbd_spec **rbd_spec)
4275 {
4276         size_t len;
4277         char *options;
4278         const char *mon_addrs;
4279         char *snap_name;
4280         size_t mon_addrs_size;
4281         struct rbd_spec *spec = NULL;
4282         struct rbd_options *rbd_opts = NULL;
4283         struct ceph_options *copts;
4284         int ret;
4285
4286         /* The first four tokens are required */
4287
4288         len = next_token(&buf);
4289         if (!len) {
4290                 rbd_warn(NULL, "no monitor address(es) provided");
4291                 return -EINVAL;
4292         }
4293         mon_addrs = buf;
4294         mon_addrs_size = len + 1;
4295         buf += len;
4296
4297         ret = -EINVAL;
4298         options = dup_token(&buf, NULL);
4299         if (!options)
4300                 return -ENOMEM;
4301         if (!*options) {
4302                 rbd_warn(NULL, "no options provided");
4303                 goto out_err;
4304         }
4305
4306         spec = rbd_spec_alloc();
4307         if (!spec)
4308                 goto out_mem;
4309
4310         spec->pool_name = dup_token(&buf, NULL);
4311         if (!spec->pool_name)
4312                 goto out_mem;
4313         if (!*spec->pool_name) {
4314                 rbd_warn(NULL, "no pool name provided");
4315                 goto out_err;
4316         }
4317
4318         spec->image_name = dup_token(&buf, NULL);
4319         if (!spec->image_name)
4320                 goto out_mem;
4321         if (!*spec->image_name) {
4322                 rbd_warn(NULL, "no image name provided");
4323                 goto out_err;
4324         }
4325
4326         /*
4327          * Snapshot name is optional; default is to use "-"
4328          * (indicating the head/no snapshot).
4329          */
4330         len = next_token(&buf);
4331         if (!len) {
4332                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4333                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4334         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4335                 ret = -ENAMETOOLONG;
4336                 goto out_err;
4337         }
4338         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4339         if (!snap_name)
4340                 goto out_mem;
4341         *(snap_name + len) = '\0';
4342         spec->snap_name = snap_name;
4343
4344         /* Initialize all rbd options to the defaults */
4345
4346         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4347         if (!rbd_opts)
4348                 goto out_mem;
4349
4350         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4351
4352         copts = ceph_parse_options(options, mon_addrs,
4353                                         mon_addrs + mon_addrs_size - 1,
4354                                         parse_rbd_opts_token, rbd_opts);
4355         if (IS_ERR(copts)) {
4356                 ret = PTR_ERR(copts);
4357                 goto out_err;
4358         }
4359         kfree(options);
4360
4361         *ceph_opts = copts;
4362         *opts = rbd_opts;
4363         *rbd_spec = spec;
4364
4365         return 0;
4366 out_mem:
4367         ret = -ENOMEM;
4368 out_err:
4369         kfree(rbd_opts);
4370         rbd_spec_put(spec);
4371         kfree(options);
4372
4373         return ret;
4374 }
4375
4376 /*
4377  * An rbd format 2 image has a unique identifier, distinct from the
4378  * name given to it by the user.  Internally, that identifier is
4379  * what's used to specify the names of objects related to the image.
4380  *
4381  * A special "rbd id" object is used to map an rbd image name to its
4382  * id.  If that object doesn't exist, then there is no v2 rbd image
4383  * with the supplied name.
4384  *
4385  * This function will record the given rbd_dev's image_id field if
4386  * it can be determined, and in that case will return 0.  If any
4387  * errors occur a negative errno will be returned and the rbd_dev's
4388  * image_id field will be unchanged (and should be NULL).
4389  */
4390 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4391 {
4392         int ret;
4393         size_t size;
4394         char *object_name;
4395         void *response;
4396         char *image_id;
4397
4398         /*
4399          * When probing a parent image, the image id is already
4400          * known (and the image name likely is not).  There's no
4401          * need to fetch the image id again in this case.  We
4402          * do still need to set the image format though.
4403          */
4404         if (rbd_dev->spec->image_id) {
4405                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4406
4407                 return 0;
4408         }
4409
4410         /*
4411          * First, see if the format 2 image id file exists, and if
4412          * so, get the image's persistent id from it.
4413          */
4414         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4415         object_name = kmalloc(size, GFP_NOIO);
4416         if (!object_name)
4417                 return -ENOMEM;
4418         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4419         dout("rbd id object name is %s\n", object_name);
4420
4421         /* Response will be an encoded string, which includes a length */
4422
4423         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4424         response = kzalloc(size, GFP_NOIO);
4425         if (!response) {
4426                 ret = -ENOMEM;
4427                 goto out;
4428         }
4429
4430         /* If it doesn't exist we'll assume it's a format 1 image */
4431
4432         ret = rbd_obj_method_sync(rbd_dev, object_name,
4433                                 "rbd", "get_id", NULL, 0,
4434                                 response, RBD_IMAGE_ID_LEN_MAX);
4435         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4436         if (ret == -ENOENT) {
4437                 image_id = kstrdup("", GFP_KERNEL);
4438                 ret = image_id ? 0 : -ENOMEM;
4439                 if (!ret)
4440                         rbd_dev->image_format = 1;
4441         } else if (ret > sizeof (__le32)) {
4442                 void *p = response;
4443
4444                 image_id = ceph_extract_encoded_string(&p, p + ret,
4445                                                 NULL, GFP_NOIO);
4446                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4447                 if (!ret)
4448                         rbd_dev->image_format = 2;
4449         } else {
4450                 ret = -EINVAL;
4451         }
4452
4453         if (!ret) {
4454                 rbd_dev->spec->image_id = image_id;
4455                 dout("image_id is %s\n", image_id);
4456         }
4457 out:
4458         kfree(response);
4459         kfree(object_name);
4460
4461         return ret;
4462 }
4463
4464 /* Undo whatever state changes are made by v1 or v2 image probe */
4465
4466 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4467 {
4468         struct rbd_image_header *header;
4469
4470         rbd_dev_remove_parent(rbd_dev);
4471         rbd_spec_put(rbd_dev->parent_spec);
4472         rbd_dev->parent_spec = NULL;
4473         rbd_dev->parent_overlap = 0;
4474
4475         /* Free dynamic fields from the header, then zero it out */
4476
4477         header = &rbd_dev->header;
4478         ceph_put_snap_context(header->snapc);
4479         kfree(header->snap_sizes);
4480         kfree(header->snap_names);
4481         kfree(header->object_prefix);
4482         memset(header, 0, sizeof (*header));
4483 }
4484
4485 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4486 {
4487         int ret;
4488
4489         ret = rbd_dev_v2_object_prefix(rbd_dev);
4490         if (ret)
4491                 goto out_err;
4492
4493         /*
4494          * Get the and check features for the image.  Currently the
4495          * features are assumed to never change.
4496          */
4497         ret = rbd_dev_v2_features(rbd_dev);
4498         if (ret)
4499                 goto out_err;
4500
4501         /* If the image supports layering, get the parent info */
4502
4503         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4504                 ret = rbd_dev_v2_parent_info(rbd_dev);
4505                 if (ret)
4506                         goto out_err;
4507                 /*
4508                  * Print a warning if this image has a parent.
4509                  * Don't print it if the image now being probed
4510                  * is itself a parent.  We can tell at this point
4511                  * because we won't know its pool name yet (just its
4512                  * pool id).
4513                  */
4514                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4515                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4516                                         "is EXPERIMENTAL!");
4517         }
4518
4519         /* If the image supports fancy striping, get its parameters */
4520
4521         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4522                 ret = rbd_dev_v2_striping_info(rbd_dev);
4523                 if (ret < 0)
4524                         goto out_err;
4525         }
4526         /* No support for crypto and compression type format 2 images */
4527
4528         return 0;
4529 out_err:
4530         rbd_dev->parent_overlap = 0;
4531         rbd_spec_put(rbd_dev->parent_spec);
4532         rbd_dev->parent_spec = NULL;
4533         kfree(rbd_dev->header_name);
4534         rbd_dev->header_name = NULL;
4535         kfree(rbd_dev->header.object_prefix);
4536         rbd_dev->header.object_prefix = NULL;
4537
4538         return ret;
4539 }
4540
4541 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4542 {
4543         struct rbd_device *parent = NULL;
4544         struct rbd_spec *parent_spec;
4545         struct rbd_client *rbdc;
4546         int ret;
4547
4548         if (!rbd_dev->parent_spec)
4549                 return 0;
4550         /*
4551          * We need to pass a reference to the client and the parent
4552          * spec when creating the parent rbd_dev.  Images related by
4553          * parent/child relationships always share both.
4554          */
4555         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4556         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4557
4558         ret = -ENOMEM;
4559         parent = rbd_dev_create(rbdc, parent_spec);
4560         if (!parent)
4561                 goto out_err;
4562
4563         ret = rbd_dev_image_probe(parent, false);
4564         if (ret < 0)
4565                 goto out_err;
4566         rbd_dev->parent = parent;
4567
4568         return 0;
4569 out_err:
4570         if (parent) {
4571                 rbd_spec_put(rbd_dev->parent_spec);
4572                 kfree(rbd_dev->header_name);
4573                 rbd_dev_destroy(parent);
4574         } else {
4575                 rbd_put_client(rbdc);
4576                 rbd_spec_put(parent_spec);
4577         }
4578
4579         return ret;
4580 }
4581
4582 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4583 {
4584         int ret;
4585
4586         /* generate unique id: find highest unique id, add one */
4587         rbd_dev_id_get(rbd_dev);
4588
4589         /* Fill in the device name, now that we have its id. */
4590         BUILD_BUG_ON(DEV_NAME_LEN
4591                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4592         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4593
4594         /* Get our block major device number. */
4595
4596         ret = register_blkdev(0, rbd_dev->name);
4597         if (ret < 0)
4598                 goto err_out_id;
4599         rbd_dev->major = ret;
4600
4601         /* Set up the blkdev mapping. */
4602
4603         ret = rbd_init_disk(rbd_dev);
4604         if (ret)
4605                 goto err_out_blkdev;
4606
4607         ret = rbd_dev_mapping_set(rbd_dev);
4608         if (ret)
4609                 goto err_out_disk;
4610         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4611
4612         ret = rbd_bus_add_dev(rbd_dev);
4613         if (ret)
4614                 goto err_out_mapping;
4615
4616         /* Everything's ready.  Announce the disk to the world. */
4617
4618         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4619         add_disk(rbd_dev->disk);
4620
4621         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4622                 (unsigned long long) rbd_dev->mapping.size);
4623
4624         return ret;
4625
4626 err_out_mapping:
4627         rbd_dev_mapping_clear(rbd_dev);
4628 err_out_disk:
4629         rbd_free_disk(rbd_dev);
4630 err_out_blkdev:
4631         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4632 err_out_id:
4633         rbd_dev_id_put(rbd_dev);
4634         rbd_dev_mapping_clear(rbd_dev);
4635
4636         return ret;
4637 }
4638
4639 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4640 {
4641         struct rbd_spec *spec = rbd_dev->spec;
4642         size_t size;
4643
4644         /* Record the header object name for this rbd image. */
4645
4646         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4647
4648         if (rbd_dev->image_format == 1)
4649                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4650         else
4651                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4652
4653         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4654         if (!rbd_dev->header_name)
4655                 return -ENOMEM;
4656
4657         if (rbd_dev->image_format == 1)
4658                 sprintf(rbd_dev->header_name, "%s%s",
4659                         spec->image_name, RBD_SUFFIX);
4660         else
4661                 sprintf(rbd_dev->header_name, "%s%s",
4662                         RBD_HEADER_PREFIX, spec->image_id);
4663         return 0;
4664 }
4665
4666 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4667 {
4668         rbd_dev_unprobe(rbd_dev);
4669         kfree(rbd_dev->header_name);
4670         rbd_dev->header_name = NULL;
4671         rbd_dev->image_format = 0;
4672         kfree(rbd_dev->spec->image_id);
4673         rbd_dev->spec->image_id = NULL;
4674
4675         rbd_dev_destroy(rbd_dev);
4676 }
4677
4678 /*
4679  * Probe for the existence of the header object for the given rbd
4680  * device.  If this image is the one being mapped (i.e., not a
4681  * parent), initiate a watch on its header object before using that
4682  * object to get detailed information about the rbd image.
4683  */
4684 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4685 {
4686         int ret;
4687         int tmp;
4688
4689         /*
4690          * Get the id from the image id object.  If it's not a
4691          * format 2 image, we'll get ENOENT back, and we'll assume
4692          * it's a format 1 image.
4693          */
4694         ret = rbd_dev_image_id(rbd_dev);
4695         if (ret)
4696                 return ret;
4697         rbd_assert(rbd_dev->spec->image_id);
4698         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4699
4700         ret = rbd_dev_header_name(rbd_dev);
4701         if (ret)
4702                 goto err_out_format;
4703
4704         if (mapping) {
4705                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4706                 if (ret)
4707                         goto out_header_name;
4708         }
4709
4710         if (rbd_dev->image_format == 1)
4711                 ret = rbd_dev_v1_header_info(rbd_dev);
4712         else
4713                 ret = rbd_dev_v2_header_info(rbd_dev);
4714         if (ret)
4715                 goto err_out_watch;
4716
4717         ret = rbd_dev_spec_update(rbd_dev);
4718         if (ret)
4719                 goto err_out_probe;
4720
4721         ret = rbd_dev_probe_parent(rbd_dev);
4722         if (ret)
4723                 goto err_out_probe;
4724
4725         dout("discovered format %u image, header name is %s\n",
4726                 rbd_dev->image_format, rbd_dev->header_name);
4727
4728         return 0;
4729 err_out_probe:
4730         rbd_dev_unprobe(rbd_dev);
4731 err_out_watch:
4732         if (mapping) {
4733                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4734                 if (tmp)
4735                         rbd_warn(rbd_dev, "unable to tear down "
4736                                         "watch request (%d)\n", tmp);
4737         }
4738 out_header_name:
4739         kfree(rbd_dev->header_name);
4740         rbd_dev->header_name = NULL;
4741 err_out_format:
4742         rbd_dev->image_format = 0;
4743         kfree(rbd_dev->spec->image_id);
4744         rbd_dev->spec->image_id = NULL;
4745
4746         dout("probe failed, returning %d\n", ret);
4747
4748         return ret;
4749 }
4750
4751 static ssize_t rbd_add(struct bus_type *bus,
4752                        const char *buf,
4753                        size_t count)
4754 {
4755         struct rbd_device *rbd_dev = NULL;
4756         struct ceph_options *ceph_opts = NULL;
4757         struct rbd_options *rbd_opts = NULL;
4758         struct rbd_spec *spec = NULL;
4759         struct rbd_client *rbdc;
4760         struct ceph_osd_client *osdc;
4761         bool read_only;
4762         int rc = -ENOMEM;
4763
4764         if (!try_module_get(THIS_MODULE))
4765                 return -ENODEV;
4766
4767         /* parse add command */
4768         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4769         if (rc < 0)
4770                 goto err_out_module;
4771         read_only = rbd_opts->read_only;
4772         kfree(rbd_opts);
4773         rbd_opts = NULL;        /* done with this */
4774
4775         rbdc = rbd_get_client(ceph_opts);
4776         if (IS_ERR(rbdc)) {
4777                 rc = PTR_ERR(rbdc);
4778                 goto err_out_args;
4779         }
4780         ceph_opts = NULL;       /* rbd_dev client now owns this */
4781
4782         /* pick the pool */
4783         osdc = &rbdc->client->osdc;
4784         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4785         if (rc < 0)
4786                 goto err_out_client;
4787         spec->pool_id = (u64)rc;
4788
4789         /* The ceph file layout needs to fit pool id in 32 bits */
4790
4791         if (spec->pool_id > (u64)U32_MAX) {
4792                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4793                                 (unsigned long long)spec->pool_id, U32_MAX);
4794                 rc = -EIO;
4795                 goto err_out_client;
4796         }
4797
4798         rbd_dev = rbd_dev_create(rbdc, spec);
4799         if (!rbd_dev)
4800                 goto err_out_client;
4801         rbdc = NULL;            /* rbd_dev now owns this */
4802         spec = NULL;            /* rbd_dev now owns this */
4803
4804         rc = rbd_dev_image_probe(rbd_dev, true);
4805         if (rc < 0)
4806                 goto err_out_rbd_dev;
4807
4808         /* If we are mapping a snapshot it must be marked read-only */
4809
4810         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4811                 read_only = true;
4812         rbd_dev->mapping.read_only = read_only;
4813
4814         rc = rbd_dev_device_setup(rbd_dev);
4815         if (!rc)
4816                 return count;
4817
4818         rbd_dev_image_release(rbd_dev);
4819 err_out_rbd_dev:
4820         rbd_dev_destroy(rbd_dev);
4821 err_out_client:
4822         rbd_put_client(rbdc);
4823 err_out_args:
4824         if (ceph_opts)
4825                 ceph_destroy_options(ceph_opts);
4826         kfree(rbd_opts);
4827         rbd_spec_put(spec);
4828 err_out_module:
4829         module_put(THIS_MODULE);
4830
4831         dout("Error adding device %s\n", buf);
4832
4833         return (ssize_t)rc;
4834 }
4835
4836 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4837 {
4838         struct list_head *tmp;
4839         struct rbd_device *rbd_dev;
4840
4841         spin_lock(&rbd_dev_list_lock);
4842         list_for_each(tmp, &rbd_dev_list) {
4843                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4844                 if (rbd_dev->dev_id == dev_id) {
4845                         spin_unlock(&rbd_dev_list_lock);
4846                         return rbd_dev;
4847                 }
4848         }
4849         spin_unlock(&rbd_dev_list_lock);
4850         return NULL;
4851 }
4852
4853 static void rbd_dev_device_release(struct device *dev)
4854 {
4855         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4856
4857         rbd_free_disk(rbd_dev);
4858         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4859         rbd_dev_mapping_clear(rbd_dev);
4860         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4861         rbd_dev->major = 0;
4862         rbd_dev_id_put(rbd_dev);
4863         rbd_dev_mapping_clear(rbd_dev);
4864 }
4865
4866 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4867 {
4868         while (rbd_dev->parent) {
4869                 struct rbd_device *first = rbd_dev;
4870                 struct rbd_device *second = first->parent;
4871                 struct rbd_device *third;
4872
4873                 /*
4874                  * Follow to the parent with no grandparent and
4875                  * remove it.
4876                  */
4877                 while (second && (third = second->parent)) {
4878                         first = second;
4879                         second = third;
4880                 }
4881                 rbd_assert(second);
4882                 rbd_dev_image_release(second);
4883                 first->parent = NULL;
4884                 first->parent_overlap = 0;
4885
4886                 rbd_assert(first->parent_spec);
4887                 rbd_spec_put(first->parent_spec);
4888                 first->parent_spec = NULL;
4889         }
4890 }
4891
4892 static ssize_t rbd_remove(struct bus_type *bus,
4893                           const char *buf,
4894                           size_t count)
4895 {
4896         struct rbd_device *rbd_dev = NULL;
4897         int target_id;
4898         unsigned long ul;
4899         int ret;
4900
4901         ret = strict_strtoul(buf, 10, &ul);
4902         if (ret)
4903                 return ret;
4904
4905         /* convert to int; abort if we lost anything in the conversion */
4906         target_id = (int) ul;
4907         if (target_id != ul)
4908                 return -EINVAL;
4909
4910         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4911
4912         rbd_dev = __rbd_get_dev(target_id);
4913         if (!rbd_dev) {
4914                 ret = -ENOENT;
4915                 goto done;
4916         }
4917
4918         spin_lock_irq(&rbd_dev->lock);
4919         if (rbd_dev->open_count)
4920                 ret = -EBUSY;
4921         else
4922                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4923         spin_unlock_irq(&rbd_dev->lock);
4924         if (ret < 0)
4925                 goto done;
4926         rbd_bus_del_dev(rbd_dev);
4927         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4928         if (ret)
4929                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4930         rbd_dev_image_release(rbd_dev);
4931         module_put(THIS_MODULE);
4932         ret = count;
4933 done:
4934         mutex_unlock(&ctl_mutex);
4935
4936         return ret;
4937 }
4938
4939 /*
4940  * create control files in sysfs
4941  * /sys/bus/rbd/...
4942  */
4943 static int rbd_sysfs_init(void)
4944 {
4945         int ret;
4946
4947         ret = device_register(&rbd_root_dev);
4948         if (ret < 0)
4949                 return ret;
4950
4951         ret = bus_register(&rbd_bus_type);
4952         if (ret < 0)
4953                 device_unregister(&rbd_root_dev);
4954
4955         return ret;
4956 }
4957
4958 static void rbd_sysfs_cleanup(void)
4959 {
4960         bus_unregister(&rbd_bus_type);
4961         device_unregister(&rbd_root_dev);
4962 }
4963
4964 static int rbd_slab_init(void)
4965 {
4966         rbd_assert(!rbd_img_request_cache);
4967         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4968                                         sizeof (struct rbd_img_request),
4969                                         __alignof__(struct rbd_img_request),
4970                                         0, NULL);
4971         if (!rbd_img_request_cache)
4972                 return -ENOMEM;
4973
4974         rbd_assert(!rbd_obj_request_cache);
4975         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4976                                         sizeof (struct rbd_obj_request),
4977                                         __alignof__(struct rbd_obj_request),
4978                                         0, NULL);
4979         if (!rbd_obj_request_cache)
4980                 goto out_err;
4981
4982         rbd_assert(!rbd_segment_name_cache);
4983         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4984                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4985         if (rbd_segment_name_cache)
4986                 return 0;
4987 out_err:
4988         if (rbd_obj_request_cache) {
4989                 kmem_cache_destroy(rbd_obj_request_cache);
4990                 rbd_obj_request_cache = NULL;
4991         }
4992
4993         kmem_cache_destroy(rbd_img_request_cache);
4994         rbd_img_request_cache = NULL;
4995
4996         return -ENOMEM;
4997 }
4998
4999 static void rbd_slab_exit(void)
5000 {
5001         rbd_assert(rbd_segment_name_cache);
5002         kmem_cache_destroy(rbd_segment_name_cache);
5003         rbd_segment_name_cache = NULL;
5004
5005         rbd_assert(rbd_obj_request_cache);
5006         kmem_cache_destroy(rbd_obj_request_cache);
5007         rbd_obj_request_cache = NULL;
5008
5009         rbd_assert(rbd_img_request_cache);
5010         kmem_cache_destroy(rbd_img_request_cache);
5011         rbd_img_request_cache = NULL;
5012 }
5013
5014 static int __init rbd_init(void)
5015 {
5016         int rc;
5017
5018         if (!libceph_compatible(NULL)) {
5019                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5020
5021                 return -EINVAL;
5022         }
5023         rc = rbd_slab_init();
5024         if (rc)
5025                 return rc;
5026         rc = rbd_sysfs_init();
5027         if (rc)
5028                 rbd_slab_exit();
5029         else
5030                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5031
5032         return rc;
5033 }
5034
5035 static void __exit rbd_exit(void)
5036 {
5037         rbd_sysfs_cleanup();
5038         rbd_slab_exit();
5039 }
5040
5041 module_init(rbd_init);
5042 module_exit(rbd_exit);
5043
5044 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5045 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5046 MODULE_DESCRIPTION("rados block device");
5047
5048 /* following authorship retained from original osdblk.c */
5049 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5050
5051 MODULE_LICENSE("GPL");