]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: drop rbd_obj_method_sync() version parameter
[~andy/linux] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
40 #include <linux/fs.h>
41 #include <linux/blkdev.h>
42
43 #include "rbd_types.h"
44
45 #define RBD_DEBUG       /* Activate rbd_assert() calls */
46
47 /*
48  * The basic unit of block I/O is a sector.  It is interpreted in a
49  * number of contexts in Linux (blk, bio, genhd), but the default is
50  * universally 512 bytes.  These symbols are just slightly more
51  * meaningful than the bare numbers they represent.
52  */
53 #define SECTOR_SHIFT    9
54 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
55
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58
59 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
60
61 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN   \
63                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /* This allows a single page to hold an image name sent by OSD */
70 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
71 #define RBD_IMAGE_ID_LEN_MAX    64
72
73 #define RBD_OBJ_PREFIX_LEN_MAX  64
74
75 /* Feature bits */
76
77 #define RBD_FEATURE_LAYERING    (1<<0)
78 #define RBD_FEATURE_STRIPINGV2  (1<<1)
79 #define RBD_FEATURES_ALL \
80             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81
82 /* Features supported by this (client software) implementation. */
83
84 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
85
86 /*
87  * An RBD device name will be "rbd#", where the "rbd" comes from
88  * RBD_DRV_NAME above, and # is a unique integer identifier.
89  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
90  * enough to hold all possible device names.
91  */
92 #define DEV_NAME_LEN            32
93 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 stripe_unit;
113         u64 stripe_count;
114 };
115
116 /*
117  * An rbd image specification.
118  *
119  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
120  * identify an image.  Each rbd_dev structure includes a pointer to
121  * an rbd_spec structure that encapsulates this identity.
122  *
123  * Each of the id's in an rbd_spec has an associated name.  For a
124  * user-mapped image, the names are supplied and the id's associated
125  * with them are looked up.  For a layered image, a parent image is
126  * defined by the tuple, and the names are looked up.
127  *
128  * An rbd_dev structure contains a parent_spec pointer which is
129  * non-null if the image it represents is a child in a layered
130  * image.  This pointer will refer to the rbd_spec structure used
131  * by the parent rbd_dev for its own identity (i.e., the structure
132  * is shared between the parent and child).
133  *
134  * Since these structures are populated once, during the discovery
135  * phase of image construction, they are effectively immutable so
136  * we make no effort to synchronize access to them.
137  *
138  * Note that code herein does not assume the image name is known (it
139  * could be a null pointer).
140  */
141 struct rbd_spec {
142         u64             pool_id;
143         const char      *pool_name;
144
145         const char      *image_id;
146         const char      *image_name;
147
148         u64             snap_id;
149         const char      *snap_name;
150
151         struct kref     kref;
152 };
153
154 /*
155  * an instance of the client.  multiple devices may share an rbd client.
156  */
157 struct rbd_client {
158         struct ceph_client      *client;
159         struct kref             kref;
160         struct list_head        node;
161 };
162
163 struct rbd_img_request;
164 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
165
166 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
167
168 struct rbd_obj_request;
169 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
170
171 enum obj_request_type {
172         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
173 };
174
175 enum obj_req_flags {
176         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
177         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
178         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
179         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
180 };
181
182 struct rbd_obj_request {
183         const char              *object_name;
184         u64                     offset;         /* object start byte */
185         u64                     length;         /* bytes from offset */
186         unsigned long           flags;
187
188         /*
189          * An object request associated with an image will have its
190          * img_data flag set; a standalone object request will not.
191          *
192          * A standalone object request will have which == BAD_WHICH
193          * and a null obj_request pointer.
194          *
195          * An object request initiated in support of a layered image
196          * object (to check for its existence before a write) will
197          * have which == BAD_WHICH and a non-null obj_request pointer.
198          *
199          * Finally, an object request for rbd image data will have
200          * which != BAD_WHICH, and will have a non-null img_request
201          * pointer.  The value of which will be in the range
202          * 0..(img_request->obj_request_count-1).
203          */
204         union {
205                 struct rbd_obj_request  *obj_request;   /* STAT op */
206                 struct {
207                         struct rbd_img_request  *img_request;
208                         u64                     img_offset;
209                         /* links for img_request->obj_requests list */
210                         struct list_head        links;
211                 };
212         };
213         u32                     which;          /* posn image request list */
214
215         enum obj_request_type   type;
216         union {
217                 struct bio      *bio_list;
218                 struct {
219                         struct page     **pages;
220                         u32             page_count;
221                 };
222         };
223         struct page             **copyup_pages;
224
225         struct ceph_osd_request *osd_req;
226
227         u64                     xferred;        /* bytes transferred */
228         u64                     version;
229         int                     result;
230
231         rbd_obj_callback_t      callback;
232         struct completion       completion;
233
234         struct kref             kref;
235 };
236
237 enum img_req_flags {
238         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
239         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
240         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
241 };
242
243 struct rbd_img_request {
244         struct rbd_device       *rbd_dev;
245         u64                     offset; /* starting image byte offset */
246         u64                     length; /* byte count from offset */
247         unsigned long           flags;
248         union {
249                 u64                     snap_id;        /* for reads */
250                 struct ceph_snap_context *snapc;        /* for writes */
251         };
252         union {
253                 struct request          *rq;            /* block request */
254                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
255         };
256         struct page             **copyup_pages;
257         spinlock_t              completion_lock;/* protects next_completion */
258         u32                     next_completion;
259         rbd_img_callback_t      callback;
260         u64                     xferred;/* aggregate bytes transferred */
261         int                     result; /* first nonzero obj_request result */
262
263         u32                     obj_request_count;
264         struct list_head        obj_requests;   /* rbd_obj_request structs */
265
266         struct kref             kref;
267 };
268
269 #define for_each_obj_request(ireq, oreq) \
270         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_from(ireq, oreq) \
272         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_safe(ireq, oreq, n) \
274         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
275
276 struct rbd_snap {
277         const char              *name;
278         u64                     size;
279         struct list_head        node;
280         u64                     id;
281         u64                     features;
282 };
283
284 struct rbd_mapping {
285         u64                     size;
286         u64                     features;
287         bool                    read_only;
288 };
289
290 /*
291  * a single device
292  */
293 struct rbd_device {
294         int                     dev_id;         /* blkdev unique id */
295
296         int                     major;          /* blkdev assigned major */
297         struct gendisk          *disk;          /* blkdev's gendisk and rq */
298
299         u32                     image_format;   /* Either 1 or 2 */
300         struct rbd_client       *rbd_client;
301
302         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
303
304         spinlock_t              lock;           /* queue, flags, open_count */
305
306         struct rbd_image_header header;
307         unsigned long           flags;          /* possibly lock protected */
308         struct rbd_spec         *spec;
309
310         char                    *header_name;
311
312         struct ceph_file_layout layout;
313
314         struct ceph_osd_event   *watch_event;
315         struct rbd_obj_request  *watch_request;
316
317         struct rbd_spec         *parent_spec;
318         u64                     parent_overlap;
319         struct rbd_device       *parent;
320
321         /* protects updating the header */
322         struct rw_semaphore     header_rwsem;
323
324         struct rbd_mapping      mapping;
325
326         struct list_head        node;
327
328         /* list of snapshots */
329         struct list_head        snaps;
330
331         /* sysfs related */
332         struct device           dev;
333         unsigned long           open_count;     /* protected by lock */
334 };
335
336 /*
337  * Flag bits for rbd_dev->flags.  If atomicity is required,
338  * rbd_dev->lock is used to protect access.
339  *
340  * Currently, only the "removing" flag (which is coupled with the
341  * "open_count" field) requires atomic access.
342  */
343 enum rbd_dev_flags {
344         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
345         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
346 };
347
348 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
349
350 static LIST_HEAD(rbd_dev_list);    /* devices */
351 static DEFINE_SPINLOCK(rbd_dev_list_lock);
352
353 static LIST_HEAD(rbd_client_list);              /* clients */
354 static DEFINE_SPINLOCK(rbd_client_list_lock);
355
356 static int rbd_img_request_submit(struct rbd_img_request *img_request);
357
358 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
359
360 static void rbd_dev_device_release(struct device *dev);
361 static void rbd_snap_destroy(struct rbd_snap *snap);
362
363 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364                        size_t count);
365 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366                           size_t count);
367 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
368
369 static struct bus_attribute rbd_bus_attrs[] = {
370         __ATTR(add, S_IWUSR, NULL, rbd_add),
371         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
372         __ATTR_NULL
373 };
374
375 static struct bus_type rbd_bus_type = {
376         .name           = "rbd",
377         .bus_attrs      = rbd_bus_attrs,
378 };
379
380 static void rbd_root_dev_release(struct device *dev)
381 {
382 }
383
384 static struct device rbd_root_dev = {
385         .init_name =    "rbd",
386         .release =      rbd_root_dev_release,
387 };
388
389 static __printf(2, 3)
390 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391 {
392         struct va_format vaf;
393         va_list args;
394
395         va_start(args, fmt);
396         vaf.fmt = fmt;
397         vaf.va = &args;
398
399         if (!rbd_dev)
400                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
401         else if (rbd_dev->disk)
402                 printk(KERN_WARNING "%s: %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
404         else if (rbd_dev->spec && rbd_dev->spec->image_name)
405                 printk(KERN_WARNING "%s: image %s: %pV\n",
406                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
407         else if (rbd_dev->spec && rbd_dev->spec->image_id)
408                 printk(KERN_WARNING "%s: id %s: %pV\n",
409                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410         else    /* punt */
411                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
412                         RBD_DRV_NAME, rbd_dev, &vaf);
413         va_end(args);
414 }
415
416 #ifdef RBD_DEBUG
417 #define rbd_assert(expr)                                                \
418                 if (unlikely(!(expr))) {                                \
419                         printk(KERN_ERR "\nAssertion failure in %s() "  \
420                                                 "at line %d:\n\n"       \
421                                         "\trbd_assert(%s);\n\n",        \
422                                         __func__, __LINE__, #expr);     \
423                         BUG();                                          \
424                 }
425 #else /* !RBD_DEBUG */
426 #  define rbd_assert(expr)      ((void) 0)
427 #endif /* !RBD_DEBUG */
428
429 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680         if (rbdc)
681                 kref_put(&rbdc->kref, rbd_client_release);
682 }
683
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686         return image_format == 1 || image_format == 2;
687 }
688
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691         size_t size;
692         u32 snap_count;
693
694         /* The header has to start with the magic rbd header text */
695         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696                 return false;
697
698         /* The bio layer requires at least sector-sized I/O */
699
700         if (ondisk->options.order < SECTOR_SHIFT)
701                 return false;
702
703         /* If we use u64 in a few spots we may be able to loosen this */
704
705         if (ondisk->options.order > 8 * sizeof (int) - 1)
706                 return false;
707
708         /*
709          * The size of a snapshot header has to fit in a size_t, and
710          * that limits the number of snapshots.
711          */
712         snap_count = le32_to_cpu(ondisk->snap_count);
713         size = SIZE_MAX - sizeof (struct ceph_snap_context);
714         if (snap_count > size / sizeof (__le64))
715                 return false;
716
717         /*
718          * Not only that, but the size of the entire the snapshot
719          * header must also be representable in a size_t.
720          */
721         size -= snap_count * sizeof (__le64);
722         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723                 return false;
724
725         return true;
726 }
727
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733                                  struct rbd_image_header_ondisk *ondisk)
734 {
735         u32 snap_count;
736         size_t len;
737         size_t size;
738         u32 i;
739
740         memset(header, 0, sizeof (*header));
741
742         snap_count = le32_to_cpu(ondisk->snap_count);
743
744         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746         if (!header->object_prefix)
747                 return -ENOMEM;
748         memcpy(header->object_prefix, ondisk->object_prefix, len);
749         header->object_prefix[len] = '\0';
750
751         if (snap_count) {
752                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754                 /* Save a copy of the snapshot names */
755
756                 if (snap_names_len > (u64) SIZE_MAX)
757                         return -EIO;
758                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759                 if (!header->snap_names)
760                         goto out_err;
761                 /*
762                  * Note that rbd_dev_v1_header_read() guarantees
763                  * the ondisk buffer we're working with has
764                  * snap_names_len bytes beyond the end of the
765                  * snapshot id array, this memcpy() is safe.
766                  */
767                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768                         snap_names_len);
769
770                 /* Record each snapshot's size */
771
772                 size = snap_count * sizeof (*header->snap_sizes);
773                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774                 if (!header->snap_sizes)
775                         goto out_err;
776                 for (i = 0; i < snap_count; i++)
777                         header->snap_sizes[i] =
778                                 le64_to_cpu(ondisk->snaps[i].image_size);
779         } else {
780                 header->snap_names = NULL;
781                 header->snap_sizes = NULL;
782         }
783
784         header->features = 0;   /* No features support in v1 images */
785         header->obj_order = ondisk->options.order;
786         header->crypt_type = ondisk->options.crypt_type;
787         header->comp_type = ondisk->options.comp_type;
788
789         /* Allocate and fill in the snapshot context */
790
791         header->image_size = le64_to_cpu(ondisk->image_size);
792
793         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
794         if (!header->snapc)
795                 goto out_err;
796         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
797         for (i = 0; i < snap_count; i++)
798                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
799
800         return 0;
801
802 out_err:
803         kfree(header->snap_sizes);
804         header->snap_sizes = NULL;
805         kfree(header->snap_names);
806         header->snap_names = NULL;
807         kfree(header->object_prefix);
808         header->object_prefix = NULL;
809
810         return -ENOMEM;
811 }
812
813 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
814 {
815         struct rbd_snap *snap;
816
817         if (snap_id == CEPH_NOSNAP)
818                 return RBD_SNAP_HEAD_NAME;
819
820         list_for_each_entry(snap, &rbd_dev->snaps, node)
821                 if (snap_id == snap->id)
822                         return snap->name;
823
824         return NULL;
825 }
826
827 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
828                                         const char *snap_name)
829 {
830         struct rbd_snap *snap;
831
832         list_for_each_entry(snap, &rbd_dev->snaps, node)
833                 if (!strcmp(snap_name, snap->name))
834                         return snap;
835
836         return NULL;
837 }
838
839 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
840 {
841         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
842                     sizeof (RBD_SNAP_HEAD_NAME))) {
843                 rbd_dev->mapping.size = rbd_dev->header.image_size;
844                 rbd_dev->mapping.features = rbd_dev->header.features;
845         } else {
846                 struct rbd_snap *snap;
847
848                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
849                 if (!snap)
850                         return -ENOENT;
851                 rbd_dev->mapping.size = snap->size;
852                 rbd_dev->mapping.features = snap->features;
853                 rbd_dev->mapping.read_only = true;
854         }
855
856         return 0;
857 }
858
859 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
860 {
861         rbd_dev->mapping.size = 0;
862         rbd_dev->mapping.features = 0;
863         rbd_dev->mapping.read_only = true;
864 }
865
866 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
867 {
868         rbd_dev->mapping.size = 0;
869         rbd_dev->mapping.features = 0;
870         rbd_dev->mapping.read_only = true;
871 }
872
873 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
874 {
875         char *name;
876         u64 segment;
877         int ret;
878
879         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
880         if (!name)
881                 return NULL;
882         segment = offset >> rbd_dev->header.obj_order;
883         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
884                         rbd_dev->header.object_prefix, segment);
885         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
886                 pr_err("error formatting segment name for #%llu (%d)\n",
887                         segment, ret);
888                 kfree(name);
889                 name = NULL;
890         }
891
892         return name;
893 }
894
895 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
896 {
897         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
898
899         return offset & (segment_size - 1);
900 }
901
902 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
903                                 u64 offset, u64 length)
904 {
905         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
906
907         offset &= segment_size - 1;
908
909         rbd_assert(length <= U64_MAX - offset);
910         if (offset + length > segment_size)
911                 length = segment_size - offset;
912
913         return length;
914 }
915
916 /*
917  * returns the size of an object in the image
918  */
919 static u64 rbd_obj_bytes(struct rbd_image_header *header)
920 {
921         return 1 << header->obj_order;
922 }
923
924 /*
925  * bio helpers
926  */
927
928 static void bio_chain_put(struct bio *chain)
929 {
930         struct bio *tmp;
931
932         while (chain) {
933                 tmp = chain;
934                 chain = chain->bi_next;
935                 bio_put(tmp);
936         }
937 }
938
939 /*
940  * zeros a bio chain, starting at specific offset
941  */
942 static void zero_bio_chain(struct bio *chain, int start_ofs)
943 {
944         struct bio_vec *bv;
945         unsigned long flags;
946         void *buf;
947         int i;
948         int pos = 0;
949
950         while (chain) {
951                 bio_for_each_segment(bv, chain, i) {
952                         if (pos + bv->bv_len > start_ofs) {
953                                 int remainder = max(start_ofs - pos, 0);
954                                 buf = bvec_kmap_irq(bv, &flags);
955                                 memset(buf + remainder, 0,
956                                        bv->bv_len - remainder);
957                                 bvec_kunmap_irq(buf, &flags);
958                         }
959                         pos += bv->bv_len;
960                 }
961
962                 chain = chain->bi_next;
963         }
964 }
965
966 /*
967  * similar to zero_bio_chain(), zeros data defined by a page array,
968  * starting at the given byte offset from the start of the array and
969  * continuing up to the given end offset.  The pages array is
970  * assumed to be big enough to hold all bytes up to the end.
971  */
972 static void zero_pages(struct page **pages, u64 offset, u64 end)
973 {
974         struct page **page = &pages[offset >> PAGE_SHIFT];
975
976         rbd_assert(end > offset);
977         rbd_assert(end - offset <= (u64)SIZE_MAX);
978         while (offset < end) {
979                 size_t page_offset;
980                 size_t length;
981                 unsigned long flags;
982                 void *kaddr;
983
984                 page_offset = (size_t)(offset & ~PAGE_MASK);
985                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
986                 local_irq_save(flags);
987                 kaddr = kmap_atomic(*page);
988                 memset(kaddr + page_offset, 0, length);
989                 kunmap_atomic(kaddr);
990                 local_irq_restore(flags);
991
992                 offset += length;
993                 page++;
994         }
995 }
996
997 /*
998  * Clone a portion of a bio, starting at the given byte offset
999  * and continuing for the number of bytes indicated.
1000  */
1001 static struct bio *bio_clone_range(struct bio *bio_src,
1002                                         unsigned int offset,
1003                                         unsigned int len,
1004                                         gfp_t gfpmask)
1005 {
1006         struct bio_vec *bv;
1007         unsigned int resid;
1008         unsigned short idx;
1009         unsigned int voff;
1010         unsigned short end_idx;
1011         unsigned short vcnt;
1012         struct bio *bio;
1013
1014         /* Handle the easy case for the caller */
1015
1016         if (!offset && len == bio_src->bi_size)
1017                 return bio_clone(bio_src, gfpmask);
1018
1019         if (WARN_ON_ONCE(!len))
1020                 return NULL;
1021         if (WARN_ON_ONCE(len > bio_src->bi_size))
1022                 return NULL;
1023         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1024                 return NULL;
1025
1026         /* Find first affected segment... */
1027
1028         resid = offset;
1029         __bio_for_each_segment(bv, bio_src, idx, 0) {
1030                 if (resid < bv->bv_len)
1031                         break;
1032                 resid -= bv->bv_len;
1033         }
1034         voff = resid;
1035
1036         /* ...and the last affected segment */
1037
1038         resid += len;
1039         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1040                 if (resid <= bv->bv_len)
1041                         break;
1042                 resid -= bv->bv_len;
1043         }
1044         vcnt = end_idx - idx + 1;
1045
1046         /* Build the clone */
1047
1048         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1049         if (!bio)
1050                 return NULL;    /* ENOMEM */
1051
1052         bio->bi_bdev = bio_src->bi_bdev;
1053         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1054         bio->bi_rw = bio_src->bi_rw;
1055         bio->bi_flags |= 1 << BIO_CLONED;
1056
1057         /*
1058          * Copy over our part of the bio_vec, then update the first
1059          * and last (or only) entries.
1060          */
1061         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1062                         vcnt * sizeof (struct bio_vec));
1063         bio->bi_io_vec[0].bv_offset += voff;
1064         if (vcnt > 1) {
1065                 bio->bi_io_vec[0].bv_len -= voff;
1066                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1067         } else {
1068                 bio->bi_io_vec[0].bv_len = len;
1069         }
1070
1071         bio->bi_vcnt = vcnt;
1072         bio->bi_size = len;
1073         bio->bi_idx = 0;
1074
1075         return bio;
1076 }
1077
1078 /*
1079  * Clone a portion of a bio chain, starting at the given byte offset
1080  * into the first bio in the source chain and continuing for the
1081  * number of bytes indicated.  The result is another bio chain of
1082  * exactly the given length, or a null pointer on error.
1083  *
1084  * The bio_src and offset parameters are both in-out.  On entry they
1085  * refer to the first source bio and the offset into that bio where
1086  * the start of data to be cloned is located.
1087  *
1088  * On return, bio_src is updated to refer to the bio in the source
1089  * chain that contains first un-cloned byte, and *offset will
1090  * contain the offset of that byte within that bio.
1091  */
1092 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1093                                         unsigned int *offset,
1094                                         unsigned int len,
1095                                         gfp_t gfpmask)
1096 {
1097         struct bio *bi = *bio_src;
1098         unsigned int off = *offset;
1099         struct bio *chain = NULL;
1100         struct bio **end;
1101
1102         /* Build up a chain of clone bios up to the limit */
1103
1104         if (!bi || off >= bi->bi_size || !len)
1105                 return NULL;            /* Nothing to clone */
1106
1107         end = &chain;
1108         while (len) {
1109                 unsigned int bi_size;
1110                 struct bio *bio;
1111
1112                 if (!bi) {
1113                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1114                         goto out_err;   /* EINVAL; ran out of bio's */
1115                 }
1116                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1117                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1118                 if (!bio)
1119                         goto out_err;   /* ENOMEM */
1120
1121                 *end = bio;
1122                 end = &bio->bi_next;
1123
1124                 off += bi_size;
1125                 if (off == bi->bi_size) {
1126                         bi = bi->bi_next;
1127                         off = 0;
1128                 }
1129                 len -= bi_size;
1130         }
1131         *bio_src = bi;
1132         *offset = off;
1133
1134         return chain;
1135 out_err:
1136         bio_chain_put(chain);
1137
1138         return NULL;
1139 }
1140
1141 /*
1142  * The default/initial value for all object request flags is 0.  For
1143  * each flag, once its value is set to 1 it is never reset to 0
1144  * again.
1145  */
1146 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1147 {
1148         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1149                 struct rbd_device *rbd_dev;
1150
1151                 rbd_dev = obj_request->img_request->rbd_dev;
1152                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1153                         obj_request);
1154         }
1155 }
1156
1157 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1158 {
1159         smp_mb();
1160         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1161 }
1162
1163 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1164 {
1165         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1166                 struct rbd_device *rbd_dev = NULL;
1167
1168                 if (obj_request_img_data_test(obj_request))
1169                         rbd_dev = obj_request->img_request->rbd_dev;
1170                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1171                         obj_request);
1172         }
1173 }
1174
1175 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1176 {
1177         smp_mb();
1178         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1179 }
1180
1181 /*
1182  * This sets the KNOWN flag after (possibly) setting the EXISTS
1183  * flag.  The latter is set based on the "exists" value provided.
1184  *
1185  * Note that for our purposes once an object exists it never goes
1186  * away again.  It's possible that the response from two existence
1187  * checks are separated by the creation of the target object, and
1188  * the first ("doesn't exist") response arrives *after* the second
1189  * ("does exist").  In that case we ignore the second one.
1190  */
1191 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1192                                 bool exists)
1193 {
1194         if (exists)
1195                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1196         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1197         smp_mb();
1198 }
1199
1200 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1201 {
1202         smp_mb();
1203         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1204 }
1205
1206 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1207 {
1208         smp_mb();
1209         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1210 }
1211
1212 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1213 {
1214         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1215                 atomic_read(&obj_request->kref.refcount));
1216         kref_get(&obj_request->kref);
1217 }
1218
1219 static void rbd_obj_request_destroy(struct kref *kref);
1220 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1221 {
1222         rbd_assert(obj_request != NULL);
1223         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1224                 atomic_read(&obj_request->kref.refcount));
1225         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1226 }
1227
1228 static void rbd_img_request_get(struct rbd_img_request *img_request)
1229 {
1230         dout("%s: img %p (was %d)\n", __func__, img_request,
1231                 atomic_read(&img_request->kref.refcount));
1232         kref_get(&img_request->kref);
1233 }
1234
1235 static void rbd_img_request_destroy(struct kref *kref);
1236 static void rbd_img_request_put(struct rbd_img_request *img_request)
1237 {
1238         rbd_assert(img_request != NULL);
1239         dout("%s: img %p (was %d)\n", __func__, img_request,
1240                 atomic_read(&img_request->kref.refcount));
1241         kref_put(&img_request->kref, rbd_img_request_destroy);
1242 }
1243
1244 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1245                                         struct rbd_obj_request *obj_request)
1246 {
1247         rbd_assert(obj_request->img_request == NULL);
1248
1249         /* Image request now owns object's original reference */
1250         obj_request->img_request = img_request;
1251         obj_request->which = img_request->obj_request_count;
1252         rbd_assert(!obj_request_img_data_test(obj_request));
1253         obj_request_img_data_set(obj_request);
1254         rbd_assert(obj_request->which != BAD_WHICH);
1255         img_request->obj_request_count++;
1256         list_add_tail(&obj_request->links, &img_request->obj_requests);
1257         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1258                 obj_request->which);
1259 }
1260
1261 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1262                                         struct rbd_obj_request *obj_request)
1263 {
1264         rbd_assert(obj_request->which != BAD_WHICH);
1265
1266         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1267                 obj_request->which);
1268         list_del(&obj_request->links);
1269         rbd_assert(img_request->obj_request_count > 0);
1270         img_request->obj_request_count--;
1271         rbd_assert(obj_request->which == img_request->obj_request_count);
1272         obj_request->which = BAD_WHICH;
1273         rbd_assert(obj_request_img_data_test(obj_request));
1274         rbd_assert(obj_request->img_request == img_request);
1275         obj_request->img_request = NULL;
1276         obj_request->callback = NULL;
1277         rbd_obj_request_put(obj_request);
1278 }
1279
1280 static bool obj_request_type_valid(enum obj_request_type type)
1281 {
1282         switch (type) {
1283         case OBJ_REQUEST_NODATA:
1284         case OBJ_REQUEST_BIO:
1285         case OBJ_REQUEST_PAGES:
1286                 return true;
1287         default:
1288                 return false;
1289         }
1290 }
1291
1292 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1293                                 struct rbd_obj_request *obj_request)
1294 {
1295         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1296
1297         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1298 }
1299
1300 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1301 {
1302
1303         dout("%s: img %p\n", __func__, img_request);
1304
1305         /*
1306          * If no error occurred, compute the aggregate transfer
1307          * count for the image request.  We could instead use
1308          * atomic64_cmpxchg() to update it as each object request
1309          * completes; not clear which way is better off hand.
1310          */
1311         if (!img_request->result) {
1312                 struct rbd_obj_request *obj_request;
1313                 u64 xferred = 0;
1314
1315                 for_each_obj_request(img_request, obj_request)
1316                         xferred += obj_request->xferred;
1317                 img_request->xferred = xferred;
1318         }
1319
1320         if (img_request->callback)
1321                 img_request->callback(img_request);
1322         else
1323                 rbd_img_request_put(img_request);
1324 }
1325
1326 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1327
1328 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1329 {
1330         dout("%s: obj %p\n", __func__, obj_request);
1331
1332         return wait_for_completion_interruptible(&obj_request->completion);
1333 }
1334
1335 /*
1336  * The default/initial value for all image request flags is 0.  Each
1337  * is conditionally set to 1 at image request initialization time
1338  * and currently never change thereafter.
1339  */
1340 static void img_request_write_set(struct rbd_img_request *img_request)
1341 {
1342         set_bit(IMG_REQ_WRITE, &img_request->flags);
1343         smp_mb();
1344 }
1345
1346 static bool img_request_write_test(struct rbd_img_request *img_request)
1347 {
1348         smp_mb();
1349         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1350 }
1351
1352 static void img_request_child_set(struct rbd_img_request *img_request)
1353 {
1354         set_bit(IMG_REQ_CHILD, &img_request->flags);
1355         smp_mb();
1356 }
1357
1358 static bool img_request_child_test(struct rbd_img_request *img_request)
1359 {
1360         smp_mb();
1361         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1362 }
1363
1364 static void img_request_layered_set(struct rbd_img_request *img_request)
1365 {
1366         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1367         smp_mb();
1368 }
1369
1370 static bool img_request_layered_test(struct rbd_img_request *img_request)
1371 {
1372         smp_mb();
1373         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1374 }
1375
1376 static void
1377 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1378 {
1379         u64 xferred = obj_request->xferred;
1380         u64 length = obj_request->length;
1381
1382         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1383                 obj_request, obj_request->img_request, obj_request->result,
1384                 xferred, length);
1385         /*
1386          * ENOENT means a hole in the image.  We zero-fill the
1387          * entire length of the request.  A short read also implies
1388          * zero-fill to the end of the request.  Either way we
1389          * update the xferred count to indicate the whole request
1390          * was satisfied.
1391          */
1392         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1393         if (obj_request->result == -ENOENT) {
1394                 if (obj_request->type == OBJ_REQUEST_BIO)
1395                         zero_bio_chain(obj_request->bio_list, 0);
1396                 else
1397                         zero_pages(obj_request->pages, 0, length);
1398                 obj_request->result = 0;
1399                 obj_request->xferred = length;
1400         } else if (xferred < length && !obj_request->result) {
1401                 if (obj_request->type == OBJ_REQUEST_BIO)
1402                         zero_bio_chain(obj_request->bio_list, xferred);
1403                 else
1404                         zero_pages(obj_request->pages, xferred, length);
1405                 obj_request->xferred = length;
1406         }
1407         obj_request_done_set(obj_request);
1408 }
1409
1410 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1411 {
1412         dout("%s: obj %p cb %p\n", __func__, obj_request,
1413                 obj_request->callback);
1414         if (obj_request->callback)
1415                 obj_request->callback(obj_request);
1416         else
1417                 complete_all(&obj_request->completion);
1418 }
1419
1420 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: obj %p\n", __func__, obj_request);
1423         obj_request_done_set(obj_request);
1424 }
1425
1426 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1427 {
1428         struct rbd_img_request *img_request = NULL;
1429         struct rbd_device *rbd_dev = NULL;
1430         bool layered = false;
1431
1432         if (obj_request_img_data_test(obj_request)) {
1433                 img_request = obj_request->img_request;
1434                 layered = img_request && img_request_layered_test(img_request);
1435                 rbd_dev = img_request->rbd_dev;
1436         }
1437
1438         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1439                 obj_request, img_request, obj_request->result,
1440                 obj_request->xferred, obj_request->length);
1441         if (layered && obj_request->result == -ENOENT &&
1442                         obj_request->img_offset < rbd_dev->parent_overlap)
1443                 rbd_img_parent_read(obj_request);
1444         else if (img_request)
1445                 rbd_img_obj_request_read_callback(obj_request);
1446         else
1447                 obj_request_done_set(obj_request);
1448 }
1449
1450 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1451 {
1452         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1453                 obj_request->result, obj_request->length);
1454         /*
1455          * There is no such thing as a successful short write.  Set
1456          * it to our originally-requested length.
1457          */
1458         obj_request->xferred = obj_request->length;
1459         obj_request_done_set(obj_request);
1460 }
1461
1462 /*
1463  * For a simple stat call there's nothing to do.  We'll do more if
1464  * this is part of a write sequence for a layered image.
1465  */
1466 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1467 {
1468         dout("%s: obj %p\n", __func__, obj_request);
1469         obj_request_done_set(obj_request);
1470 }
1471
1472 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1473                                 struct ceph_msg *msg)
1474 {
1475         struct rbd_obj_request *obj_request = osd_req->r_priv;
1476         u16 opcode;
1477
1478         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1479         rbd_assert(osd_req == obj_request->osd_req);
1480         if (obj_request_img_data_test(obj_request)) {
1481                 rbd_assert(obj_request->img_request);
1482                 rbd_assert(obj_request->which != BAD_WHICH);
1483         } else {
1484                 rbd_assert(obj_request->which == BAD_WHICH);
1485         }
1486
1487         if (osd_req->r_result < 0)
1488                 obj_request->result = osd_req->r_result;
1489         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1490
1491         BUG_ON(osd_req->r_num_ops > 2);
1492
1493         /*
1494          * We support a 64-bit length, but ultimately it has to be
1495          * passed to blk_end_request(), which takes an unsigned int.
1496          */
1497         obj_request->xferred = osd_req->r_reply_op_len[0];
1498         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1499         opcode = osd_req->r_ops[0].op;
1500         switch (opcode) {
1501         case CEPH_OSD_OP_READ:
1502                 rbd_osd_read_callback(obj_request);
1503                 break;
1504         case CEPH_OSD_OP_WRITE:
1505                 rbd_osd_write_callback(obj_request);
1506                 break;
1507         case CEPH_OSD_OP_STAT:
1508                 rbd_osd_stat_callback(obj_request);
1509                 break;
1510         case CEPH_OSD_OP_CALL:
1511         case CEPH_OSD_OP_NOTIFY_ACK:
1512         case CEPH_OSD_OP_WATCH:
1513                 rbd_osd_trivial_callback(obj_request);
1514                 break;
1515         default:
1516                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1517                         obj_request->object_name, (unsigned short) opcode);
1518                 break;
1519         }
1520
1521         if (obj_request_done_test(obj_request))
1522                 rbd_obj_request_complete(obj_request);
1523 }
1524
1525 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1526 {
1527         struct rbd_img_request *img_request = obj_request->img_request;
1528         struct ceph_osd_request *osd_req = obj_request->osd_req;
1529         u64 snap_id;
1530
1531         rbd_assert(osd_req != NULL);
1532
1533         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1534         ceph_osdc_build_request(osd_req, obj_request->offset,
1535                         NULL, snap_id, NULL);
1536 }
1537
1538 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1539 {
1540         struct rbd_img_request *img_request = obj_request->img_request;
1541         struct ceph_osd_request *osd_req = obj_request->osd_req;
1542         struct ceph_snap_context *snapc;
1543         struct timespec mtime = CURRENT_TIME;
1544
1545         rbd_assert(osd_req != NULL);
1546
1547         snapc = img_request ? img_request->snapc : NULL;
1548         ceph_osdc_build_request(osd_req, obj_request->offset,
1549                         snapc, CEPH_NOSNAP, &mtime);
1550 }
1551
1552 static struct ceph_osd_request *rbd_osd_req_create(
1553                                         struct rbd_device *rbd_dev,
1554                                         bool write_request,
1555                                         struct rbd_obj_request *obj_request)
1556 {
1557         struct ceph_snap_context *snapc = NULL;
1558         struct ceph_osd_client *osdc;
1559         struct ceph_osd_request *osd_req;
1560
1561         if (obj_request_img_data_test(obj_request)) {
1562                 struct rbd_img_request *img_request = obj_request->img_request;
1563
1564                 rbd_assert(write_request ==
1565                                 img_request_write_test(img_request));
1566                 if (write_request)
1567                         snapc = img_request->snapc;
1568         }
1569
1570         /* Allocate and initialize the request, for the single op */
1571
1572         osdc = &rbd_dev->rbd_client->client->osdc;
1573         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1574         if (!osd_req)
1575                 return NULL;    /* ENOMEM */
1576
1577         if (write_request)
1578                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1579         else
1580                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1581
1582         osd_req->r_callback = rbd_osd_req_callback;
1583         osd_req->r_priv = obj_request;
1584
1585         osd_req->r_oid_len = strlen(obj_request->object_name);
1586         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1587         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1588
1589         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1590
1591         return osd_req;
1592 }
1593
1594 /*
1595  * Create a copyup osd request based on the information in the
1596  * object request supplied.  A copyup request has two osd ops,
1597  * a copyup method call, and a "normal" write request.
1598  */
1599 static struct ceph_osd_request *
1600 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1601 {
1602         struct rbd_img_request *img_request;
1603         struct ceph_snap_context *snapc;
1604         struct rbd_device *rbd_dev;
1605         struct ceph_osd_client *osdc;
1606         struct ceph_osd_request *osd_req;
1607
1608         rbd_assert(obj_request_img_data_test(obj_request));
1609         img_request = obj_request->img_request;
1610         rbd_assert(img_request);
1611         rbd_assert(img_request_write_test(img_request));
1612
1613         /* Allocate and initialize the request, for the two ops */
1614
1615         snapc = img_request->snapc;
1616         rbd_dev = img_request->rbd_dev;
1617         osdc = &rbd_dev->rbd_client->client->osdc;
1618         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1619         if (!osd_req)
1620                 return NULL;    /* ENOMEM */
1621
1622         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1623         osd_req->r_callback = rbd_osd_req_callback;
1624         osd_req->r_priv = obj_request;
1625
1626         osd_req->r_oid_len = strlen(obj_request->object_name);
1627         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1628         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1629
1630         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1631
1632         return osd_req;
1633 }
1634
1635
1636 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1637 {
1638         ceph_osdc_put_request(osd_req);
1639 }
1640
1641 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1642
1643 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1644                                                 u64 offset, u64 length,
1645                                                 enum obj_request_type type)
1646 {
1647         struct rbd_obj_request *obj_request;
1648         size_t size;
1649         char *name;
1650
1651         rbd_assert(obj_request_type_valid(type));
1652
1653         size = strlen(object_name) + 1;
1654         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1655         if (!obj_request)
1656                 return NULL;
1657
1658         name = (char *)(obj_request + 1);
1659         obj_request->object_name = memcpy(name, object_name, size);
1660         obj_request->offset = offset;
1661         obj_request->length = length;
1662         obj_request->flags = 0;
1663         obj_request->which = BAD_WHICH;
1664         obj_request->type = type;
1665         INIT_LIST_HEAD(&obj_request->links);
1666         init_completion(&obj_request->completion);
1667         kref_init(&obj_request->kref);
1668
1669         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1670                 offset, length, (int)type, obj_request);
1671
1672         return obj_request;
1673 }
1674
1675 static void rbd_obj_request_destroy(struct kref *kref)
1676 {
1677         struct rbd_obj_request *obj_request;
1678
1679         obj_request = container_of(kref, struct rbd_obj_request, kref);
1680
1681         dout("%s: obj %p\n", __func__, obj_request);
1682
1683         rbd_assert(obj_request->img_request == NULL);
1684         rbd_assert(obj_request->which == BAD_WHICH);
1685
1686         if (obj_request->osd_req)
1687                 rbd_osd_req_destroy(obj_request->osd_req);
1688
1689         rbd_assert(obj_request_type_valid(obj_request->type));
1690         switch (obj_request->type) {
1691         case OBJ_REQUEST_NODATA:
1692                 break;          /* Nothing to do */
1693         case OBJ_REQUEST_BIO:
1694                 if (obj_request->bio_list)
1695                         bio_chain_put(obj_request->bio_list);
1696                 break;
1697         case OBJ_REQUEST_PAGES:
1698                 if (obj_request->pages)
1699                         ceph_release_page_vector(obj_request->pages,
1700                                                 obj_request->page_count);
1701                 break;
1702         }
1703
1704         kfree(obj_request);
1705 }
1706
1707 /*
1708  * Caller is responsible for filling in the list of object requests
1709  * that comprises the image request, and the Linux request pointer
1710  * (if there is one).
1711  */
1712 static struct rbd_img_request *rbd_img_request_create(
1713                                         struct rbd_device *rbd_dev,
1714                                         u64 offset, u64 length,
1715                                         bool write_request,
1716                                         bool child_request)
1717 {
1718         struct rbd_img_request *img_request;
1719
1720         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1721         if (!img_request)
1722                 return NULL;
1723
1724         if (write_request) {
1725                 down_read(&rbd_dev->header_rwsem);
1726                 ceph_get_snap_context(rbd_dev->header.snapc);
1727                 up_read(&rbd_dev->header_rwsem);
1728         }
1729
1730         img_request->rq = NULL;
1731         img_request->rbd_dev = rbd_dev;
1732         img_request->offset = offset;
1733         img_request->length = length;
1734         img_request->flags = 0;
1735         if (write_request) {
1736                 img_request_write_set(img_request);
1737                 img_request->snapc = rbd_dev->header.snapc;
1738         } else {
1739                 img_request->snap_id = rbd_dev->spec->snap_id;
1740         }
1741         if (child_request)
1742                 img_request_child_set(img_request);
1743         if (rbd_dev->parent_spec)
1744                 img_request_layered_set(img_request);
1745         spin_lock_init(&img_request->completion_lock);
1746         img_request->next_completion = 0;
1747         img_request->callback = NULL;
1748         img_request->result = 0;
1749         img_request->obj_request_count = 0;
1750         INIT_LIST_HEAD(&img_request->obj_requests);
1751         kref_init(&img_request->kref);
1752
1753         rbd_img_request_get(img_request);       /* Avoid a warning */
1754         rbd_img_request_put(img_request);       /* TEMPORARY */
1755
1756         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1757                 write_request ? "write" : "read", offset, length,
1758                 img_request);
1759
1760         return img_request;
1761 }
1762
1763 static void rbd_img_request_destroy(struct kref *kref)
1764 {
1765         struct rbd_img_request *img_request;
1766         struct rbd_obj_request *obj_request;
1767         struct rbd_obj_request *next_obj_request;
1768
1769         img_request = container_of(kref, struct rbd_img_request, kref);
1770
1771         dout("%s: img %p\n", __func__, img_request);
1772
1773         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1774                 rbd_img_obj_request_del(img_request, obj_request);
1775         rbd_assert(img_request->obj_request_count == 0);
1776
1777         if (img_request_write_test(img_request))
1778                 ceph_put_snap_context(img_request->snapc);
1779
1780         if (img_request_child_test(img_request))
1781                 rbd_obj_request_put(img_request->obj_request);
1782
1783         kfree(img_request);
1784 }
1785
1786 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1787 {
1788         struct rbd_img_request *img_request;
1789         unsigned int xferred;
1790         int result;
1791         bool more;
1792
1793         rbd_assert(obj_request_img_data_test(obj_request));
1794         img_request = obj_request->img_request;
1795
1796         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1797         xferred = (unsigned int)obj_request->xferred;
1798         result = obj_request->result;
1799         if (result) {
1800                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1801
1802                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1803                         img_request_write_test(img_request) ? "write" : "read",
1804                         obj_request->length, obj_request->img_offset,
1805                         obj_request->offset);
1806                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1807                         result, xferred);
1808                 if (!img_request->result)
1809                         img_request->result = result;
1810         }
1811
1812         /* Image object requests don't own their page array */
1813
1814         if (obj_request->type == OBJ_REQUEST_PAGES) {
1815                 obj_request->pages = NULL;
1816                 obj_request->page_count = 0;
1817         }
1818
1819         if (img_request_child_test(img_request)) {
1820                 rbd_assert(img_request->obj_request != NULL);
1821                 more = obj_request->which < img_request->obj_request_count - 1;
1822         } else {
1823                 rbd_assert(img_request->rq != NULL);
1824                 more = blk_end_request(img_request->rq, result, xferred);
1825         }
1826
1827         return more;
1828 }
1829
1830 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1831 {
1832         struct rbd_img_request *img_request;
1833         u32 which = obj_request->which;
1834         bool more = true;
1835
1836         rbd_assert(obj_request_img_data_test(obj_request));
1837         img_request = obj_request->img_request;
1838
1839         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1840         rbd_assert(img_request != NULL);
1841         rbd_assert(img_request->obj_request_count > 0);
1842         rbd_assert(which != BAD_WHICH);
1843         rbd_assert(which < img_request->obj_request_count);
1844         rbd_assert(which >= img_request->next_completion);
1845
1846         spin_lock_irq(&img_request->completion_lock);
1847         if (which != img_request->next_completion)
1848                 goto out;
1849
1850         for_each_obj_request_from(img_request, obj_request) {
1851                 rbd_assert(more);
1852                 rbd_assert(which < img_request->obj_request_count);
1853
1854                 if (!obj_request_done_test(obj_request))
1855                         break;
1856                 more = rbd_img_obj_end_request(obj_request);
1857                 which++;
1858         }
1859
1860         rbd_assert(more ^ (which == img_request->obj_request_count));
1861         img_request->next_completion = which;
1862 out:
1863         spin_unlock_irq(&img_request->completion_lock);
1864
1865         if (!more)
1866                 rbd_img_request_complete(img_request);
1867 }
1868
1869 /*
1870  * Split up an image request into one or more object requests, each
1871  * to a different object.  The "type" parameter indicates whether
1872  * "data_desc" is the pointer to the head of a list of bio
1873  * structures, or the base of a page array.  In either case this
1874  * function assumes data_desc describes memory sufficient to hold
1875  * all data described by the image request.
1876  */
1877 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1878                                         enum obj_request_type type,
1879                                         void *data_desc)
1880 {
1881         struct rbd_device *rbd_dev = img_request->rbd_dev;
1882         struct rbd_obj_request *obj_request = NULL;
1883         struct rbd_obj_request *next_obj_request;
1884         bool write_request = img_request_write_test(img_request);
1885         struct bio *bio_list;
1886         unsigned int bio_offset = 0;
1887         struct page **pages;
1888         u64 img_offset;
1889         u64 resid;
1890         u16 opcode;
1891
1892         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1893                 (int)type, data_desc);
1894
1895         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1896         img_offset = img_request->offset;
1897         resid = img_request->length;
1898         rbd_assert(resid > 0);
1899
1900         if (type == OBJ_REQUEST_BIO) {
1901                 bio_list = data_desc;
1902                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1903         } else {
1904                 rbd_assert(type == OBJ_REQUEST_PAGES);
1905                 pages = data_desc;
1906         }
1907
1908         while (resid) {
1909                 struct ceph_osd_request *osd_req;
1910                 const char *object_name;
1911                 u64 offset;
1912                 u64 length;
1913
1914                 object_name = rbd_segment_name(rbd_dev, img_offset);
1915                 if (!object_name)
1916                         goto out_unwind;
1917                 offset = rbd_segment_offset(rbd_dev, img_offset);
1918                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1919                 obj_request = rbd_obj_request_create(object_name,
1920                                                 offset, length, type);
1921                 kfree(object_name);     /* object request has its own copy */
1922                 if (!obj_request)
1923                         goto out_unwind;
1924
1925                 if (type == OBJ_REQUEST_BIO) {
1926                         unsigned int clone_size;
1927
1928                         rbd_assert(length <= (u64)UINT_MAX);
1929                         clone_size = (unsigned int)length;
1930                         obj_request->bio_list =
1931                                         bio_chain_clone_range(&bio_list,
1932                                                                 &bio_offset,
1933                                                                 clone_size,
1934                                                                 GFP_ATOMIC);
1935                         if (!obj_request->bio_list)
1936                                 goto out_partial;
1937                 } else {
1938                         unsigned int page_count;
1939
1940                         obj_request->pages = pages;
1941                         page_count = (u32)calc_pages_for(offset, length);
1942                         obj_request->page_count = page_count;
1943                         if ((offset + length) & ~PAGE_MASK)
1944                                 page_count--;   /* more on last page */
1945                         pages += page_count;
1946                 }
1947
1948                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1949                                                 obj_request);
1950                 if (!osd_req)
1951                         goto out_partial;
1952                 obj_request->osd_req = osd_req;
1953                 obj_request->callback = rbd_img_obj_callback;
1954
1955                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1956                                                 0, 0);
1957                 if (type == OBJ_REQUEST_BIO)
1958                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1959                                         obj_request->bio_list, length);
1960                 else
1961                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1962                                         obj_request->pages, length,
1963                                         offset & ~PAGE_MASK, false, false);
1964
1965                 if (write_request)
1966                         rbd_osd_req_format_write(obj_request);
1967                 else
1968                         rbd_osd_req_format_read(obj_request);
1969
1970                 obj_request->img_offset = img_offset;
1971                 rbd_img_obj_request_add(img_request, obj_request);
1972
1973                 img_offset += length;
1974                 resid -= length;
1975         }
1976
1977         return 0;
1978
1979 out_partial:
1980         rbd_obj_request_put(obj_request);
1981 out_unwind:
1982         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1983                 rbd_obj_request_put(obj_request);
1984
1985         return -ENOMEM;
1986 }
1987
1988 static void
1989 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1990 {
1991         struct rbd_img_request *img_request;
1992         struct rbd_device *rbd_dev;
1993         u64 length;
1994         u32 page_count;
1995
1996         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1997         rbd_assert(obj_request_img_data_test(obj_request));
1998         img_request = obj_request->img_request;
1999         rbd_assert(img_request);
2000
2001         rbd_dev = img_request->rbd_dev;
2002         rbd_assert(rbd_dev);
2003         length = (u64)1 << rbd_dev->header.obj_order;
2004         page_count = (u32)calc_pages_for(0, length);
2005
2006         rbd_assert(obj_request->copyup_pages);
2007         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2008         obj_request->copyup_pages = NULL;
2009
2010         /*
2011          * We want the transfer count to reflect the size of the
2012          * original write request.  There is no such thing as a
2013          * successful short write, so if the request was successful
2014          * we can just set it to the originally-requested length.
2015          */
2016         if (!obj_request->result)
2017                 obj_request->xferred = obj_request->length;
2018
2019         /* Finish up with the normal image object callback */
2020
2021         rbd_img_obj_callback(obj_request);
2022 }
2023
2024 static void
2025 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2026 {
2027         struct rbd_obj_request *orig_request;
2028         struct ceph_osd_request *osd_req;
2029         struct ceph_osd_client *osdc;
2030         struct rbd_device *rbd_dev;
2031         struct page **pages;
2032         int result;
2033         u64 obj_size;
2034         u64 xferred;
2035
2036         rbd_assert(img_request_child_test(img_request));
2037
2038         /* First get what we need from the image request */
2039
2040         pages = img_request->copyup_pages;
2041         rbd_assert(pages != NULL);
2042         img_request->copyup_pages = NULL;
2043
2044         orig_request = img_request->obj_request;
2045         rbd_assert(orig_request != NULL);
2046         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2047         result = img_request->result;
2048         obj_size = img_request->length;
2049         xferred = img_request->xferred;
2050
2051         rbd_dev = img_request->rbd_dev;
2052         rbd_assert(rbd_dev);
2053         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2054
2055         rbd_img_request_put(img_request);
2056
2057         if (result)
2058                 goto out_err;
2059
2060         /* Allocate the new copyup osd request for the original request */
2061
2062         result = -ENOMEM;
2063         rbd_assert(!orig_request->osd_req);
2064         osd_req = rbd_osd_req_create_copyup(orig_request);
2065         if (!osd_req)
2066                 goto out_err;
2067         orig_request->osd_req = osd_req;
2068         orig_request->copyup_pages = pages;
2069
2070         /* Initialize the copyup op */
2071
2072         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2073         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2074                                                 false, false);
2075
2076         /* Then the original write request op */
2077
2078         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2079                                         orig_request->offset,
2080                                         orig_request->length, 0, 0);
2081         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2082                                         orig_request->length);
2083
2084         rbd_osd_req_format_write(orig_request);
2085
2086         /* All set, send it off. */
2087
2088         orig_request->callback = rbd_img_obj_copyup_callback;
2089         osdc = &rbd_dev->rbd_client->client->osdc;
2090         result = rbd_obj_request_submit(osdc, orig_request);
2091         if (!result)
2092                 return;
2093 out_err:
2094         /* Record the error code and complete the request */
2095
2096         orig_request->result = result;
2097         orig_request->xferred = 0;
2098         obj_request_done_set(orig_request);
2099         rbd_obj_request_complete(orig_request);
2100 }
2101
2102 /*
2103  * Read from the parent image the range of data that covers the
2104  * entire target of the given object request.  This is used for
2105  * satisfying a layered image write request when the target of an
2106  * object request from the image request does not exist.
2107  *
2108  * A page array big enough to hold the returned data is allocated
2109  * and supplied to rbd_img_request_fill() as the "data descriptor."
2110  * When the read completes, this page array will be transferred to
2111  * the original object request for the copyup operation.
2112  *
2113  * If an error occurs, record it as the result of the original
2114  * object request and mark it done so it gets completed.
2115  */
2116 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2117 {
2118         struct rbd_img_request *img_request = NULL;
2119         struct rbd_img_request *parent_request = NULL;
2120         struct rbd_device *rbd_dev;
2121         u64 img_offset;
2122         u64 length;
2123         struct page **pages = NULL;
2124         u32 page_count;
2125         int result;
2126
2127         rbd_assert(obj_request_img_data_test(obj_request));
2128         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2129
2130         img_request = obj_request->img_request;
2131         rbd_assert(img_request != NULL);
2132         rbd_dev = img_request->rbd_dev;
2133         rbd_assert(rbd_dev->parent != NULL);
2134
2135         /*
2136          * First things first.  The original osd request is of no
2137          * use to use any more, we'll need a new one that can hold
2138          * the two ops in a copyup request.  We'll get that later,
2139          * but for now we can release the old one.
2140          */
2141         rbd_osd_req_destroy(obj_request->osd_req);
2142         obj_request->osd_req = NULL;
2143
2144         /*
2145          * Determine the byte range covered by the object in the
2146          * child image to which the original request was to be sent.
2147          */
2148         img_offset = obj_request->img_offset - obj_request->offset;
2149         length = (u64)1 << rbd_dev->header.obj_order;
2150
2151         /*
2152          * There is no defined parent data beyond the parent
2153          * overlap, so limit what we read at that boundary if
2154          * necessary.
2155          */
2156         if (img_offset + length > rbd_dev->parent_overlap) {
2157                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2158                 length = rbd_dev->parent_overlap - img_offset;
2159         }
2160
2161         /*
2162          * Allocate a page array big enough to receive the data read
2163          * from the parent.
2164          */
2165         page_count = (u32)calc_pages_for(0, length);
2166         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2167         if (IS_ERR(pages)) {
2168                 result = PTR_ERR(pages);
2169                 pages = NULL;
2170                 goto out_err;
2171         }
2172
2173         result = -ENOMEM;
2174         parent_request = rbd_img_request_create(rbd_dev->parent,
2175                                                 img_offset, length,
2176                                                 false, true);
2177         if (!parent_request)
2178                 goto out_err;
2179         rbd_obj_request_get(obj_request);
2180         parent_request->obj_request = obj_request;
2181
2182         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2183         if (result)
2184                 goto out_err;
2185         parent_request->copyup_pages = pages;
2186
2187         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2188         result = rbd_img_request_submit(parent_request);
2189         if (!result)
2190                 return 0;
2191
2192         parent_request->copyup_pages = NULL;
2193         parent_request->obj_request = NULL;
2194         rbd_obj_request_put(obj_request);
2195 out_err:
2196         if (pages)
2197                 ceph_release_page_vector(pages, page_count);
2198         if (parent_request)
2199                 rbd_img_request_put(parent_request);
2200         obj_request->result = result;
2201         obj_request->xferred = 0;
2202         obj_request_done_set(obj_request);
2203
2204         return result;
2205 }
2206
2207 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2208 {
2209         struct rbd_obj_request *orig_request;
2210         int result;
2211
2212         rbd_assert(!obj_request_img_data_test(obj_request));
2213
2214         /*
2215          * All we need from the object request is the original
2216          * request and the result of the STAT op.  Grab those, then
2217          * we're done with the request.
2218          */
2219         orig_request = obj_request->obj_request;
2220         obj_request->obj_request = NULL;
2221         rbd_assert(orig_request);
2222         rbd_assert(orig_request->img_request);
2223
2224         result = obj_request->result;
2225         obj_request->result = 0;
2226
2227         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2228                 obj_request, orig_request, result,
2229                 obj_request->xferred, obj_request->length);
2230         rbd_obj_request_put(obj_request);
2231
2232         rbd_assert(orig_request);
2233         rbd_assert(orig_request->img_request);
2234
2235         /*
2236          * Our only purpose here is to determine whether the object
2237          * exists, and we don't want to treat the non-existence as
2238          * an error.  If something else comes back, transfer the
2239          * error to the original request and complete it now.
2240          */
2241         if (!result) {
2242                 obj_request_existence_set(orig_request, true);
2243         } else if (result == -ENOENT) {
2244                 obj_request_existence_set(orig_request, false);
2245         } else if (result) {
2246                 orig_request->result = result;
2247                 goto out;
2248         }
2249
2250         /*
2251          * Resubmit the original request now that we have recorded
2252          * whether the target object exists.
2253          */
2254         orig_request->result = rbd_img_obj_request_submit(orig_request);
2255 out:
2256         if (orig_request->result)
2257                 rbd_obj_request_complete(orig_request);
2258         rbd_obj_request_put(orig_request);
2259 }
2260
2261 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2262 {
2263         struct rbd_obj_request *stat_request;
2264         struct rbd_device *rbd_dev;
2265         struct ceph_osd_client *osdc;
2266         struct page **pages = NULL;
2267         u32 page_count;
2268         size_t size;
2269         int ret;
2270
2271         /*
2272          * The response data for a STAT call consists of:
2273          *     le64 length;
2274          *     struct {
2275          *         le32 tv_sec;
2276          *         le32 tv_nsec;
2277          *     } mtime;
2278          */
2279         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2280         page_count = (u32)calc_pages_for(0, size);
2281         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2282         if (IS_ERR(pages))
2283                 return PTR_ERR(pages);
2284
2285         ret = -ENOMEM;
2286         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2287                                                         OBJ_REQUEST_PAGES);
2288         if (!stat_request)
2289                 goto out;
2290
2291         rbd_obj_request_get(obj_request);
2292         stat_request->obj_request = obj_request;
2293         stat_request->pages = pages;
2294         stat_request->page_count = page_count;
2295
2296         rbd_assert(obj_request->img_request);
2297         rbd_dev = obj_request->img_request->rbd_dev;
2298         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2299                                                 stat_request);
2300         if (!stat_request->osd_req)
2301                 goto out;
2302         stat_request->callback = rbd_img_obj_exists_callback;
2303
2304         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2305         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2306                                         false, false);
2307         rbd_osd_req_format_read(stat_request);
2308
2309         osdc = &rbd_dev->rbd_client->client->osdc;
2310         ret = rbd_obj_request_submit(osdc, stat_request);
2311 out:
2312         if (ret)
2313                 rbd_obj_request_put(obj_request);
2314
2315         return ret;
2316 }
2317
2318 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2319 {
2320         struct rbd_img_request *img_request;
2321         struct rbd_device *rbd_dev;
2322         bool known;
2323
2324         rbd_assert(obj_request_img_data_test(obj_request));
2325
2326         img_request = obj_request->img_request;
2327         rbd_assert(img_request);
2328         rbd_dev = img_request->rbd_dev;
2329
2330         /*
2331          * Only writes to layered images need special handling.
2332          * Reads and non-layered writes are simple object requests.
2333          * Layered writes that start beyond the end of the overlap
2334          * with the parent have no parent data, so they too are
2335          * simple object requests.  Finally, if the target object is
2336          * known to already exist, its parent data has already been
2337          * copied, so a write to the object can also be handled as a
2338          * simple object request.
2339          */
2340         if (!img_request_write_test(img_request) ||
2341                 !img_request_layered_test(img_request) ||
2342                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2343                 ((known = obj_request_known_test(obj_request)) &&
2344                         obj_request_exists_test(obj_request))) {
2345
2346                 struct rbd_device *rbd_dev;
2347                 struct ceph_osd_client *osdc;
2348
2349                 rbd_dev = obj_request->img_request->rbd_dev;
2350                 osdc = &rbd_dev->rbd_client->client->osdc;
2351
2352                 return rbd_obj_request_submit(osdc, obj_request);
2353         }
2354
2355         /*
2356          * It's a layered write.  The target object might exist but
2357          * we may not know that yet.  If we know it doesn't exist,
2358          * start by reading the data for the full target object from
2359          * the parent so we can use it for a copyup to the target.
2360          */
2361         if (known)
2362                 return rbd_img_obj_parent_read_full(obj_request);
2363
2364         /* We don't know whether the target exists.  Go find out. */
2365
2366         return rbd_img_obj_exists_submit(obj_request);
2367 }
2368
2369 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2370 {
2371         struct rbd_obj_request *obj_request;
2372         struct rbd_obj_request *next_obj_request;
2373
2374         dout("%s: img %p\n", __func__, img_request);
2375         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2376                 int ret;
2377
2378                 ret = rbd_img_obj_request_submit(obj_request);
2379                 if (ret)
2380                         return ret;
2381         }
2382
2383         return 0;
2384 }
2385
2386 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2387 {
2388         struct rbd_obj_request *obj_request;
2389         struct rbd_device *rbd_dev;
2390         u64 obj_end;
2391
2392         rbd_assert(img_request_child_test(img_request));
2393
2394         obj_request = img_request->obj_request;
2395         rbd_assert(obj_request);
2396         rbd_assert(obj_request->img_request);
2397
2398         obj_request->result = img_request->result;
2399         if (obj_request->result)
2400                 goto out;
2401
2402         /*
2403          * We need to zero anything beyond the parent overlap
2404          * boundary.  Since rbd_img_obj_request_read_callback()
2405          * will zero anything beyond the end of a short read, an
2406          * easy way to do this is to pretend the data from the
2407          * parent came up short--ending at the overlap boundary.
2408          */
2409         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2410         obj_end = obj_request->img_offset + obj_request->length;
2411         rbd_dev = obj_request->img_request->rbd_dev;
2412         if (obj_end > rbd_dev->parent_overlap) {
2413                 u64 xferred = 0;
2414
2415                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2416                         xferred = rbd_dev->parent_overlap -
2417                                         obj_request->img_offset;
2418
2419                 obj_request->xferred = min(img_request->xferred, xferred);
2420         } else {
2421                 obj_request->xferred = img_request->xferred;
2422         }
2423 out:
2424         rbd_img_obj_request_read_callback(obj_request);
2425         rbd_obj_request_complete(obj_request);
2426 }
2427
2428 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2429 {
2430         struct rbd_device *rbd_dev;
2431         struct rbd_img_request *img_request;
2432         int result;
2433
2434         rbd_assert(obj_request_img_data_test(obj_request));
2435         rbd_assert(obj_request->img_request != NULL);
2436         rbd_assert(obj_request->result == (s32) -ENOENT);
2437         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2438
2439         rbd_dev = obj_request->img_request->rbd_dev;
2440         rbd_assert(rbd_dev->parent != NULL);
2441         /* rbd_read_finish(obj_request, obj_request->length); */
2442         img_request = rbd_img_request_create(rbd_dev->parent,
2443                                                 obj_request->img_offset,
2444                                                 obj_request->length,
2445                                                 false, true);
2446         result = -ENOMEM;
2447         if (!img_request)
2448                 goto out_err;
2449
2450         rbd_obj_request_get(obj_request);
2451         img_request->obj_request = obj_request;
2452
2453         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2454                                         obj_request->bio_list);
2455         if (result)
2456                 goto out_err;
2457
2458         img_request->callback = rbd_img_parent_read_callback;
2459         result = rbd_img_request_submit(img_request);
2460         if (result)
2461                 goto out_err;
2462
2463         return;
2464 out_err:
2465         if (img_request)
2466                 rbd_img_request_put(img_request);
2467         obj_request->result = result;
2468         obj_request->xferred = 0;
2469         obj_request_done_set(obj_request);
2470 }
2471
2472 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2473 {
2474         struct rbd_obj_request *obj_request;
2475         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2476         int ret;
2477
2478         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479                                                         OBJ_REQUEST_NODATA);
2480         if (!obj_request)
2481                 return -ENOMEM;
2482
2483         ret = -ENOMEM;
2484         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2485         if (!obj_request->osd_req)
2486                 goto out;
2487         obj_request->callback = rbd_obj_request_put;
2488
2489         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2490                                         notify_id, 0, 0);
2491         rbd_osd_req_format_read(obj_request);
2492
2493         ret = rbd_obj_request_submit(osdc, obj_request);
2494 out:
2495         if (ret)
2496                 rbd_obj_request_put(obj_request);
2497
2498         return ret;
2499 }
2500
2501 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2502 {
2503         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2504
2505         if (!rbd_dev)
2506                 return;
2507
2508         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2509                 rbd_dev->header_name, (unsigned long long)notify_id,
2510                 (unsigned int)opcode);
2511         (void)rbd_dev_refresh(rbd_dev);
2512
2513         rbd_obj_notify_ack(rbd_dev, notify_id);
2514 }
2515
2516 /*
2517  * Request sync osd watch/unwatch.  The value of "start" determines
2518  * whether a watch request is being initiated or torn down.
2519  */
2520 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2521 {
2522         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2523         struct rbd_obj_request *obj_request;
2524         int ret;
2525
2526         rbd_assert(start ^ !!rbd_dev->watch_event);
2527         rbd_assert(start ^ !!rbd_dev->watch_request);
2528
2529         if (start) {
2530                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2531                                                 &rbd_dev->watch_event);
2532                 if (ret < 0)
2533                         return ret;
2534                 rbd_assert(rbd_dev->watch_event != NULL);
2535         }
2536
2537         ret = -ENOMEM;
2538         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2539                                                         OBJ_REQUEST_NODATA);
2540         if (!obj_request)
2541                 goto out_cancel;
2542
2543         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2544         if (!obj_request->osd_req)
2545                 goto out_cancel;
2546
2547         if (start)
2548                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2549         else
2550                 ceph_osdc_unregister_linger_request(osdc,
2551                                         rbd_dev->watch_request->osd_req);
2552
2553         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2554                                 rbd_dev->watch_event->cookie, 0, start);
2555         rbd_osd_req_format_write(obj_request);
2556
2557         ret = rbd_obj_request_submit(osdc, obj_request);
2558         if (ret)
2559                 goto out_cancel;
2560         ret = rbd_obj_request_wait(obj_request);
2561         if (ret)
2562                 goto out_cancel;
2563         ret = obj_request->result;
2564         if (ret)
2565                 goto out_cancel;
2566
2567         /*
2568          * A watch request is set to linger, so the underlying osd
2569          * request won't go away until we unregister it.  We retain
2570          * a pointer to the object request during that time (in
2571          * rbd_dev->watch_request), so we'll keep a reference to
2572          * it.  We'll drop that reference (below) after we've
2573          * unregistered it.
2574          */
2575         if (start) {
2576                 rbd_dev->watch_request = obj_request;
2577
2578                 return 0;
2579         }
2580
2581         /* We have successfully torn down the watch request */
2582
2583         rbd_obj_request_put(rbd_dev->watch_request);
2584         rbd_dev->watch_request = NULL;
2585 out_cancel:
2586         /* Cancel the event if we're tearing down, or on error */
2587         ceph_osdc_cancel_event(rbd_dev->watch_event);
2588         rbd_dev->watch_event = NULL;
2589         if (obj_request)
2590                 rbd_obj_request_put(obj_request);
2591
2592         return ret;
2593 }
2594
2595 /*
2596  * Synchronous osd object method call.  Returns the number of bytes
2597  * returned in the outbound buffer, or a negative error code.
2598  */
2599 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2600                              const char *object_name,
2601                              const char *class_name,
2602                              const char *method_name,
2603                              const void *outbound,
2604                              size_t outbound_size,
2605                              void *inbound,
2606                              size_t inbound_size)
2607 {
2608         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2609         struct rbd_obj_request *obj_request;
2610         struct page **pages;
2611         u32 page_count;
2612         int ret;
2613
2614         /*
2615          * Method calls are ultimately read operations.  The result
2616          * should placed into the inbound buffer provided.  They
2617          * also supply outbound data--parameters for the object
2618          * method.  Currently if this is present it will be a
2619          * snapshot id.
2620          */
2621         page_count = (u32)calc_pages_for(0, inbound_size);
2622         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2623         if (IS_ERR(pages))
2624                 return PTR_ERR(pages);
2625
2626         ret = -ENOMEM;
2627         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2628                                                         OBJ_REQUEST_PAGES);
2629         if (!obj_request)
2630                 goto out;
2631
2632         obj_request->pages = pages;
2633         obj_request->page_count = page_count;
2634
2635         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2636         if (!obj_request->osd_req)
2637                 goto out;
2638
2639         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2640                                         class_name, method_name);
2641         if (outbound_size) {
2642                 struct ceph_pagelist *pagelist;
2643
2644                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2645                 if (!pagelist)
2646                         goto out;
2647
2648                 ceph_pagelist_init(pagelist);
2649                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2650                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2651                                                 pagelist);
2652         }
2653         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2654                                         obj_request->pages, inbound_size,
2655                                         0, false, false);
2656         rbd_osd_req_format_read(obj_request);
2657
2658         ret = rbd_obj_request_submit(osdc, obj_request);
2659         if (ret)
2660                 goto out;
2661         ret = rbd_obj_request_wait(obj_request);
2662         if (ret)
2663                 goto out;
2664
2665         ret = obj_request->result;
2666         if (ret < 0)
2667                 goto out;
2668
2669         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2670         ret = (int)obj_request->xferred;
2671         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2672 out:
2673         if (obj_request)
2674                 rbd_obj_request_put(obj_request);
2675         else
2676                 ceph_release_page_vector(pages, page_count);
2677
2678         return ret;
2679 }
2680
2681 static void rbd_request_fn(struct request_queue *q)
2682                 __releases(q->queue_lock) __acquires(q->queue_lock)
2683 {
2684         struct rbd_device *rbd_dev = q->queuedata;
2685         bool read_only = rbd_dev->mapping.read_only;
2686         struct request *rq;
2687         int result;
2688
2689         while ((rq = blk_fetch_request(q))) {
2690                 bool write_request = rq_data_dir(rq) == WRITE;
2691                 struct rbd_img_request *img_request;
2692                 u64 offset;
2693                 u64 length;
2694
2695                 /* Ignore any non-FS requests that filter through. */
2696
2697                 if (rq->cmd_type != REQ_TYPE_FS) {
2698                         dout("%s: non-fs request type %d\n", __func__,
2699                                 (int) rq->cmd_type);
2700                         __blk_end_request_all(rq, 0);
2701                         continue;
2702                 }
2703
2704                 /* Ignore/skip any zero-length requests */
2705
2706                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2707                 length = (u64) blk_rq_bytes(rq);
2708
2709                 if (!length) {
2710                         dout("%s: zero-length request\n", __func__);
2711                         __blk_end_request_all(rq, 0);
2712                         continue;
2713                 }
2714
2715                 spin_unlock_irq(q->queue_lock);
2716
2717                 /* Disallow writes to a read-only device */
2718
2719                 if (write_request) {
2720                         result = -EROFS;
2721                         if (read_only)
2722                                 goto end_request;
2723                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2724                 }
2725
2726                 /*
2727                  * Quit early if the mapped snapshot no longer
2728                  * exists.  It's still possible the snapshot will
2729                  * have disappeared by the time our request arrives
2730                  * at the osd, but there's no sense in sending it if
2731                  * we already know.
2732                  */
2733                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2734                         dout("request for non-existent snapshot");
2735                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2736                         result = -ENXIO;
2737                         goto end_request;
2738                 }
2739
2740                 result = -EINVAL;
2741                 if (offset && length > U64_MAX - offset + 1) {
2742                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2743                                 offset, length);
2744                         goto end_request;       /* Shouldn't happen */
2745                 }
2746
2747                 result = -ENOMEM;
2748                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2749                                                         write_request, false);
2750                 if (!img_request)
2751                         goto end_request;
2752
2753                 img_request->rq = rq;
2754
2755                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2756                                                 rq->bio);
2757                 if (!result)
2758                         result = rbd_img_request_submit(img_request);
2759                 if (result)
2760                         rbd_img_request_put(img_request);
2761 end_request:
2762                 spin_lock_irq(q->queue_lock);
2763                 if (result < 0) {
2764                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2765                                 write_request ? "write" : "read",
2766                                 length, offset, result);
2767
2768                         __blk_end_request_all(rq, result);
2769                 }
2770         }
2771 }
2772
2773 /*
2774  * a queue callback. Makes sure that we don't create a bio that spans across
2775  * multiple osd objects. One exception would be with a single page bios,
2776  * which we handle later at bio_chain_clone_range()
2777  */
2778 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2779                           struct bio_vec *bvec)
2780 {
2781         struct rbd_device *rbd_dev = q->queuedata;
2782         sector_t sector_offset;
2783         sector_t sectors_per_obj;
2784         sector_t obj_sector_offset;
2785         int ret;
2786
2787         /*
2788          * Find how far into its rbd object the partition-relative
2789          * bio start sector is to offset relative to the enclosing
2790          * device.
2791          */
2792         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2793         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2794         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2795
2796         /*
2797          * Compute the number of bytes from that offset to the end
2798          * of the object.  Account for what's already used by the bio.
2799          */
2800         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2801         if (ret > bmd->bi_size)
2802                 ret -= bmd->bi_size;
2803         else
2804                 ret = 0;
2805
2806         /*
2807          * Don't send back more than was asked for.  And if the bio
2808          * was empty, let the whole thing through because:  "Note
2809          * that a block device *must* allow a single page to be
2810          * added to an empty bio."
2811          */
2812         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2813         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2814                 ret = (int) bvec->bv_len;
2815
2816         return ret;
2817 }
2818
2819 static void rbd_free_disk(struct rbd_device *rbd_dev)
2820 {
2821         struct gendisk *disk = rbd_dev->disk;
2822
2823         if (!disk)
2824                 return;
2825
2826         rbd_dev->disk = NULL;
2827         if (disk->flags & GENHD_FL_UP) {
2828                 del_gendisk(disk);
2829                 if (disk->queue)
2830                         blk_cleanup_queue(disk->queue);
2831         }
2832         put_disk(disk);
2833 }
2834
2835 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2836                                 const char *object_name,
2837                                 u64 offset, u64 length, void *buf)
2838
2839 {
2840         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2841         struct rbd_obj_request *obj_request;
2842         struct page **pages = NULL;
2843         u32 page_count;
2844         size_t size;
2845         int ret;
2846
2847         page_count = (u32) calc_pages_for(offset, length);
2848         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2849         if (IS_ERR(pages))
2850                 ret = PTR_ERR(pages);
2851
2852         ret = -ENOMEM;
2853         obj_request = rbd_obj_request_create(object_name, offset, length,
2854                                                         OBJ_REQUEST_PAGES);
2855         if (!obj_request)
2856                 goto out;
2857
2858         obj_request->pages = pages;
2859         obj_request->page_count = page_count;
2860
2861         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2862         if (!obj_request->osd_req)
2863                 goto out;
2864
2865         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2866                                         offset, length, 0, 0);
2867         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2868                                         obj_request->pages,
2869                                         obj_request->length,
2870                                         obj_request->offset & ~PAGE_MASK,
2871                                         false, false);
2872         rbd_osd_req_format_read(obj_request);
2873
2874         ret = rbd_obj_request_submit(osdc, obj_request);
2875         if (ret)
2876                 goto out;
2877         ret = rbd_obj_request_wait(obj_request);
2878         if (ret)
2879                 goto out;
2880
2881         ret = obj_request->result;
2882         if (ret < 0)
2883                 goto out;
2884
2885         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2886         size = (size_t) obj_request->xferred;
2887         ceph_copy_from_page_vector(pages, buf, 0, size);
2888         rbd_assert(size <= (size_t)INT_MAX);
2889         ret = (int)size;
2890 out:
2891         if (obj_request)
2892                 rbd_obj_request_put(obj_request);
2893         else
2894                 ceph_release_page_vector(pages, page_count);
2895
2896         return ret;
2897 }
2898
2899 /*
2900  * Read the complete header for the given rbd device.
2901  *
2902  * Returns a pointer to a dynamically-allocated buffer containing
2903  * the complete and validated header.  Caller can pass the address
2904  * of a variable that will be filled in with the version of the
2905  * header object at the time it was read.
2906  *
2907  * Returns a pointer-coded errno if a failure occurs.
2908  */
2909 static struct rbd_image_header_ondisk *
2910 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2911 {
2912         struct rbd_image_header_ondisk *ondisk = NULL;
2913         u32 snap_count = 0;
2914         u64 names_size = 0;
2915         u32 want_count;
2916         int ret;
2917
2918         /*
2919          * The complete header will include an array of its 64-bit
2920          * snapshot ids, followed by the names of those snapshots as
2921          * a contiguous block of NUL-terminated strings.  Note that
2922          * the number of snapshots could change by the time we read
2923          * it in, in which case we re-read it.
2924          */
2925         do {
2926                 size_t size;
2927
2928                 kfree(ondisk);
2929
2930                 size = sizeof (*ondisk);
2931                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2932                 size += names_size;
2933                 ondisk = kmalloc(size, GFP_KERNEL);
2934                 if (!ondisk)
2935                         return ERR_PTR(-ENOMEM);
2936
2937                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2938                                        0, size, ondisk);
2939                 if (ret < 0)
2940                         goto out_err;
2941                 if ((size_t)ret < size) {
2942                         ret = -ENXIO;
2943                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2944                                 size, ret);
2945                         goto out_err;
2946                 }
2947                 if (!rbd_dev_ondisk_valid(ondisk)) {
2948                         ret = -ENXIO;
2949                         rbd_warn(rbd_dev, "invalid header");
2950                         goto out_err;
2951                 }
2952
2953                 names_size = le64_to_cpu(ondisk->snap_names_len);
2954                 want_count = snap_count;
2955                 snap_count = le32_to_cpu(ondisk->snap_count);
2956         } while (snap_count != want_count);
2957
2958         return ondisk;
2959
2960 out_err:
2961         kfree(ondisk);
2962
2963         return ERR_PTR(ret);
2964 }
2965
2966 /*
2967  * reload the ondisk the header
2968  */
2969 static int rbd_read_header(struct rbd_device *rbd_dev,
2970                            struct rbd_image_header *header)
2971 {
2972         struct rbd_image_header_ondisk *ondisk;
2973         int ret;
2974
2975         ondisk = rbd_dev_v1_header_read(rbd_dev);
2976         if (IS_ERR(ondisk))
2977                 return PTR_ERR(ondisk);
2978         ret = rbd_header_from_disk(header, ondisk);
2979         kfree(ondisk);
2980
2981         return ret;
2982 }
2983
2984 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2985 {
2986         struct rbd_snap *snap;
2987         struct rbd_snap *next;
2988
2989         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2990                 list_del(&snap->node);
2991                 rbd_snap_destroy(snap);
2992         }
2993 }
2994
2995 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2996 {
2997         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2998                 return;
2999
3000         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3001                 sector_t size;
3002
3003                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3004                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3005                 dout("setting size to %llu sectors", (unsigned long long)size);
3006                 set_capacity(rbd_dev->disk, size);
3007         }
3008 }
3009
3010 /*
3011  * only read the first part of the ondisk header, without the snaps info
3012  */
3013 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3014 {
3015         int ret;
3016         struct rbd_image_header h;
3017
3018         ret = rbd_read_header(rbd_dev, &h);
3019         if (ret < 0)
3020                 return ret;
3021
3022         down_write(&rbd_dev->header_rwsem);
3023
3024         /* Update image size, and check for resize of mapped image */
3025         rbd_dev->header.image_size = h.image_size;
3026         rbd_update_mapping_size(rbd_dev);
3027
3028         /* rbd_dev->header.object_prefix shouldn't change */
3029         kfree(rbd_dev->header.snap_sizes);
3030         kfree(rbd_dev->header.snap_names);
3031         /* osd requests may still refer to snapc */
3032         ceph_put_snap_context(rbd_dev->header.snapc);
3033
3034         rbd_dev->header.image_size = h.image_size;
3035         rbd_dev->header.snapc = h.snapc;
3036         rbd_dev->header.snap_names = h.snap_names;
3037         rbd_dev->header.snap_sizes = h.snap_sizes;
3038         /* Free the extra copy of the object prefix */
3039         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3040                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3041         kfree(h.object_prefix);
3042
3043         ret = rbd_dev_snaps_update(rbd_dev);
3044
3045         up_write(&rbd_dev->header_rwsem);
3046
3047         return ret;
3048 }
3049
3050 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3051 {
3052         u64 image_size;
3053         int ret;
3054
3055         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3056         image_size = rbd_dev->header.image_size;
3057         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3058         if (rbd_dev->image_format == 1)
3059                 ret = rbd_dev_v1_refresh(rbd_dev);
3060         else
3061                 ret = rbd_dev_v2_refresh(rbd_dev);
3062         mutex_unlock(&ctl_mutex);
3063         if (ret)
3064                 rbd_warn(rbd_dev, "got notification but failed to "
3065                            " update snaps: %d\n", ret);
3066         if (image_size != rbd_dev->header.image_size)
3067                 revalidate_disk(rbd_dev->disk);
3068
3069         return ret;
3070 }
3071
3072 static int rbd_init_disk(struct rbd_device *rbd_dev)
3073 {
3074         struct gendisk *disk;
3075         struct request_queue *q;
3076         u64 segment_size;
3077
3078         /* create gendisk info */
3079         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3080         if (!disk)
3081                 return -ENOMEM;
3082
3083         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3084                  rbd_dev->dev_id);
3085         disk->major = rbd_dev->major;
3086         disk->first_minor = 0;
3087         disk->fops = &rbd_bd_ops;
3088         disk->private_data = rbd_dev;
3089
3090         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3091         if (!q)
3092                 goto out_disk;
3093
3094         /* We use the default size, but let's be explicit about it. */
3095         blk_queue_physical_block_size(q, SECTOR_SIZE);
3096
3097         /* set io sizes to object size */
3098         segment_size = rbd_obj_bytes(&rbd_dev->header);
3099         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3100         blk_queue_max_segment_size(q, segment_size);
3101         blk_queue_io_min(q, segment_size);
3102         blk_queue_io_opt(q, segment_size);
3103
3104         blk_queue_merge_bvec(q, rbd_merge_bvec);
3105         disk->queue = q;
3106
3107         q->queuedata = rbd_dev;
3108
3109         rbd_dev->disk = disk;
3110
3111         return 0;
3112 out_disk:
3113         put_disk(disk);
3114
3115         return -ENOMEM;
3116 }
3117
3118 /*
3119   sysfs
3120 */
3121
3122 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3123 {
3124         return container_of(dev, struct rbd_device, dev);
3125 }
3126
3127 static ssize_t rbd_size_show(struct device *dev,
3128                              struct device_attribute *attr, char *buf)
3129 {
3130         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3131
3132         return sprintf(buf, "%llu\n",
3133                 (unsigned long long)rbd_dev->mapping.size);
3134 }
3135
3136 /*
3137  * Note this shows the features for whatever's mapped, which is not
3138  * necessarily the base image.
3139  */
3140 static ssize_t rbd_features_show(struct device *dev,
3141                              struct device_attribute *attr, char *buf)
3142 {
3143         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3144
3145         return sprintf(buf, "0x%016llx\n",
3146                         (unsigned long long)rbd_dev->mapping.features);
3147 }
3148
3149 static ssize_t rbd_major_show(struct device *dev,
3150                               struct device_attribute *attr, char *buf)
3151 {
3152         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3153
3154         if (rbd_dev->major)
3155                 return sprintf(buf, "%d\n", rbd_dev->major);
3156
3157         return sprintf(buf, "(none)\n");
3158
3159 }
3160
3161 static ssize_t rbd_client_id_show(struct device *dev,
3162                                   struct device_attribute *attr, char *buf)
3163 {
3164         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3165
3166         return sprintf(buf, "client%lld\n",
3167                         ceph_client_id(rbd_dev->rbd_client->client));
3168 }
3169
3170 static ssize_t rbd_pool_show(struct device *dev,
3171                              struct device_attribute *attr, char *buf)
3172 {
3173         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3174
3175         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3176 }
3177
3178 static ssize_t rbd_pool_id_show(struct device *dev,
3179                              struct device_attribute *attr, char *buf)
3180 {
3181         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3182
3183         return sprintf(buf, "%llu\n",
3184                         (unsigned long long) rbd_dev->spec->pool_id);
3185 }
3186
3187 static ssize_t rbd_name_show(struct device *dev,
3188                              struct device_attribute *attr, char *buf)
3189 {
3190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3191
3192         if (rbd_dev->spec->image_name)
3193                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3194
3195         return sprintf(buf, "(unknown)\n");
3196 }
3197
3198 static ssize_t rbd_image_id_show(struct device *dev,
3199                              struct device_attribute *attr, char *buf)
3200 {
3201         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3202
3203         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3204 }
3205
3206 /*
3207  * Shows the name of the currently-mapped snapshot (or
3208  * RBD_SNAP_HEAD_NAME for the base image).
3209  */
3210 static ssize_t rbd_snap_show(struct device *dev,
3211                              struct device_attribute *attr,
3212                              char *buf)
3213 {
3214         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3215
3216         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3217 }
3218
3219 /*
3220  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3221  * for the parent image.  If there is no parent, simply shows
3222  * "(no parent image)".
3223  */
3224 static ssize_t rbd_parent_show(struct device *dev,
3225                              struct device_attribute *attr,
3226                              char *buf)
3227 {
3228         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229         struct rbd_spec *spec = rbd_dev->parent_spec;
3230         int count;
3231         char *bufp = buf;
3232
3233         if (!spec)
3234                 return sprintf(buf, "(no parent image)\n");
3235
3236         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3237                         (unsigned long long) spec->pool_id, spec->pool_name);
3238         if (count < 0)
3239                 return count;
3240         bufp += count;
3241
3242         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3243                         spec->image_name ? spec->image_name : "(unknown)");
3244         if (count < 0)
3245                 return count;
3246         bufp += count;
3247
3248         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3249                         (unsigned long long) spec->snap_id, spec->snap_name);
3250         if (count < 0)
3251                 return count;
3252         bufp += count;
3253
3254         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3255         if (count < 0)
3256                 return count;
3257         bufp += count;
3258
3259         return (ssize_t) (bufp - buf);
3260 }
3261
3262 static ssize_t rbd_image_refresh(struct device *dev,
3263                                  struct device_attribute *attr,
3264                                  const char *buf,
3265                                  size_t size)
3266 {
3267         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268         int ret;
3269
3270         ret = rbd_dev_refresh(rbd_dev);
3271
3272         return ret < 0 ? ret : size;
3273 }
3274
3275 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3276 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3277 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3278 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3279 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3280 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3281 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3282 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3283 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3284 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3285 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3286
3287 static struct attribute *rbd_attrs[] = {
3288         &dev_attr_size.attr,
3289         &dev_attr_features.attr,
3290         &dev_attr_major.attr,
3291         &dev_attr_client_id.attr,
3292         &dev_attr_pool.attr,
3293         &dev_attr_pool_id.attr,
3294         &dev_attr_name.attr,
3295         &dev_attr_image_id.attr,
3296         &dev_attr_current_snap.attr,
3297         &dev_attr_parent.attr,
3298         &dev_attr_refresh.attr,
3299         NULL
3300 };
3301
3302 static struct attribute_group rbd_attr_group = {
3303         .attrs = rbd_attrs,
3304 };
3305
3306 static const struct attribute_group *rbd_attr_groups[] = {
3307         &rbd_attr_group,
3308         NULL
3309 };
3310
3311 static void rbd_sysfs_dev_release(struct device *dev)
3312 {
3313 }
3314
3315 static struct device_type rbd_device_type = {
3316         .name           = "rbd",
3317         .groups         = rbd_attr_groups,
3318         .release        = rbd_sysfs_dev_release,
3319 };
3320
3321 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3322 {
3323         kref_get(&spec->kref);
3324
3325         return spec;
3326 }
3327
3328 static void rbd_spec_free(struct kref *kref);
3329 static void rbd_spec_put(struct rbd_spec *spec)
3330 {
3331         if (spec)
3332                 kref_put(&spec->kref, rbd_spec_free);
3333 }
3334
3335 static struct rbd_spec *rbd_spec_alloc(void)
3336 {
3337         struct rbd_spec *spec;
3338
3339         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3340         if (!spec)
3341                 return NULL;
3342         kref_init(&spec->kref);
3343
3344         return spec;
3345 }
3346
3347 static void rbd_spec_free(struct kref *kref)
3348 {
3349         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3350
3351         kfree(spec->pool_name);
3352         kfree(spec->image_id);
3353         kfree(spec->image_name);
3354         kfree(spec->snap_name);
3355         kfree(spec);
3356 }
3357
3358 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3359                                 struct rbd_spec *spec)
3360 {
3361         struct rbd_device *rbd_dev;
3362
3363         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3364         if (!rbd_dev)
3365                 return NULL;
3366
3367         spin_lock_init(&rbd_dev->lock);
3368         rbd_dev->flags = 0;
3369         INIT_LIST_HEAD(&rbd_dev->node);
3370         INIT_LIST_HEAD(&rbd_dev->snaps);
3371         init_rwsem(&rbd_dev->header_rwsem);
3372
3373         rbd_dev->spec = spec;
3374         rbd_dev->rbd_client = rbdc;
3375
3376         /* Initialize the layout used for all rbd requests */
3377
3378         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3379         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3380         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3381         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3382
3383         return rbd_dev;
3384 }
3385
3386 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3387 {
3388         rbd_put_client(rbd_dev->rbd_client);
3389         rbd_spec_put(rbd_dev->spec);
3390         kfree(rbd_dev);
3391 }
3392
3393 static void rbd_snap_destroy(struct rbd_snap *snap)
3394 {
3395         kfree(snap->name);
3396         kfree(snap);
3397 }
3398
3399 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3400                                                 const char *snap_name,
3401                                                 u64 snap_id, u64 snap_size,
3402                                                 u64 snap_features)
3403 {
3404         struct rbd_snap *snap;
3405
3406         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3407         if (!snap)
3408                 return ERR_PTR(-ENOMEM);
3409
3410         snap->name = snap_name;
3411         snap->id = snap_id;
3412         snap->size = snap_size;
3413         snap->features = snap_features;
3414
3415         return snap;
3416 }
3417
3418 /*
3419  * Returns a dynamically-allocated snapshot name if successful, or a
3420  * pointer-coded error otherwise.
3421  */
3422 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3423                 u64 *snap_size, u64 *snap_features)
3424 {
3425         const char *snap_name;
3426         int i;
3427
3428         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3429
3430         /* Skip over names until we find the one we are looking for */
3431
3432         snap_name = rbd_dev->header.snap_names;
3433         for (i = 0; i < which; i++)
3434                 snap_name += strlen(snap_name) + 1;
3435
3436         snap_name = kstrdup(snap_name, GFP_KERNEL);
3437         if (!snap_name)
3438                 return ERR_PTR(-ENOMEM);
3439
3440         *snap_size = rbd_dev->header.snap_sizes[which];
3441         *snap_features = 0;     /* No features for v1 */
3442
3443         return snap_name;
3444 }
3445
3446 /*
3447  * Get the size and object order for an image snapshot, or if
3448  * snap_id is CEPH_NOSNAP, gets this information for the base
3449  * image.
3450  */
3451 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3452                                 u8 *order, u64 *snap_size)
3453 {
3454         __le64 snapid = cpu_to_le64(snap_id);
3455         int ret;
3456         struct {
3457                 u8 order;
3458                 __le64 size;
3459         } __attribute__ ((packed)) size_buf = { 0 };
3460
3461         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3462                                 "rbd", "get_size",
3463                                 &snapid, sizeof (snapid),
3464                                 &size_buf, sizeof (size_buf));
3465         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3466         if (ret < 0)
3467                 return ret;
3468         if (ret < sizeof (size_buf))
3469                 return -ERANGE;
3470
3471         if (order)
3472                 *order = size_buf.order;
3473         *snap_size = le64_to_cpu(size_buf.size);
3474
3475         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3476                 (unsigned long long)snap_id, (unsigned int)*order,
3477                 (unsigned long long)*snap_size);
3478
3479         return 0;
3480 }
3481
3482 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3483 {
3484         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3485                                         &rbd_dev->header.obj_order,
3486                                         &rbd_dev->header.image_size);
3487 }
3488
3489 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3490 {
3491         void *reply_buf;
3492         int ret;
3493         void *p;
3494
3495         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3496         if (!reply_buf)
3497                 return -ENOMEM;
3498
3499         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3500                                 "rbd", "get_object_prefix", NULL, 0,
3501                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3502         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3503         if (ret < 0)
3504                 goto out;
3505
3506         p = reply_buf;
3507         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3508                                                 p + ret, NULL, GFP_NOIO);
3509         ret = 0;
3510
3511         if (IS_ERR(rbd_dev->header.object_prefix)) {
3512                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3513                 rbd_dev->header.object_prefix = NULL;
3514         } else {
3515                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3516         }
3517 out:
3518         kfree(reply_buf);
3519
3520         return ret;
3521 }
3522
3523 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3524                 u64 *snap_features)
3525 {
3526         __le64 snapid = cpu_to_le64(snap_id);
3527         struct {
3528                 __le64 features;
3529                 __le64 incompat;
3530         } __attribute__ ((packed)) features_buf = { 0 };
3531         u64 incompat;
3532         int ret;
3533
3534         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3535                                 "rbd", "get_features",
3536                                 &snapid, sizeof (snapid),
3537                                 &features_buf, sizeof (features_buf));
3538         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3539         if (ret < 0)
3540                 return ret;
3541         if (ret < sizeof (features_buf))
3542                 return -ERANGE;
3543
3544         incompat = le64_to_cpu(features_buf.incompat);
3545         if (incompat & ~RBD_FEATURES_SUPPORTED)
3546                 return -ENXIO;
3547
3548         *snap_features = le64_to_cpu(features_buf.features);
3549
3550         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3551                 (unsigned long long)snap_id,
3552                 (unsigned long long)*snap_features,
3553                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3554
3555         return 0;
3556 }
3557
3558 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3559 {
3560         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3561                                                 &rbd_dev->header.features);
3562 }
3563
3564 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3565 {
3566         struct rbd_spec *parent_spec;
3567         size_t size;
3568         void *reply_buf = NULL;
3569         __le64 snapid;
3570         void *p;
3571         void *end;
3572         char *image_id;
3573         u64 overlap;
3574         int ret;
3575
3576         parent_spec = rbd_spec_alloc();
3577         if (!parent_spec)
3578                 return -ENOMEM;
3579
3580         size = sizeof (__le64) +                                /* pool_id */
3581                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3582                 sizeof (__le64) +                               /* snap_id */
3583                 sizeof (__le64);                                /* overlap */
3584         reply_buf = kmalloc(size, GFP_KERNEL);
3585         if (!reply_buf) {
3586                 ret = -ENOMEM;
3587                 goto out_err;
3588         }
3589
3590         snapid = cpu_to_le64(CEPH_NOSNAP);
3591         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3592                                 "rbd", "get_parent",
3593                                 &snapid, sizeof (snapid),
3594                                 reply_buf, size);
3595         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3596         if (ret < 0)
3597                 goto out_err;
3598
3599         p = reply_buf;
3600         end = reply_buf + ret;
3601         ret = -ERANGE;
3602         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3603         if (parent_spec->pool_id == CEPH_NOPOOL)
3604                 goto out;       /* No parent?  No problem. */
3605
3606         /* The ceph file layout needs to fit pool id in 32 bits */
3607
3608         ret = -EIO;
3609         if (parent_spec->pool_id > (u64)U32_MAX) {
3610                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3611                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3612                 goto out_err;
3613         }
3614
3615         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3616         if (IS_ERR(image_id)) {
3617                 ret = PTR_ERR(image_id);
3618                 goto out_err;
3619         }
3620         parent_spec->image_id = image_id;
3621         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3622         ceph_decode_64_safe(&p, end, overlap, out_err);
3623
3624         rbd_dev->parent_overlap = overlap;
3625         rbd_dev->parent_spec = parent_spec;
3626         parent_spec = NULL;     /* rbd_dev now owns this */
3627 out:
3628         ret = 0;
3629 out_err:
3630         kfree(reply_buf);
3631         rbd_spec_put(parent_spec);
3632
3633         return ret;
3634 }
3635
3636 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3637 {
3638         struct {
3639                 __le64 stripe_unit;
3640                 __le64 stripe_count;
3641         } __attribute__ ((packed)) striping_info_buf = { 0 };
3642         size_t size = sizeof (striping_info_buf);
3643         void *p;
3644         u64 obj_size;
3645         u64 stripe_unit;
3646         u64 stripe_count;
3647         int ret;
3648
3649         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3650                                 "rbd", "get_stripe_unit_count", NULL, 0,
3651                                 (char *)&striping_info_buf, size);
3652         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3653         if (ret < 0)
3654                 return ret;
3655         if (ret < size)
3656                 return -ERANGE;
3657
3658         /*
3659          * We don't actually support the "fancy striping" feature
3660          * (STRIPINGV2) yet, but if the striping sizes are the
3661          * defaults the behavior is the same as before.  So find
3662          * out, and only fail if the image has non-default values.
3663          */
3664         ret = -EINVAL;
3665         obj_size = (u64)1 << rbd_dev->header.obj_order;
3666         p = &striping_info_buf;
3667         stripe_unit = ceph_decode_64(&p);
3668         if (stripe_unit != obj_size) {
3669                 rbd_warn(rbd_dev, "unsupported stripe unit "
3670                                 "(got %llu want %llu)",
3671                                 stripe_unit, obj_size);
3672                 return -EINVAL;
3673         }
3674         stripe_count = ceph_decode_64(&p);
3675         if (stripe_count != 1) {
3676                 rbd_warn(rbd_dev, "unsupported stripe count "
3677                                 "(got %llu want 1)", stripe_count);
3678                 return -EINVAL;
3679         }
3680         rbd_dev->header.stripe_unit = stripe_unit;
3681         rbd_dev->header.stripe_count = stripe_count;
3682
3683         return 0;
3684 }
3685
3686 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3687 {
3688         size_t image_id_size;
3689         char *image_id;
3690         void *p;
3691         void *end;
3692         size_t size;
3693         void *reply_buf = NULL;
3694         size_t len = 0;
3695         char *image_name = NULL;
3696         int ret;
3697
3698         rbd_assert(!rbd_dev->spec->image_name);
3699
3700         len = strlen(rbd_dev->spec->image_id);
3701         image_id_size = sizeof (__le32) + len;
3702         image_id = kmalloc(image_id_size, GFP_KERNEL);
3703         if (!image_id)
3704                 return NULL;
3705
3706         p = image_id;
3707         end = image_id + image_id_size;
3708         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3709
3710         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3711         reply_buf = kmalloc(size, GFP_KERNEL);
3712         if (!reply_buf)
3713                 goto out;
3714
3715         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3716                                 "rbd", "dir_get_name",
3717                                 image_id, image_id_size,
3718                                 reply_buf, size);
3719         if (ret < 0)
3720                 goto out;
3721         p = reply_buf;
3722         end = reply_buf + ret;
3723
3724         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3725         if (IS_ERR(image_name))
3726                 image_name = NULL;
3727         else
3728                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3729 out:
3730         kfree(reply_buf);
3731         kfree(image_id);
3732
3733         return image_name;
3734 }
3735
3736 /*
3737  * When an rbd image has a parent image, it is identified by the
3738  * pool, image, and snapshot ids (not names).  This function fills
3739  * in the names for those ids.  (It's OK if we can't figure out the
3740  * name for an image id, but the pool and snapshot ids should always
3741  * exist and have names.)  All names in an rbd spec are dynamically
3742  * allocated.
3743  *
3744  * When an image being mapped (not a parent) is probed, we have the
3745  * pool name and pool id, image name and image id, and the snapshot
3746  * name.  The only thing we're missing is the snapshot id.
3747  *
3748  * The set of snapshots for an image is not known until they have
3749  * been read by rbd_dev_snaps_update(), so we can't completely fill
3750  * in this information until after that has been called.
3751  */
3752 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3753 {
3754         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3755         struct rbd_spec *spec = rbd_dev->spec;
3756         const char *pool_name;
3757         const char *image_name;
3758         const char *snap_name;
3759         int ret;
3760
3761         /*
3762          * An image being mapped will have the pool name (etc.), but
3763          * we need to look up the snapshot id.
3764          */
3765         if (spec->pool_name) {
3766                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3767                         struct rbd_snap *snap;
3768
3769                         snap = snap_by_name(rbd_dev, spec->snap_name);
3770                         if (!snap)
3771                                 return -ENOENT;
3772                         spec->snap_id = snap->id;
3773                 } else {
3774                         spec->snap_id = CEPH_NOSNAP;
3775                 }
3776
3777                 return 0;
3778         }
3779
3780         /* Get the pool name; we have to make our own copy of this */
3781
3782         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3783         if (!pool_name) {
3784                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3785                 return -EIO;
3786         }
3787         pool_name = kstrdup(pool_name, GFP_KERNEL);
3788         if (!pool_name)
3789                 return -ENOMEM;
3790
3791         /* Fetch the image name; tolerate failure here */
3792
3793         image_name = rbd_dev_image_name(rbd_dev);
3794         if (!image_name)
3795                 rbd_warn(rbd_dev, "unable to get image name");
3796
3797         /* Look up the snapshot name, and make a copy */
3798
3799         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3800         if (!snap_name) {
3801                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3802                 ret = -EIO;
3803                 goto out_err;
3804         }
3805         snap_name = kstrdup(snap_name, GFP_KERNEL);
3806         if (!snap_name) {
3807                 ret = -ENOMEM;
3808                 goto out_err;
3809         }
3810
3811         spec->pool_name = pool_name;
3812         spec->image_name = image_name;
3813         spec->snap_name = snap_name;
3814
3815         return 0;
3816 out_err:
3817         kfree(image_name);
3818         kfree(pool_name);
3819
3820         return ret;
3821 }
3822
3823 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3824 {
3825         size_t size;
3826         int ret;
3827         void *reply_buf;
3828         void *p;
3829         void *end;
3830         u64 seq;
3831         u32 snap_count;
3832         struct ceph_snap_context *snapc;
3833         u32 i;
3834
3835         /*
3836          * We'll need room for the seq value (maximum snapshot id),
3837          * snapshot count, and array of that many snapshot ids.
3838          * For now we have a fixed upper limit on the number we're
3839          * prepared to receive.
3840          */
3841         size = sizeof (__le64) + sizeof (__le32) +
3842                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3843         reply_buf = kzalloc(size, GFP_KERNEL);
3844         if (!reply_buf)
3845                 return -ENOMEM;
3846
3847         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3848                                 "rbd", "get_snapcontext", NULL, 0,
3849                                 reply_buf, size);
3850         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3851         if (ret < 0)
3852                 goto out;
3853
3854         p = reply_buf;
3855         end = reply_buf + ret;
3856         ret = -ERANGE;
3857         ceph_decode_64_safe(&p, end, seq, out);
3858         ceph_decode_32_safe(&p, end, snap_count, out);
3859
3860         /*
3861          * Make sure the reported number of snapshot ids wouldn't go
3862          * beyond the end of our buffer.  But before checking that,
3863          * make sure the computed size of the snapshot context we
3864          * allocate is representable in a size_t.
3865          */
3866         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3867                                  / sizeof (u64)) {
3868                 ret = -EINVAL;
3869                 goto out;
3870         }
3871         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3872                 goto out;
3873         ret = 0;
3874
3875         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3876         if (!snapc) {
3877                 ret = -ENOMEM;
3878                 goto out;
3879         }
3880         snapc->seq = seq;
3881         for (i = 0; i < snap_count; i++)
3882                 snapc->snaps[i] = ceph_decode_64(&p);
3883
3884         rbd_dev->header.snapc = snapc;
3885
3886         dout("  snap context seq = %llu, snap_count = %u\n",
3887                 (unsigned long long)seq, (unsigned int)snap_count);
3888 out:
3889         kfree(reply_buf);
3890
3891         return ret;
3892 }
3893
3894 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3895 {
3896         size_t size;
3897         void *reply_buf;
3898         __le64 snap_id;
3899         int ret;
3900         void *p;
3901         void *end;
3902         char *snap_name;
3903
3904         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3905         reply_buf = kmalloc(size, GFP_KERNEL);
3906         if (!reply_buf)
3907                 return ERR_PTR(-ENOMEM);
3908
3909         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3910         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3911         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3912                                 "rbd", "get_snapshot_name",
3913                                 &snap_id, sizeof (snap_id),
3914                                 reply_buf, size);
3915         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3916         if (ret < 0) {
3917                 snap_name = ERR_PTR(ret);
3918                 goto out;
3919         }
3920
3921         p = reply_buf;
3922         end = reply_buf + ret;
3923         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3924         if (IS_ERR(snap_name))
3925                 goto out;
3926
3927         dout("  snap_id 0x%016llx snap_name = %s\n",
3928                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3929 out:
3930         kfree(reply_buf);
3931
3932         return snap_name;
3933 }
3934
3935 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3936                 u64 *snap_size, u64 *snap_features)
3937 {
3938         u64 snap_id;
3939         u64 size;
3940         u64 features;
3941         const char *snap_name;
3942         int ret;
3943
3944         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3945         snap_id = rbd_dev->header.snapc->snaps[which];
3946         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3947         if (ret)
3948                 goto out_err;
3949
3950         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3951         if (ret)
3952                 goto out_err;
3953
3954         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3955         if (!IS_ERR(snap_name)) {
3956                 *snap_size = size;
3957                 *snap_features = features;
3958         }
3959
3960         return snap_name;
3961 out_err:
3962         return ERR_PTR(ret);
3963 }
3964
3965 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3966                 u64 *snap_size, u64 *snap_features)
3967 {
3968         if (rbd_dev->image_format == 1)
3969                 return rbd_dev_v1_snap_info(rbd_dev, which,
3970                                         snap_size, snap_features);
3971         if (rbd_dev->image_format == 2)
3972                 return rbd_dev_v2_snap_info(rbd_dev, which,
3973                                         snap_size, snap_features);
3974         return ERR_PTR(-EINVAL);
3975 }
3976
3977 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3978 {
3979         int ret;
3980
3981         down_write(&rbd_dev->header_rwsem);
3982
3983         ret = rbd_dev_v2_image_size(rbd_dev);
3984         if (ret)
3985                 goto out;
3986         rbd_update_mapping_size(rbd_dev);
3987
3988         ret = rbd_dev_v2_snap_context(rbd_dev);
3989         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3990         if (ret)
3991                 goto out;
3992         ret = rbd_dev_snaps_update(rbd_dev);
3993         dout("rbd_dev_snaps_update returned %d\n", ret);
3994         if (ret)
3995                 goto out;
3996 out:
3997         up_write(&rbd_dev->header_rwsem);
3998
3999         return ret;
4000 }
4001
4002 /*
4003  * Scan the rbd device's current snapshot list and compare it to the
4004  * newly-received snapshot context.  Remove any existing snapshots
4005  * not present in the new snapshot context.  Add a new snapshot for
4006  * any snaphots in the snapshot context not in the current list.
4007  * And verify there are no changes to snapshots we already know
4008  * about.
4009  *
4010  * Assumes the snapshots in the snapshot context are sorted by
4011  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4012  * are also maintained in that order.)
4013  *
4014  * Note that any error occurs while updating the snapshot list
4015  * aborts the update, and the entire list is cleared.  The snapshot
4016  * list becomes inconsistent at that point anyway, so it might as
4017  * well be empty.
4018  */
4019 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4020 {
4021         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4022         const u32 snap_count = snapc->num_snaps;
4023         struct list_head *head = &rbd_dev->snaps;
4024         struct list_head *links = head->next;
4025         u32 index = 0;
4026         int ret = 0;
4027
4028         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4029         while (index < snap_count || links != head) {
4030                 u64 snap_id;
4031                 struct rbd_snap *snap;
4032                 const char *snap_name;
4033                 u64 snap_size = 0;
4034                 u64 snap_features = 0;
4035
4036                 snap_id = index < snap_count ? snapc->snaps[index]
4037                                              : CEPH_NOSNAP;
4038                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4039                                      : NULL;
4040                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4041
4042                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4043                         struct list_head *next = links->next;
4044
4045                         /*
4046                          * A previously-existing snapshot is not in
4047                          * the new snap context.
4048                          *
4049                          * If the now-missing snapshot is the one
4050                          * the image represents, clear its existence
4051                          * flag so we can avoid sending any more
4052                          * requests to it.
4053                          */
4054                         if (rbd_dev->spec->snap_id == snap->id)
4055                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4056                         dout("removing %ssnap id %llu\n",
4057                                 rbd_dev->spec->snap_id == snap->id ?
4058                                                         "mapped " : "",
4059                                 (unsigned long long)snap->id);
4060
4061                         list_del(&snap->node);
4062                         rbd_snap_destroy(snap);
4063
4064                         /* Done with this list entry; advance */
4065
4066                         links = next;
4067                         continue;
4068                 }
4069
4070                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4071                                         &snap_size, &snap_features);
4072                 if (IS_ERR(snap_name)) {
4073                         ret = PTR_ERR(snap_name);
4074                         dout("failed to get snap info, error %d\n", ret);
4075                         goto out_err;
4076                 }
4077
4078                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4079                         (unsigned long long)snap_id);
4080                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4081                         struct rbd_snap *new_snap;
4082
4083                         /* We haven't seen this snapshot before */
4084
4085                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4086                                         snap_id, snap_size, snap_features);
4087                         if (IS_ERR(new_snap)) {
4088                                 ret = PTR_ERR(new_snap);
4089                                 dout("  failed to add dev, error %d\n", ret);
4090                                 goto out_err;
4091                         }
4092
4093                         /* New goes before existing, or at end of list */
4094
4095                         dout("  added dev%s\n", snap ? "" : " at end\n");
4096                         if (snap)
4097                                 list_add_tail(&new_snap->node, &snap->node);
4098                         else
4099                                 list_add_tail(&new_snap->node, head);
4100                 } else {
4101                         /* Already have this one */
4102
4103                         dout("  already present\n");
4104
4105                         rbd_assert(snap->size == snap_size);
4106                         rbd_assert(!strcmp(snap->name, snap_name));
4107                         rbd_assert(snap->features == snap_features);
4108
4109                         /* Done with this list entry; advance */
4110
4111                         links = links->next;
4112                 }
4113
4114                 /* Advance to the next entry in the snapshot context */
4115
4116                 index++;
4117         }
4118         dout("%s: done\n", __func__);
4119
4120         return 0;
4121 out_err:
4122         rbd_remove_all_snaps(rbd_dev);
4123
4124         return ret;
4125 }
4126
4127 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4128 {
4129         struct device *dev;
4130         int ret;
4131
4132         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4133
4134         dev = &rbd_dev->dev;
4135         dev->bus = &rbd_bus_type;
4136         dev->type = &rbd_device_type;
4137         dev->parent = &rbd_root_dev;
4138         dev->release = rbd_dev_device_release;
4139         dev_set_name(dev, "%d", rbd_dev->dev_id);
4140         ret = device_register(dev);
4141
4142         mutex_unlock(&ctl_mutex);
4143
4144         return ret;
4145 }
4146
4147 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4148 {
4149         device_unregister(&rbd_dev->dev);
4150 }
4151
4152 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4153
4154 /*
4155  * Get a unique rbd identifier for the given new rbd_dev, and add
4156  * the rbd_dev to the global list.  The minimum rbd id is 1.
4157  */
4158 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4159 {
4160         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4161
4162         spin_lock(&rbd_dev_list_lock);
4163         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4164         spin_unlock(&rbd_dev_list_lock);
4165         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4166                 (unsigned long long) rbd_dev->dev_id);
4167 }
4168
4169 /*
4170  * Remove an rbd_dev from the global list, and record that its
4171  * identifier is no longer in use.
4172  */
4173 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4174 {
4175         struct list_head *tmp;
4176         int rbd_id = rbd_dev->dev_id;
4177         int max_id;
4178
4179         rbd_assert(rbd_id > 0);
4180
4181         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4182                 (unsigned long long) rbd_dev->dev_id);
4183         spin_lock(&rbd_dev_list_lock);
4184         list_del_init(&rbd_dev->node);
4185
4186         /*
4187          * If the id being "put" is not the current maximum, there
4188          * is nothing special we need to do.
4189          */
4190         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4191                 spin_unlock(&rbd_dev_list_lock);
4192                 return;
4193         }
4194
4195         /*
4196          * We need to update the current maximum id.  Search the
4197          * list to find out what it is.  We're more likely to find
4198          * the maximum at the end, so search the list backward.
4199          */
4200         max_id = 0;
4201         list_for_each_prev(tmp, &rbd_dev_list) {
4202                 struct rbd_device *rbd_dev;
4203
4204                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4205                 if (rbd_dev->dev_id > max_id)
4206                         max_id = rbd_dev->dev_id;
4207         }
4208         spin_unlock(&rbd_dev_list_lock);
4209
4210         /*
4211          * The max id could have been updated by rbd_dev_id_get(), in
4212          * which case it now accurately reflects the new maximum.
4213          * Be careful not to overwrite the maximum value in that
4214          * case.
4215          */
4216         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4217         dout("  max dev id has been reset\n");
4218 }
4219
4220 /*
4221  * Skips over white space at *buf, and updates *buf to point to the
4222  * first found non-space character (if any). Returns the length of
4223  * the token (string of non-white space characters) found.  Note
4224  * that *buf must be terminated with '\0'.
4225  */
4226 static inline size_t next_token(const char **buf)
4227 {
4228         /*
4229         * These are the characters that produce nonzero for
4230         * isspace() in the "C" and "POSIX" locales.
4231         */
4232         const char *spaces = " \f\n\r\t\v";
4233
4234         *buf += strspn(*buf, spaces);   /* Find start of token */
4235
4236         return strcspn(*buf, spaces);   /* Return token length */
4237 }
4238
4239 /*
4240  * Finds the next token in *buf, and if the provided token buffer is
4241  * big enough, copies the found token into it.  The result, if
4242  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4243  * must be terminated with '\0' on entry.
4244  *
4245  * Returns the length of the token found (not including the '\0').
4246  * Return value will be 0 if no token is found, and it will be >=
4247  * token_size if the token would not fit.
4248  *
4249  * The *buf pointer will be updated to point beyond the end of the
4250  * found token.  Note that this occurs even if the token buffer is
4251  * too small to hold it.
4252  */
4253 static inline size_t copy_token(const char **buf,
4254                                 char *token,
4255                                 size_t token_size)
4256 {
4257         size_t len;
4258
4259         len = next_token(buf);
4260         if (len < token_size) {
4261                 memcpy(token, *buf, len);
4262                 *(token + len) = '\0';
4263         }
4264         *buf += len;
4265
4266         return len;
4267 }
4268
4269 /*
4270  * Finds the next token in *buf, dynamically allocates a buffer big
4271  * enough to hold a copy of it, and copies the token into the new
4272  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4273  * that a duplicate buffer is created even for a zero-length token.
4274  *
4275  * Returns a pointer to the newly-allocated duplicate, or a null
4276  * pointer if memory for the duplicate was not available.  If
4277  * the lenp argument is a non-null pointer, the length of the token
4278  * (not including the '\0') is returned in *lenp.
4279  *
4280  * If successful, the *buf pointer will be updated to point beyond
4281  * the end of the found token.
4282  *
4283  * Note: uses GFP_KERNEL for allocation.
4284  */
4285 static inline char *dup_token(const char **buf, size_t *lenp)
4286 {
4287         char *dup;
4288         size_t len;
4289
4290         len = next_token(buf);
4291         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4292         if (!dup)
4293                 return NULL;
4294         *(dup + len) = '\0';
4295         *buf += len;
4296
4297         if (lenp)
4298                 *lenp = len;
4299
4300         return dup;
4301 }
4302
4303 /*
4304  * Parse the options provided for an "rbd add" (i.e., rbd image
4305  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4306  * and the data written is passed here via a NUL-terminated buffer.
4307  * Returns 0 if successful or an error code otherwise.
4308  *
4309  * The information extracted from these options is recorded in
4310  * the other parameters which return dynamically-allocated
4311  * structures:
4312  *  ceph_opts
4313  *      The address of a pointer that will refer to a ceph options
4314  *      structure.  Caller must release the returned pointer using
4315  *      ceph_destroy_options() when it is no longer needed.
4316  *  rbd_opts
4317  *      Address of an rbd options pointer.  Fully initialized by
4318  *      this function; caller must release with kfree().
4319  *  spec
4320  *      Address of an rbd image specification pointer.  Fully
4321  *      initialized by this function based on parsed options.
4322  *      Caller must release with rbd_spec_put().
4323  *
4324  * The options passed take this form:
4325  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4326  * where:
4327  *  <mon_addrs>
4328  *      A comma-separated list of one or more monitor addresses.
4329  *      A monitor address is an ip address, optionally followed
4330  *      by a port number (separated by a colon).
4331  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4332  *  <options>
4333  *      A comma-separated list of ceph and/or rbd options.
4334  *  <pool_name>
4335  *      The name of the rados pool containing the rbd image.
4336  *  <image_name>
4337  *      The name of the image in that pool to map.
4338  *  <snap_id>
4339  *      An optional snapshot id.  If provided, the mapping will
4340  *      present data from the image at the time that snapshot was
4341  *      created.  The image head is used if no snapshot id is
4342  *      provided.  Snapshot mappings are always read-only.
4343  */
4344 static int rbd_add_parse_args(const char *buf,
4345                                 struct ceph_options **ceph_opts,
4346                                 struct rbd_options **opts,
4347                                 struct rbd_spec **rbd_spec)
4348 {
4349         size_t len;
4350         char *options;
4351         const char *mon_addrs;
4352         char *snap_name;
4353         size_t mon_addrs_size;
4354         struct rbd_spec *spec = NULL;
4355         struct rbd_options *rbd_opts = NULL;
4356         struct ceph_options *copts;
4357         int ret;
4358
4359         /* The first four tokens are required */
4360
4361         len = next_token(&buf);
4362         if (!len) {
4363                 rbd_warn(NULL, "no monitor address(es) provided");
4364                 return -EINVAL;
4365         }
4366         mon_addrs = buf;
4367         mon_addrs_size = len + 1;
4368         buf += len;
4369
4370         ret = -EINVAL;
4371         options = dup_token(&buf, NULL);
4372         if (!options)
4373                 return -ENOMEM;
4374         if (!*options) {
4375                 rbd_warn(NULL, "no options provided");
4376                 goto out_err;
4377         }
4378
4379         spec = rbd_spec_alloc();
4380         if (!spec)
4381                 goto out_mem;
4382
4383         spec->pool_name = dup_token(&buf, NULL);
4384         if (!spec->pool_name)
4385                 goto out_mem;
4386         if (!*spec->pool_name) {
4387                 rbd_warn(NULL, "no pool name provided");
4388                 goto out_err;
4389         }
4390
4391         spec->image_name = dup_token(&buf, NULL);
4392         if (!spec->image_name)
4393                 goto out_mem;
4394         if (!*spec->image_name) {
4395                 rbd_warn(NULL, "no image name provided");
4396                 goto out_err;
4397         }
4398
4399         /*
4400          * Snapshot name is optional; default is to use "-"
4401          * (indicating the head/no snapshot).
4402          */
4403         len = next_token(&buf);
4404         if (!len) {
4405                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4406                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4407         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4408                 ret = -ENAMETOOLONG;
4409                 goto out_err;
4410         }
4411         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4412         if (!snap_name)
4413                 goto out_mem;
4414         *(snap_name + len) = '\0';
4415         spec->snap_name = snap_name;
4416
4417         /* Initialize all rbd options to the defaults */
4418
4419         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4420         if (!rbd_opts)
4421                 goto out_mem;
4422
4423         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4424
4425         copts = ceph_parse_options(options, mon_addrs,
4426                                         mon_addrs + mon_addrs_size - 1,
4427                                         parse_rbd_opts_token, rbd_opts);
4428         if (IS_ERR(copts)) {
4429                 ret = PTR_ERR(copts);
4430                 goto out_err;
4431         }
4432         kfree(options);
4433
4434         *ceph_opts = copts;
4435         *opts = rbd_opts;
4436         *rbd_spec = spec;
4437
4438         return 0;
4439 out_mem:
4440         ret = -ENOMEM;
4441 out_err:
4442         kfree(rbd_opts);
4443         rbd_spec_put(spec);
4444         kfree(options);
4445
4446         return ret;
4447 }
4448
4449 /*
4450  * An rbd format 2 image has a unique identifier, distinct from the
4451  * name given to it by the user.  Internally, that identifier is
4452  * what's used to specify the names of objects related to the image.
4453  *
4454  * A special "rbd id" object is used to map an rbd image name to its
4455  * id.  If that object doesn't exist, then there is no v2 rbd image
4456  * with the supplied name.
4457  *
4458  * This function will record the given rbd_dev's image_id field if
4459  * it can be determined, and in that case will return 0.  If any
4460  * errors occur a negative errno will be returned and the rbd_dev's
4461  * image_id field will be unchanged (and should be NULL).
4462  */
4463 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4464 {
4465         int ret;
4466         size_t size;
4467         char *object_name;
4468         void *response;
4469         char *image_id;
4470
4471         /*
4472          * When probing a parent image, the image id is already
4473          * known (and the image name likely is not).  There's no
4474          * need to fetch the image id again in this case.  We
4475          * do still need to set the image format though.
4476          */
4477         if (rbd_dev->spec->image_id) {
4478                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4479
4480                 return 0;
4481         }
4482
4483         /*
4484          * First, see if the format 2 image id file exists, and if
4485          * so, get the image's persistent id from it.
4486          */
4487         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4488         object_name = kmalloc(size, GFP_NOIO);
4489         if (!object_name)
4490                 return -ENOMEM;
4491         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4492         dout("rbd id object name is %s\n", object_name);
4493
4494         /* Response will be an encoded string, which includes a length */
4495
4496         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4497         response = kzalloc(size, GFP_NOIO);
4498         if (!response) {
4499                 ret = -ENOMEM;
4500                 goto out;
4501         }
4502
4503         /* If it doesn't exist we'll assume it's a format 1 image */
4504
4505         ret = rbd_obj_method_sync(rbd_dev, object_name,
4506                                 "rbd", "get_id", NULL, 0,
4507                                 response, RBD_IMAGE_ID_LEN_MAX);
4508         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4509         if (ret == -ENOENT) {
4510                 image_id = kstrdup("", GFP_KERNEL);
4511                 ret = image_id ? 0 : -ENOMEM;
4512                 if (!ret)
4513                         rbd_dev->image_format = 1;
4514         } else if (ret > sizeof (__le32)) {
4515                 void *p = response;
4516
4517                 image_id = ceph_extract_encoded_string(&p, p + ret,
4518                                                 NULL, GFP_NOIO);
4519                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4520                 if (!ret)
4521                         rbd_dev->image_format = 2;
4522         } else {
4523                 ret = -EINVAL;
4524         }
4525
4526         if (!ret) {
4527                 rbd_dev->spec->image_id = image_id;
4528                 dout("image_id is %s\n", image_id);
4529         }
4530 out:
4531         kfree(response);
4532         kfree(object_name);
4533
4534         return ret;
4535 }
4536
4537 /* Undo whatever state changes are made by v1 or v2 image probe */
4538
4539 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4540 {
4541         struct rbd_image_header *header;
4542
4543         rbd_dev_remove_parent(rbd_dev);
4544         rbd_spec_put(rbd_dev->parent_spec);
4545         rbd_dev->parent_spec = NULL;
4546         rbd_dev->parent_overlap = 0;
4547
4548         /* Free dynamic fields from the header, then zero it out */
4549
4550         header = &rbd_dev->header;
4551         ceph_put_snap_context(header->snapc);
4552         kfree(header->snap_sizes);
4553         kfree(header->snap_names);
4554         kfree(header->object_prefix);
4555         memset(header, 0, sizeof (*header));
4556 }
4557
4558 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4559 {
4560         int ret;
4561
4562         /* Populate rbd image metadata */
4563
4564         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4565         if (ret < 0)
4566                 goto out_err;
4567
4568         /* Version 1 images have no parent (no layering) */
4569
4570         rbd_dev->parent_spec = NULL;
4571         rbd_dev->parent_overlap = 0;
4572
4573         dout("discovered version 1 image, header name is %s\n",
4574                 rbd_dev->header_name);
4575
4576         return 0;
4577
4578 out_err:
4579         kfree(rbd_dev->header_name);
4580         rbd_dev->header_name = NULL;
4581         kfree(rbd_dev->spec->image_id);
4582         rbd_dev->spec->image_id = NULL;
4583
4584         return ret;
4585 }
4586
4587 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4588 {
4589         int ret;
4590
4591         ret = rbd_dev_v2_image_size(rbd_dev);
4592         if (ret)
4593                 goto out_err;
4594
4595         /* Get the object prefix (a.k.a. block_name) for the image */
4596
4597         ret = rbd_dev_v2_object_prefix(rbd_dev);
4598         if (ret)
4599                 goto out_err;
4600
4601         /* Get the and check features for the image */
4602
4603         ret = rbd_dev_v2_features(rbd_dev);
4604         if (ret)
4605                 goto out_err;
4606
4607         /* If the image supports layering, get the parent info */
4608
4609         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4610                 ret = rbd_dev_v2_parent_info(rbd_dev);
4611                 if (ret)
4612                         goto out_err;
4613
4614                 /*
4615                  * Don't print a warning for parent images.  We can
4616                  * tell this point because we won't know its pool
4617                  * name yet (just its pool id).
4618                  */
4619                 if (rbd_dev->spec->pool_name)
4620                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4621                                         "is EXPERIMENTAL!");
4622         }
4623
4624         /* If the image supports fancy striping, get its parameters */
4625
4626         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4627                 ret = rbd_dev_v2_striping_info(rbd_dev);
4628                 if (ret < 0)
4629                         goto out_err;
4630         }
4631
4632         /* crypto and compression type aren't (yet) supported for v2 images */
4633
4634         rbd_dev->header.crypt_type = 0;
4635         rbd_dev->header.comp_type = 0;
4636
4637         /* Get the snapshot context, plus the header version */
4638
4639         ret = rbd_dev_v2_snap_context(rbd_dev);
4640         if (ret)
4641                 goto out_err;
4642
4643         dout("discovered version 2 image, header name is %s\n",
4644                 rbd_dev->header_name);
4645
4646         return 0;
4647 out_err:
4648         rbd_dev->parent_overlap = 0;
4649         rbd_spec_put(rbd_dev->parent_spec);
4650         rbd_dev->parent_spec = NULL;
4651         kfree(rbd_dev->header_name);
4652         rbd_dev->header_name = NULL;
4653         kfree(rbd_dev->header.object_prefix);
4654         rbd_dev->header.object_prefix = NULL;
4655
4656         return ret;
4657 }
4658
4659 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4660 {
4661         struct rbd_device *parent = NULL;
4662         struct rbd_spec *parent_spec;
4663         struct rbd_client *rbdc;
4664         int ret;
4665
4666         if (!rbd_dev->parent_spec)
4667                 return 0;
4668         /*
4669          * We need to pass a reference to the client and the parent
4670          * spec when creating the parent rbd_dev.  Images related by
4671          * parent/child relationships always share both.
4672          */
4673         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4674         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4675
4676         ret = -ENOMEM;
4677         parent = rbd_dev_create(rbdc, parent_spec);
4678         if (!parent)
4679                 goto out_err;
4680
4681         ret = rbd_dev_image_probe(parent);
4682         if (ret < 0)
4683                 goto out_err;
4684         rbd_dev->parent = parent;
4685
4686         return 0;
4687 out_err:
4688         if (parent) {
4689                 rbd_spec_put(rbd_dev->parent_spec);
4690                 kfree(rbd_dev->header_name);
4691                 rbd_dev_destroy(parent);
4692         } else {
4693                 rbd_put_client(rbdc);
4694                 rbd_spec_put(parent_spec);
4695         }
4696
4697         return ret;
4698 }
4699
4700 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4701 {
4702         int ret;
4703
4704         ret = rbd_dev_mapping_set(rbd_dev);
4705         if (ret)
4706                 return ret;
4707
4708         /* generate unique id: find highest unique id, add one */
4709         rbd_dev_id_get(rbd_dev);
4710
4711         /* Fill in the device name, now that we have its id. */
4712         BUILD_BUG_ON(DEV_NAME_LEN
4713                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4714         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4715
4716         /* Get our block major device number. */
4717
4718         ret = register_blkdev(0, rbd_dev->name);
4719         if (ret < 0)
4720                 goto err_out_id;
4721         rbd_dev->major = ret;
4722
4723         /* Set up the blkdev mapping. */
4724
4725         ret = rbd_init_disk(rbd_dev);
4726         if (ret)
4727                 goto err_out_blkdev;
4728
4729         ret = rbd_bus_add_dev(rbd_dev);
4730         if (ret)
4731                 goto err_out_disk;
4732
4733         /* Everything's ready.  Announce the disk to the world. */
4734
4735         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4736         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4737         add_disk(rbd_dev->disk);
4738
4739         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4740                 (unsigned long long) rbd_dev->mapping.size);
4741
4742         return ret;
4743
4744 err_out_disk:
4745         rbd_free_disk(rbd_dev);
4746 err_out_blkdev:
4747         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4748 err_out_id:
4749         rbd_dev_id_put(rbd_dev);
4750         rbd_dev_mapping_clear(rbd_dev);
4751
4752         return ret;
4753 }
4754
4755 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4756 {
4757         struct rbd_spec *spec = rbd_dev->spec;
4758         size_t size;
4759
4760         /* Record the header object name for this rbd image. */
4761
4762         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4763
4764         if (rbd_dev->image_format == 1)
4765                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4766         else
4767                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4768
4769         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4770         if (!rbd_dev->header_name)
4771                 return -ENOMEM;
4772
4773         if (rbd_dev->image_format == 1)
4774                 sprintf(rbd_dev->header_name, "%s%s",
4775                         spec->image_name, RBD_SUFFIX);
4776         else
4777                 sprintf(rbd_dev->header_name, "%s%s",
4778                         RBD_HEADER_PREFIX, spec->image_id);
4779         return 0;
4780 }
4781
4782 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4783 {
4784         int ret;
4785
4786         rbd_remove_all_snaps(rbd_dev);
4787         rbd_dev_unprobe(rbd_dev);
4788         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4789         if (ret)
4790                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4791         kfree(rbd_dev->header_name);
4792         rbd_dev->header_name = NULL;
4793         rbd_dev->image_format = 0;
4794         kfree(rbd_dev->spec->image_id);
4795         rbd_dev->spec->image_id = NULL;
4796
4797         rbd_dev_destroy(rbd_dev);
4798 }
4799
4800 /*
4801  * Probe for the existence of the header object for the given rbd
4802  * device.  For format 2 images this includes determining the image
4803  * id.
4804  */
4805 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4806 {
4807         int ret;
4808         int tmp;
4809
4810         /*
4811          * Get the id from the image id object.  If it's not a
4812          * format 2 image, we'll get ENOENT back, and we'll assume
4813          * it's a format 1 image.
4814          */
4815         ret = rbd_dev_image_id(rbd_dev);
4816         if (ret)
4817                 return ret;
4818         rbd_assert(rbd_dev->spec->image_id);
4819         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4820
4821         ret = rbd_dev_header_name(rbd_dev);
4822         if (ret)
4823                 goto err_out_format;
4824
4825         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4826         if (ret)
4827                 goto out_header_name;
4828
4829         if (rbd_dev->image_format == 1)
4830                 ret = rbd_dev_v1_probe(rbd_dev);
4831         else
4832                 ret = rbd_dev_v2_probe(rbd_dev);
4833         if (ret)
4834                 goto err_out_watch;
4835
4836         ret = rbd_dev_snaps_update(rbd_dev);
4837         if (ret)
4838                 goto err_out_probe;
4839
4840         ret = rbd_dev_spec_update(rbd_dev);
4841         if (ret)
4842                 goto err_out_snaps;
4843
4844         ret = rbd_dev_probe_parent(rbd_dev);
4845         if (!ret)
4846                 return 0;
4847
4848 err_out_snaps:
4849         rbd_remove_all_snaps(rbd_dev);
4850 err_out_probe:
4851         rbd_dev_unprobe(rbd_dev);
4852 err_out_watch:
4853         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4854         if (tmp)
4855                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4856 out_header_name:
4857         kfree(rbd_dev->header_name);
4858         rbd_dev->header_name = NULL;
4859 err_out_format:
4860         rbd_dev->image_format = 0;
4861         kfree(rbd_dev->spec->image_id);
4862         rbd_dev->spec->image_id = NULL;
4863
4864         dout("probe failed, returning %d\n", ret);
4865
4866         return ret;
4867 }
4868
4869 static ssize_t rbd_add(struct bus_type *bus,
4870                        const char *buf,
4871                        size_t count)
4872 {
4873         struct rbd_device *rbd_dev = NULL;
4874         struct ceph_options *ceph_opts = NULL;
4875         struct rbd_options *rbd_opts = NULL;
4876         struct rbd_spec *spec = NULL;
4877         struct rbd_client *rbdc;
4878         struct ceph_osd_client *osdc;
4879         int rc = -ENOMEM;
4880
4881         if (!try_module_get(THIS_MODULE))
4882                 return -ENODEV;
4883
4884         /* parse add command */
4885         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4886         if (rc < 0)
4887                 goto err_out_module;
4888
4889         rbdc = rbd_get_client(ceph_opts);
4890         if (IS_ERR(rbdc)) {
4891                 rc = PTR_ERR(rbdc);
4892                 goto err_out_args;
4893         }
4894         ceph_opts = NULL;       /* rbd_dev client now owns this */
4895
4896         /* pick the pool */
4897         osdc = &rbdc->client->osdc;
4898         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4899         if (rc < 0)
4900                 goto err_out_client;
4901         spec->pool_id = (u64)rc;
4902
4903         /* The ceph file layout needs to fit pool id in 32 bits */
4904
4905         if (spec->pool_id > (u64)U32_MAX) {
4906                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4907                                 (unsigned long long)spec->pool_id, U32_MAX);
4908                 rc = -EIO;
4909                 goto err_out_client;
4910         }
4911
4912         rbd_dev = rbd_dev_create(rbdc, spec);
4913         if (!rbd_dev)
4914                 goto err_out_client;
4915         rbdc = NULL;            /* rbd_dev now owns this */
4916         spec = NULL;            /* rbd_dev now owns this */
4917
4918         rbd_dev->mapping.read_only = rbd_opts->read_only;
4919         kfree(rbd_opts);
4920         rbd_opts = NULL;        /* done with this */
4921
4922         rc = rbd_dev_image_probe(rbd_dev);
4923         if (rc < 0)
4924                 goto err_out_rbd_dev;
4925
4926         rc = rbd_dev_device_setup(rbd_dev);
4927         if (!rc)
4928                 return count;
4929
4930         rbd_dev_image_release(rbd_dev);
4931 err_out_rbd_dev:
4932         rbd_dev_destroy(rbd_dev);
4933 err_out_client:
4934         rbd_put_client(rbdc);
4935 err_out_args:
4936         if (ceph_opts)
4937                 ceph_destroy_options(ceph_opts);
4938         kfree(rbd_opts);
4939         rbd_spec_put(spec);
4940 err_out_module:
4941         module_put(THIS_MODULE);
4942
4943         dout("Error adding device %s\n", buf);
4944
4945         return (ssize_t)rc;
4946 }
4947
4948 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4949 {
4950         struct list_head *tmp;
4951         struct rbd_device *rbd_dev;
4952
4953         spin_lock(&rbd_dev_list_lock);
4954         list_for_each(tmp, &rbd_dev_list) {
4955                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4956                 if (rbd_dev->dev_id == dev_id) {
4957                         spin_unlock(&rbd_dev_list_lock);
4958                         return rbd_dev;
4959                 }
4960         }
4961         spin_unlock(&rbd_dev_list_lock);
4962         return NULL;
4963 }
4964
4965 static void rbd_dev_device_release(struct device *dev)
4966 {
4967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4968
4969         rbd_free_disk(rbd_dev);
4970         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4971         rbd_dev_clear_mapping(rbd_dev);
4972         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4973         rbd_dev->major = 0;
4974         rbd_dev_id_put(rbd_dev);
4975         rbd_dev_mapping_clear(rbd_dev);
4976 }
4977
4978 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4979 {
4980         while (rbd_dev->parent) {
4981                 struct rbd_device *first = rbd_dev;
4982                 struct rbd_device *second = first->parent;
4983                 struct rbd_device *third;
4984
4985                 /*
4986                  * Follow to the parent with no grandparent and
4987                  * remove it.
4988                  */
4989                 while (second && (third = second->parent)) {
4990                         first = second;
4991                         second = third;
4992                 }
4993                 rbd_assert(second);
4994                 rbd_dev_image_release(second);
4995                 first->parent = NULL;
4996                 first->parent_overlap = 0;
4997
4998                 rbd_assert(first->parent_spec);
4999                 rbd_spec_put(first->parent_spec);
5000                 first->parent_spec = NULL;
5001         }
5002 }
5003
5004 static ssize_t rbd_remove(struct bus_type *bus,
5005                           const char *buf,
5006                           size_t count)
5007 {
5008         struct rbd_device *rbd_dev = NULL;
5009         int target_id;
5010         unsigned long ul;
5011         int ret;
5012
5013         ret = strict_strtoul(buf, 10, &ul);
5014         if (ret)
5015                 return ret;
5016
5017         /* convert to int; abort if we lost anything in the conversion */
5018         target_id = (int) ul;
5019         if (target_id != ul)
5020                 return -EINVAL;
5021
5022         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5023
5024         rbd_dev = __rbd_get_dev(target_id);
5025         if (!rbd_dev) {
5026                 ret = -ENOENT;
5027                 goto done;
5028         }
5029
5030         spin_lock_irq(&rbd_dev->lock);
5031         if (rbd_dev->open_count)
5032                 ret = -EBUSY;
5033         else
5034                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5035         spin_unlock_irq(&rbd_dev->lock);
5036         if (ret < 0)
5037                 goto done;
5038         ret = count;
5039         rbd_bus_del_dev(rbd_dev);
5040         rbd_dev_image_release(rbd_dev);
5041         module_put(THIS_MODULE);
5042 done:
5043         mutex_unlock(&ctl_mutex);
5044
5045         return ret;
5046 }
5047
5048 /*
5049  * create control files in sysfs
5050  * /sys/bus/rbd/...
5051  */
5052 static int rbd_sysfs_init(void)
5053 {
5054         int ret;
5055
5056         ret = device_register(&rbd_root_dev);
5057         if (ret < 0)
5058                 return ret;
5059
5060         ret = bus_register(&rbd_bus_type);
5061         if (ret < 0)
5062                 device_unregister(&rbd_root_dev);
5063
5064         return ret;
5065 }
5066
5067 static void rbd_sysfs_cleanup(void)
5068 {
5069         bus_unregister(&rbd_bus_type);
5070         device_unregister(&rbd_root_dev);
5071 }
5072
5073 static int __init rbd_init(void)
5074 {
5075         int rc;
5076
5077         if (!libceph_compatible(NULL)) {
5078                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5079
5080                 return -EINVAL;
5081         }
5082         rc = rbd_sysfs_init();
5083         if (rc)
5084                 return rc;
5085         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5086         return 0;
5087 }
5088
5089 static void __exit rbd_exit(void)
5090 {
5091         rbd_sysfs_cleanup();
5092 }
5093
5094 module_init(rbd_init);
5095 module_exit(rbd_exit);
5096
5097 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5098 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5099 MODULE_DESCRIPTION("rados block device");
5100
5101 /* following authorship retained from original osdblk.c */
5102 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5103
5104 MODULE_LICENSE("GPL");