]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: encapsulate removing parent devices
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
676
677 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
678 {
679         struct ceph_snap_context *snapc;
680         size_t size;
681
682         size = sizeof (struct ceph_snap_context);
683         size += snap_count * sizeof (snapc->snaps[0]);
684         snapc = kzalloc(size, GFP_KERNEL);
685         if (!snapc)
686                 return NULL;
687
688         atomic_set(&snapc->nref, 1);
689         snapc->num_snaps = snap_count;
690
691         return snapc;
692 }
693
694 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
695 {
696         (void)ceph_get_snap_context(snapc);
697 }
698
699 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
700 {
701         ceph_put_snap_context(snapc);
702 }
703
704 /*
705  * Drop reference to ceph client node. If it's not referenced anymore, release
706  * it.
707  */
708 static void rbd_put_client(struct rbd_client *rbdc)
709 {
710         if (rbdc)
711                 kref_put(&rbdc->kref, rbd_client_release);
712 }
713
714 static bool rbd_image_format_valid(u32 image_format)
715 {
716         return image_format == 1 || image_format == 2;
717 }
718
719 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
720 {
721         size_t size;
722         u32 snap_count;
723
724         /* The header has to start with the magic rbd header text */
725         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
726                 return false;
727
728         /* The bio layer requires at least sector-sized I/O */
729
730         if (ondisk->options.order < SECTOR_SHIFT)
731                 return false;
732
733         /* If we use u64 in a few spots we may be able to loosen this */
734
735         if (ondisk->options.order > 8 * sizeof (int) - 1)
736                 return false;
737
738         /*
739          * The size of a snapshot header has to fit in a size_t, and
740          * that limits the number of snapshots.
741          */
742         snap_count = le32_to_cpu(ondisk->snap_count);
743         size = SIZE_MAX - sizeof (struct ceph_snap_context);
744         if (snap_count > size / sizeof (__le64))
745                 return false;
746
747         /*
748          * Not only that, but the size of the entire the snapshot
749          * header must also be representable in a size_t.
750          */
751         size -= snap_count * sizeof (__le64);
752         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
753                 return false;
754
755         return true;
756 }
757
758 /*
759  * Create a new header structure, translate header format from the on-disk
760  * header.
761  */
762 static int rbd_header_from_disk(struct rbd_image_header *header,
763                                  struct rbd_image_header_ondisk *ondisk)
764 {
765         u32 snap_count;
766         size_t len;
767         size_t size;
768         u32 i;
769
770         memset(header, 0, sizeof (*header));
771
772         snap_count = le32_to_cpu(ondisk->snap_count);
773
774         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
775         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
776         if (!header->object_prefix)
777                 return -ENOMEM;
778         memcpy(header->object_prefix, ondisk->object_prefix, len);
779         header->object_prefix[len] = '\0';
780
781         if (snap_count) {
782                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
783
784                 /* Save a copy of the snapshot names */
785
786                 if (snap_names_len > (u64) SIZE_MAX)
787                         return -EIO;
788                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
789                 if (!header->snap_names)
790                         goto out_err;
791                 /*
792                  * Note that rbd_dev_v1_header_read() guarantees
793                  * the ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
798                         snap_names_len);
799
800                 /* Record each snapshot's size */
801
802                 size = snap_count * sizeof (*header->snap_sizes);
803                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
804                 if (!header->snap_sizes)
805                         goto out_err;
806                 for (i = 0; i < snap_count; i++)
807                         header->snap_sizes[i] =
808                                 le64_to_cpu(ondisk->snaps[i].image_size);
809         } else {
810                 header->snap_names = NULL;
811                 header->snap_sizes = NULL;
812         }
813
814         header->features = 0;   /* No features support in v1 images */
815         header->obj_order = ondisk->options.order;
816         header->crypt_type = ondisk->options.crypt_type;
817         header->comp_type = ondisk->options.comp_type;
818
819         /* Allocate and fill in the snapshot context */
820
821         header->image_size = le64_to_cpu(ondisk->image_size);
822
823         header->snapc = rbd_snap_context_create(snap_count);
824         if (!header->snapc)
825                 goto out_err;
826         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
827         for (i = 0; i < snap_count; i++)
828                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
829
830         return 0;
831
832 out_err:
833         kfree(header->snap_sizes);
834         header->snap_sizes = NULL;
835         kfree(header->snap_names);
836         header->snap_names = NULL;
837         kfree(header->object_prefix);
838         header->object_prefix = NULL;
839
840         return -ENOMEM;
841 }
842
843 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
844 {
845         struct rbd_snap *snap;
846
847         if (snap_id == CEPH_NOSNAP)
848                 return RBD_SNAP_HEAD_NAME;
849
850         list_for_each_entry(snap, &rbd_dev->snaps, node)
851                 if (snap_id == snap->id)
852                         return snap->name;
853
854         return NULL;
855 }
856
857 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
858                                         const char *snap_name)
859 {
860         struct rbd_snap *snap;
861
862         list_for_each_entry(snap, &rbd_dev->snaps, node)
863                 if (!strcmp(snap_name, snap->name))
864                         return snap;
865
866         return NULL;
867 }
868
869 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
870 {
871         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
872                     sizeof (RBD_SNAP_HEAD_NAME))) {
873                 rbd_dev->mapping.size = rbd_dev->header.image_size;
874                 rbd_dev->mapping.features = rbd_dev->header.features;
875         } else {
876                 struct rbd_snap *snap;
877
878                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
879                 if (!snap)
880                         return -ENOENT;
881                 rbd_dev->mapping.size = snap->size;
882                 rbd_dev->mapping.features = snap->features;
883                 rbd_dev->mapping.read_only = true;
884         }
885
886         return 0;
887 }
888
889 static void rbd_header_free(struct rbd_image_header *header)
890 {
891         kfree(header->object_prefix);
892         header->object_prefix = NULL;
893         kfree(header->snap_sizes);
894         header->snap_sizes = NULL;
895         kfree(header->snap_names);
896         header->snap_names = NULL;
897         rbd_snap_context_put(header->snapc);
898         header->snapc = NULL;
899 }
900
901 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
902 {
903         char *name;
904         u64 segment;
905         int ret;
906
907         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
908         if (!name)
909                 return NULL;
910         segment = offset >> rbd_dev->header.obj_order;
911         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
912                         rbd_dev->header.object_prefix, segment);
913         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
914                 pr_err("error formatting segment name for #%llu (%d)\n",
915                         segment, ret);
916                 kfree(name);
917                 name = NULL;
918         }
919
920         return name;
921 }
922
923 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
924 {
925         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
926
927         return offset & (segment_size - 1);
928 }
929
930 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
931                                 u64 offset, u64 length)
932 {
933         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
934
935         offset &= segment_size - 1;
936
937         rbd_assert(length <= U64_MAX - offset);
938         if (offset + length > segment_size)
939                 length = segment_size - offset;
940
941         return length;
942 }
943
944 /*
945  * returns the size of an object in the image
946  */
947 static u64 rbd_obj_bytes(struct rbd_image_header *header)
948 {
949         return 1 << header->obj_order;
950 }
951
952 /*
953  * bio helpers
954  */
955
956 static void bio_chain_put(struct bio *chain)
957 {
958         struct bio *tmp;
959
960         while (chain) {
961                 tmp = chain;
962                 chain = chain->bi_next;
963                 bio_put(tmp);
964         }
965 }
966
967 /*
968  * zeros a bio chain, starting at specific offset
969  */
970 static void zero_bio_chain(struct bio *chain, int start_ofs)
971 {
972         struct bio_vec *bv;
973         unsigned long flags;
974         void *buf;
975         int i;
976         int pos = 0;
977
978         while (chain) {
979                 bio_for_each_segment(bv, chain, i) {
980                         if (pos + bv->bv_len > start_ofs) {
981                                 int remainder = max(start_ofs - pos, 0);
982                                 buf = bvec_kmap_irq(bv, &flags);
983                                 memset(buf + remainder, 0,
984                                        bv->bv_len - remainder);
985                                 bvec_kunmap_irq(buf, &flags);
986                         }
987                         pos += bv->bv_len;
988                 }
989
990                 chain = chain->bi_next;
991         }
992 }
993
994 /*
995  * similar to zero_bio_chain(), zeros data defined by a page array,
996  * starting at the given byte offset from the start of the array and
997  * continuing up to the given end offset.  The pages array is
998  * assumed to be big enough to hold all bytes up to the end.
999  */
1000 static void zero_pages(struct page **pages, u64 offset, u64 end)
1001 {
1002         struct page **page = &pages[offset >> PAGE_SHIFT];
1003
1004         rbd_assert(end > offset);
1005         rbd_assert(end - offset <= (u64)SIZE_MAX);
1006         while (offset < end) {
1007                 size_t page_offset;
1008                 size_t length;
1009                 unsigned long flags;
1010                 void *kaddr;
1011
1012                 page_offset = (size_t)(offset & ~PAGE_MASK);
1013                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1014                 local_irq_save(flags);
1015                 kaddr = kmap_atomic(*page);
1016                 memset(kaddr + page_offset, 0, length);
1017                 kunmap_atomic(kaddr);
1018                 local_irq_restore(flags);
1019
1020                 offset += length;
1021                 page++;
1022         }
1023 }
1024
1025 /*
1026  * Clone a portion of a bio, starting at the given byte offset
1027  * and continuing for the number of bytes indicated.
1028  */
1029 static struct bio *bio_clone_range(struct bio *bio_src,
1030                                         unsigned int offset,
1031                                         unsigned int len,
1032                                         gfp_t gfpmask)
1033 {
1034         struct bio_vec *bv;
1035         unsigned int resid;
1036         unsigned short idx;
1037         unsigned int voff;
1038         unsigned short end_idx;
1039         unsigned short vcnt;
1040         struct bio *bio;
1041
1042         /* Handle the easy case for the caller */
1043
1044         if (!offset && len == bio_src->bi_size)
1045                 return bio_clone(bio_src, gfpmask);
1046
1047         if (WARN_ON_ONCE(!len))
1048                 return NULL;
1049         if (WARN_ON_ONCE(len > bio_src->bi_size))
1050                 return NULL;
1051         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1052                 return NULL;
1053
1054         /* Find first affected segment... */
1055
1056         resid = offset;
1057         __bio_for_each_segment(bv, bio_src, idx, 0) {
1058                 if (resid < bv->bv_len)
1059                         break;
1060                 resid -= bv->bv_len;
1061         }
1062         voff = resid;
1063
1064         /* ...and the last affected segment */
1065
1066         resid += len;
1067         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1068                 if (resid <= bv->bv_len)
1069                         break;
1070                 resid -= bv->bv_len;
1071         }
1072         vcnt = end_idx - idx + 1;
1073
1074         /* Build the clone */
1075
1076         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1077         if (!bio)
1078                 return NULL;    /* ENOMEM */
1079
1080         bio->bi_bdev = bio_src->bi_bdev;
1081         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1082         bio->bi_rw = bio_src->bi_rw;
1083         bio->bi_flags |= 1 << BIO_CLONED;
1084
1085         /*
1086          * Copy over our part of the bio_vec, then update the first
1087          * and last (or only) entries.
1088          */
1089         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1090                         vcnt * sizeof (struct bio_vec));
1091         bio->bi_io_vec[0].bv_offset += voff;
1092         if (vcnt > 1) {
1093                 bio->bi_io_vec[0].bv_len -= voff;
1094                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1095         } else {
1096                 bio->bi_io_vec[0].bv_len = len;
1097         }
1098
1099         bio->bi_vcnt = vcnt;
1100         bio->bi_size = len;
1101         bio->bi_idx = 0;
1102
1103         return bio;
1104 }
1105
1106 /*
1107  * Clone a portion of a bio chain, starting at the given byte offset
1108  * into the first bio in the source chain and continuing for the
1109  * number of bytes indicated.  The result is another bio chain of
1110  * exactly the given length, or a null pointer on error.
1111  *
1112  * The bio_src and offset parameters are both in-out.  On entry they
1113  * refer to the first source bio and the offset into that bio where
1114  * the start of data to be cloned is located.
1115  *
1116  * On return, bio_src is updated to refer to the bio in the source
1117  * chain that contains first un-cloned byte, and *offset will
1118  * contain the offset of that byte within that bio.
1119  */
1120 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1121                                         unsigned int *offset,
1122                                         unsigned int len,
1123                                         gfp_t gfpmask)
1124 {
1125         struct bio *bi = *bio_src;
1126         unsigned int off = *offset;
1127         struct bio *chain = NULL;
1128         struct bio **end;
1129
1130         /* Build up a chain of clone bios up to the limit */
1131
1132         if (!bi || off >= bi->bi_size || !len)
1133                 return NULL;            /* Nothing to clone */
1134
1135         end = &chain;
1136         while (len) {
1137                 unsigned int bi_size;
1138                 struct bio *bio;
1139
1140                 if (!bi) {
1141                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1142                         goto out_err;   /* EINVAL; ran out of bio's */
1143                 }
1144                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1145                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1146                 if (!bio)
1147                         goto out_err;   /* ENOMEM */
1148
1149                 *end = bio;
1150                 end = &bio->bi_next;
1151
1152                 off += bi_size;
1153                 if (off == bi->bi_size) {
1154                         bi = bi->bi_next;
1155                         off = 0;
1156                 }
1157                 len -= bi_size;
1158         }
1159         *bio_src = bi;
1160         *offset = off;
1161
1162         return chain;
1163 out_err:
1164         bio_chain_put(chain);
1165
1166         return NULL;
1167 }
1168
1169 /*
1170  * The default/initial value for all object request flags is 0.  For
1171  * each flag, once its value is set to 1 it is never reset to 0
1172  * again.
1173  */
1174 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1175 {
1176         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1177                 struct rbd_device *rbd_dev;
1178
1179                 rbd_dev = obj_request->img_request->rbd_dev;
1180                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1181                         obj_request);
1182         }
1183 }
1184
1185 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1186 {
1187         smp_mb();
1188         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1189 }
1190
1191 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1192 {
1193         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1194                 struct rbd_device *rbd_dev = NULL;
1195
1196                 if (obj_request_img_data_test(obj_request))
1197                         rbd_dev = obj_request->img_request->rbd_dev;
1198                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1199                         obj_request);
1200         }
1201 }
1202
1203 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1204 {
1205         smp_mb();
1206         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1207 }
1208
1209 /*
1210  * This sets the KNOWN flag after (possibly) setting the EXISTS
1211  * flag.  The latter is set based on the "exists" value provided.
1212  *
1213  * Note that for our purposes once an object exists it never goes
1214  * away again.  It's possible that the response from two existence
1215  * checks are separated by the creation of the target object, and
1216  * the first ("doesn't exist") response arrives *after* the second
1217  * ("does exist").  In that case we ignore the second one.
1218  */
1219 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1220                                 bool exists)
1221 {
1222         if (exists)
1223                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1224         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1225         smp_mb();
1226 }
1227
1228 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1229 {
1230         smp_mb();
1231         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1232 }
1233
1234 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1235 {
1236         smp_mb();
1237         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1238 }
1239
1240 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1241 {
1242         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1243                 atomic_read(&obj_request->kref.refcount));
1244         kref_get(&obj_request->kref);
1245 }
1246
1247 static void rbd_obj_request_destroy(struct kref *kref);
1248 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1249 {
1250         rbd_assert(obj_request != NULL);
1251         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1252                 atomic_read(&obj_request->kref.refcount));
1253         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1254 }
1255
1256 static void rbd_img_request_get(struct rbd_img_request *img_request)
1257 {
1258         dout("%s: img %p (was %d)\n", __func__, img_request,
1259                 atomic_read(&img_request->kref.refcount));
1260         kref_get(&img_request->kref);
1261 }
1262
1263 static void rbd_img_request_destroy(struct kref *kref);
1264 static void rbd_img_request_put(struct rbd_img_request *img_request)
1265 {
1266         rbd_assert(img_request != NULL);
1267         dout("%s: img %p (was %d)\n", __func__, img_request,
1268                 atomic_read(&img_request->kref.refcount));
1269         kref_put(&img_request->kref, rbd_img_request_destroy);
1270 }
1271
1272 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1273                                         struct rbd_obj_request *obj_request)
1274 {
1275         rbd_assert(obj_request->img_request == NULL);
1276
1277         /* Image request now owns object's original reference */
1278         obj_request->img_request = img_request;
1279         obj_request->which = img_request->obj_request_count;
1280         rbd_assert(!obj_request_img_data_test(obj_request));
1281         obj_request_img_data_set(obj_request);
1282         rbd_assert(obj_request->which != BAD_WHICH);
1283         img_request->obj_request_count++;
1284         list_add_tail(&obj_request->links, &img_request->obj_requests);
1285         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1286                 obj_request->which);
1287 }
1288
1289 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1290                                         struct rbd_obj_request *obj_request)
1291 {
1292         rbd_assert(obj_request->which != BAD_WHICH);
1293
1294         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1295                 obj_request->which);
1296         list_del(&obj_request->links);
1297         rbd_assert(img_request->obj_request_count > 0);
1298         img_request->obj_request_count--;
1299         rbd_assert(obj_request->which == img_request->obj_request_count);
1300         obj_request->which = BAD_WHICH;
1301         rbd_assert(obj_request_img_data_test(obj_request));
1302         rbd_assert(obj_request->img_request == img_request);
1303         obj_request->img_request = NULL;
1304         obj_request->callback = NULL;
1305         rbd_obj_request_put(obj_request);
1306 }
1307
1308 static bool obj_request_type_valid(enum obj_request_type type)
1309 {
1310         switch (type) {
1311         case OBJ_REQUEST_NODATA:
1312         case OBJ_REQUEST_BIO:
1313         case OBJ_REQUEST_PAGES:
1314                 return true;
1315         default:
1316                 return false;
1317         }
1318 }
1319
1320 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1321                                 struct rbd_obj_request *obj_request)
1322 {
1323         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1324
1325         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1326 }
1327
1328 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1329 {
1330
1331         dout("%s: img %p\n", __func__, img_request);
1332
1333         /*
1334          * If no error occurred, compute the aggregate transfer
1335          * count for the image request.  We could instead use
1336          * atomic64_cmpxchg() to update it as each object request
1337          * completes; not clear which way is better off hand.
1338          */
1339         if (!img_request->result) {
1340                 struct rbd_obj_request *obj_request;
1341                 u64 xferred = 0;
1342
1343                 for_each_obj_request(img_request, obj_request)
1344                         xferred += obj_request->xferred;
1345                 img_request->xferred = xferred;
1346         }
1347
1348         if (img_request->callback)
1349                 img_request->callback(img_request);
1350         else
1351                 rbd_img_request_put(img_request);
1352 }
1353
1354 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1355
1356 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1357 {
1358         dout("%s: obj %p\n", __func__, obj_request);
1359
1360         return wait_for_completion_interruptible(&obj_request->completion);
1361 }
1362
1363 /*
1364  * The default/initial value for all image request flags is 0.  Each
1365  * is conditionally set to 1 at image request initialization time
1366  * and currently never change thereafter.
1367  */
1368 static void img_request_write_set(struct rbd_img_request *img_request)
1369 {
1370         set_bit(IMG_REQ_WRITE, &img_request->flags);
1371         smp_mb();
1372 }
1373
1374 static bool img_request_write_test(struct rbd_img_request *img_request)
1375 {
1376         smp_mb();
1377         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1378 }
1379
1380 static void img_request_child_set(struct rbd_img_request *img_request)
1381 {
1382         set_bit(IMG_REQ_CHILD, &img_request->flags);
1383         smp_mb();
1384 }
1385
1386 static bool img_request_child_test(struct rbd_img_request *img_request)
1387 {
1388         smp_mb();
1389         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1390 }
1391
1392 static void img_request_layered_set(struct rbd_img_request *img_request)
1393 {
1394         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1395         smp_mb();
1396 }
1397
1398 static bool img_request_layered_test(struct rbd_img_request *img_request)
1399 {
1400         smp_mb();
1401         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1402 }
1403
1404 static void
1405 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1406 {
1407         u64 xferred = obj_request->xferred;
1408         u64 length = obj_request->length;
1409
1410         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1411                 obj_request, obj_request->img_request, obj_request->result,
1412                 xferred, length);
1413         /*
1414          * ENOENT means a hole in the image.  We zero-fill the
1415          * entire length of the request.  A short read also implies
1416          * zero-fill to the end of the request.  Either way we
1417          * update the xferred count to indicate the whole request
1418          * was satisfied.
1419          */
1420         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1421         if (obj_request->result == -ENOENT) {
1422                 if (obj_request->type == OBJ_REQUEST_BIO)
1423                         zero_bio_chain(obj_request->bio_list, 0);
1424                 else
1425                         zero_pages(obj_request->pages, 0, length);
1426                 obj_request->result = 0;
1427                 obj_request->xferred = length;
1428         } else if (xferred < length && !obj_request->result) {
1429                 if (obj_request->type == OBJ_REQUEST_BIO)
1430                         zero_bio_chain(obj_request->bio_list, xferred);
1431                 else
1432                         zero_pages(obj_request->pages, xferred, length);
1433                 obj_request->xferred = length;
1434         }
1435         obj_request_done_set(obj_request);
1436 }
1437
1438 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1439 {
1440         dout("%s: obj %p cb %p\n", __func__, obj_request,
1441                 obj_request->callback);
1442         if (obj_request->callback)
1443                 obj_request->callback(obj_request);
1444         else
1445                 complete_all(&obj_request->completion);
1446 }
1447
1448 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1449 {
1450         dout("%s: obj %p\n", __func__, obj_request);
1451         obj_request_done_set(obj_request);
1452 }
1453
1454 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1455 {
1456         struct rbd_img_request *img_request = NULL;
1457         struct rbd_device *rbd_dev = NULL;
1458         bool layered = false;
1459
1460         if (obj_request_img_data_test(obj_request)) {
1461                 img_request = obj_request->img_request;
1462                 layered = img_request && img_request_layered_test(img_request);
1463                 rbd_dev = img_request->rbd_dev;
1464         }
1465
1466         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1467                 obj_request, img_request, obj_request->result,
1468                 obj_request->xferred, obj_request->length);
1469         if (layered && obj_request->result == -ENOENT &&
1470                         obj_request->img_offset < rbd_dev->parent_overlap)
1471                 rbd_img_parent_read(obj_request);
1472         else if (img_request)
1473                 rbd_img_obj_request_read_callback(obj_request);
1474         else
1475                 obj_request_done_set(obj_request);
1476 }
1477
1478 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1479 {
1480         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1481                 obj_request->result, obj_request->length);
1482         /*
1483          * There is no such thing as a successful short write.  Set
1484          * it to our originally-requested length.
1485          */
1486         obj_request->xferred = obj_request->length;
1487         obj_request_done_set(obj_request);
1488 }
1489
1490 /*
1491  * For a simple stat call there's nothing to do.  We'll do more if
1492  * this is part of a write sequence for a layered image.
1493  */
1494 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1495 {
1496         dout("%s: obj %p\n", __func__, obj_request);
1497         obj_request_done_set(obj_request);
1498 }
1499
1500 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1501                                 struct ceph_msg *msg)
1502 {
1503         struct rbd_obj_request *obj_request = osd_req->r_priv;
1504         u16 opcode;
1505
1506         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1507         rbd_assert(osd_req == obj_request->osd_req);
1508         if (obj_request_img_data_test(obj_request)) {
1509                 rbd_assert(obj_request->img_request);
1510                 rbd_assert(obj_request->which != BAD_WHICH);
1511         } else {
1512                 rbd_assert(obj_request->which == BAD_WHICH);
1513         }
1514
1515         if (osd_req->r_result < 0)
1516                 obj_request->result = osd_req->r_result;
1517         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1518
1519         BUG_ON(osd_req->r_num_ops > 2);
1520
1521         /*
1522          * We support a 64-bit length, but ultimately it has to be
1523          * passed to blk_end_request(), which takes an unsigned int.
1524          */
1525         obj_request->xferred = osd_req->r_reply_op_len[0];
1526         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1527         opcode = osd_req->r_ops[0].op;
1528         switch (opcode) {
1529         case CEPH_OSD_OP_READ:
1530                 rbd_osd_read_callback(obj_request);
1531                 break;
1532         case CEPH_OSD_OP_WRITE:
1533                 rbd_osd_write_callback(obj_request);
1534                 break;
1535         case CEPH_OSD_OP_STAT:
1536                 rbd_osd_stat_callback(obj_request);
1537                 break;
1538         case CEPH_OSD_OP_CALL:
1539         case CEPH_OSD_OP_NOTIFY_ACK:
1540         case CEPH_OSD_OP_WATCH:
1541                 rbd_osd_trivial_callback(obj_request);
1542                 break;
1543         default:
1544                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1545                         obj_request->object_name, (unsigned short) opcode);
1546                 break;
1547         }
1548
1549         if (obj_request_done_test(obj_request))
1550                 rbd_obj_request_complete(obj_request);
1551 }
1552
1553 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = obj_request->img_request;
1556         struct ceph_osd_request *osd_req = obj_request->osd_req;
1557         u64 snap_id;
1558
1559         rbd_assert(osd_req != NULL);
1560
1561         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1562         ceph_osdc_build_request(osd_req, obj_request->offset,
1563                         NULL, snap_id, NULL);
1564 }
1565
1566 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1567 {
1568         struct rbd_img_request *img_request = obj_request->img_request;
1569         struct ceph_osd_request *osd_req = obj_request->osd_req;
1570         struct ceph_snap_context *snapc;
1571         struct timespec mtime = CURRENT_TIME;
1572
1573         rbd_assert(osd_req != NULL);
1574
1575         snapc = img_request ? img_request->snapc : NULL;
1576         ceph_osdc_build_request(osd_req, obj_request->offset,
1577                         snapc, CEPH_NOSNAP, &mtime);
1578 }
1579
1580 static struct ceph_osd_request *rbd_osd_req_create(
1581                                         struct rbd_device *rbd_dev,
1582                                         bool write_request,
1583                                         struct rbd_obj_request *obj_request)
1584 {
1585         struct ceph_snap_context *snapc = NULL;
1586         struct ceph_osd_client *osdc;
1587         struct ceph_osd_request *osd_req;
1588
1589         if (obj_request_img_data_test(obj_request)) {
1590                 struct rbd_img_request *img_request = obj_request->img_request;
1591
1592                 rbd_assert(write_request ==
1593                                 img_request_write_test(img_request));
1594                 if (write_request)
1595                         snapc = img_request->snapc;
1596         }
1597
1598         /* Allocate and initialize the request, for the single op */
1599
1600         osdc = &rbd_dev->rbd_client->client->osdc;
1601         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1602         if (!osd_req)
1603                 return NULL;    /* ENOMEM */
1604
1605         if (write_request)
1606                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1607         else
1608                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1609
1610         osd_req->r_callback = rbd_osd_req_callback;
1611         osd_req->r_priv = obj_request;
1612
1613         osd_req->r_oid_len = strlen(obj_request->object_name);
1614         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1615         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1616
1617         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1618
1619         return osd_req;
1620 }
1621
1622 /*
1623  * Create a copyup osd request based on the information in the
1624  * object request supplied.  A copyup request has two osd ops,
1625  * a copyup method call, and a "normal" write request.
1626  */
1627 static struct ceph_osd_request *
1628 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1629 {
1630         struct rbd_img_request *img_request;
1631         struct ceph_snap_context *snapc;
1632         struct rbd_device *rbd_dev;
1633         struct ceph_osd_client *osdc;
1634         struct ceph_osd_request *osd_req;
1635
1636         rbd_assert(obj_request_img_data_test(obj_request));
1637         img_request = obj_request->img_request;
1638         rbd_assert(img_request);
1639         rbd_assert(img_request_write_test(img_request));
1640
1641         /* Allocate and initialize the request, for the two ops */
1642
1643         snapc = img_request->snapc;
1644         rbd_dev = img_request->rbd_dev;
1645         osdc = &rbd_dev->rbd_client->client->osdc;
1646         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1647         if (!osd_req)
1648                 return NULL;    /* ENOMEM */
1649
1650         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1651         osd_req->r_callback = rbd_osd_req_callback;
1652         osd_req->r_priv = obj_request;
1653
1654         osd_req->r_oid_len = strlen(obj_request->object_name);
1655         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1656         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1657
1658         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1659
1660         return osd_req;
1661 }
1662
1663
1664 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1665 {
1666         ceph_osdc_put_request(osd_req);
1667 }
1668
1669 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1670
1671 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1672                                                 u64 offset, u64 length,
1673                                                 enum obj_request_type type)
1674 {
1675         struct rbd_obj_request *obj_request;
1676         size_t size;
1677         char *name;
1678
1679         rbd_assert(obj_request_type_valid(type));
1680
1681         size = strlen(object_name) + 1;
1682         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1683         if (!obj_request)
1684                 return NULL;
1685
1686         name = (char *)(obj_request + 1);
1687         obj_request->object_name = memcpy(name, object_name, size);
1688         obj_request->offset = offset;
1689         obj_request->length = length;
1690         obj_request->flags = 0;
1691         obj_request->which = BAD_WHICH;
1692         obj_request->type = type;
1693         INIT_LIST_HEAD(&obj_request->links);
1694         init_completion(&obj_request->completion);
1695         kref_init(&obj_request->kref);
1696
1697         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1698                 offset, length, (int)type, obj_request);
1699
1700         return obj_request;
1701 }
1702
1703 static void rbd_obj_request_destroy(struct kref *kref)
1704 {
1705         struct rbd_obj_request *obj_request;
1706
1707         obj_request = container_of(kref, struct rbd_obj_request, kref);
1708
1709         dout("%s: obj %p\n", __func__, obj_request);
1710
1711         rbd_assert(obj_request->img_request == NULL);
1712         rbd_assert(obj_request->which == BAD_WHICH);
1713
1714         if (obj_request->osd_req)
1715                 rbd_osd_req_destroy(obj_request->osd_req);
1716
1717         rbd_assert(obj_request_type_valid(obj_request->type));
1718         switch (obj_request->type) {
1719         case OBJ_REQUEST_NODATA:
1720                 break;          /* Nothing to do */
1721         case OBJ_REQUEST_BIO:
1722                 if (obj_request->bio_list)
1723                         bio_chain_put(obj_request->bio_list);
1724                 break;
1725         case OBJ_REQUEST_PAGES:
1726                 if (obj_request->pages)
1727                         ceph_release_page_vector(obj_request->pages,
1728                                                 obj_request->page_count);
1729                 break;
1730         }
1731
1732         kfree(obj_request);
1733 }
1734
1735 /*
1736  * Caller is responsible for filling in the list of object requests
1737  * that comprises the image request, and the Linux request pointer
1738  * (if there is one).
1739  */
1740 static struct rbd_img_request *rbd_img_request_create(
1741                                         struct rbd_device *rbd_dev,
1742                                         u64 offset, u64 length,
1743                                         bool write_request,
1744                                         bool child_request)
1745 {
1746         struct rbd_img_request *img_request;
1747
1748         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1749         if (!img_request)
1750                 return NULL;
1751
1752         if (write_request) {
1753                 down_read(&rbd_dev->header_rwsem);
1754                 rbd_snap_context_get(rbd_dev->header.snapc);
1755                 up_read(&rbd_dev->header_rwsem);
1756         }
1757
1758         img_request->rq = NULL;
1759         img_request->rbd_dev = rbd_dev;
1760         img_request->offset = offset;
1761         img_request->length = length;
1762         img_request->flags = 0;
1763         if (write_request) {
1764                 img_request_write_set(img_request);
1765                 img_request->snapc = rbd_dev->header.snapc;
1766         } else {
1767                 img_request->snap_id = rbd_dev->spec->snap_id;
1768         }
1769         if (child_request)
1770                 img_request_child_set(img_request);
1771         if (rbd_dev->parent_spec)
1772                 img_request_layered_set(img_request);
1773         spin_lock_init(&img_request->completion_lock);
1774         img_request->next_completion = 0;
1775         img_request->callback = NULL;
1776         img_request->result = 0;
1777         img_request->obj_request_count = 0;
1778         INIT_LIST_HEAD(&img_request->obj_requests);
1779         kref_init(&img_request->kref);
1780
1781         rbd_img_request_get(img_request);       /* Avoid a warning */
1782         rbd_img_request_put(img_request);       /* TEMPORARY */
1783
1784         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1785                 write_request ? "write" : "read", offset, length,
1786                 img_request);
1787
1788         return img_request;
1789 }
1790
1791 static void rbd_img_request_destroy(struct kref *kref)
1792 {
1793         struct rbd_img_request *img_request;
1794         struct rbd_obj_request *obj_request;
1795         struct rbd_obj_request *next_obj_request;
1796
1797         img_request = container_of(kref, struct rbd_img_request, kref);
1798
1799         dout("%s: img %p\n", __func__, img_request);
1800
1801         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1802                 rbd_img_obj_request_del(img_request, obj_request);
1803         rbd_assert(img_request->obj_request_count == 0);
1804
1805         if (img_request_write_test(img_request))
1806                 rbd_snap_context_put(img_request->snapc);
1807
1808         if (img_request_child_test(img_request))
1809                 rbd_obj_request_put(img_request->obj_request);
1810
1811         kfree(img_request);
1812 }
1813
1814 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1815 {
1816         struct rbd_img_request *img_request;
1817         unsigned int xferred;
1818         int result;
1819         bool more;
1820
1821         rbd_assert(obj_request_img_data_test(obj_request));
1822         img_request = obj_request->img_request;
1823
1824         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1825         xferred = (unsigned int)obj_request->xferred;
1826         result = obj_request->result;
1827         if (result) {
1828                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1829
1830                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1831                         img_request_write_test(img_request) ? "write" : "read",
1832                         obj_request->length, obj_request->img_offset,
1833                         obj_request->offset);
1834                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1835                         result, xferred);
1836                 if (!img_request->result)
1837                         img_request->result = result;
1838         }
1839
1840         /* Image object requests don't own their page array */
1841
1842         if (obj_request->type == OBJ_REQUEST_PAGES) {
1843                 obj_request->pages = NULL;
1844                 obj_request->page_count = 0;
1845         }
1846
1847         if (img_request_child_test(img_request)) {
1848                 rbd_assert(img_request->obj_request != NULL);
1849                 more = obj_request->which < img_request->obj_request_count - 1;
1850         } else {
1851                 rbd_assert(img_request->rq != NULL);
1852                 more = blk_end_request(img_request->rq, result, xferred);
1853         }
1854
1855         return more;
1856 }
1857
1858 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1859 {
1860         struct rbd_img_request *img_request;
1861         u32 which = obj_request->which;
1862         bool more = true;
1863
1864         rbd_assert(obj_request_img_data_test(obj_request));
1865         img_request = obj_request->img_request;
1866
1867         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1868         rbd_assert(img_request != NULL);
1869         rbd_assert(img_request->obj_request_count > 0);
1870         rbd_assert(which != BAD_WHICH);
1871         rbd_assert(which < img_request->obj_request_count);
1872         rbd_assert(which >= img_request->next_completion);
1873
1874         spin_lock_irq(&img_request->completion_lock);
1875         if (which != img_request->next_completion)
1876                 goto out;
1877
1878         for_each_obj_request_from(img_request, obj_request) {
1879                 rbd_assert(more);
1880                 rbd_assert(which < img_request->obj_request_count);
1881
1882                 if (!obj_request_done_test(obj_request))
1883                         break;
1884                 more = rbd_img_obj_end_request(obj_request);
1885                 which++;
1886         }
1887
1888         rbd_assert(more ^ (which == img_request->obj_request_count));
1889         img_request->next_completion = which;
1890 out:
1891         spin_unlock_irq(&img_request->completion_lock);
1892
1893         if (!more)
1894                 rbd_img_request_complete(img_request);
1895 }
1896
1897 /*
1898  * Split up an image request into one or more object requests, each
1899  * to a different object.  The "type" parameter indicates whether
1900  * "data_desc" is the pointer to the head of a list of bio
1901  * structures, or the base of a page array.  In either case this
1902  * function assumes data_desc describes memory sufficient to hold
1903  * all data described by the image request.
1904  */
1905 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1906                                         enum obj_request_type type,
1907                                         void *data_desc)
1908 {
1909         struct rbd_device *rbd_dev = img_request->rbd_dev;
1910         struct rbd_obj_request *obj_request = NULL;
1911         struct rbd_obj_request *next_obj_request;
1912         bool write_request = img_request_write_test(img_request);
1913         struct bio *bio_list;
1914         unsigned int bio_offset = 0;
1915         struct page **pages;
1916         u64 img_offset;
1917         u64 resid;
1918         u16 opcode;
1919
1920         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1921                 (int)type, data_desc);
1922
1923         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1924         img_offset = img_request->offset;
1925         resid = img_request->length;
1926         rbd_assert(resid > 0);
1927
1928         if (type == OBJ_REQUEST_BIO) {
1929                 bio_list = data_desc;
1930                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1931         } else {
1932                 rbd_assert(type == OBJ_REQUEST_PAGES);
1933                 pages = data_desc;
1934         }
1935
1936         while (resid) {
1937                 struct ceph_osd_request *osd_req;
1938                 const char *object_name;
1939                 u64 offset;
1940                 u64 length;
1941
1942                 object_name = rbd_segment_name(rbd_dev, img_offset);
1943                 if (!object_name)
1944                         goto out_unwind;
1945                 offset = rbd_segment_offset(rbd_dev, img_offset);
1946                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1947                 obj_request = rbd_obj_request_create(object_name,
1948                                                 offset, length, type);
1949                 kfree(object_name);     /* object request has its own copy */
1950                 if (!obj_request)
1951                         goto out_unwind;
1952
1953                 if (type == OBJ_REQUEST_BIO) {
1954                         unsigned int clone_size;
1955
1956                         rbd_assert(length <= (u64)UINT_MAX);
1957                         clone_size = (unsigned int)length;
1958                         obj_request->bio_list =
1959                                         bio_chain_clone_range(&bio_list,
1960                                                                 &bio_offset,
1961                                                                 clone_size,
1962                                                                 GFP_ATOMIC);
1963                         if (!obj_request->bio_list)
1964                                 goto out_partial;
1965                 } else {
1966                         unsigned int page_count;
1967
1968                         obj_request->pages = pages;
1969                         page_count = (u32)calc_pages_for(offset, length);
1970                         obj_request->page_count = page_count;
1971                         if ((offset + length) & ~PAGE_MASK)
1972                                 page_count--;   /* more on last page */
1973                         pages += page_count;
1974                 }
1975
1976                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1977                                                 obj_request);
1978                 if (!osd_req)
1979                         goto out_partial;
1980                 obj_request->osd_req = osd_req;
1981                 obj_request->callback = rbd_img_obj_callback;
1982
1983                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1984                                                 0, 0);
1985                 if (type == OBJ_REQUEST_BIO)
1986                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1987                                         obj_request->bio_list, length);
1988                 else
1989                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1990                                         obj_request->pages, length,
1991                                         offset & ~PAGE_MASK, false, false);
1992
1993                 if (write_request)
1994                         rbd_osd_req_format_write(obj_request);
1995                 else
1996                         rbd_osd_req_format_read(obj_request);
1997
1998                 obj_request->img_offset = img_offset;
1999                 rbd_img_obj_request_add(img_request, obj_request);
2000
2001                 img_offset += length;
2002                 resid -= length;
2003         }
2004
2005         return 0;
2006
2007 out_partial:
2008         rbd_obj_request_put(obj_request);
2009 out_unwind:
2010         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2011                 rbd_obj_request_put(obj_request);
2012
2013         return -ENOMEM;
2014 }
2015
2016 static void
2017 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2018 {
2019         struct rbd_img_request *img_request;
2020         struct rbd_device *rbd_dev;
2021         u64 length;
2022         u32 page_count;
2023
2024         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2025         rbd_assert(obj_request_img_data_test(obj_request));
2026         img_request = obj_request->img_request;
2027         rbd_assert(img_request);
2028
2029         rbd_dev = img_request->rbd_dev;
2030         rbd_assert(rbd_dev);
2031         length = (u64)1 << rbd_dev->header.obj_order;
2032         page_count = (u32)calc_pages_for(0, length);
2033
2034         rbd_assert(obj_request->copyup_pages);
2035         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2036         obj_request->copyup_pages = NULL;
2037
2038         /*
2039          * We want the transfer count to reflect the size of the
2040          * original write request.  There is no such thing as a
2041          * successful short write, so if the request was successful
2042          * we can just set it to the originally-requested length.
2043          */
2044         if (!obj_request->result)
2045                 obj_request->xferred = obj_request->length;
2046
2047         /* Finish up with the normal image object callback */
2048
2049         rbd_img_obj_callback(obj_request);
2050 }
2051
2052 static void
2053 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2054 {
2055         struct rbd_obj_request *orig_request;
2056         struct ceph_osd_request *osd_req;
2057         struct ceph_osd_client *osdc;
2058         struct rbd_device *rbd_dev;
2059         struct page **pages;
2060         int result;
2061         u64 obj_size;
2062         u64 xferred;
2063
2064         rbd_assert(img_request_child_test(img_request));
2065
2066         /* First get what we need from the image request */
2067
2068         pages = img_request->copyup_pages;
2069         rbd_assert(pages != NULL);
2070         img_request->copyup_pages = NULL;
2071
2072         orig_request = img_request->obj_request;
2073         rbd_assert(orig_request != NULL);
2074         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2075         result = img_request->result;
2076         obj_size = img_request->length;
2077         xferred = img_request->xferred;
2078
2079         rbd_dev = img_request->rbd_dev;
2080         rbd_assert(rbd_dev);
2081         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2082
2083         rbd_img_request_put(img_request);
2084
2085         if (result)
2086                 goto out_err;
2087
2088         /* Allocate the new copyup osd request for the original request */
2089
2090         result = -ENOMEM;
2091         rbd_assert(!orig_request->osd_req);
2092         osd_req = rbd_osd_req_create_copyup(orig_request);
2093         if (!osd_req)
2094                 goto out_err;
2095         orig_request->osd_req = osd_req;
2096         orig_request->copyup_pages = pages;
2097
2098         /* Initialize the copyup op */
2099
2100         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2101         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2102                                                 false, false);
2103
2104         /* Then the original write request op */
2105
2106         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2107                                         orig_request->offset,
2108                                         orig_request->length, 0, 0);
2109         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2110                                         orig_request->length);
2111
2112         rbd_osd_req_format_write(orig_request);
2113
2114         /* All set, send it off. */
2115
2116         orig_request->callback = rbd_img_obj_copyup_callback;
2117         osdc = &rbd_dev->rbd_client->client->osdc;
2118         result = rbd_obj_request_submit(osdc, orig_request);
2119         if (!result)
2120                 return;
2121 out_err:
2122         /* Record the error code and complete the request */
2123
2124         orig_request->result = result;
2125         orig_request->xferred = 0;
2126         obj_request_done_set(orig_request);
2127         rbd_obj_request_complete(orig_request);
2128 }
2129
2130 /*
2131  * Read from the parent image the range of data that covers the
2132  * entire target of the given object request.  This is used for
2133  * satisfying a layered image write request when the target of an
2134  * object request from the image request does not exist.
2135  *
2136  * A page array big enough to hold the returned data is allocated
2137  * and supplied to rbd_img_request_fill() as the "data descriptor."
2138  * When the read completes, this page array will be transferred to
2139  * the original object request for the copyup operation.
2140  *
2141  * If an error occurs, record it as the result of the original
2142  * object request and mark it done so it gets completed.
2143  */
2144 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2145 {
2146         struct rbd_img_request *img_request = NULL;
2147         struct rbd_img_request *parent_request = NULL;
2148         struct rbd_device *rbd_dev;
2149         u64 img_offset;
2150         u64 length;
2151         struct page **pages = NULL;
2152         u32 page_count;
2153         int result;
2154
2155         rbd_assert(obj_request_img_data_test(obj_request));
2156         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2157
2158         img_request = obj_request->img_request;
2159         rbd_assert(img_request != NULL);
2160         rbd_dev = img_request->rbd_dev;
2161         rbd_assert(rbd_dev->parent != NULL);
2162
2163         /*
2164          * First things first.  The original osd request is of no
2165          * use to use any more, we'll need a new one that can hold
2166          * the two ops in a copyup request.  We'll get that later,
2167          * but for now we can release the old one.
2168          */
2169         rbd_osd_req_destroy(obj_request->osd_req);
2170         obj_request->osd_req = NULL;
2171
2172         /*
2173          * Determine the byte range covered by the object in the
2174          * child image to which the original request was to be sent.
2175          */
2176         img_offset = obj_request->img_offset - obj_request->offset;
2177         length = (u64)1 << rbd_dev->header.obj_order;
2178
2179         /*
2180          * There is no defined parent data beyond the parent
2181          * overlap, so limit what we read at that boundary if
2182          * necessary.
2183          */
2184         if (img_offset + length > rbd_dev->parent_overlap) {
2185                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2186                 length = rbd_dev->parent_overlap - img_offset;
2187         }
2188
2189         /*
2190          * Allocate a page array big enough to receive the data read
2191          * from the parent.
2192          */
2193         page_count = (u32)calc_pages_for(0, length);
2194         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2195         if (IS_ERR(pages)) {
2196                 result = PTR_ERR(pages);
2197                 pages = NULL;
2198                 goto out_err;
2199         }
2200
2201         result = -ENOMEM;
2202         parent_request = rbd_img_request_create(rbd_dev->parent,
2203                                                 img_offset, length,
2204                                                 false, true);
2205         if (!parent_request)
2206                 goto out_err;
2207         rbd_obj_request_get(obj_request);
2208         parent_request->obj_request = obj_request;
2209
2210         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2211         if (result)
2212                 goto out_err;
2213         parent_request->copyup_pages = pages;
2214
2215         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2216         result = rbd_img_request_submit(parent_request);
2217         if (!result)
2218                 return 0;
2219
2220         parent_request->copyup_pages = NULL;
2221         parent_request->obj_request = NULL;
2222         rbd_obj_request_put(obj_request);
2223 out_err:
2224         if (pages)
2225                 ceph_release_page_vector(pages, page_count);
2226         if (parent_request)
2227                 rbd_img_request_put(parent_request);
2228         obj_request->result = result;
2229         obj_request->xferred = 0;
2230         obj_request_done_set(obj_request);
2231
2232         return result;
2233 }
2234
2235 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2236 {
2237         struct rbd_obj_request *orig_request;
2238         int result;
2239
2240         rbd_assert(!obj_request_img_data_test(obj_request));
2241
2242         /*
2243          * All we need from the object request is the original
2244          * request and the result of the STAT op.  Grab those, then
2245          * we're done with the request.
2246          */
2247         orig_request = obj_request->obj_request;
2248         obj_request->obj_request = NULL;
2249         rbd_assert(orig_request);
2250         rbd_assert(orig_request->img_request);
2251
2252         result = obj_request->result;
2253         obj_request->result = 0;
2254
2255         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2256                 obj_request, orig_request, result,
2257                 obj_request->xferred, obj_request->length);
2258         rbd_obj_request_put(obj_request);
2259
2260         rbd_assert(orig_request);
2261         rbd_assert(orig_request->img_request);
2262
2263         /*
2264          * Our only purpose here is to determine whether the object
2265          * exists, and we don't want to treat the non-existence as
2266          * an error.  If something else comes back, transfer the
2267          * error to the original request and complete it now.
2268          */
2269         if (!result) {
2270                 obj_request_existence_set(orig_request, true);
2271         } else if (result == -ENOENT) {
2272                 obj_request_existence_set(orig_request, false);
2273         } else if (result) {
2274                 orig_request->result = result;
2275                 goto out;
2276         }
2277
2278         /*
2279          * Resubmit the original request now that we have recorded
2280          * whether the target object exists.
2281          */
2282         orig_request->result = rbd_img_obj_request_submit(orig_request);
2283 out:
2284         if (orig_request->result)
2285                 rbd_obj_request_complete(orig_request);
2286         rbd_obj_request_put(orig_request);
2287 }
2288
2289 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2290 {
2291         struct rbd_obj_request *stat_request;
2292         struct rbd_device *rbd_dev;
2293         struct ceph_osd_client *osdc;
2294         struct page **pages = NULL;
2295         u32 page_count;
2296         size_t size;
2297         int ret;
2298
2299         /*
2300          * The response data for a STAT call consists of:
2301          *     le64 length;
2302          *     struct {
2303          *         le32 tv_sec;
2304          *         le32 tv_nsec;
2305          *     } mtime;
2306          */
2307         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2308         page_count = (u32)calc_pages_for(0, size);
2309         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2310         if (IS_ERR(pages))
2311                 return PTR_ERR(pages);
2312
2313         ret = -ENOMEM;
2314         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2315                                                         OBJ_REQUEST_PAGES);
2316         if (!stat_request)
2317                 goto out;
2318
2319         rbd_obj_request_get(obj_request);
2320         stat_request->obj_request = obj_request;
2321         stat_request->pages = pages;
2322         stat_request->page_count = page_count;
2323
2324         rbd_assert(obj_request->img_request);
2325         rbd_dev = obj_request->img_request->rbd_dev;
2326         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2327                                                 stat_request);
2328         if (!stat_request->osd_req)
2329                 goto out;
2330         stat_request->callback = rbd_img_obj_exists_callback;
2331
2332         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2333         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2334                                         false, false);
2335         rbd_osd_req_format_read(stat_request);
2336
2337         osdc = &rbd_dev->rbd_client->client->osdc;
2338         ret = rbd_obj_request_submit(osdc, stat_request);
2339 out:
2340         if (ret)
2341                 rbd_obj_request_put(obj_request);
2342
2343         return ret;
2344 }
2345
2346 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2347 {
2348         struct rbd_img_request *img_request;
2349         struct rbd_device *rbd_dev;
2350         bool known;
2351
2352         rbd_assert(obj_request_img_data_test(obj_request));
2353
2354         img_request = obj_request->img_request;
2355         rbd_assert(img_request);
2356         rbd_dev = img_request->rbd_dev;
2357
2358         /*
2359          * Only writes to layered images need special handling.
2360          * Reads and non-layered writes are simple object requests.
2361          * Layered writes that start beyond the end of the overlap
2362          * with the parent have no parent data, so they too are
2363          * simple object requests.  Finally, if the target object is
2364          * known to already exist, its parent data has already been
2365          * copied, so a write to the object can also be handled as a
2366          * simple object request.
2367          */
2368         if (!img_request_write_test(img_request) ||
2369                 !img_request_layered_test(img_request) ||
2370                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2371                 ((known = obj_request_known_test(obj_request)) &&
2372                         obj_request_exists_test(obj_request))) {
2373
2374                 struct rbd_device *rbd_dev;
2375                 struct ceph_osd_client *osdc;
2376
2377                 rbd_dev = obj_request->img_request->rbd_dev;
2378                 osdc = &rbd_dev->rbd_client->client->osdc;
2379
2380                 return rbd_obj_request_submit(osdc, obj_request);
2381         }
2382
2383         /*
2384          * It's a layered write.  The target object might exist but
2385          * we may not know that yet.  If we know it doesn't exist,
2386          * start by reading the data for the full target object from
2387          * the parent so we can use it for a copyup to the target.
2388          */
2389         if (known)
2390                 return rbd_img_obj_parent_read_full(obj_request);
2391
2392         /* We don't know whether the target exists.  Go find out. */
2393
2394         return rbd_img_obj_exists_submit(obj_request);
2395 }
2396
2397 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2398 {
2399         struct rbd_obj_request *obj_request;
2400         struct rbd_obj_request *next_obj_request;
2401
2402         dout("%s: img %p\n", __func__, img_request);
2403         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2404                 int ret;
2405
2406                 ret = rbd_img_obj_request_submit(obj_request);
2407                 if (ret)
2408                         return ret;
2409         }
2410
2411         return 0;
2412 }
2413
2414 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2415 {
2416         struct rbd_obj_request *obj_request;
2417         struct rbd_device *rbd_dev;
2418         u64 obj_end;
2419
2420         rbd_assert(img_request_child_test(img_request));
2421
2422         obj_request = img_request->obj_request;
2423         rbd_assert(obj_request);
2424         rbd_assert(obj_request->img_request);
2425
2426         obj_request->result = img_request->result;
2427         if (obj_request->result)
2428                 goto out;
2429
2430         /*
2431          * We need to zero anything beyond the parent overlap
2432          * boundary.  Since rbd_img_obj_request_read_callback()
2433          * will zero anything beyond the end of a short read, an
2434          * easy way to do this is to pretend the data from the
2435          * parent came up short--ending at the overlap boundary.
2436          */
2437         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2438         obj_end = obj_request->img_offset + obj_request->length;
2439         rbd_dev = obj_request->img_request->rbd_dev;
2440         if (obj_end > rbd_dev->parent_overlap) {
2441                 u64 xferred = 0;
2442
2443                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2444                         xferred = rbd_dev->parent_overlap -
2445                                         obj_request->img_offset;
2446
2447                 obj_request->xferred = min(img_request->xferred, xferred);
2448         } else {
2449                 obj_request->xferred = img_request->xferred;
2450         }
2451 out:
2452         rbd_img_obj_request_read_callback(obj_request);
2453         rbd_obj_request_complete(obj_request);
2454 }
2455
2456 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2457 {
2458         struct rbd_device *rbd_dev;
2459         struct rbd_img_request *img_request;
2460         int result;
2461
2462         rbd_assert(obj_request_img_data_test(obj_request));
2463         rbd_assert(obj_request->img_request != NULL);
2464         rbd_assert(obj_request->result == (s32) -ENOENT);
2465         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2466
2467         rbd_dev = obj_request->img_request->rbd_dev;
2468         rbd_assert(rbd_dev->parent != NULL);
2469         /* rbd_read_finish(obj_request, obj_request->length); */
2470         img_request = rbd_img_request_create(rbd_dev->parent,
2471                                                 obj_request->img_offset,
2472                                                 obj_request->length,
2473                                                 false, true);
2474         result = -ENOMEM;
2475         if (!img_request)
2476                 goto out_err;
2477
2478         rbd_obj_request_get(obj_request);
2479         img_request->obj_request = obj_request;
2480
2481         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2482                                         obj_request->bio_list);
2483         if (result)
2484                 goto out_err;
2485
2486         img_request->callback = rbd_img_parent_read_callback;
2487         result = rbd_img_request_submit(img_request);
2488         if (result)
2489                 goto out_err;
2490
2491         return;
2492 out_err:
2493         if (img_request)
2494                 rbd_img_request_put(img_request);
2495         obj_request->result = result;
2496         obj_request->xferred = 0;
2497         obj_request_done_set(obj_request);
2498 }
2499
2500 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2501                                    u64 ver, u64 notify_id)
2502 {
2503         struct rbd_obj_request *obj_request;
2504         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2505         int ret;
2506
2507         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2508                                                         OBJ_REQUEST_NODATA);
2509         if (!obj_request)
2510                 return -ENOMEM;
2511
2512         ret = -ENOMEM;
2513         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2514         if (!obj_request->osd_req)
2515                 goto out;
2516         obj_request->callback = rbd_obj_request_put;
2517
2518         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2519                                         notify_id, ver, 0);
2520         rbd_osd_req_format_read(obj_request);
2521
2522         ret = rbd_obj_request_submit(osdc, obj_request);
2523 out:
2524         if (ret)
2525                 rbd_obj_request_put(obj_request);
2526
2527         return ret;
2528 }
2529
2530 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2531 {
2532         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2533         u64 hver;
2534
2535         if (!rbd_dev)
2536                 return;
2537
2538         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2539                 rbd_dev->header_name, (unsigned long long) notify_id,
2540                 (unsigned int) opcode);
2541         (void)rbd_dev_refresh(rbd_dev, &hver);
2542
2543         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2544 }
2545
2546 /*
2547  * Request sync osd watch/unwatch.  The value of "start" determines
2548  * whether a watch request is being initiated or torn down.
2549  */
2550 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2551 {
2552         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2553         struct rbd_obj_request *obj_request;
2554         int ret;
2555
2556         rbd_assert(start ^ !!rbd_dev->watch_event);
2557         rbd_assert(start ^ !!rbd_dev->watch_request);
2558
2559         if (start) {
2560                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2561                                                 &rbd_dev->watch_event);
2562                 if (ret < 0)
2563                         return ret;
2564                 rbd_assert(rbd_dev->watch_event != NULL);
2565         }
2566
2567         ret = -ENOMEM;
2568         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2569                                                         OBJ_REQUEST_NODATA);
2570         if (!obj_request)
2571                 goto out_cancel;
2572
2573         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2574         if (!obj_request->osd_req)
2575                 goto out_cancel;
2576
2577         if (start)
2578                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2579         else
2580                 ceph_osdc_unregister_linger_request(osdc,
2581                                         rbd_dev->watch_request->osd_req);
2582
2583         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2584                                 rbd_dev->watch_event->cookie,
2585                                 rbd_dev->header.obj_version, start);
2586         rbd_osd_req_format_write(obj_request);
2587
2588         ret = rbd_obj_request_submit(osdc, obj_request);
2589         if (ret)
2590                 goto out_cancel;
2591         ret = rbd_obj_request_wait(obj_request);
2592         if (ret)
2593                 goto out_cancel;
2594         ret = obj_request->result;
2595         if (ret)
2596                 goto out_cancel;
2597
2598         /*
2599          * A watch request is set to linger, so the underlying osd
2600          * request won't go away until we unregister it.  We retain
2601          * a pointer to the object request during that time (in
2602          * rbd_dev->watch_request), so we'll keep a reference to
2603          * it.  We'll drop that reference (below) after we've
2604          * unregistered it.
2605          */
2606         if (start) {
2607                 rbd_dev->watch_request = obj_request;
2608
2609                 return 0;
2610         }
2611
2612         /* We have successfully torn down the watch request */
2613
2614         rbd_obj_request_put(rbd_dev->watch_request);
2615         rbd_dev->watch_request = NULL;
2616 out_cancel:
2617         /* Cancel the event if we're tearing down, or on error */
2618         ceph_osdc_cancel_event(rbd_dev->watch_event);
2619         rbd_dev->watch_event = NULL;
2620         if (obj_request)
2621                 rbd_obj_request_put(obj_request);
2622
2623         return ret;
2624 }
2625
2626 /*
2627  * Synchronous osd object method call.  Returns the number of bytes
2628  * returned in the outbound buffer, or a negative error code.
2629  */
2630 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2631                              const char *object_name,
2632                              const char *class_name,
2633                              const char *method_name,
2634                              const void *outbound,
2635                              size_t outbound_size,
2636                              void *inbound,
2637                              size_t inbound_size,
2638                              u64 *version)
2639 {
2640         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2641         struct rbd_obj_request *obj_request;
2642         struct page **pages;
2643         u32 page_count;
2644         int ret;
2645
2646         /*
2647          * Method calls are ultimately read operations.  The result
2648          * should placed into the inbound buffer provided.  They
2649          * also supply outbound data--parameters for the object
2650          * method.  Currently if this is present it will be a
2651          * snapshot id.
2652          */
2653         page_count = (u32)calc_pages_for(0, inbound_size);
2654         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2655         if (IS_ERR(pages))
2656                 return PTR_ERR(pages);
2657
2658         ret = -ENOMEM;
2659         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2660                                                         OBJ_REQUEST_PAGES);
2661         if (!obj_request)
2662                 goto out;
2663
2664         obj_request->pages = pages;
2665         obj_request->page_count = page_count;
2666
2667         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2668         if (!obj_request->osd_req)
2669                 goto out;
2670
2671         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2672                                         class_name, method_name);
2673         if (outbound_size) {
2674                 struct ceph_pagelist *pagelist;
2675
2676                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2677                 if (!pagelist)
2678                         goto out;
2679
2680                 ceph_pagelist_init(pagelist);
2681                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2682                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2683                                                 pagelist);
2684         }
2685         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2686                                         obj_request->pages, inbound_size,
2687                                         0, false, false);
2688         rbd_osd_req_format_read(obj_request);
2689
2690         ret = rbd_obj_request_submit(osdc, obj_request);
2691         if (ret)
2692                 goto out;
2693         ret = rbd_obj_request_wait(obj_request);
2694         if (ret)
2695                 goto out;
2696
2697         ret = obj_request->result;
2698         if (ret < 0)
2699                 goto out;
2700
2701         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2702         ret = (int)obj_request->xferred;
2703         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2704         if (version)
2705                 *version = obj_request->version;
2706 out:
2707         if (obj_request)
2708                 rbd_obj_request_put(obj_request);
2709         else
2710                 ceph_release_page_vector(pages, page_count);
2711
2712         return ret;
2713 }
2714
2715 static void rbd_request_fn(struct request_queue *q)
2716                 __releases(q->queue_lock) __acquires(q->queue_lock)
2717 {
2718         struct rbd_device *rbd_dev = q->queuedata;
2719         bool read_only = rbd_dev->mapping.read_only;
2720         struct request *rq;
2721         int result;
2722
2723         while ((rq = blk_fetch_request(q))) {
2724                 bool write_request = rq_data_dir(rq) == WRITE;
2725                 struct rbd_img_request *img_request;
2726                 u64 offset;
2727                 u64 length;
2728
2729                 /* Ignore any non-FS requests that filter through. */
2730
2731                 if (rq->cmd_type != REQ_TYPE_FS) {
2732                         dout("%s: non-fs request type %d\n", __func__,
2733                                 (int) rq->cmd_type);
2734                         __blk_end_request_all(rq, 0);
2735                         continue;
2736                 }
2737
2738                 /* Ignore/skip any zero-length requests */
2739
2740                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2741                 length = (u64) blk_rq_bytes(rq);
2742
2743                 if (!length) {
2744                         dout("%s: zero-length request\n", __func__);
2745                         __blk_end_request_all(rq, 0);
2746                         continue;
2747                 }
2748
2749                 spin_unlock_irq(q->queue_lock);
2750
2751                 /* Disallow writes to a read-only device */
2752
2753                 if (write_request) {
2754                         result = -EROFS;
2755                         if (read_only)
2756                                 goto end_request;
2757                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2758                 }
2759
2760                 /*
2761                  * Quit early if the mapped snapshot no longer
2762                  * exists.  It's still possible the snapshot will
2763                  * have disappeared by the time our request arrives
2764                  * at the osd, but there's no sense in sending it if
2765                  * we already know.
2766                  */
2767                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2768                         dout("request for non-existent snapshot");
2769                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2770                         result = -ENXIO;
2771                         goto end_request;
2772                 }
2773
2774                 result = -EINVAL;
2775                 if (offset && length > U64_MAX - offset + 1) {
2776                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2777                                 offset, length);
2778                         goto end_request;       /* Shouldn't happen */
2779                 }
2780
2781                 result = -ENOMEM;
2782                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2783                                                         write_request, false);
2784                 if (!img_request)
2785                         goto end_request;
2786
2787                 img_request->rq = rq;
2788
2789                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2790                                                 rq->bio);
2791                 if (!result)
2792                         result = rbd_img_request_submit(img_request);
2793                 if (result)
2794                         rbd_img_request_put(img_request);
2795 end_request:
2796                 spin_lock_irq(q->queue_lock);
2797                 if (result < 0) {
2798                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2799                                 write_request ? "write" : "read",
2800                                 length, offset, result);
2801
2802                         __blk_end_request_all(rq, result);
2803                 }
2804         }
2805 }
2806
2807 /*
2808  * a queue callback. Makes sure that we don't create a bio that spans across
2809  * multiple osd objects. One exception would be with a single page bios,
2810  * which we handle later at bio_chain_clone_range()
2811  */
2812 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2813                           struct bio_vec *bvec)
2814 {
2815         struct rbd_device *rbd_dev = q->queuedata;
2816         sector_t sector_offset;
2817         sector_t sectors_per_obj;
2818         sector_t obj_sector_offset;
2819         int ret;
2820
2821         /*
2822          * Find how far into its rbd object the partition-relative
2823          * bio start sector is to offset relative to the enclosing
2824          * device.
2825          */
2826         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2827         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2828         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2829
2830         /*
2831          * Compute the number of bytes from that offset to the end
2832          * of the object.  Account for what's already used by the bio.
2833          */
2834         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2835         if (ret > bmd->bi_size)
2836                 ret -= bmd->bi_size;
2837         else
2838                 ret = 0;
2839
2840         /*
2841          * Don't send back more than was asked for.  And if the bio
2842          * was empty, let the whole thing through because:  "Note
2843          * that a block device *must* allow a single page to be
2844          * added to an empty bio."
2845          */
2846         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2847         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2848                 ret = (int) bvec->bv_len;
2849
2850         return ret;
2851 }
2852
2853 static void rbd_free_disk(struct rbd_device *rbd_dev)
2854 {
2855         struct gendisk *disk = rbd_dev->disk;
2856
2857         if (!disk)
2858                 return;
2859
2860         rbd_dev->disk = NULL;
2861         if (disk->flags & GENHD_FL_UP) {
2862                 del_gendisk(disk);
2863                 if (disk->queue)
2864                         blk_cleanup_queue(disk->queue);
2865         }
2866         put_disk(disk);
2867 }
2868
2869 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2870                                 const char *object_name,
2871                                 u64 offset, u64 length,
2872                                 void *buf, u64 *version)
2873
2874 {
2875         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2876         struct rbd_obj_request *obj_request;
2877         struct page **pages = NULL;
2878         u32 page_count;
2879         size_t size;
2880         int ret;
2881
2882         page_count = (u32) calc_pages_for(offset, length);
2883         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2884         if (IS_ERR(pages))
2885                 ret = PTR_ERR(pages);
2886
2887         ret = -ENOMEM;
2888         obj_request = rbd_obj_request_create(object_name, offset, length,
2889                                                         OBJ_REQUEST_PAGES);
2890         if (!obj_request)
2891                 goto out;
2892
2893         obj_request->pages = pages;
2894         obj_request->page_count = page_count;
2895
2896         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2897         if (!obj_request->osd_req)
2898                 goto out;
2899
2900         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2901                                         offset, length, 0, 0);
2902         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2903                                         obj_request->pages,
2904                                         obj_request->length,
2905                                         obj_request->offset & ~PAGE_MASK,
2906                                         false, false);
2907         rbd_osd_req_format_read(obj_request);
2908
2909         ret = rbd_obj_request_submit(osdc, obj_request);
2910         if (ret)
2911                 goto out;
2912         ret = rbd_obj_request_wait(obj_request);
2913         if (ret)
2914                 goto out;
2915
2916         ret = obj_request->result;
2917         if (ret < 0)
2918                 goto out;
2919
2920         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2921         size = (size_t) obj_request->xferred;
2922         ceph_copy_from_page_vector(pages, buf, 0, size);
2923         rbd_assert(size <= (size_t) INT_MAX);
2924         ret = (int) size;
2925         if (version)
2926                 *version = obj_request->version;
2927 out:
2928         if (obj_request)
2929                 rbd_obj_request_put(obj_request);
2930         else
2931                 ceph_release_page_vector(pages, page_count);
2932
2933         return ret;
2934 }
2935
2936 /*
2937  * Read the complete header for the given rbd device.
2938  *
2939  * Returns a pointer to a dynamically-allocated buffer containing
2940  * the complete and validated header.  Caller can pass the address
2941  * of a variable that will be filled in with the version of the
2942  * header object at the time it was read.
2943  *
2944  * Returns a pointer-coded errno if a failure occurs.
2945  */
2946 static struct rbd_image_header_ondisk *
2947 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2948 {
2949         struct rbd_image_header_ondisk *ondisk = NULL;
2950         u32 snap_count = 0;
2951         u64 names_size = 0;
2952         u32 want_count;
2953         int ret;
2954
2955         /*
2956          * The complete header will include an array of its 64-bit
2957          * snapshot ids, followed by the names of those snapshots as
2958          * a contiguous block of NUL-terminated strings.  Note that
2959          * the number of snapshots could change by the time we read
2960          * it in, in which case we re-read it.
2961          */
2962         do {
2963                 size_t size;
2964
2965                 kfree(ondisk);
2966
2967                 size = sizeof (*ondisk);
2968                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2969                 size += names_size;
2970                 ondisk = kmalloc(size, GFP_KERNEL);
2971                 if (!ondisk)
2972                         return ERR_PTR(-ENOMEM);
2973
2974                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2975                                        0, size, ondisk, version);
2976                 if (ret < 0)
2977                         goto out_err;
2978                 if ((size_t)ret < size) {
2979                         ret = -ENXIO;
2980                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2981                                 size, ret);
2982                         goto out_err;
2983                 }
2984                 if (!rbd_dev_ondisk_valid(ondisk)) {
2985                         ret = -ENXIO;
2986                         rbd_warn(rbd_dev, "invalid header");
2987                         goto out_err;
2988                 }
2989
2990                 names_size = le64_to_cpu(ondisk->snap_names_len);
2991                 want_count = snap_count;
2992                 snap_count = le32_to_cpu(ondisk->snap_count);
2993         } while (snap_count != want_count);
2994
2995         return ondisk;
2996
2997 out_err:
2998         kfree(ondisk);
2999
3000         return ERR_PTR(ret);
3001 }
3002
3003 /*
3004  * reload the ondisk the header
3005  */
3006 static int rbd_read_header(struct rbd_device *rbd_dev,
3007                            struct rbd_image_header *header)
3008 {
3009         struct rbd_image_header_ondisk *ondisk;
3010         u64 ver = 0;
3011         int ret;
3012
3013         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3014         if (IS_ERR(ondisk))
3015                 return PTR_ERR(ondisk);
3016         ret = rbd_header_from_disk(header, ondisk);
3017         if (ret >= 0)
3018                 header->obj_version = ver;
3019         kfree(ondisk);
3020
3021         return ret;
3022 }
3023
3024 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3025 {
3026         struct rbd_snap *snap;
3027         struct rbd_snap *next;
3028
3029         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3030                 list_del(&snap->node);
3031                 rbd_snap_destroy(snap);
3032         }
3033 }
3034
3035 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3036 {
3037         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3038                 return;
3039
3040         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3041                 sector_t size;
3042
3043                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3044                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3045                 dout("setting size to %llu sectors", (unsigned long long)size);
3046                 set_capacity(rbd_dev->disk, size);
3047         }
3048 }
3049
3050 /*
3051  * only read the first part of the ondisk header, without the snaps info
3052  */
3053 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3054 {
3055         int ret;
3056         struct rbd_image_header h;
3057
3058         ret = rbd_read_header(rbd_dev, &h);
3059         if (ret < 0)
3060                 return ret;
3061
3062         down_write(&rbd_dev->header_rwsem);
3063
3064         /* Update image size, and check for resize of mapped image */
3065         rbd_dev->header.image_size = h.image_size;
3066         rbd_update_mapping_size(rbd_dev);
3067
3068         /* rbd_dev->header.object_prefix shouldn't change */
3069         kfree(rbd_dev->header.snap_sizes);
3070         kfree(rbd_dev->header.snap_names);
3071         /* osd requests may still refer to snapc */
3072         rbd_snap_context_put(rbd_dev->header.snapc);
3073
3074         if (hver)
3075                 *hver = h.obj_version;
3076         rbd_dev->header.obj_version = h.obj_version;
3077         rbd_dev->header.image_size = h.image_size;
3078         rbd_dev->header.snapc = h.snapc;
3079         rbd_dev->header.snap_names = h.snap_names;
3080         rbd_dev->header.snap_sizes = h.snap_sizes;
3081         /* Free the extra copy of the object prefix */
3082         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3083                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3084         kfree(h.object_prefix);
3085
3086         ret = rbd_dev_snaps_update(rbd_dev);
3087
3088         up_write(&rbd_dev->header_rwsem);
3089
3090         return ret;
3091 }
3092
3093 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3094 {
3095         int ret;
3096
3097         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3098         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3099         if (rbd_dev->image_format == 1)
3100                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3101         else
3102                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3103         mutex_unlock(&ctl_mutex);
3104         revalidate_disk(rbd_dev->disk);
3105         if (ret)
3106                 rbd_warn(rbd_dev, "got notification but failed to "
3107                            " update snaps: %d\n", ret);
3108
3109         return ret;
3110 }
3111
3112 static int rbd_init_disk(struct rbd_device *rbd_dev)
3113 {
3114         struct gendisk *disk;
3115         struct request_queue *q;
3116         u64 segment_size;
3117
3118         /* create gendisk info */
3119         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3120         if (!disk)
3121                 return -ENOMEM;
3122
3123         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3124                  rbd_dev->dev_id);
3125         disk->major = rbd_dev->major;
3126         disk->first_minor = 0;
3127         disk->fops = &rbd_bd_ops;
3128         disk->private_data = rbd_dev;
3129
3130         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3131         if (!q)
3132                 goto out_disk;
3133
3134         /* We use the default size, but let's be explicit about it. */
3135         blk_queue_physical_block_size(q, SECTOR_SIZE);
3136
3137         /* set io sizes to object size */
3138         segment_size = rbd_obj_bytes(&rbd_dev->header);
3139         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3140         blk_queue_max_segment_size(q, segment_size);
3141         blk_queue_io_min(q, segment_size);
3142         blk_queue_io_opt(q, segment_size);
3143
3144         blk_queue_merge_bvec(q, rbd_merge_bvec);
3145         disk->queue = q;
3146
3147         q->queuedata = rbd_dev;
3148
3149         rbd_dev->disk = disk;
3150
3151         return 0;
3152 out_disk:
3153         put_disk(disk);
3154
3155         return -ENOMEM;
3156 }
3157
3158 /*
3159   sysfs
3160 */
3161
3162 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3163 {
3164         return container_of(dev, struct rbd_device, dev);
3165 }
3166
3167 static ssize_t rbd_size_show(struct device *dev,
3168                              struct device_attribute *attr, char *buf)
3169 {
3170         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3171
3172         return sprintf(buf, "%llu\n",
3173                 (unsigned long long)rbd_dev->mapping.size);
3174 }
3175
3176 /*
3177  * Note this shows the features for whatever's mapped, which is not
3178  * necessarily the base image.
3179  */
3180 static ssize_t rbd_features_show(struct device *dev,
3181                              struct device_attribute *attr, char *buf)
3182 {
3183         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3184
3185         return sprintf(buf, "0x%016llx\n",
3186                         (unsigned long long)rbd_dev->mapping.features);
3187 }
3188
3189 static ssize_t rbd_major_show(struct device *dev,
3190                               struct device_attribute *attr, char *buf)
3191 {
3192         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3193
3194         if (rbd_dev->major)
3195                 return sprintf(buf, "%d\n", rbd_dev->major);
3196
3197         return sprintf(buf, "(none)\n");
3198
3199 }
3200
3201 static ssize_t rbd_client_id_show(struct device *dev,
3202                                   struct device_attribute *attr, char *buf)
3203 {
3204         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3205
3206         return sprintf(buf, "client%lld\n",
3207                         ceph_client_id(rbd_dev->rbd_client->client));
3208 }
3209
3210 static ssize_t rbd_pool_show(struct device *dev,
3211                              struct device_attribute *attr, char *buf)
3212 {
3213         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3214
3215         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3216 }
3217
3218 static ssize_t rbd_pool_id_show(struct device *dev,
3219                              struct device_attribute *attr, char *buf)
3220 {
3221         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3222
3223         return sprintf(buf, "%llu\n",
3224                         (unsigned long long) rbd_dev->spec->pool_id);
3225 }
3226
3227 static ssize_t rbd_name_show(struct device *dev,
3228                              struct device_attribute *attr, char *buf)
3229 {
3230         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3231
3232         if (rbd_dev->spec->image_name)
3233                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3234
3235         return sprintf(buf, "(unknown)\n");
3236 }
3237
3238 static ssize_t rbd_image_id_show(struct device *dev,
3239                              struct device_attribute *attr, char *buf)
3240 {
3241         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3242
3243         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3244 }
3245
3246 /*
3247  * Shows the name of the currently-mapped snapshot (or
3248  * RBD_SNAP_HEAD_NAME for the base image).
3249  */
3250 static ssize_t rbd_snap_show(struct device *dev,
3251                              struct device_attribute *attr,
3252                              char *buf)
3253 {
3254         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3255
3256         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3257 }
3258
3259 /*
3260  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3261  * for the parent image.  If there is no parent, simply shows
3262  * "(no parent image)".
3263  */
3264 static ssize_t rbd_parent_show(struct device *dev,
3265                              struct device_attribute *attr,
3266                              char *buf)
3267 {
3268         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3269         struct rbd_spec *spec = rbd_dev->parent_spec;
3270         int count;
3271         char *bufp = buf;
3272
3273         if (!spec)
3274                 return sprintf(buf, "(no parent image)\n");
3275
3276         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3277                         (unsigned long long) spec->pool_id, spec->pool_name);
3278         if (count < 0)
3279                 return count;
3280         bufp += count;
3281
3282         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3283                         spec->image_name ? spec->image_name : "(unknown)");
3284         if (count < 0)
3285                 return count;
3286         bufp += count;
3287
3288         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3289                         (unsigned long long) spec->snap_id, spec->snap_name);
3290         if (count < 0)
3291                 return count;
3292         bufp += count;
3293
3294         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3295         if (count < 0)
3296                 return count;
3297         bufp += count;
3298
3299         return (ssize_t) (bufp - buf);
3300 }
3301
3302 static ssize_t rbd_image_refresh(struct device *dev,
3303                                  struct device_attribute *attr,
3304                                  const char *buf,
3305                                  size_t size)
3306 {
3307         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3308         int ret;
3309
3310         ret = rbd_dev_refresh(rbd_dev, NULL);
3311
3312         return ret < 0 ? ret : size;
3313 }
3314
3315 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3316 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3317 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3318 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3319 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3320 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3321 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3322 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3323 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3324 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3325 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3326
3327 static struct attribute *rbd_attrs[] = {
3328         &dev_attr_size.attr,
3329         &dev_attr_features.attr,
3330         &dev_attr_major.attr,
3331         &dev_attr_client_id.attr,
3332         &dev_attr_pool.attr,
3333         &dev_attr_pool_id.attr,
3334         &dev_attr_name.attr,
3335         &dev_attr_image_id.attr,
3336         &dev_attr_current_snap.attr,
3337         &dev_attr_parent.attr,
3338         &dev_attr_refresh.attr,
3339         NULL
3340 };
3341
3342 static struct attribute_group rbd_attr_group = {
3343         .attrs = rbd_attrs,
3344 };
3345
3346 static const struct attribute_group *rbd_attr_groups[] = {
3347         &rbd_attr_group,
3348         NULL
3349 };
3350
3351 static void rbd_sysfs_dev_release(struct device *dev)
3352 {
3353 }
3354
3355 static struct device_type rbd_device_type = {
3356         .name           = "rbd",
3357         .groups         = rbd_attr_groups,
3358         .release        = rbd_sysfs_dev_release,
3359 };
3360
3361 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3362 {
3363         kref_get(&spec->kref);
3364
3365         return spec;
3366 }
3367
3368 static void rbd_spec_free(struct kref *kref);
3369 static void rbd_spec_put(struct rbd_spec *spec)
3370 {
3371         if (spec)
3372                 kref_put(&spec->kref, rbd_spec_free);
3373 }
3374
3375 static struct rbd_spec *rbd_spec_alloc(void)
3376 {
3377         struct rbd_spec *spec;
3378
3379         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3380         if (!spec)
3381                 return NULL;
3382         kref_init(&spec->kref);
3383
3384         return spec;
3385 }
3386
3387 static void rbd_spec_free(struct kref *kref)
3388 {
3389         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3390
3391         kfree(spec->pool_name);
3392         kfree(spec->image_id);
3393         kfree(spec->image_name);
3394         kfree(spec->snap_name);
3395         kfree(spec);
3396 }
3397
3398 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3399                                 struct rbd_spec *spec)
3400 {
3401         struct rbd_device *rbd_dev;
3402
3403         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3404         if (!rbd_dev)
3405                 return NULL;
3406
3407         spin_lock_init(&rbd_dev->lock);
3408         rbd_dev->flags = 0;
3409         INIT_LIST_HEAD(&rbd_dev->node);
3410         INIT_LIST_HEAD(&rbd_dev->snaps);
3411         init_rwsem(&rbd_dev->header_rwsem);
3412
3413         rbd_dev->spec = spec;
3414         rbd_dev->rbd_client = rbdc;
3415
3416         /* Initialize the layout used for all rbd requests */
3417
3418         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3419         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3420         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3421         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3422
3423         return rbd_dev;
3424 }
3425
3426 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3427 {
3428         rbd_put_client(rbd_dev->rbd_client);
3429         rbd_spec_put(rbd_dev->spec);
3430         kfree(rbd_dev);
3431 }
3432
3433 static void rbd_snap_destroy(struct rbd_snap *snap)
3434 {
3435         kfree(snap->name);
3436         kfree(snap);
3437 }
3438
3439 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3440                                                 const char *snap_name,
3441                                                 u64 snap_id, u64 snap_size,
3442                                                 u64 snap_features)
3443 {
3444         struct rbd_snap *snap;
3445
3446         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3447         if (!snap)
3448                 return ERR_PTR(-ENOMEM);
3449
3450         snap->name = snap_name;
3451         snap->id = snap_id;
3452         snap->size = snap_size;
3453         snap->features = snap_features;
3454
3455         return snap;
3456 }
3457
3458 /*
3459  * Returns a dynamically-allocated snapshot name if successful, or a
3460  * pointer-coded error otherwise.
3461  */
3462 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3463                 u64 *snap_size, u64 *snap_features)
3464 {
3465         char *snap_name;
3466         int i;
3467
3468         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3469
3470         /* Skip over names until we find the one we are looking for */
3471
3472         snap_name = rbd_dev->header.snap_names;
3473         for (i = 0; i < which; i++)
3474                 snap_name += strlen(snap_name) + 1;
3475
3476         snap_name = kstrdup(snap_name, GFP_KERNEL);
3477         if (!snap_name)
3478                 return ERR_PTR(-ENOMEM);
3479
3480         *snap_size = rbd_dev->header.snap_sizes[which];
3481         *snap_features = 0;     /* No features for v1 */
3482
3483         return snap_name;
3484 }
3485
3486 /*
3487  * Get the size and object order for an image snapshot, or if
3488  * snap_id is CEPH_NOSNAP, gets this information for the base
3489  * image.
3490  */
3491 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3492                                 u8 *order, u64 *snap_size)
3493 {
3494         __le64 snapid = cpu_to_le64(snap_id);
3495         int ret;
3496         struct {
3497                 u8 order;
3498                 __le64 size;
3499         } __attribute__ ((packed)) size_buf = { 0 };
3500
3501         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3502                                 "rbd", "get_size",
3503                                 &snapid, sizeof (snapid),
3504                                 &size_buf, sizeof (size_buf), NULL);
3505         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3506         if (ret < 0)
3507                 return ret;
3508         if (ret < sizeof (size_buf))
3509                 return -ERANGE;
3510
3511         if (order)
3512                 *order = size_buf.order;
3513         *snap_size = le64_to_cpu(size_buf.size);
3514
3515         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3516                 (unsigned long long)snap_id, (unsigned int)*order,
3517                 (unsigned long long)*snap_size);
3518
3519         return 0;
3520 }
3521
3522 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3523 {
3524         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3525                                         &rbd_dev->header.obj_order,
3526                                         &rbd_dev->header.image_size);
3527 }
3528
3529 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3530 {
3531         void *reply_buf;
3532         int ret;
3533         void *p;
3534
3535         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3536         if (!reply_buf)
3537                 return -ENOMEM;
3538
3539         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3540                                 "rbd", "get_object_prefix", NULL, 0,
3541                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3542         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3543         if (ret < 0)
3544                 goto out;
3545
3546         p = reply_buf;
3547         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3548                                                 p + ret, NULL, GFP_NOIO);
3549         ret = 0;
3550
3551         if (IS_ERR(rbd_dev->header.object_prefix)) {
3552                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3553                 rbd_dev->header.object_prefix = NULL;
3554         } else {
3555                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3556         }
3557 out:
3558         kfree(reply_buf);
3559
3560         return ret;
3561 }
3562
3563 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3564                 u64 *snap_features)
3565 {
3566         __le64 snapid = cpu_to_le64(snap_id);
3567         struct {
3568                 __le64 features;
3569                 __le64 incompat;
3570         } __attribute__ ((packed)) features_buf = { 0 };
3571         u64 incompat;
3572         int ret;
3573
3574         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3575                                 "rbd", "get_features",
3576                                 &snapid, sizeof (snapid),
3577                                 &features_buf, sizeof (features_buf), NULL);
3578         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3579         if (ret < 0)
3580                 return ret;
3581         if (ret < sizeof (features_buf))
3582                 return -ERANGE;
3583
3584         incompat = le64_to_cpu(features_buf.incompat);
3585         if (incompat & ~RBD_FEATURES_SUPPORTED)
3586                 return -ENXIO;
3587
3588         *snap_features = le64_to_cpu(features_buf.features);
3589
3590         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3591                 (unsigned long long)snap_id,
3592                 (unsigned long long)*snap_features,
3593                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3594
3595         return 0;
3596 }
3597
3598 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3599 {
3600         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3601                                                 &rbd_dev->header.features);
3602 }
3603
3604 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3605 {
3606         struct rbd_spec *parent_spec;
3607         size_t size;
3608         void *reply_buf = NULL;
3609         __le64 snapid;
3610         void *p;
3611         void *end;
3612         char *image_id;
3613         u64 overlap;
3614         int ret;
3615
3616         parent_spec = rbd_spec_alloc();
3617         if (!parent_spec)
3618                 return -ENOMEM;
3619
3620         size = sizeof (__le64) +                                /* pool_id */
3621                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3622                 sizeof (__le64) +                               /* snap_id */
3623                 sizeof (__le64);                                /* overlap */
3624         reply_buf = kmalloc(size, GFP_KERNEL);
3625         if (!reply_buf) {
3626                 ret = -ENOMEM;
3627                 goto out_err;
3628         }
3629
3630         snapid = cpu_to_le64(CEPH_NOSNAP);
3631         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3632                                 "rbd", "get_parent",
3633                                 &snapid, sizeof (snapid),
3634                                 reply_buf, size, NULL);
3635         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3636         if (ret < 0)
3637                 goto out_err;
3638
3639         p = reply_buf;
3640         end = reply_buf + ret;
3641         ret = -ERANGE;
3642         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3643         if (parent_spec->pool_id == CEPH_NOPOOL)
3644                 goto out;       /* No parent?  No problem. */
3645
3646         /* The ceph file layout needs to fit pool id in 32 bits */
3647
3648         ret = -EIO;
3649         if (parent_spec->pool_id > (u64)U32_MAX) {
3650                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3651                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3652                 goto out_err;
3653         }
3654
3655         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3656         if (IS_ERR(image_id)) {
3657                 ret = PTR_ERR(image_id);
3658                 goto out_err;
3659         }
3660         parent_spec->image_id = image_id;
3661         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3662         ceph_decode_64_safe(&p, end, overlap, out_err);
3663
3664         rbd_dev->parent_overlap = overlap;
3665         rbd_dev->parent_spec = parent_spec;
3666         parent_spec = NULL;     /* rbd_dev now owns this */
3667 out:
3668         ret = 0;
3669 out_err:
3670         kfree(reply_buf);
3671         rbd_spec_put(parent_spec);
3672
3673         return ret;
3674 }
3675
3676 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3677 {
3678         struct {
3679                 __le64 stripe_unit;
3680                 __le64 stripe_count;
3681         } __attribute__ ((packed)) striping_info_buf = { 0 };
3682         size_t size = sizeof (striping_info_buf);
3683         void *p;
3684         u64 obj_size;
3685         u64 stripe_unit;
3686         u64 stripe_count;
3687         int ret;
3688
3689         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3690                                 "rbd", "get_stripe_unit_count", NULL, 0,
3691                                 (char *)&striping_info_buf, size, NULL);
3692         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3693         if (ret < 0)
3694                 return ret;
3695         if (ret < size)
3696                 return -ERANGE;
3697
3698         /*
3699          * We don't actually support the "fancy striping" feature
3700          * (STRIPINGV2) yet, but if the striping sizes are the
3701          * defaults the behavior is the same as before.  So find
3702          * out, and only fail if the image has non-default values.
3703          */
3704         ret = -EINVAL;
3705         obj_size = (u64)1 << rbd_dev->header.obj_order;
3706         p = &striping_info_buf;
3707         stripe_unit = ceph_decode_64(&p);
3708         if (stripe_unit != obj_size) {
3709                 rbd_warn(rbd_dev, "unsupported stripe unit "
3710                                 "(got %llu want %llu)",
3711                                 stripe_unit, obj_size);
3712                 return -EINVAL;
3713         }
3714         stripe_count = ceph_decode_64(&p);
3715         if (stripe_count != 1) {
3716                 rbd_warn(rbd_dev, "unsupported stripe count "
3717                                 "(got %llu want 1)", stripe_count);
3718                 return -EINVAL;
3719         }
3720         rbd_dev->header.stripe_unit = stripe_unit;
3721         rbd_dev->header.stripe_count = stripe_count;
3722
3723         return 0;
3724 }
3725
3726 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3727 {
3728         size_t image_id_size;
3729         char *image_id;
3730         void *p;
3731         void *end;
3732         size_t size;
3733         void *reply_buf = NULL;
3734         size_t len = 0;
3735         char *image_name = NULL;
3736         int ret;
3737
3738         rbd_assert(!rbd_dev->spec->image_name);
3739
3740         len = strlen(rbd_dev->spec->image_id);
3741         image_id_size = sizeof (__le32) + len;
3742         image_id = kmalloc(image_id_size, GFP_KERNEL);
3743         if (!image_id)
3744                 return NULL;
3745
3746         p = image_id;
3747         end = image_id + image_id_size;
3748         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3749
3750         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3751         reply_buf = kmalloc(size, GFP_KERNEL);
3752         if (!reply_buf)
3753                 goto out;
3754
3755         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3756                                 "rbd", "dir_get_name",
3757                                 image_id, image_id_size,
3758                                 reply_buf, size, NULL);
3759         if (ret < 0)
3760                 goto out;
3761         p = reply_buf;
3762         end = reply_buf + ret;
3763
3764         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3765         if (IS_ERR(image_name))
3766                 image_name = NULL;
3767         else
3768                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3769 out:
3770         kfree(reply_buf);
3771         kfree(image_id);
3772
3773         return image_name;
3774 }
3775
3776 /*
3777  * When an rbd image has a parent image, it is identified by the
3778  * pool, image, and snapshot ids (not names).  This function fills
3779  * in the names for those ids.  (It's OK if we can't figure out the
3780  * name for an image id, but the pool and snapshot ids should always
3781  * exist and have names.)  All names in an rbd spec are dynamically
3782  * allocated.
3783  *
3784  * When an image being mapped (not a parent) is probed, we have the
3785  * pool name and pool id, image name and image id, and the snapshot
3786  * name.  The only thing we're missing is the snapshot id.
3787  *
3788  * The set of snapshots for an image is not known until they have
3789  * been read by rbd_dev_snaps_update(), so we can't completely fill
3790  * in this information until after that has been called.
3791  */
3792 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3793 {
3794         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3795         struct rbd_spec *spec = rbd_dev->spec;
3796         const char *pool_name;
3797         const char *image_name;
3798         const char *snap_name;
3799         int ret;
3800
3801         /*
3802          * An image being mapped will have the pool name (etc.), but
3803          * we need to look up the snapshot id.
3804          */
3805         if (spec->pool_name) {
3806                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3807                         struct rbd_snap *snap;
3808
3809                         snap = snap_by_name(rbd_dev, spec->snap_name);
3810                         if (!snap)
3811                                 return -ENOENT;
3812                         spec->snap_id = snap->id;
3813                 } else {
3814                         spec->snap_id = CEPH_NOSNAP;
3815                 }
3816
3817                 return 0;
3818         }
3819
3820         /* Get the pool name; we have to make our own copy of this */
3821
3822         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3823         if (!pool_name) {
3824                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3825                 return -EIO;
3826         }
3827         pool_name = kstrdup(pool_name, GFP_KERNEL);
3828         if (!pool_name)
3829                 return -ENOMEM;
3830
3831         /* Fetch the image name; tolerate failure here */
3832
3833         image_name = rbd_dev_image_name(rbd_dev);
3834         if (!image_name)
3835                 rbd_warn(rbd_dev, "unable to get image name");
3836
3837         /* Look up the snapshot name, and make a copy */
3838
3839         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3840         if (!snap_name) {
3841                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3842                 ret = -EIO;
3843                 goto out_err;
3844         }
3845         snap_name = kstrdup(snap_name, GFP_KERNEL);
3846         if (!snap_name) {
3847                 ret = -ENOMEM;
3848                 goto out_err;
3849         }
3850
3851         spec->pool_name = pool_name;
3852         spec->image_name = image_name;
3853         spec->snap_name = snap_name;
3854
3855         return 0;
3856 out_err:
3857         kfree(image_name);
3858         kfree(pool_name);
3859
3860         return ret;
3861 }
3862
3863 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3864 {
3865         size_t size;
3866         int ret;
3867         void *reply_buf;
3868         void *p;
3869         void *end;
3870         u64 seq;
3871         u32 snap_count;
3872         struct ceph_snap_context *snapc;
3873         u32 i;
3874
3875         /*
3876          * We'll need room for the seq value (maximum snapshot id),
3877          * snapshot count, and array of that many snapshot ids.
3878          * For now we have a fixed upper limit on the number we're
3879          * prepared to receive.
3880          */
3881         size = sizeof (__le64) + sizeof (__le32) +
3882                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3883         reply_buf = kzalloc(size, GFP_KERNEL);
3884         if (!reply_buf)
3885                 return -ENOMEM;
3886
3887         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3888                                 "rbd", "get_snapcontext", NULL, 0,
3889                                 reply_buf, size, ver);
3890         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3891         if (ret < 0)
3892                 goto out;
3893
3894         p = reply_buf;
3895         end = reply_buf + ret;
3896         ret = -ERANGE;
3897         ceph_decode_64_safe(&p, end, seq, out);
3898         ceph_decode_32_safe(&p, end, snap_count, out);
3899
3900         /*
3901          * Make sure the reported number of snapshot ids wouldn't go
3902          * beyond the end of our buffer.  But before checking that,
3903          * make sure the computed size of the snapshot context we
3904          * allocate is representable in a size_t.
3905          */
3906         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3907                                  / sizeof (u64)) {
3908                 ret = -EINVAL;
3909                 goto out;
3910         }
3911         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3912                 goto out;
3913         ret = 0;
3914
3915         snapc = rbd_snap_context_create(snap_count);
3916         if (!snapc) {
3917                 ret = -ENOMEM;
3918                 goto out;
3919         }
3920         snapc->seq = seq;
3921         for (i = 0; i < snap_count; i++)
3922                 snapc->snaps[i] = ceph_decode_64(&p);
3923
3924         rbd_dev->header.snapc = snapc;
3925
3926         dout("  snap context seq = %llu, snap_count = %u\n",
3927                 (unsigned long long)seq, (unsigned int)snap_count);
3928 out:
3929         kfree(reply_buf);
3930
3931         return ret;
3932 }
3933
3934 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3935 {
3936         size_t size;
3937         void *reply_buf;
3938         __le64 snap_id;
3939         int ret;
3940         void *p;
3941         void *end;
3942         char *snap_name;
3943
3944         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3945         reply_buf = kmalloc(size, GFP_KERNEL);
3946         if (!reply_buf)
3947                 return ERR_PTR(-ENOMEM);
3948
3949         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3950         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3951         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3952                                 "rbd", "get_snapshot_name",
3953                                 &snap_id, sizeof (snap_id),
3954                                 reply_buf, size, NULL);
3955         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3956         if (ret < 0) {
3957                 snap_name = ERR_PTR(ret);
3958                 goto out;
3959         }
3960
3961         p = reply_buf;
3962         end = reply_buf + ret;
3963         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3964         if (IS_ERR(snap_name))
3965                 goto out;
3966
3967         dout("  snap_id 0x%016llx snap_name = %s\n",
3968                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3969 out:
3970         kfree(reply_buf);
3971
3972         return snap_name;
3973 }
3974
3975 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3976                 u64 *snap_size, u64 *snap_features)
3977 {
3978         u64 snap_id;
3979         u64 size;
3980         u64 features;
3981         char *snap_name;
3982         int ret;
3983
3984         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3985         snap_id = rbd_dev->header.snapc->snaps[which];
3986         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3987         if (ret)
3988                 goto out_err;
3989
3990         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3991         if (ret)
3992                 goto out_err;
3993
3994         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3995         if (!IS_ERR(snap_name)) {
3996                 *snap_size = size;
3997                 *snap_features = features;
3998         }
3999
4000         return snap_name;
4001 out_err:
4002         return ERR_PTR(ret);
4003 }
4004
4005 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4006                 u64 *snap_size, u64 *snap_features)
4007 {
4008         if (rbd_dev->image_format == 1)
4009                 return rbd_dev_v1_snap_info(rbd_dev, which,
4010                                         snap_size, snap_features);
4011         if (rbd_dev->image_format == 2)
4012                 return rbd_dev_v2_snap_info(rbd_dev, which,
4013                                         snap_size, snap_features);
4014         return ERR_PTR(-EINVAL);
4015 }
4016
4017 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4018 {
4019         int ret;
4020         __u8 obj_order;
4021
4022         down_write(&rbd_dev->header_rwsem);
4023
4024         /* Grab old order first, to see if it changes */
4025
4026         obj_order = rbd_dev->header.obj_order,
4027         ret = rbd_dev_v2_image_size(rbd_dev);
4028         if (ret)
4029                 goto out;
4030         if (rbd_dev->header.obj_order != obj_order) {
4031                 ret = -EIO;
4032                 goto out;
4033         }
4034         rbd_update_mapping_size(rbd_dev);
4035
4036         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4037         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4038         if (ret)
4039                 goto out;
4040         ret = rbd_dev_snaps_update(rbd_dev);
4041         dout("rbd_dev_snaps_update returned %d\n", ret);
4042         if (ret)
4043                 goto out;
4044 out:
4045         up_write(&rbd_dev->header_rwsem);
4046
4047         return ret;
4048 }
4049
4050 /*
4051  * Scan the rbd device's current snapshot list and compare it to the
4052  * newly-received snapshot context.  Remove any existing snapshots
4053  * not present in the new snapshot context.  Add a new snapshot for
4054  * any snaphots in the snapshot context not in the current list.
4055  * And verify there are no changes to snapshots we already know
4056  * about.
4057  *
4058  * Assumes the snapshots in the snapshot context are sorted by
4059  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4060  * are also maintained in that order.)
4061  *
4062  * Note that any error occurs while updating the snapshot list
4063  * aborts the update, and the entire list is cleared.  The snapshot
4064  * list becomes inconsistent at that point anyway, so it might as
4065  * well be empty.
4066  */
4067 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4068 {
4069         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4070         const u32 snap_count = snapc->num_snaps;
4071         struct list_head *head = &rbd_dev->snaps;
4072         struct list_head *links = head->next;
4073         u32 index = 0;
4074         int ret = 0;
4075
4076         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4077         while (index < snap_count || links != head) {
4078                 u64 snap_id;
4079                 struct rbd_snap *snap;
4080                 char *snap_name;
4081                 u64 snap_size = 0;
4082                 u64 snap_features = 0;
4083
4084                 snap_id = index < snap_count ? snapc->snaps[index]
4085                                              : CEPH_NOSNAP;
4086                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4087                                      : NULL;
4088                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4089
4090                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4091                         struct list_head *next = links->next;
4092
4093                         /*
4094                          * A previously-existing snapshot is not in
4095                          * the new snap context.
4096                          *
4097                          * If the now-missing snapshot is the one
4098                          * the image represents, clear its existence
4099                          * flag so we can avoid sending any more
4100                          * requests to it.
4101                          */
4102                         if (rbd_dev->spec->snap_id == snap->id)
4103                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4104                         dout("removing %ssnap id %llu\n",
4105                                 rbd_dev->spec->snap_id == snap->id ?
4106                                                         "mapped " : "",
4107                                 (unsigned long long)snap->id);
4108
4109                         list_del(&snap->node);
4110                         rbd_snap_destroy(snap);
4111
4112                         /* Done with this list entry; advance */
4113
4114                         links = next;
4115                         continue;
4116                 }
4117
4118                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4119                                         &snap_size, &snap_features);
4120                 if (IS_ERR(snap_name)) {
4121                         ret = PTR_ERR(snap_name);
4122                         dout("failed to get snap info, error %d\n", ret);
4123                         goto out_err;
4124                 }
4125
4126                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4127                         (unsigned long long)snap_id);
4128                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4129                         struct rbd_snap *new_snap;
4130
4131                         /* We haven't seen this snapshot before */
4132
4133                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4134                                         snap_id, snap_size, snap_features);
4135                         if (IS_ERR(new_snap)) {
4136                                 ret = PTR_ERR(new_snap);
4137                                 dout("  failed to add dev, error %d\n", ret);
4138                                 goto out_err;
4139                         }
4140
4141                         /* New goes before existing, or at end of list */
4142
4143                         dout("  added dev%s\n", snap ? "" : " at end\n");
4144                         if (snap)
4145                                 list_add_tail(&new_snap->node, &snap->node);
4146                         else
4147                                 list_add_tail(&new_snap->node, head);
4148                 } else {
4149                         /* Already have this one */
4150
4151                         dout("  already present\n");
4152
4153                         rbd_assert(snap->size == snap_size);
4154                         rbd_assert(!strcmp(snap->name, snap_name));
4155                         rbd_assert(snap->features == snap_features);
4156
4157                         /* Done with this list entry; advance */
4158
4159                         links = links->next;
4160                 }
4161
4162                 /* Advance to the next entry in the snapshot context */
4163
4164                 index++;
4165         }
4166         dout("%s: done\n", __func__);
4167
4168         return 0;
4169 out_err:
4170         rbd_remove_all_snaps(rbd_dev);
4171
4172         return ret;
4173 }
4174
4175 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4176 {
4177         struct device *dev;
4178         int ret;
4179
4180         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4181
4182         dev = &rbd_dev->dev;
4183         dev->bus = &rbd_bus_type;
4184         dev->type = &rbd_device_type;
4185         dev->parent = &rbd_root_dev;
4186         dev->release = rbd_dev_release;
4187         dev_set_name(dev, "%d", rbd_dev->dev_id);
4188         ret = device_register(dev);
4189
4190         mutex_unlock(&ctl_mutex);
4191
4192         return ret;
4193 }
4194
4195 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4196 {
4197         device_unregister(&rbd_dev->dev);
4198 }
4199
4200 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4201
4202 /*
4203  * Get a unique rbd identifier for the given new rbd_dev, and add
4204  * the rbd_dev to the global list.  The minimum rbd id is 1.
4205  */
4206 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4207 {
4208         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4209
4210         spin_lock(&rbd_dev_list_lock);
4211         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4212         spin_unlock(&rbd_dev_list_lock);
4213         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4214                 (unsigned long long) rbd_dev->dev_id);
4215 }
4216
4217 /*
4218  * Remove an rbd_dev from the global list, and record that its
4219  * identifier is no longer in use.
4220  */
4221 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4222 {
4223         struct list_head *tmp;
4224         int rbd_id = rbd_dev->dev_id;
4225         int max_id;
4226
4227         rbd_assert(rbd_id > 0);
4228
4229         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4230                 (unsigned long long) rbd_dev->dev_id);
4231         spin_lock(&rbd_dev_list_lock);
4232         list_del_init(&rbd_dev->node);
4233
4234         /*
4235          * If the id being "put" is not the current maximum, there
4236          * is nothing special we need to do.
4237          */
4238         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4239                 spin_unlock(&rbd_dev_list_lock);
4240                 return;
4241         }
4242
4243         /*
4244          * We need to update the current maximum id.  Search the
4245          * list to find out what it is.  We're more likely to find
4246          * the maximum at the end, so search the list backward.
4247          */
4248         max_id = 0;
4249         list_for_each_prev(tmp, &rbd_dev_list) {
4250                 struct rbd_device *rbd_dev;
4251
4252                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4253                 if (rbd_dev->dev_id > max_id)
4254                         max_id = rbd_dev->dev_id;
4255         }
4256         spin_unlock(&rbd_dev_list_lock);
4257
4258         /*
4259          * The max id could have been updated by rbd_dev_id_get(), in
4260          * which case it now accurately reflects the new maximum.
4261          * Be careful not to overwrite the maximum value in that
4262          * case.
4263          */
4264         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4265         dout("  max dev id has been reset\n");
4266 }
4267
4268 /*
4269  * Skips over white space at *buf, and updates *buf to point to the
4270  * first found non-space character (if any). Returns the length of
4271  * the token (string of non-white space characters) found.  Note
4272  * that *buf must be terminated with '\0'.
4273  */
4274 static inline size_t next_token(const char **buf)
4275 {
4276         /*
4277         * These are the characters that produce nonzero for
4278         * isspace() in the "C" and "POSIX" locales.
4279         */
4280         const char *spaces = " \f\n\r\t\v";
4281
4282         *buf += strspn(*buf, spaces);   /* Find start of token */
4283
4284         return strcspn(*buf, spaces);   /* Return token length */
4285 }
4286
4287 /*
4288  * Finds the next token in *buf, and if the provided token buffer is
4289  * big enough, copies the found token into it.  The result, if
4290  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4291  * must be terminated with '\0' on entry.
4292  *
4293  * Returns the length of the token found (not including the '\0').
4294  * Return value will be 0 if no token is found, and it will be >=
4295  * token_size if the token would not fit.
4296  *
4297  * The *buf pointer will be updated to point beyond the end of the
4298  * found token.  Note that this occurs even if the token buffer is
4299  * too small to hold it.
4300  */
4301 static inline size_t copy_token(const char **buf,
4302                                 char *token,
4303                                 size_t token_size)
4304 {
4305         size_t len;
4306
4307         len = next_token(buf);
4308         if (len < token_size) {
4309                 memcpy(token, *buf, len);
4310                 *(token + len) = '\0';
4311         }
4312         *buf += len;
4313
4314         return len;
4315 }
4316
4317 /*
4318  * Finds the next token in *buf, dynamically allocates a buffer big
4319  * enough to hold a copy of it, and copies the token into the new
4320  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4321  * that a duplicate buffer is created even for a zero-length token.
4322  *
4323  * Returns a pointer to the newly-allocated duplicate, or a null
4324  * pointer if memory for the duplicate was not available.  If
4325  * the lenp argument is a non-null pointer, the length of the token
4326  * (not including the '\0') is returned in *lenp.
4327  *
4328  * If successful, the *buf pointer will be updated to point beyond
4329  * the end of the found token.
4330  *
4331  * Note: uses GFP_KERNEL for allocation.
4332  */
4333 static inline char *dup_token(const char **buf, size_t *lenp)
4334 {
4335         char *dup;
4336         size_t len;
4337
4338         len = next_token(buf);
4339         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4340         if (!dup)
4341                 return NULL;
4342         *(dup + len) = '\0';
4343         *buf += len;
4344
4345         if (lenp)
4346                 *lenp = len;
4347
4348         return dup;
4349 }
4350
4351 /*
4352  * Parse the options provided for an "rbd add" (i.e., rbd image
4353  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4354  * and the data written is passed here via a NUL-terminated buffer.
4355  * Returns 0 if successful or an error code otherwise.
4356  *
4357  * The information extracted from these options is recorded in
4358  * the other parameters which return dynamically-allocated
4359  * structures:
4360  *  ceph_opts
4361  *      The address of a pointer that will refer to a ceph options
4362  *      structure.  Caller must release the returned pointer using
4363  *      ceph_destroy_options() when it is no longer needed.
4364  *  rbd_opts
4365  *      Address of an rbd options pointer.  Fully initialized by
4366  *      this function; caller must release with kfree().
4367  *  spec
4368  *      Address of an rbd image specification pointer.  Fully
4369  *      initialized by this function based on parsed options.
4370  *      Caller must release with rbd_spec_put().
4371  *
4372  * The options passed take this form:
4373  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4374  * where:
4375  *  <mon_addrs>
4376  *      A comma-separated list of one or more monitor addresses.
4377  *      A monitor address is an ip address, optionally followed
4378  *      by a port number (separated by a colon).
4379  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4380  *  <options>
4381  *      A comma-separated list of ceph and/or rbd options.
4382  *  <pool_name>
4383  *      The name of the rados pool containing the rbd image.
4384  *  <image_name>
4385  *      The name of the image in that pool to map.
4386  *  <snap_id>
4387  *      An optional snapshot id.  If provided, the mapping will
4388  *      present data from the image at the time that snapshot was
4389  *      created.  The image head is used if no snapshot id is
4390  *      provided.  Snapshot mappings are always read-only.
4391  */
4392 static int rbd_add_parse_args(const char *buf,
4393                                 struct ceph_options **ceph_opts,
4394                                 struct rbd_options **opts,
4395                                 struct rbd_spec **rbd_spec)
4396 {
4397         size_t len;
4398         char *options;
4399         const char *mon_addrs;
4400         char *snap_name;
4401         size_t mon_addrs_size;
4402         struct rbd_spec *spec = NULL;
4403         struct rbd_options *rbd_opts = NULL;
4404         struct ceph_options *copts;
4405         int ret;
4406
4407         /* The first four tokens are required */
4408
4409         len = next_token(&buf);
4410         if (!len) {
4411                 rbd_warn(NULL, "no monitor address(es) provided");
4412                 return -EINVAL;
4413         }
4414         mon_addrs = buf;
4415         mon_addrs_size = len + 1;
4416         buf += len;
4417
4418         ret = -EINVAL;
4419         options = dup_token(&buf, NULL);
4420         if (!options)
4421                 return -ENOMEM;
4422         if (!*options) {
4423                 rbd_warn(NULL, "no options provided");
4424                 goto out_err;
4425         }
4426
4427         spec = rbd_spec_alloc();
4428         if (!spec)
4429                 goto out_mem;
4430
4431         spec->pool_name = dup_token(&buf, NULL);
4432         if (!spec->pool_name)
4433                 goto out_mem;
4434         if (!*spec->pool_name) {
4435                 rbd_warn(NULL, "no pool name provided");
4436                 goto out_err;
4437         }
4438
4439         spec->image_name = dup_token(&buf, NULL);
4440         if (!spec->image_name)
4441                 goto out_mem;
4442         if (!*spec->image_name) {
4443                 rbd_warn(NULL, "no image name provided");
4444                 goto out_err;
4445         }
4446
4447         /*
4448          * Snapshot name is optional; default is to use "-"
4449          * (indicating the head/no snapshot).
4450          */
4451         len = next_token(&buf);
4452         if (!len) {
4453                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4454                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4455         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4456                 ret = -ENAMETOOLONG;
4457                 goto out_err;
4458         }
4459         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4460         if (!snap_name)
4461                 goto out_mem;
4462         *(snap_name + len) = '\0';
4463         spec->snap_name = snap_name;
4464
4465         /* Initialize all rbd options to the defaults */
4466
4467         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4468         if (!rbd_opts)
4469                 goto out_mem;
4470
4471         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4472
4473         copts = ceph_parse_options(options, mon_addrs,
4474                                         mon_addrs + mon_addrs_size - 1,
4475                                         parse_rbd_opts_token, rbd_opts);
4476         if (IS_ERR(copts)) {
4477                 ret = PTR_ERR(copts);
4478                 goto out_err;
4479         }
4480         kfree(options);
4481
4482         *ceph_opts = copts;
4483         *opts = rbd_opts;
4484         *rbd_spec = spec;
4485
4486         return 0;
4487 out_mem:
4488         ret = -ENOMEM;
4489 out_err:
4490         kfree(rbd_opts);
4491         rbd_spec_put(spec);
4492         kfree(options);
4493
4494         return ret;
4495 }
4496
4497 /*
4498  * An rbd format 2 image has a unique identifier, distinct from the
4499  * name given to it by the user.  Internally, that identifier is
4500  * what's used to specify the names of objects related to the image.
4501  *
4502  * A special "rbd id" object is used to map an rbd image name to its
4503  * id.  If that object doesn't exist, then there is no v2 rbd image
4504  * with the supplied name.
4505  *
4506  * This function will record the given rbd_dev's image_id field if
4507  * it can be determined, and in that case will return 0.  If any
4508  * errors occur a negative errno will be returned and the rbd_dev's
4509  * image_id field will be unchanged (and should be NULL).
4510  */
4511 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4512 {
4513         int ret;
4514         size_t size;
4515         char *object_name;
4516         void *response;
4517         char *image_id;
4518
4519         /*
4520          * When probing a parent image, the image id is already
4521          * known (and the image name likely is not).  There's no
4522          * need to fetch the image id again in this case.  We
4523          * do still need to set the image format though.
4524          */
4525         if (rbd_dev->spec->image_id) {
4526                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4527
4528                 return 0;
4529         }
4530
4531         /*
4532          * First, see if the format 2 image id file exists, and if
4533          * so, get the image's persistent id from it.
4534          */
4535         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4536         object_name = kmalloc(size, GFP_NOIO);
4537         if (!object_name)
4538                 return -ENOMEM;
4539         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4540         dout("rbd id object name is %s\n", object_name);
4541
4542         /* Response will be an encoded string, which includes a length */
4543
4544         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4545         response = kzalloc(size, GFP_NOIO);
4546         if (!response) {
4547                 ret = -ENOMEM;
4548                 goto out;
4549         }
4550
4551         /* If it doesn't exist we'll assume it's a format 1 image */
4552
4553         ret = rbd_obj_method_sync(rbd_dev, object_name,
4554                                 "rbd", "get_id", NULL, 0,
4555                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4556         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4557         if (ret == -ENOENT) {
4558                 image_id = kstrdup("", GFP_KERNEL);
4559                 ret = image_id ? 0 : -ENOMEM;
4560                 if (!ret)
4561                         rbd_dev->image_format = 1;
4562         } else if (ret > sizeof (__le32)) {
4563                 void *p = response;
4564
4565                 image_id = ceph_extract_encoded_string(&p, p + ret,
4566                                                 NULL, GFP_NOIO);
4567                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4568                 if (!ret)
4569                         rbd_dev->image_format = 2;
4570         } else {
4571                 ret = -EINVAL;
4572         }
4573
4574         if (!ret) {
4575                 rbd_dev->spec->image_id = image_id;
4576                 dout("image_id is %s\n", image_id);
4577         }
4578 out:
4579         kfree(response);
4580         kfree(object_name);
4581
4582         return ret;
4583 }
4584
4585 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4586 {
4587         int ret;
4588         size_t size;
4589
4590         /* Record the header object name for this rbd image. */
4591
4592         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4593         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4594         if (!rbd_dev->header_name) {
4595                 ret = -ENOMEM;
4596                 goto out_err;
4597         }
4598         sprintf(rbd_dev->header_name, "%s%s",
4599                 rbd_dev->spec->image_name, RBD_SUFFIX);
4600
4601         /* Populate rbd image metadata */
4602
4603         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4604         if (ret < 0)
4605                 goto out_err;
4606
4607         /* Version 1 images have no parent (no layering) */
4608
4609         rbd_dev->parent_spec = NULL;
4610         rbd_dev->parent_overlap = 0;
4611
4612         dout("discovered version 1 image, header name is %s\n",
4613                 rbd_dev->header_name);
4614
4615         return 0;
4616
4617 out_err:
4618         kfree(rbd_dev->header_name);
4619         rbd_dev->header_name = NULL;
4620         kfree(rbd_dev->spec->image_id);
4621         rbd_dev->spec->image_id = NULL;
4622
4623         return ret;
4624 }
4625
4626 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4627 {
4628         size_t size;
4629         int ret;
4630         u64 ver = 0;
4631
4632         /*
4633          * Image id was filled in by the caller.  Record the header
4634          * object name for this rbd image.
4635          */
4636         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4637         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4638         if (!rbd_dev->header_name)
4639                 return -ENOMEM;
4640         sprintf(rbd_dev->header_name, "%s%s",
4641                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4642
4643         /* Get the size and object order for the image */
4644         ret = rbd_dev_v2_image_size(rbd_dev);
4645         if (ret)
4646                 goto out_err;
4647
4648         /* Get the object prefix (a.k.a. block_name) for the image */
4649
4650         ret = rbd_dev_v2_object_prefix(rbd_dev);
4651         if (ret)
4652                 goto out_err;
4653
4654         /* Get the and check features for the image */
4655
4656         ret = rbd_dev_v2_features(rbd_dev);
4657         if (ret)
4658                 goto out_err;
4659
4660         /* If the image supports layering, get the parent info */
4661
4662         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4663                 ret = rbd_dev_v2_parent_info(rbd_dev);
4664                 if (ret)
4665                         goto out_err;
4666                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4667                                         "layered rbd images is EXPERIMENTAL!");
4668         }
4669
4670         /* If the image supports fancy striping, get its parameters */
4671
4672         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4673                 ret = rbd_dev_v2_striping_info(rbd_dev);
4674                 if (ret < 0)
4675                         goto out_err;
4676         }
4677
4678         /* crypto and compression type aren't (yet) supported for v2 images */
4679
4680         rbd_dev->header.crypt_type = 0;
4681         rbd_dev->header.comp_type = 0;
4682
4683         /* Get the snapshot context, plus the header version */
4684
4685         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4686         if (ret)
4687                 goto out_err;
4688         rbd_dev->header.obj_version = ver;
4689
4690         dout("discovered version 2 image, header name is %s\n",
4691                 rbd_dev->header_name);
4692
4693         return 0;
4694 out_err:
4695         rbd_dev->parent_overlap = 0;
4696         rbd_spec_put(rbd_dev->parent_spec);
4697         rbd_dev->parent_spec = NULL;
4698         kfree(rbd_dev->header_name);
4699         rbd_dev->header_name = NULL;
4700         kfree(rbd_dev->header.object_prefix);
4701         rbd_dev->header.object_prefix = NULL;
4702
4703         return ret;
4704 }
4705
4706 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4707 {
4708         struct rbd_device *parent = NULL;
4709         struct rbd_spec *parent_spec;
4710         struct rbd_client *rbdc;
4711         int ret;
4712
4713         if (!rbd_dev->parent_spec)
4714                 return 0;
4715         /*
4716          * We need to pass a reference to the client and the parent
4717          * spec when creating the parent rbd_dev.  Images related by
4718          * parent/child relationships always share both.
4719          */
4720         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4721         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4722
4723         ret = -ENOMEM;
4724         parent = rbd_dev_create(rbdc, parent_spec);
4725         if (!parent)
4726                 goto out_err;
4727
4728         ret = rbd_dev_image_probe(parent);
4729         if (ret < 0)
4730                 goto out_err;
4731         rbd_dev->parent = parent;
4732
4733         return 0;
4734 out_err:
4735         if (parent) {
4736                 rbd_spec_put(rbd_dev->parent_spec);
4737                 kfree(rbd_dev->header_name);
4738                 rbd_dev_destroy(parent);
4739         } else {
4740                 rbd_put_client(rbdc);
4741                 rbd_spec_put(parent_spec);
4742         }
4743
4744         return ret;
4745 }
4746
4747 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4748 {
4749         int ret;
4750
4751         /* no need to lock here, as rbd_dev is not registered yet */
4752         ret = rbd_dev_snaps_update(rbd_dev);
4753         if (ret)
4754                 return ret;
4755
4756         ret = rbd_dev_spec_update(rbd_dev);
4757         if (ret)
4758                 goto err_out_snaps;
4759
4760         ret = rbd_dev_set_mapping(rbd_dev);
4761         if (ret)
4762                 goto err_out_snaps;
4763
4764         /* generate unique id: find highest unique id, add one */
4765         rbd_dev_id_get(rbd_dev);
4766
4767         /* Fill in the device name, now that we have its id. */
4768         BUILD_BUG_ON(DEV_NAME_LEN
4769                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4770         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4771
4772         /* Get our block major device number. */
4773
4774         ret = register_blkdev(0, rbd_dev->name);
4775         if (ret < 0)
4776                 goto err_out_id;
4777         rbd_dev->major = ret;
4778
4779         /* Set up the blkdev mapping. */
4780
4781         ret = rbd_init_disk(rbd_dev);
4782         if (ret)
4783                 goto err_out_blkdev;
4784
4785         ret = rbd_bus_add_dev(rbd_dev);
4786         if (ret)
4787                 goto err_out_disk;
4788
4789         ret = rbd_dev_probe_parent(rbd_dev);
4790         if (ret)
4791                 goto err_out_bus;
4792
4793         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4794         if (ret)
4795                 goto err_out_bus;
4796
4797         /* Everything's ready.  Announce the disk to the world. */
4798
4799         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4800         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4801         add_disk(rbd_dev->disk);
4802
4803         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4804                 (unsigned long long) rbd_dev->mapping.size);
4805
4806         return ret;
4807
4808 err_out_bus:
4809         /* this will also clean up rest of rbd_dev stuff */
4810
4811         rbd_bus_del_dev(rbd_dev);
4812
4813         return ret;
4814 err_out_disk:
4815         rbd_free_disk(rbd_dev);
4816 err_out_blkdev:
4817         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4818 err_out_id:
4819         rbd_dev_id_put(rbd_dev);
4820 err_out_snaps:
4821         rbd_remove_all_snaps(rbd_dev);
4822
4823         return ret;
4824 }
4825
4826 /*
4827  * Probe for the existence of the header object for the given rbd
4828  * device.  For format 2 images this includes determining the image
4829  * id.
4830  */
4831 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4832 {
4833         int ret;
4834
4835         /*
4836          * Get the id from the image id object.  If it's not a
4837          * format 2 image, we'll get ENOENT back, and we'll assume
4838          * it's a format 1 image.
4839          */
4840         ret = rbd_dev_image_id(rbd_dev);
4841         if (ret)
4842                 return ret;
4843         rbd_assert(rbd_dev->spec->image_id);
4844         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4845
4846         if (rbd_dev->image_format == 1)
4847                 ret = rbd_dev_v1_probe(rbd_dev);
4848         else
4849                 ret = rbd_dev_v2_probe(rbd_dev);
4850         if (ret)
4851                 goto out_err;
4852
4853         ret = rbd_dev_probe_finish(rbd_dev);
4854         if (ret)
4855                 rbd_header_free(&rbd_dev->header);
4856
4857         return ret;
4858 out_err:
4859         kfree(rbd_dev->spec->image_id);
4860         rbd_dev->spec->image_id = NULL;
4861
4862         dout("probe failed, returning %d\n", ret);
4863
4864         return ret;
4865 }
4866
4867 static ssize_t rbd_add(struct bus_type *bus,
4868                        const char *buf,
4869                        size_t count)
4870 {
4871         struct rbd_device *rbd_dev = NULL;
4872         struct ceph_options *ceph_opts = NULL;
4873         struct rbd_options *rbd_opts = NULL;
4874         struct rbd_spec *spec = NULL;
4875         struct rbd_client *rbdc;
4876         struct ceph_osd_client *osdc;
4877         int rc = -ENOMEM;
4878
4879         if (!try_module_get(THIS_MODULE))
4880                 return -ENODEV;
4881
4882         /* parse add command */
4883         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4884         if (rc < 0)
4885                 goto err_out_module;
4886
4887         rbdc = rbd_get_client(ceph_opts);
4888         if (IS_ERR(rbdc)) {
4889                 rc = PTR_ERR(rbdc);
4890                 goto err_out_args;
4891         }
4892         ceph_opts = NULL;       /* rbd_dev client now owns this */
4893
4894         /* pick the pool */
4895         osdc = &rbdc->client->osdc;
4896         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4897         if (rc < 0)
4898                 goto err_out_client;
4899         spec->pool_id = (u64)rc;
4900
4901         /* The ceph file layout needs to fit pool id in 32 bits */
4902
4903         if (spec->pool_id > (u64)U32_MAX) {
4904                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4905                                 (unsigned long long)spec->pool_id, U32_MAX);
4906                 rc = -EIO;
4907                 goto err_out_client;
4908         }
4909
4910         rbd_dev = rbd_dev_create(rbdc, spec);
4911         if (!rbd_dev)
4912                 goto err_out_client;
4913         rbdc = NULL;            /* rbd_dev now owns this */
4914         spec = NULL;            /* rbd_dev now owns this */
4915
4916         rbd_dev->mapping.read_only = rbd_opts->read_only;
4917         kfree(rbd_opts);
4918         rbd_opts = NULL;        /* done with this */
4919
4920         rc = rbd_dev_image_probe(rbd_dev);
4921         if (rc < 0)
4922                 goto err_out_rbd_dev;
4923
4924         return count;
4925 err_out_rbd_dev:
4926         rbd_spec_put(rbd_dev->parent_spec);
4927         kfree(rbd_dev->header_name);
4928         rbd_dev_destroy(rbd_dev);
4929 err_out_client:
4930         rbd_put_client(rbdc);
4931 err_out_args:
4932         if (ceph_opts)
4933                 ceph_destroy_options(ceph_opts);
4934         kfree(rbd_opts);
4935         rbd_spec_put(spec);
4936 err_out_module:
4937         module_put(THIS_MODULE);
4938
4939         dout("Error adding device %s\n", buf);
4940
4941         return (ssize_t)rc;
4942 }
4943
4944 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4945 {
4946         struct list_head *tmp;
4947         struct rbd_device *rbd_dev;
4948
4949         spin_lock(&rbd_dev_list_lock);
4950         list_for_each(tmp, &rbd_dev_list) {
4951                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4952                 if (rbd_dev->dev_id == dev_id) {
4953                         spin_unlock(&rbd_dev_list_lock);
4954                         return rbd_dev;
4955                 }
4956         }
4957         spin_unlock(&rbd_dev_list_lock);
4958         return NULL;
4959 }
4960
4961 static void rbd_dev_release(struct device *dev)
4962 {
4963         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4964
4965         if (rbd_dev->watch_event)
4966                 rbd_dev_header_watch_sync(rbd_dev, 0);
4967
4968         /* clean up and free blkdev */
4969         rbd_free_disk(rbd_dev);
4970         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4971
4972         /* release allocated disk header fields */
4973         rbd_header_free(&rbd_dev->header);
4974
4975         /* done with the id, and with the rbd_dev */
4976         rbd_dev_id_put(rbd_dev);
4977         rbd_assert(rbd_dev->rbd_client != NULL);
4978         rbd_spec_put(rbd_dev->parent_spec);
4979         kfree(rbd_dev->header_name);
4980         rbd_dev_destroy(rbd_dev);
4981
4982         /* release module ref */
4983         module_put(THIS_MODULE);
4984 }
4985
4986 static void __rbd_remove(struct rbd_device *rbd_dev)
4987 {
4988         rbd_remove_all_snaps(rbd_dev);
4989         rbd_bus_del_dev(rbd_dev);
4990 }
4991
4992 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4993 {
4994         while (rbd_dev->parent_spec) {
4995                 struct rbd_device *first = rbd_dev;
4996                 struct rbd_device *second = first->parent;
4997                 struct rbd_device *third;
4998
4999                 /*
5000                  * Follow to the parent with no grandparent and
5001                  * remove it.
5002                  */
5003                 while (second && (third = second->parent)) {
5004                         first = second;
5005                         second = third;
5006                 }
5007                 __rbd_remove(second);
5008                 rbd_spec_put(first->parent_spec);
5009                 first->parent_spec = NULL;
5010                 first->parent_overlap = 0;
5011                 first->parent = NULL;
5012         }
5013 }
5014
5015 static ssize_t rbd_remove(struct bus_type *bus,
5016                           const char *buf,
5017                           size_t count)
5018 {
5019         struct rbd_device *rbd_dev = NULL;
5020         int target_id, rc;
5021         unsigned long ul;
5022         int ret = count;
5023
5024         rc = strict_strtoul(buf, 10, &ul);
5025         if (rc)
5026                 return rc;
5027
5028         /* convert to int; abort if we lost anything in the conversion */
5029         target_id = (int) ul;
5030         if (target_id != ul)
5031                 return -EINVAL;
5032
5033         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5034
5035         rbd_dev = __rbd_get_dev(target_id);
5036         if (!rbd_dev) {
5037                 ret = -ENOENT;
5038                 goto done;
5039         }
5040
5041         spin_lock_irq(&rbd_dev->lock);
5042         if (rbd_dev->open_count)
5043                 ret = -EBUSY;
5044         else
5045                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5046         spin_unlock_irq(&rbd_dev->lock);
5047         if (ret < 0)
5048                 goto done;
5049
5050         rbd_dev_remove_parent(rbd_dev);
5051
5052         __rbd_remove(rbd_dev);
5053
5054 done:
5055         mutex_unlock(&ctl_mutex);
5056
5057         return ret;
5058 }
5059
5060 /*
5061  * create control files in sysfs
5062  * /sys/bus/rbd/...
5063  */
5064 static int rbd_sysfs_init(void)
5065 {
5066         int ret;
5067
5068         ret = device_register(&rbd_root_dev);
5069         if (ret < 0)
5070                 return ret;
5071
5072         ret = bus_register(&rbd_bus_type);
5073         if (ret < 0)
5074                 device_unregister(&rbd_root_dev);
5075
5076         return ret;
5077 }
5078
5079 static void rbd_sysfs_cleanup(void)
5080 {
5081         bus_unregister(&rbd_bus_type);
5082         device_unregister(&rbd_root_dev);
5083 }
5084
5085 static int __init rbd_init(void)
5086 {
5087         int rc;
5088
5089         if (!libceph_compatible(NULL)) {
5090                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5091
5092                 return -EINVAL;
5093         }
5094         rc = rbd_sysfs_init();
5095         if (rc)
5096                 return rc;
5097         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5098         return 0;
5099 }
5100
5101 static void __exit rbd_exit(void)
5102 {
5103         rbd_sysfs_cleanup();
5104 }
5105
5106 module_init(rbd_init);
5107 module_exit(rbd_exit);
5108
5109 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5110 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5111 MODULE_DESCRIPTION("rados block device");
5112
5113 /* following authorship retained from original osdblk.c */
5114 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5115
5116 MODULE_LICENSE("GPL");