]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: don't leak rbd_req on synchronous requests
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U32_MAX ((u32) (~0U))
58 #define U64_MAX ((u64) (~0ULL))
59
60 #define RBD_DRV_NAME "rbd"
61 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62
63 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
64
65 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
66 #define RBD_MAX_SNAP_NAME_LEN   \
67                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68
69 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
70 #define RBD_MAX_OPT_LEN         1024
71
72 #define RBD_SNAP_HEAD_NAME      "-"
73
74 /* This allows a single page to hold an image name sent by OSD */
75 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
76 #define RBD_IMAGE_ID_LEN_MAX    64
77
78 #define RBD_OBJ_PREFIX_LEN_MAX  64
79
80 /* Feature bits */
81
82 #define RBD_FEATURE_LAYERING      1
83
84 /* Features supported by this (client software) implementation. */
85
86 #define RBD_FEATURES_ALL          (0)
87
88 /*
89  * An RBD device name will be "rbd#", where the "rbd" comes from
90  * RBD_DRV_NAME above, and # is a unique integer identifier.
91  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92  * enough to hold all possible device names.
93  */
94 #define DEV_NAME_LEN            32
95 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
96
97 #define RBD_READ_ONLY_DEFAULT           false
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 obj_version;
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         char            *pool_name;
147
148         char            *image_id;
149         char            *image_name;
150
151         u64             snap_id;
152         char            *snap_name;
153
154         struct kref     kref;
155 };
156
157 struct rbd_options {
158         bool    read_only;
159 };
160
161 /*
162  * an instance of the client.  multiple devices may share an rbd client.
163  */
164 struct rbd_client {
165         struct ceph_client      *client;
166         struct kref             kref;
167         struct list_head        node;
168 };
169
170 /*
171  * a request completion status
172  */
173 struct rbd_req_status {
174         int done;
175         s32 rc;
176         u64 bytes;
177 };
178
179 /*
180  * a collection of requests
181  */
182 struct rbd_req_coll {
183         int                     total;
184         int                     num_done;
185         struct kref             kref;
186         struct rbd_req_status   status[0];
187 };
188
189 /*
190  * a single io request
191  */
192 struct rbd_request {
193         struct request          *rq;            /* blk layer request */
194         struct bio              *bio;           /* cloned bio */
195         struct page             **pages;        /* list of used pages */
196         u64                     len;
197         int                     coll_index;
198         struct rbd_req_coll     *coll;
199 };
200
201 struct rbd_snap {
202         struct  device          dev;
203         const char              *name;
204         u64                     size;
205         struct list_head        node;
206         u64                     id;
207         u64                     features;
208 };
209
210 struct rbd_mapping {
211         u64                     size;
212         u64                     features;
213         bool                    read_only;
214 };
215
216 /*
217  * a single device
218  */
219 struct rbd_device {
220         int                     dev_id;         /* blkdev unique id */
221
222         int                     major;          /* blkdev assigned major */
223         struct gendisk          *disk;          /* blkdev's gendisk and rq */
224
225         u32                     image_format;   /* Either 1 or 2 */
226         struct rbd_client       *rbd_client;
227
228         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
229
230         spinlock_t              lock;           /* queue lock */
231
232         struct rbd_image_header header;
233         atomic_t                exists;
234         struct rbd_spec         *spec;
235
236         char                    *header_name;
237
238         struct ceph_file_layout layout;
239
240         struct ceph_osd_event   *watch_event;
241         struct ceph_osd_request *watch_request;
242
243         struct rbd_spec         *parent_spec;
244         u64                     parent_overlap;
245
246         /* protects updating the header */
247         struct rw_semaphore     header_rwsem;
248
249         struct rbd_mapping      mapping;
250
251         struct list_head        node;
252
253         /* list of snapshots */
254         struct list_head        snaps;
255
256         /* sysfs related */
257         struct device           dev;
258         unsigned long           open_count;
259 };
260
261 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
262
263 static LIST_HEAD(rbd_dev_list);    /* devices */
264 static DEFINE_SPINLOCK(rbd_dev_list_lock);
265
266 static LIST_HEAD(rbd_client_list);              /* clients */
267 static DEFINE_SPINLOCK(rbd_client_list_lock);
268
269 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
270 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
271
272 static void rbd_dev_release(struct device *dev);
273 static void rbd_remove_snap_dev(struct rbd_snap *snap);
274
275 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
276                        size_t count);
277 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
278                           size_t count);
279
280 static struct bus_attribute rbd_bus_attrs[] = {
281         __ATTR(add, S_IWUSR, NULL, rbd_add),
282         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
283         __ATTR_NULL
284 };
285
286 static struct bus_type rbd_bus_type = {
287         .name           = "rbd",
288         .bus_attrs      = rbd_bus_attrs,
289 };
290
291 static void rbd_root_dev_release(struct device *dev)
292 {
293 }
294
295 static struct device rbd_root_dev = {
296         .init_name =    "rbd",
297         .release =      rbd_root_dev_release,
298 };
299
300 static __printf(2, 3)
301 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
302 {
303         struct va_format vaf;
304         va_list args;
305
306         va_start(args, fmt);
307         vaf.fmt = fmt;
308         vaf.va = &args;
309
310         if (!rbd_dev)
311                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
312         else if (rbd_dev->disk)
313                 printk(KERN_WARNING "%s: %s: %pV\n",
314                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
315         else if (rbd_dev->spec && rbd_dev->spec->image_name)
316                 printk(KERN_WARNING "%s: image %s: %pV\n",
317                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
318         else if (rbd_dev->spec && rbd_dev->spec->image_id)
319                 printk(KERN_WARNING "%s: id %s: %pV\n",
320                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
321         else    /* punt */
322                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
323                         RBD_DRV_NAME, rbd_dev, &vaf);
324         va_end(args);
325 }
326
327 #ifdef RBD_DEBUG
328 #define rbd_assert(expr)                                                \
329                 if (unlikely(!(expr))) {                                \
330                         printk(KERN_ERR "\nAssertion failure in %s() "  \
331                                                 "at line %d:\n\n"       \
332                                         "\trbd_assert(%s);\n\n",        \
333                                         __func__, __LINE__, #expr);     \
334                         BUG();                                          \
335                 }
336 #else /* !RBD_DEBUG */
337 #  define rbd_assert(expr)      ((void) 0)
338 #endif /* !RBD_DEBUG */
339
340 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
341 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
342
343 static int rbd_open(struct block_device *bdev, fmode_t mode)
344 {
345         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
346
347         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
348                 return -EROFS;
349
350         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
351         (void) get_device(&rbd_dev->dev);
352         set_device_ro(bdev, rbd_dev->mapping.read_only);
353         rbd_dev->open_count++;
354         mutex_unlock(&ctl_mutex);
355
356         return 0;
357 }
358
359 static int rbd_release(struct gendisk *disk, fmode_t mode)
360 {
361         struct rbd_device *rbd_dev = disk->private_data;
362
363         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
364         rbd_assert(rbd_dev->open_count > 0);
365         rbd_dev->open_count--;
366         put_device(&rbd_dev->dev);
367         mutex_unlock(&ctl_mutex);
368
369         return 0;
370 }
371
372 static const struct block_device_operations rbd_bd_ops = {
373         .owner                  = THIS_MODULE,
374         .open                   = rbd_open,
375         .release                = rbd_release,
376 };
377
378 /*
379  * Initialize an rbd client instance.
380  * We own *ceph_opts.
381  */
382 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
383 {
384         struct rbd_client *rbdc;
385         int ret = -ENOMEM;
386
387         dout("rbd_client_create\n");
388         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
389         if (!rbdc)
390                 goto out_opt;
391
392         kref_init(&rbdc->kref);
393         INIT_LIST_HEAD(&rbdc->node);
394
395         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
396
397         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
398         if (IS_ERR(rbdc->client))
399                 goto out_mutex;
400         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
401
402         ret = ceph_open_session(rbdc->client);
403         if (ret < 0)
404                 goto out_err;
405
406         spin_lock(&rbd_client_list_lock);
407         list_add_tail(&rbdc->node, &rbd_client_list);
408         spin_unlock(&rbd_client_list_lock);
409
410         mutex_unlock(&ctl_mutex);
411
412         dout("rbd_client_create created %p\n", rbdc);
413         return rbdc;
414
415 out_err:
416         ceph_destroy_client(rbdc->client);
417 out_mutex:
418         mutex_unlock(&ctl_mutex);
419         kfree(rbdc);
420 out_opt:
421         if (ceph_opts)
422                 ceph_destroy_options(ceph_opts);
423         return ERR_PTR(ret);
424 }
425
426 /*
427  * Find a ceph client with specific addr and configuration.  If
428  * found, bump its reference count.
429  */
430 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
431 {
432         struct rbd_client *client_node;
433         bool found = false;
434
435         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
436                 return NULL;
437
438         spin_lock(&rbd_client_list_lock);
439         list_for_each_entry(client_node, &rbd_client_list, node) {
440                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
441                         kref_get(&client_node->kref);
442                         found = true;
443                         break;
444                 }
445         }
446         spin_unlock(&rbd_client_list_lock);
447
448         return found ? client_node : NULL;
449 }
450
451 /*
452  * mount options
453  */
454 enum {
455         Opt_last_int,
456         /* int args above */
457         Opt_last_string,
458         /* string args above */
459         Opt_read_only,
460         Opt_read_write,
461         /* Boolean args above */
462         Opt_last_bool,
463 };
464
465 static match_table_t rbd_opts_tokens = {
466         /* int args above */
467         /* string args above */
468         {Opt_read_only, "read_only"},
469         {Opt_read_only, "ro"},          /* Alternate spelling */
470         {Opt_read_write, "read_write"},
471         {Opt_read_write, "rw"},         /* Alternate spelling */
472         /* Boolean args above */
473         {-1, NULL}
474 };
475
476 static int parse_rbd_opts_token(char *c, void *private)
477 {
478         struct rbd_options *rbd_opts = private;
479         substring_t argstr[MAX_OPT_ARGS];
480         int token, intval, ret;
481
482         token = match_token(c, rbd_opts_tokens, argstr);
483         if (token < 0)
484                 return -EINVAL;
485
486         if (token < Opt_last_int) {
487                 ret = match_int(&argstr[0], &intval);
488                 if (ret < 0) {
489                         pr_err("bad mount option arg (not int) "
490                                "at '%s'\n", c);
491                         return ret;
492                 }
493                 dout("got int token %d val %d\n", token, intval);
494         } else if (token > Opt_last_int && token < Opt_last_string) {
495                 dout("got string token %d val %s\n", token,
496                      argstr[0].from);
497         } else if (token > Opt_last_string && token < Opt_last_bool) {
498                 dout("got Boolean token %d\n", token);
499         } else {
500                 dout("got token %d\n", token);
501         }
502
503         switch (token) {
504         case Opt_read_only:
505                 rbd_opts->read_only = true;
506                 break;
507         case Opt_read_write:
508                 rbd_opts->read_only = false;
509                 break;
510         default:
511                 rbd_assert(false);
512                 break;
513         }
514         return 0;
515 }
516
517 /*
518  * Get a ceph client with specific addr and configuration, if one does
519  * not exist create it.
520  */
521 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
522 {
523         struct rbd_client *rbdc;
524
525         rbdc = rbd_client_find(ceph_opts);
526         if (rbdc)       /* using an existing client */
527                 ceph_destroy_options(ceph_opts);
528         else
529                 rbdc = rbd_client_create(ceph_opts);
530
531         return rbdc;
532 }
533
534 /*
535  * Destroy ceph client
536  *
537  * Caller must hold rbd_client_list_lock.
538  */
539 static void rbd_client_release(struct kref *kref)
540 {
541         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
542
543         dout("rbd_release_client %p\n", rbdc);
544         spin_lock(&rbd_client_list_lock);
545         list_del(&rbdc->node);
546         spin_unlock(&rbd_client_list_lock);
547
548         ceph_destroy_client(rbdc->client);
549         kfree(rbdc);
550 }
551
552 /*
553  * Drop reference to ceph client node. If it's not referenced anymore, release
554  * it.
555  */
556 static void rbd_put_client(struct rbd_client *rbdc)
557 {
558         if (rbdc)
559                 kref_put(&rbdc->kref, rbd_client_release);
560 }
561
562 /*
563  * Destroy requests collection
564  */
565 static void rbd_coll_release(struct kref *kref)
566 {
567         struct rbd_req_coll *coll =
568                 container_of(kref, struct rbd_req_coll, kref);
569
570         dout("rbd_coll_release %p\n", coll);
571         kfree(coll);
572 }
573
574 static bool rbd_image_format_valid(u32 image_format)
575 {
576         return image_format == 1 || image_format == 2;
577 }
578
579 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
580 {
581         size_t size;
582         u32 snap_count;
583
584         /* The header has to start with the magic rbd header text */
585         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
586                 return false;
587
588         /* The bio layer requires at least sector-sized I/O */
589
590         if (ondisk->options.order < SECTOR_SHIFT)
591                 return false;
592
593         /* If we use u64 in a few spots we may be able to loosen this */
594
595         if (ondisk->options.order > 8 * sizeof (int) - 1)
596                 return false;
597
598         /*
599          * The size of a snapshot header has to fit in a size_t, and
600          * that limits the number of snapshots.
601          */
602         snap_count = le32_to_cpu(ondisk->snap_count);
603         size = SIZE_MAX - sizeof (struct ceph_snap_context);
604         if (snap_count > size / sizeof (__le64))
605                 return false;
606
607         /*
608          * Not only that, but the size of the entire the snapshot
609          * header must also be representable in a size_t.
610          */
611         size -= snap_count * sizeof (__le64);
612         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
613                 return false;
614
615         return true;
616 }
617
618 /*
619  * Create a new header structure, translate header format from the on-disk
620  * header.
621  */
622 static int rbd_header_from_disk(struct rbd_image_header *header,
623                                  struct rbd_image_header_ondisk *ondisk)
624 {
625         u32 snap_count;
626         size_t len;
627         size_t size;
628         u32 i;
629
630         memset(header, 0, sizeof (*header));
631
632         snap_count = le32_to_cpu(ondisk->snap_count);
633
634         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
635         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
636         if (!header->object_prefix)
637                 return -ENOMEM;
638         memcpy(header->object_prefix, ondisk->object_prefix, len);
639         header->object_prefix[len] = '\0';
640
641         if (snap_count) {
642                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
643
644                 /* Save a copy of the snapshot names */
645
646                 if (snap_names_len > (u64) SIZE_MAX)
647                         return -EIO;
648                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
649                 if (!header->snap_names)
650                         goto out_err;
651                 /*
652                  * Note that rbd_dev_v1_header_read() guarantees
653                  * the ondisk buffer we're working with has
654                  * snap_names_len bytes beyond the end of the
655                  * snapshot id array, this memcpy() is safe.
656                  */
657                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
658                         snap_names_len);
659
660                 /* Record each snapshot's size */
661
662                 size = snap_count * sizeof (*header->snap_sizes);
663                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
664                 if (!header->snap_sizes)
665                         goto out_err;
666                 for (i = 0; i < snap_count; i++)
667                         header->snap_sizes[i] =
668                                 le64_to_cpu(ondisk->snaps[i].image_size);
669         } else {
670                 WARN_ON(ondisk->snap_names_len);
671                 header->snap_names = NULL;
672                 header->snap_sizes = NULL;
673         }
674
675         header->features = 0;   /* No features support in v1 images */
676         header->obj_order = ondisk->options.order;
677         header->crypt_type = ondisk->options.crypt_type;
678         header->comp_type = ondisk->options.comp_type;
679
680         /* Allocate and fill in the snapshot context */
681
682         header->image_size = le64_to_cpu(ondisk->image_size);
683         size = sizeof (struct ceph_snap_context);
684         size += snap_count * sizeof (header->snapc->snaps[0]);
685         header->snapc = kzalloc(size, GFP_KERNEL);
686         if (!header->snapc)
687                 goto out_err;
688
689         atomic_set(&header->snapc->nref, 1);
690         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
691         header->snapc->num_snaps = snap_count;
692         for (i = 0; i < snap_count; i++)
693                 header->snapc->snaps[i] =
694                         le64_to_cpu(ondisk->snaps[i].id);
695
696         return 0;
697
698 out_err:
699         kfree(header->snap_sizes);
700         header->snap_sizes = NULL;
701         kfree(header->snap_names);
702         header->snap_names = NULL;
703         kfree(header->object_prefix);
704         header->object_prefix = NULL;
705
706         return -ENOMEM;
707 }
708
709 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
710 {
711         struct rbd_snap *snap;
712
713         if (snap_id == CEPH_NOSNAP)
714                 return RBD_SNAP_HEAD_NAME;
715
716         list_for_each_entry(snap, &rbd_dev->snaps, node)
717                 if (snap_id == snap->id)
718                         return snap->name;
719
720         return NULL;
721 }
722
723 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
724 {
725
726         struct rbd_snap *snap;
727
728         list_for_each_entry(snap, &rbd_dev->snaps, node) {
729                 if (!strcmp(snap_name, snap->name)) {
730                         rbd_dev->spec->snap_id = snap->id;
731                         rbd_dev->mapping.size = snap->size;
732                         rbd_dev->mapping.features = snap->features;
733
734                         return 0;
735                 }
736         }
737
738         return -ENOENT;
739 }
740
741 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
742 {
743         int ret;
744
745         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
746                     sizeof (RBD_SNAP_HEAD_NAME))) {
747                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
748                 rbd_dev->mapping.size = rbd_dev->header.image_size;
749                 rbd_dev->mapping.features = rbd_dev->header.features;
750                 ret = 0;
751         } else {
752                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
753                 if (ret < 0)
754                         goto done;
755                 rbd_dev->mapping.read_only = true;
756         }
757         atomic_set(&rbd_dev->exists, 1);
758 done:
759         return ret;
760 }
761
762 static void rbd_header_free(struct rbd_image_header *header)
763 {
764         kfree(header->object_prefix);
765         header->object_prefix = NULL;
766         kfree(header->snap_sizes);
767         header->snap_sizes = NULL;
768         kfree(header->snap_names);
769         header->snap_names = NULL;
770         ceph_put_snap_context(header->snapc);
771         header->snapc = NULL;
772 }
773
774 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
775 {
776         char *name;
777         u64 segment;
778         int ret;
779
780         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
781         if (!name)
782                 return NULL;
783         segment = offset >> rbd_dev->header.obj_order;
784         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
785                         rbd_dev->header.object_prefix, segment);
786         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
787                 pr_err("error formatting segment name for #%llu (%d)\n",
788                         segment, ret);
789                 kfree(name);
790                 name = NULL;
791         }
792
793         return name;
794 }
795
796 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
797 {
798         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
799
800         return offset & (segment_size - 1);
801 }
802
803 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
804                                 u64 offset, u64 length)
805 {
806         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
807
808         offset &= segment_size - 1;
809
810         rbd_assert(length <= U64_MAX - offset);
811         if (offset + length > segment_size)
812                 length = segment_size - offset;
813
814         return length;
815 }
816
817 static int rbd_get_num_segments(struct rbd_image_header *header,
818                                 u64 ofs, u64 len)
819 {
820         u64 start_seg;
821         u64 end_seg;
822
823         if (!len)
824                 return 0;
825         if (len - 1 > U64_MAX - ofs)
826                 return -ERANGE;
827
828         start_seg = ofs >> header->obj_order;
829         end_seg = (ofs + len - 1) >> header->obj_order;
830
831         return end_seg - start_seg + 1;
832 }
833
834 /*
835  * returns the size of an object in the image
836  */
837 static u64 rbd_obj_bytes(struct rbd_image_header *header)
838 {
839         return 1 << header->obj_order;
840 }
841
842 /*
843  * bio helpers
844  */
845
846 static void bio_chain_put(struct bio *chain)
847 {
848         struct bio *tmp;
849
850         while (chain) {
851                 tmp = chain;
852                 chain = chain->bi_next;
853                 bio_put(tmp);
854         }
855 }
856
857 /*
858  * zeros a bio chain, starting at specific offset
859  */
860 static void zero_bio_chain(struct bio *chain, int start_ofs)
861 {
862         struct bio_vec *bv;
863         unsigned long flags;
864         void *buf;
865         int i;
866         int pos = 0;
867
868         while (chain) {
869                 bio_for_each_segment(bv, chain, i) {
870                         if (pos + bv->bv_len > start_ofs) {
871                                 int remainder = max(start_ofs - pos, 0);
872                                 buf = bvec_kmap_irq(bv, &flags);
873                                 memset(buf + remainder, 0,
874                                        bv->bv_len - remainder);
875                                 bvec_kunmap_irq(buf, &flags);
876                         }
877                         pos += bv->bv_len;
878                 }
879
880                 chain = chain->bi_next;
881         }
882 }
883
884 /*
885  * Clone a portion of a bio, starting at the given byte offset
886  * and continuing for the number of bytes indicated.
887  */
888 static struct bio *bio_clone_range(struct bio *bio_src,
889                                         unsigned int offset,
890                                         unsigned int len,
891                                         gfp_t gfpmask)
892 {
893         struct bio_vec *bv;
894         unsigned int resid;
895         unsigned short idx;
896         unsigned int voff;
897         unsigned short end_idx;
898         unsigned short vcnt;
899         struct bio *bio;
900
901         /* Handle the easy case for the caller */
902
903         if (!offset && len == bio_src->bi_size)
904                 return bio_clone(bio_src, gfpmask);
905
906         if (WARN_ON_ONCE(!len))
907                 return NULL;
908         if (WARN_ON_ONCE(len > bio_src->bi_size))
909                 return NULL;
910         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
911                 return NULL;
912
913         /* Find first affected segment... */
914
915         resid = offset;
916         __bio_for_each_segment(bv, bio_src, idx, 0) {
917                 if (resid < bv->bv_len)
918                         break;
919                 resid -= bv->bv_len;
920         }
921         voff = resid;
922
923         /* ...and the last affected segment */
924
925         resid += len;
926         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
927                 if (resid <= bv->bv_len)
928                         break;
929                 resid -= bv->bv_len;
930         }
931         vcnt = end_idx - idx + 1;
932
933         /* Build the clone */
934
935         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
936         if (!bio)
937                 return NULL;    /* ENOMEM */
938
939         bio->bi_bdev = bio_src->bi_bdev;
940         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
941         bio->bi_rw = bio_src->bi_rw;
942         bio->bi_flags |= 1 << BIO_CLONED;
943
944         /*
945          * Copy over our part of the bio_vec, then update the first
946          * and last (or only) entries.
947          */
948         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
949                         vcnt * sizeof (struct bio_vec));
950         bio->bi_io_vec[0].bv_offset += voff;
951         if (vcnt > 1) {
952                 bio->bi_io_vec[0].bv_len -= voff;
953                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
954         } else {
955                 bio->bi_io_vec[0].bv_len = len;
956         }
957
958         bio->bi_vcnt = vcnt;
959         bio->bi_size = len;
960         bio->bi_idx = 0;
961
962         return bio;
963 }
964
965 /*
966  * Clone a portion of a bio chain, starting at the given byte offset
967  * into the first bio in the source chain and continuing for the
968  * number of bytes indicated.  The result is another bio chain of
969  * exactly the given length, or a null pointer on error.
970  *
971  * The bio_src and offset parameters are both in-out.  On entry they
972  * refer to the first source bio and the offset into that bio where
973  * the start of data to be cloned is located.
974  *
975  * On return, bio_src is updated to refer to the bio in the source
976  * chain that contains first un-cloned byte, and *offset will
977  * contain the offset of that byte within that bio.
978  */
979 static struct bio *bio_chain_clone_range(struct bio **bio_src,
980                                         unsigned int *offset,
981                                         unsigned int len,
982                                         gfp_t gfpmask)
983 {
984         struct bio *bi = *bio_src;
985         unsigned int off = *offset;
986         struct bio *chain = NULL;
987         struct bio **end;
988
989         /* Build up a chain of clone bios up to the limit */
990
991         if (!bi || off >= bi->bi_size || !len)
992                 return NULL;            /* Nothing to clone */
993
994         end = &chain;
995         while (len) {
996                 unsigned int bi_size;
997                 struct bio *bio;
998
999                 if (!bi) {
1000                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1001                         goto out_err;   /* EINVAL; ran out of bio's */
1002                 }
1003                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1004                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1005                 if (!bio)
1006                         goto out_err;   /* ENOMEM */
1007
1008                 *end = bio;
1009                 end = &bio->bi_next;
1010
1011                 off += bi_size;
1012                 if (off == bi->bi_size) {
1013                         bi = bi->bi_next;
1014                         off = 0;
1015                 }
1016                 len -= bi_size;
1017         }
1018         *bio_src = bi;
1019         *offset = off;
1020
1021         return chain;
1022 out_err:
1023         bio_chain_put(chain);
1024
1025         return NULL;
1026 }
1027
1028 static struct ceph_osd_req_op *rbd_create_rw_op(int opcode, u32 payload_len)
1029 {
1030         struct ceph_osd_req_op *op;
1031
1032         op = kzalloc(sizeof (*op), GFP_NOIO);
1033         if (!op)
1034                 return NULL;
1035         /*
1036          * op extent offset and length will be set later on
1037          * after ceph_calc_file_object_mapping().
1038          */
1039         op->op = opcode;
1040         op->payload_len = payload_len;
1041
1042         return op;
1043 }
1044
1045 static void rbd_destroy_op(struct ceph_osd_req_op *op)
1046 {
1047         kfree(op);
1048 }
1049
1050 static void rbd_coll_end_req_index(struct request *rq,
1051                                    struct rbd_req_coll *coll,
1052                                    int index,
1053                                    s32 ret, u64 len)
1054 {
1055         struct request_queue *q;
1056         int min, max, i;
1057
1058         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1059              coll, index, (int)ret, (unsigned long long)len);
1060
1061         if (!rq)
1062                 return;
1063
1064         if (!coll) {
1065                 blk_end_request(rq, ret, len);
1066                 return;
1067         }
1068
1069         q = rq->q;
1070
1071         spin_lock_irq(q->queue_lock);
1072         coll->status[index].done = 1;
1073         coll->status[index].rc = ret;
1074         coll->status[index].bytes = len;
1075         max = min = coll->num_done;
1076         while (max < coll->total && coll->status[max].done)
1077                 max++;
1078
1079         for (i = min; i<max; i++) {
1080                 __blk_end_request(rq, (int)coll->status[i].rc,
1081                                   coll->status[i].bytes);
1082                 coll->num_done++;
1083                 kref_put(&coll->kref, rbd_coll_release);
1084         }
1085         spin_unlock_irq(q->queue_lock);
1086 }
1087
1088 static void rbd_coll_end_req(struct rbd_request *rbd_req,
1089                              s32 ret, u64 len)
1090 {
1091         rbd_coll_end_req_index(rbd_req->rq,
1092                                 rbd_req->coll, rbd_req->coll_index,
1093                                 ret, len);
1094 }
1095
1096 /*
1097  * Send ceph osd request
1098  */
1099 static int rbd_do_request(struct request *rq,
1100                           struct rbd_device *rbd_dev,
1101                           struct ceph_snap_context *snapc,
1102                           u64 snapid,
1103                           const char *object_name, u64 ofs, u64 len,
1104                           struct bio *bio,
1105                           struct page **pages,
1106                           int num_pages,
1107                           int flags,
1108                           struct ceph_osd_req_op *op,
1109                           struct rbd_req_coll *coll,
1110                           int coll_index,
1111                           void (*rbd_cb)(struct ceph_osd_request *,
1112                                          struct ceph_msg *),
1113                           struct ceph_osd_request **linger_req,
1114                           u64 *ver)
1115 {
1116         struct ceph_osd_client *osdc;
1117         struct ceph_osd_request *osd_req;
1118         struct rbd_request *rbd_req = NULL;
1119         struct timespec mtime = CURRENT_TIME;
1120         int ret;
1121
1122         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1123                 object_name, (unsigned long long) ofs,
1124                 (unsigned long long) len, coll, coll_index);
1125
1126         osdc = &rbd_dev->rbd_client->client->osdc;
1127         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1128         if (!osd_req)
1129                 return -ENOMEM;
1130
1131         osd_req->r_flags = flags;
1132         osd_req->r_pages = pages;
1133         if (bio) {
1134                 osd_req->r_bio = bio;
1135                 bio_get(osd_req->r_bio);
1136         }
1137
1138         if (rbd_cb) {
1139                 ret = -ENOMEM;
1140                 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1141                 if (!rbd_req)
1142                         goto done_osd_req;
1143
1144                 rbd_req->rq = rq;
1145                 rbd_req->bio = bio;
1146                 rbd_req->pages = pages;
1147                 rbd_req->len = len;
1148                 rbd_req->coll = coll;
1149                 rbd_req->coll_index = coll ? coll_index : 0;
1150         }
1151
1152         osd_req->r_callback = rbd_cb;
1153         osd_req->r_priv = rbd_req;
1154
1155         strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1156         osd_req->r_oid_len = strlen(osd_req->r_oid);
1157
1158         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1159
1160         if (op->op == CEPH_OSD_OP_READ || op->op == CEPH_OSD_OP_WRITE) {
1161                 op->extent.offset = ofs;
1162                 op->extent.length = len;
1163                 if (op->op == CEPH_OSD_OP_WRITE)
1164                         op->payload_len = len;
1165         }
1166         osd_req->r_num_pages = calc_pages_for(ofs, len);
1167         osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1168
1169         ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1170                                 snapc, snapid, &mtime);
1171
1172         if (linger_req) {
1173                 ceph_osdc_set_request_linger(osdc, osd_req);
1174                 *linger_req = osd_req;
1175         }
1176
1177         ret = ceph_osdc_start_request(osdc, osd_req, false);
1178         if (ret < 0)
1179                 goto done_err;
1180
1181         if (!rbd_cb) {
1182                 u64 version;
1183
1184                 ret = ceph_osdc_wait_request(osdc, osd_req);
1185                 version = le64_to_cpu(osd_req->r_reassert_version.version);
1186                 if (ver)
1187                         *ver = version;
1188                 dout("reassert_ver=%llu\n", (unsigned long long) version);
1189                 ceph_osdc_put_request(osd_req);
1190         }
1191         return ret;
1192
1193 done_err:
1194         if (bio)
1195                 bio_chain_put(osd_req->r_bio);
1196         kfree(rbd_req);
1197 done_osd_req:
1198         ceph_osdc_put_request(osd_req);
1199
1200         return ret;
1201 }
1202
1203 /*
1204  * Ceph osd op callback
1205  */
1206 static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1207 {
1208         struct rbd_request *rbd_req = osd_req->r_priv;
1209         struct ceph_osd_reply_head *replyhead;
1210         struct ceph_osd_op *op;
1211         s32 rc;
1212         u64 bytes;
1213         int read_op;
1214
1215         /* parse reply */
1216         replyhead = msg->front.iov_base;
1217         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1218         op = (void *)(replyhead + 1);
1219         rc = (s32)le32_to_cpu(replyhead->result);
1220         bytes = le64_to_cpu(op->extent.length);
1221         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1222
1223         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1224                 (unsigned long long) bytes, read_op, (int) rc);
1225
1226         if (rc == (s32)-ENOENT && read_op) {
1227                 zero_bio_chain(rbd_req->bio, 0);
1228                 rc = 0;
1229         } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1230                 zero_bio_chain(rbd_req->bio, bytes);
1231                 bytes = rbd_req->len;
1232         }
1233
1234         rbd_coll_end_req(rbd_req, rc, bytes);
1235
1236         if (rbd_req->bio)
1237                 bio_chain_put(rbd_req->bio);
1238
1239         ceph_osdc_put_request(osd_req);
1240         kfree(rbd_req);
1241 }
1242
1243 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1244                                 struct ceph_msg *msg)
1245 {
1246         ceph_osdc_put_request(osd_req);
1247 }
1248
1249 /*
1250  * Do a synchronous ceph osd operation
1251  */
1252 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1253                            int flags,
1254                            struct ceph_osd_req_op *op,
1255                            const char *object_name,
1256                            u64 ofs, u64 inbound_size,
1257                            char *inbound,
1258                            struct ceph_osd_request **linger_req,
1259                            u64 *ver)
1260 {
1261         int ret;
1262         struct page **pages;
1263         int num_pages;
1264
1265         rbd_assert(op != NULL);
1266
1267         num_pages = calc_pages_for(ofs, inbound_size);
1268         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1269         if (IS_ERR(pages))
1270                 return PTR_ERR(pages);
1271
1272         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1273                           object_name, ofs, inbound_size, NULL,
1274                           pages, num_pages,
1275                           flags,
1276                           op,
1277                           NULL, 0,
1278                           NULL,
1279                           linger_req, ver);
1280         if (ret < 0)
1281                 goto done;
1282
1283         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1284                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1285
1286 done:
1287         ceph_release_page_vector(pages, num_pages);
1288         return ret;
1289 }
1290
1291 /*
1292  * Do an asynchronous ceph osd operation
1293  */
1294 static int rbd_do_op(struct request *rq,
1295                      struct rbd_device *rbd_dev,
1296                      struct ceph_snap_context *snapc,
1297                      u64 ofs, u64 len,
1298                      struct bio *bio,
1299                      struct rbd_req_coll *coll,
1300                      int coll_index)
1301 {
1302         char *seg_name;
1303         u64 seg_ofs;
1304         u64 seg_len;
1305         int ret;
1306         struct ceph_osd_req_op *op;
1307         u32 payload_len;
1308         int opcode;
1309         int flags;
1310         u64 snapid;
1311
1312         seg_name = rbd_segment_name(rbd_dev, ofs);
1313         if (!seg_name)
1314                 return -ENOMEM;
1315         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1316         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1317
1318         if (rq_data_dir(rq) == WRITE) {
1319                 opcode = CEPH_OSD_OP_WRITE;
1320                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1321                 snapid = CEPH_NOSNAP;
1322                 payload_len = seg_len;
1323         } else {
1324                 opcode = CEPH_OSD_OP_READ;
1325                 flags = CEPH_OSD_FLAG_READ;
1326                 rbd_assert(!snapc);
1327                 snapid = rbd_dev->spec->snap_id;
1328                 payload_len = 0;
1329         }
1330
1331         ret = -ENOMEM;
1332         op = rbd_create_rw_op(opcode, payload_len);
1333         if (!op)
1334                 goto done;
1335
1336         /* we've taken care of segment sizes earlier when we
1337            cloned the bios. We should never have a segment
1338            truncated at this point */
1339         rbd_assert(seg_len == len);
1340
1341         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1342                              seg_name, seg_ofs, seg_len,
1343                              bio,
1344                              NULL, 0,
1345                              flags,
1346                              op,
1347                              coll, coll_index,
1348                              rbd_req_cb, 0, NULL);
1349         if (ret < 0)
1350                 rbd_coll_end_req_index(rq, coll, coll_index,
1351                                         (s32)ret, seg_len);
1352         rbd_destroy_op(op);
1353 done:
1354         kfree(seg_name);
1355         return ret;
1356 }
1357
1358 /*
1359  * Request sync osd read
1360  */
1361 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1362                           const char *object_name,
1363                           u64 ofs, u64 len,
1364                           char *buf,
1365                           u64 *ver)
1366 {
1367         struct ceph_osd_req_op *op;
1368         int ret;
1369
1370         op = rbd_create_rw_op(CEPH_OSD_OP_READ, 0);
1371         if (!op)
1372                 return -ENOMEM;
1373
1374         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1375                                op, object_name, ofs, len, buf, NULL, ver);
1376         rbd_destroy_op(op);
1377
1378         return ret;
1379 }
1380
1381 /*
1382  * Request sync osd watch
1383  */
1384 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1385                                    u64 ver,
1386                                    u64 notify_id)
1387 {
1388         struct ceph_osd_req_op *op;
1389         int ret;
1390
1391         op = rbd_create_rw_op(CEPH_OSD_OP_NOTIFY_ACK, 0);
1392         if (!op)
1393                 return -ENOMEM;
1394
1395         op->watch.ver = cpu_to_le64(ver);
1396         op->watch.cookie = notify_id;
1397         op->watch.flag = 0;
1398
1399         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1400                           rbd_dev->header_name, 0, 0, NULL,
1401                           NULL, 0,
1402                           CEPH_OSD_FLAG_READ,
1403                           op,
1404                           NULL, 0,
1405                           rbd_simple_req_cb, 0, NULL);
1406
1407         rbd_destroy_op(op);
1408         return ret;
1409 }
1410
1411 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1412 {
1413         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1414         u64 hver;
1415         int rc;
1416
1417         if (!rbd_dev)
1418                 return;
1419
1420         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1421                 rbd_dev->header_name, (unsigned long long) notify_id,
1422                 (unsigned int) opcode);
1423         rc = rbd_dev_refresh(rbd_dev, &hver);
1424         if (rc)
1425                 rbd_warn(rbd_dev, "got notification but failed to "
1426                            " update snaps: %d\n", rc);
1427
1428         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1429 }
1430
1431 /*
1432  * Request sync osd watch/unwatch.  The value of "start" determines
1433  * whether a watch request is being initiated or torn down.
1434  */
1435 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1436 {
1437         struct ceph_osd_req_op *op;
1438         struct ceph_osd_request **linger_req = NULL;
1439         __le64 version = 0;
1440         int ret;
1441
1442         op = rbd_create_rw_op(CEPH_OSD_OP_WATCH, 0);
1443         if (!op)
1444                 return -ENOMEM;
1445
1446         if (start) {
1447                 struct ceph_osd_client *osdc;
1448
1449                 osdc = &rbd_dev->rbd_client->client->osdc;
1450                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1451                                                 &rbd_dev->watch_event);
1452                 if (ret < 0)
1453                         goto done;
1454                 version = cpu_to_le64(rbd_dev->header.obj_version);
1455                 linger_req = &rbd_dev->watch_request;
1456         }
1457
1458         op->watch.ver = version;
1459         op->watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1460         op->watch.flag = (u8) start ? 1 : 0;
1461
1462         ret = rbd_req_sync_op(rbd_dev,
1463                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1464                               op, rbd_dev->header_name,
1465                               0, 0, NULL, linger_req, NULL);
1466
1467         if (!start || ret < 0) {
1468                 ceph_osdc_cancel_event(rbd_dev->watch_event);
1469                 rbd_dev->watch_event = NULL;
1470         }
1471 done:
1472         rbd_destroy_op(op);
1473
1474         return ret;
1475 }
1476
1477 /*
1478  * Synchronous osd object method call
1479  */
1480 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1481                              const char *object_name,
1482                              const char *class_name,
1483                              const char *method_name,
1484                              const char *outbound,
1485                              size_t outbound_size,
1486                              char *inbound,
1487                              size_t inbound_size,
1488                              u64 *ver)
1489 {
1490         struct ceph_osd_req_op *op;
1491         int class_name_len = strlen(class_name);
1492         int method_name_len = strlen(method_name);
1493         int payload_size;
1494         int ret;
1495
1496         /*
1497          * Any input parameters required by the method we're calling
1498          * will be sent along with the class and method names as
1499          * part of the message payload.  That data and its size are
1500          * supplied via the indata and indata_len fields (named from
1501          * the perspective of the server side) in the OSD request
1502          * operation.
1503          */
1504         payload_size = class_name_len + method_name_len + outbound_size;
1505         op = rbd_create_rw_op(CEPH_OSD_OP_CALL, payload_size);
1506         if (!op)
1507                 return -ENOMEM;
1508
1509         op->cls.class_name = class_name;
1510         op->cls.class_len = (__u8) class_name_len;
1511         op->cls.method_name = method_name;
1512         op->cls.method_len = (__u8) method_name_len;
1513         op->cls.argc = 0;
1514         op->cls.indata = outbound;
1515         op->cls.indata_len = outbound_size;
1516
1517         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1518                                object_name, 0, inbound_size, inbound,
1519                                NULL, ver);
1520
1521         rbd_destroy_op(op);
1522
1523         dout("cls_exec returned %d\n", ret);
1524         return ret;
1525 }
1526
1527 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1528 {
1529         struct rbd_req_coll *coll =
1530                         kzalloc(sizeof(struct rbd_req_coll) +
1531                                 sizeof(struct rbd_req_status) * num_reqs,
1532                                 GFP_ATOMIC);
1533
1534         if (!coll)
1535                 return NULL;
1536         coll->total = num_reqs;
1537         kref_init(&coll->kref);
1538         return coll;
1539 }
1540
1541 static int rbd_dev_do_request(struct request *rq,
1542                                 struct rbd_device *rbd_dev,
1543                                 struct ceph_snap_context *snapc,
1544                                 u64 ofs, unsigned int size,
1545                                 struct bio *bio_chain)
1546 {
1547         int num_segs;
1548         struct rbd_req_coll *coll;
1549         unsigned int bio_offset;
1550         int cur_seg = 0;
1551
1552         dout("%s 0x%x bytes at 0x%llx\n",
1553                 rq_data_dir(rq) == WRITE ? "write" : "read",
1554                 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1555
1556         num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1557         if (num_segs <= 0)
1558                 return num_segs;
1559
1560         coll = rbd_alloc_coll(num_segs);
1561         if (!coll)
1562                 return -ENOMEM;
1563
1564         bio_offset = 0;
1565         do {
1566                 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1567                 unsigned int clone_size;
1568                 struct bio *bio_clone;
1569
1570                 BUG_ON(limit > (u64)UINT_MAX);
1571                 clone_size = (unsigned int)limit;
1572                 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1573
1574                 kref_get(&coll->kref);
1575
1576                 /* Pass a cloned bio chain via an osd request */
1577
1578                 bio_clone = bio_chain_clone_range(&bio_chain,
1579                                         &bio_offset, clone_size,
1580                                         GFP_ATOMIC);
1581                 if (bio_clone)
1582                         (void)rbd_do_op(rq, rbd_dev, snapc,
1583                                         ofs, clone_size,
1584                                         bio_clone, coll, cur_seg);
1585                 else
1586                         rbd_coll_end_req_index(rq, coll, cur_seg,
1587                                                 (s32)-ENOMEM,
1588                                                 clone_size);
1589                 size -= clone_size;
1590                 ofs += clone_size;
1591
1592                 cur_seg++;
1593         } while (size > 0);
1594         kref_put(&coll->kref, rbd_coll_release);
1595
1596         return 0;
1597 }
1598
1599 /*
1600  * block device queue callback
1601  */
1602 static void rbd_rq_fn(struct request_queue *q)
1603 {
1604         struct rbd_device *rbd_dev = q->queuedata;
1605         bool read_only = rbd_dev->mapping.read_only;
1606         struct request *rq;
1607
1608         while ((rq = blk_fetch_request(q))) {
1609                 struct ceph_snap_context *snapc = NULL;
1610                 unsigned int size = 0;
1611                 int result;
1612
1613                 dout("fetched request\n");
1614
1615                 /* Filter out block requests we don't understand */
1616
1617                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1618                         __blk_end_request_all(rq, 0);
1619                         continue;
1620                 }
1621                 spin_unlock_irq(q->queue_lock);
1622
1623                 /* Write requests need a reference to the snapshot context */
1624
1625                 if (rq_data_dir(rq) == WRITE) {
1626                         result = -EROFS;
1627                         if (read_only) /* Can't write to a read-only device */
1628                                 goto out_end_request;
1629
1630                         /*
1631                          * Note that each osd request will take its
1632                          * own reference to the snapshot context
1633                          * supplied.  The reference we take here
1634                          * just guarantees the one we provide stays
1635                          * valid.
1636                          */
1637                         down_read(&rbd_dev->header_rwsem);
1638                         snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1639                         up_read(&rbd_dev->header_rwsem);
1640                         rbd_assert(snapc != NULL);
1641                 } else if (!atomic_read(&rbd_dev->exists)) {
1642                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1643                         dout("request for non-existent snapshot");
1644                         result = -ENXIO;
1645                         goto out_end_request;
1646                 }
1647
1648                 size = blk_rq_bytes(rq);
1649                 result = rbd_dev_do_request(rq, rbd_dev, snapc,
1650                                 blk_rq_pos(rq) * SECTOR_SIZE,
1651                                 size, rq->bio);
1652 out_end_request:
1653                 if (snapc)
1654                         ceph_put_snap_context(snapc);
1655                 spin_lock_irq(q->queue_lock);
1656                 if (!size || result < 0)
1657                         __blk_end_request_all(rq, result);
1658         }
1659 }
1660
1661 /*
1662  * a queue callback. Makes sure that we don't create a bio that spans across
1663  * multiple osd objects. One exception would be with a single page bios,
1664  * which we handle later at bio_chain_clone_range()
1665  */
1666 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1667                           struct bio_vec *bvec)
1668 {
1669         struct rbd_device *rbd_dev = q->queuedata;
1670         sector_t sector_offset;
1671         sector_t sectors_per_obj;
1672         sector_t obj_sector_offset;
1673         int ret;
1674
1675         /*
1676          * Find how far into its rbd object the partition-relative
1677          * bio start sector is to offset relative to the enclosing
1678          * device.
1679          */
1680         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1681         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1682         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1683
1684         /*
1685          * Compute the number of bytes from that offset to the end
1686          * of the object.  Account for what's already used by the bio.
1687          */
1688         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1689         if (ret > bmd->bi_size)
1690                 ret -= bmd->bi_size;
1691         else
1692                 ret = 0;
1693
1694         /*
1695          * Don't send back more than was asked for.  And if the bio
1696          * was empty, let the whole thing through because:  "Note
1697          * that a block device *must* allow a single page to be
1698          * added to an empty bio."
1699          */
1700         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1701         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1702                 ret = (int) bvec->bv_len;
1703
1704         return ret;
1705 }
1706
1707 static void rbd_free_disk(struct rbd_device *rbd_dev)
1708 {
1709         struct gendisk *disk = rbd_dev->disk;
1710
1711         if (!disk)
1712                 return;
1713
1714         if (disk->flags & GENHD_FL_UP)
1715                 del_gendisk(disk);
1716         if (disk->queue)
1717                 blk_cleanup_queue(disk->queue);
1718         put_disk(disk);
1719 }
1720
1721 /*
1722  * Read the complete header for the given rbd device.
1723  *
1724  * Returns a pointer to a dynamically-allocated buffer containing
1725  * the complete and validated header.  Caller can pass the address
1726  * of a variable that will be filled in with the version of the
1727  * header object at the time it was read.
1728  *
1729  * Returns a pointer-coded errno if a failure occurs.
1730  */
1731 static struct rbd_image_header_ondisk *
1732 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1733 {
1734         struct rbd_image_header_ondisk *ondisk = NULL;
1735         u32 snap_count = 0;
1736         u64 names_size = 0;
1737         u32 want_count;
1738         int ret;
1739
1740         /*
1741          * The complete header will include an array of its 64-bit
1742          * snapshot ids, followed by the names of those snapshots as
1743          * a contiguous block of NUL-terminated strings.  Note that
1744          * the number of snapshots could change by the time we read
1745          * it in, in which case we re-read it.
1746          */
1747         do {
1748                 size_t size;
1749
1750                 kfree(ondisk);
1751
1752                 size = sizeof (*ondisk);
1753                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1754                 size += names_size;
1755                 ondisk = kmalloc(size, GFP_KERNEL);
1756                 if (!ondisk)
1757                         return ERR_PTR(-ENOMEM);
1758
1759                 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
1760                                        0, size,
1761                                        (char *) ondisk, version);
1762
1763                 if (ret < 0)
1764                         goto out_err;
1765                 if (WARN_ON((size_t) ret < size)) {
1766                         ret = -ENXIO;
1767                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1768                                 size, ret);
1769                         goto out_err;
1770                 }
1771                 if (!rbd_dev_ondisk_valid(ondisk)) {
1772                         ret = -ENXIO;
1773                         rbd_warn(rbd_dev, "invalid header");
1774                         goto out_err;
1775                 }
1776
1777                 names_size = le64_to_cpu(ondisk->snap_names_len);
1778                 want_count = snap_count;
1779                 snap_count = le32_to_cpu(ondisk->snap_count);
1780         } while (snap_count != want_count);
1781
1782         return ondisk;
1783
1784 out_err:
1785         kfree(ondisk);
1786
1787         return ERR_PTR(ret);
1788 }
1789
1790 /*
1791  * reload the ondisk the header
1792  */
1793 static int rbd_read_header(struct rbd_device *rbd_dev,
1794                            struct rbd_image_header *header)
1795 {
1796         struct rbd_image_header_ondisk *ondisk;
1797         u64 ver = 0;
1798         int ret;
1799
1800         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1801         if (IS_ERR(ondisk))
1802                 return PTR_ERR(ondisk);
1803         ret = rbd_header_from_disk(header, ondisk);
1804         if (ret >= 0)
1805                 header->obj_version = ver;
1806         kfree(ondisk);
1807
1808         return ret;
1809 }
1810
1811 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1812 {
1813         struct rbd_snap *snap;
1814         struct rbd_snap *next;
1815
1816         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1817                 rbd_remove_snap_dev(snap);
1818 }
1819
1820 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1821 {
1822         sector_t size;
1823
1824         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1825                 return;
1826
1827         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1828         dout("setting size to %llu sectors", (unsigned long long) size);
1829         rbd_dev->mapping.size = (u64) size;
1830         set_capacity(rbd_dev->disk, size);
1831 }
1832
1833 /*
1834  * only read the first part of the ondisk header, without the snaps info
1835  */
1836 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1837 {
1838         int ret;
1839         struct rbd_image_header h;
1840
1841         ret = rbd_read_header(rbd_dev, &h);
1842         if (ret < 0)
1843                 return ret;
1844
1845         down_write(&rbd_dev->header_rwsem);
1846
1847         /* Update image size, and check for resize of mapped image */
1848         rbd_dev->header.image_size = h.image_size;
1849         rbd_update_mapping_size(rbd_dev);
1850
1851         /* rbd_dev->header.object_prefix shouldn't change */
1852         kfree(rbd_dev->header.snap_sizes);
1853         kfree(rbd_dev->header.snap_names);
1854         /* osd requests may still refer to snapc */
1855         ceph_put_snap_context(rbd_dev->header.snapc);
1856
1857         if (hver)
1858                 *hver = h.obj_version;
1859         rbd_dev->header.obj_version = h.obj_version;
1860         rbd_dev->header.image_size = h.image_size;
1861         rbd_dev->header.snapc = h.snapc;
1862         rbd_dev->header.snap_names = h.snap_names;
1863         rbd_dev->header.snap_sizes = h.snap_sizes;
1864         /* Free the extra copy of the object prefix */
1865         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1866         kfree(h.object_prefix);
1867
1868         ret = rbd_dev_snaps_update(rbd_dev);
1869         if (!ret)
1870                 ret = rbd_dev_snaps_register(rbd_dev);
1871
1872         up_write(&rbd_dev->header_rwsem);
1873
1874         return ret;
1875 }
1876
1877 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1878 {
1879         int ret;
1880
1881         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1882         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1883         if (rbd_dev->image_format == 1)
1884                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1885         else
1886                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1887         mutex_unlock(&ctl_mutex);
1888
1889         return ret;
1890 }
1891
1892 static int rbd_init_disk(struct rbd_device *rbd_dev)
1893 {
1894         struct gendisk *disk;
1895         struct request_queue *q;
1896         u64 segment_size;
1897
1898         /* create gendisk info */
1899         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1900         if (!disk)
1901                 return -ENOMEM;
1902
1903         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1904                  rbd_dev->dev_id);
1905         disk->major = rbd_dev->major;
1906         disk->first_minor = 0;
1907         disk->fops = &rbd_bd_ops;
1908         disk->private_data = rbd_dev;
1909
1910         /* init rq */
1911         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1912         if (!q)
1913                 goto out_disk;
1914
1915         /* We use the default size, but let's be explicit about it. */
1916         blk_queue_physical_block_size(q, SECTOR_SIZE);
1917
1918         /* set io sizes to object size */
1919         segment_size = rbd_obj_bytes(&rbd_dev->header);
1920         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1921         blk_queue_max_segment_size(q, segment_size);
1922         blk_queue_io_min(q, segment_size);
1923         blk_queue_io_opt(q, segment_size);
1924
1925         blk_queue_merge_bvec(q, rbd_merge_bvec);
1926         disk->queue = q;
1927
1928         q->queuedata = rbd_dev;
1929
1930         rbd_dev->disk = disk;
1931
1932         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1933
1934         return 0;
1935 out_disk:
1936         put_disk(disk);
1937
1938         return -ENOMEM;
1939 }
1940
1941 /*
1942   sysfs
1943 */
1944
1945 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1946 {
1947         return container_of(dev, struct rbd_device, dev);
1948 }
1949
1950 static ssize_t rbd_size_show(struct device *dev,
1951                              struct device_attribute *attr, char *buf)
1952 {
1953         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1954         sector_t size;
1955
1956         down_read(&rbd_dev->header_rwsem);
1957         size = get_capacity(rbd_dev->disk);
1958         up_read(&rbd_dev->header_rwsem);
1959
1960         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1961 }
1962
1963 /*
1964  * Note this shows the features for whatever's mapped, which is not
1965  * necessarily the base image.
1966  */
1967 static ssize_t rbd_features_show(struct device *dev,
1968                              struct device_attribute *attr, char *buf)
1969 {
1970         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1971
1972         return sprintf(buf, "0x%016llx\n",
1973                         (unsigned long long) rbd_dev->mapping.features);
1974 }
1975
1976 static ssize_t rbd_major_show(struct device *dev,
1977                               struct device_attribute *attr, char *buf)
1978 {
1979         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1980
1981         return sprintf(buf, "%d\n", rbd_dev->major);
1982 }
1983
1984 static ssize_t rbd_client_id_show(struct device *dev,
1985                                   struct device_attribute *attr, char *buf)
1986 {
1987         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1988
1989         return sprintf(buf, "client%lld\n",
1990                         ceph_client_id(rbd_dev->rbd_client->client));
1991 }
1992
1993 static ssize_t rbd_pool_show(struct device *dev,
1994                              struct device_attribute *attr, char *buf)
1995 {
1996         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1997
1998         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1999 }
2000
2001 static ssize_t rbd_pool_id_show(struct device *dev,
2002                              struct device_attribute *attr, char *buf)
2003 {
2004         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2005
2006         return sprintf(buf, "%llu\n",
2007                 (unsigned long long) rbd_dev->spec->pool_id);
2008 }
2009
2010 static ssize_t rbd_name_show(struct device *dev,
2011                              struct device_attribute *attr, char *buf)
2012 {
2013         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2014
2015         if (rbd_dev->spec->image_name)
2016                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2017
2018         return sprintf(buf, "(unknown)\n");
2019 }
2020
2021 static ssize_t rbd_image_id_show(struct device *dev,
2022                              struct device_attribute *attr, char *buf)
2023 {
2024         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2025
2026         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2027 }
2028
2029 /*
2030  * Shows the name of the currently-mapped snapshot (or
2031  * RBD_SNAP_HEAD_NAME for the base image).
2032  */
2033 static ssize_t rbd_snap_show(struct device *dev,
2034                              struct device_attribute *attr,
2035                              char *buf)
2036 {
2037         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2038
2039         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2040 }
2041
2042 /*
2043  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2044  * for the parent image.  If there is no parent, simply shows
2045  * "(no parent image)".
2046  */
2047 static ssize_t rbd_parent_show(struct device *dev,
2048                              struct device_attribute *attr,
2049                              char *buf)
2050 {
2051         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2052         struct rbd_spec *spec = rbd_dev->parent_spec;
2053         int count;
2054         char *bufp = buf;
2055
2056         if (!spec)
2057                 return sprintf(buf, "(no parent image)\n");
2058
2059         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2060                         (unsigned long long) spec->pool_id, spec->pool_name);
2061         if (count < 0)
2062                 return count;
2063         bufp += count;
2064
2065         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2066                         spec->image_name ? spec->image_name : "(unknown)");
2067         if (count < 0)
2068                 return count;
2069         bufp += count;
2070
2071         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2072                         (unsigned long long) spec->snap_id, spec->snap_name);
2073         if (count < 0)
2074                 return count;
2075         bufp += count;
2076
2077         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2078         if (count < 0)
2079                 return count;
2080         bufp += count;
2081
2082         return (ssize_t) (bufp - buf);
2083 }
2084
2085 static ssize_t rbd_image_refresh(struct device *dev,
2086                                  struct device_attribute *attr,
2087                                  const char *buf,
2088                                  size_t size)
2089 {
2090         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2091         int ret;
2092
2093         ret = rbd_dev_refresh(rbd_dev, NULL);
2094
2095         return ret < 0 ? ret : size;
2096 }
2097
2098 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2099 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2100 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2101 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2102 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2103 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2104 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2105 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2106 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2107 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2108 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2109
2110 static struct attribute *rbd_attrs[] = {
2111         &dev_attr_size.attr,
2112         &dev_attr_features.attr,
2113         &dev_attr_major.attr,
2114         &dev_attr_client_id.attr,
2115         &dev_attr_pool.attr,
2116         &dev_attr_pool_id.attr,
2117         &dev_attr_name.attr,
2118         &dev_attr_image_id.attr,
2119         &dev_attr_current_snap.attr,
2120         &dev_attr_parent.attr,
2121         &dev_attr_refresh.attr,
2122         NULL
2123 };
2124
2125 static struct attribute_group rbd_attr_group = {
2126         .attrs = rbd_attrs,
2127 };
2128
2129 static const struct attribute_group *rbd_attr_groups[] = {
2130         &rbd_attr_group,
2131         NULL
2132 };
2133
2134 static void rbd_sysfs_dev_release(struct device *dev)
2135 {
2136 }
2137
2138 static struct device_type rbd_device_type = {
2139         .name           = "rbd",
2140         .groups         = rbd_attr_groups,
2141         .release        = rbd_sysfs_dev_release,
2142 };
2143
2144
2145 /*
2146   sysfs - snapshots
2147 */
2148
2149 static ssize_t rbd_snap_size_show(struct device *dev,
2150                                   struct device_attribute *attr,
2151                                   char *buf)
2152 {
2153         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2154
2155         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2156 }
2157
2158 static ssize_t rbd_snap_id_show(struct device *dev,
2159                                 struct device_attribute *attr,
2160                                 char *buf)
2161 {
2162         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2163
2164         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2165 }
2166
2167 static ssize_t rbd_snap_features_show(struct device *dev,
2168                                 struct device_attribute *attr,
2169                                 char *buf)
2170 {
2171         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2172
2173         return sprintf(buf, "0x%016llx\n",
2174                         (unsigned long long) snap->features);
2175 }
2176
2177 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2178 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2179 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2180
2181 static struct attribute *rbd_snap_attrs[] = {
2182         &dev_attr_snap_size.attr,
2183         &dev_attr_snap_id.attr,
2184         &dev_attr_snap_features.attr,
2185         NULL,
2186 };
2187
2188 static struct attribute_group rbd_snap_attr_group = {
2189         .attrs = rbd_snap_attrs,
2190 };
2191
2192 static void rbd_snap_dev_release(struct device *dev)
2193 {
2194         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2195         kfree(snap->name);
2196         kfree(snap);
2197 }
2198
2199 static const struct attribute_group *rbd_snap_attr_groups[] = {
2200         &rbd_snap_attr_group,
2201         NULL
2202 };
2203
2204 static struct device_type rbd_snap_device_type = {
2205         .groups         = rbd_snap_attr_groups,
2206         .release        = rbd_snap_dev_release,
2207 };
2208
2209 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2210 {
2211         kref_get(&spec->kref);
2212
2213         return spec;
2214 }
2215
2216 static void rbd_spec_free(struct kref *kref);
2217 static void rbd_spec_put(struct rbd_spec *spec)
2218 {
2219         if (spec)
2220                 kref_put(&spec->kref, rbd_spec_free);
2221 }
2222
2223 static struct rbd_spec *rbd_spec_alloc(void)
2224 {
2225         struct rbd_spec *spec;
2226
2227         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2228         if (!spec)
2229                 return NULL;
2230         kref_init(&spec->kref);
2231
2232         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2233
2234         return spec;
2235 }
2236
2237 static void rbd_spec_free(struct kref *kref)
2238 {
2239         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2240
2241         kfree(spec->pool_name);
2242         kfree(spec->image_id);
2243         kfree(spec->image_name);
2244         kfree(spec->snap_name);
2245         kfree(spec);
2246 }
2247
2248 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2249                                 struct rbd_spec *spec)
2250 {
2251         struct rbd_device *rbd_dev;
2252
2253         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2254         if (!rbd_dev)
2255                 return NULL;
2256
2257         spin_lock_init(&rbd_dev->lock);
2258         atomic_set(&rbd_dev->exists, 0);
2259         INIT_LIST_HEAD(&rbd_dev->node);
2260         INIT_LIST_HEAD(&rbd_dev->snaps);
2261         init_rwsem(&rbd_dev->header_rwsem);
2262
2263         rbd_dev->spec = spec;
2264         rbd_dev->rbd_client = rbdc;
2265
2266         /* Initialize the layout used for all rbd requests */
2267
2268         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2269         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2270         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2271         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2272
2273         return rbd_dev;
2274 }
2275
2276 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2277 {
2278         rbd_spec_put(rbd_dev->parent_spec);
2279         kfree(rbd_dev->header_name);
2280         rbd_put_client(rbd_dev->rbd_client);
2281         rbd_spec_put(rbd_dev->spec);
2282         kfree(rbd_dev);
2283 }
2284
2285 static bool rbd_snap_registered(struct rbd_snap *snap)
2286 {
2287         bool ret = snap->dev.type == &rbd_snap_device_type;
2288         bool reg = device_is_registered(&snap->dev);
2289
2290         rbd_assert(!ret ^ reg);
2291
2292         return ret;
2293 }
2294
2295 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2296 {
2297         list_del(&snap->node);
2298         if (device_is_registered(&snap->dev))
2299                 device_unregister(&snap->dev);
2300 }
2301
2302 static int rbd_register_snap_dev(struct rbd_snap *snap,
2303                                   struct device *parent)
2304 {
2305         struct device *dev = &snap->dev;
2306         int ret;
2307
2308         dev->type = &rbd_snap_device_type;
2309         dev->parent = parent;
2310         dev->release = rbd_snap_dev_release;
2311         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2312         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2313
2314         ret = device_register(dev);
2315
2316         return ret;
2317 }
2318
2319 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2320                                                 const char *snap_name,
2321                                                 u64 snap_id, u64 snap_size,
2322                                                 u64 snap_features)
2323 {
2324         struct rbd_snap *snap;
2325         int ret;
2326
2327         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2328         if (!snap)
2329                 return ERR_PTR(-ENOMEM);
2330
2331         ret = -ENOMEM;
2332         snap->name = kstrdup(snap_name, GFP_KERNEL);
2333         if (!snap->name)
2334                 goto err;
2335
2336         snap->id = snap_id;
2337         snap->size = snap_size;
2338         snap->features = snap_features;
2339
2340         return snap;
2341
2342 err:
2343         kfree(snap->name);
2344         kfree(snap);
2345
2346         return ERR_PTR(ret);
2347 }
2348
2349 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2350                 u64 *snap_size, u64 *snap_features)
2351 {
2352         char *snap_name;
2353
2354         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2355
2356         *snap_size = rbd_dev->header.snap_sizes[which];
2357         *snap_features = 0;     /* No features for v1 */
2358
2359         /* Skip over names until we find the one we are looking for */
2360
2361         snap_name = rbd_dev->header.snap_names;
2362         while (which--)
2363                 snap_name += strlen(snap_name) + 1;
2364
2365         return snap_name;
2366 }
2367
2368 /*
2369  * Get the size and object order for an image snapshot, or if
2370  * snap_id is CEPH_NOSNAP, gets this information for the base
2371  * image.
2372  */
2373 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2374                                 u8 *order, u64 *snap_size)
2375 {
2376         __le64 snapid = cpu_to_le64(snap_id);
2377         int ret;
2378         struct {
2379                 u8 order;
2380                 __le64 size;
2381         } __attribute__ ((packed)) size_buf = { 0 };
2382
2383         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2384                                 "rbd", "get_size",
2385                                 (char *) &snapid, sizeof (snapid),
2386                                 (char *) &size_buf, sizeof (size_buf), NULL);
2387         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2388         if (ret < 0)
2389                 return ret;
2390
2391         *order = size_buf.order;
2392         *snap_size = le64_to_cpu(size_buf.size);
2393
2394         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2395                 (unsigned long long) snap_id, (unsigned int) *order,
2396                 (unsigned long long) *snap_size);
2397
2398         return 0;
2399 }
2400
2401 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2402 {
2403         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2404                                         &rbd_dev->header.obj_order,
2405                                         &rbd_dev->header.image_size);
2406 }
2407
2408 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2409 {
2410         void *reply_buf;
2411         int ret;
2412         void *p;
2413
2414         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2415         if (!reply_buf)
2416                 return -ENOMEM;
2417
2418         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2419                                 "rbd", "get_object_prefix",
2420                                 NULL, 0,
2421                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2422         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2423         if (ret < 0)
2424                 goto out;
2425         ret = 0;    /* rbd_req_sync_exec() can return positive */
2426
2427         p = reply_buf;
2428         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2429                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2430                                                 NULL, GFP_NOIO);
2431
2432         if (IS_ERR(rbd_dev->header.object_prefix)) {
2433                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2434                 rbd_dev->header.object_prefix = NULL;
2435         } else {
2436                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2437         }
2438
2439 out:
2440         kfree(reply_buf);
2441
2442         return ret;
2443 }
2444
2445 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2446                 u64 *snap_features)
2447 {
2448         __le64 snapid = cpu_to_le64(snap_id);
2449         struct {
2450                 __le64 features;
2451                 __le64 incompat;
2452         } features_buf = { 0 };
2453         u64 incompat;
2454         int ret;
2455
2456         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2457                                 "rbd", "get_features",
2458                                 (char *) &snapid, sizeof (snapid),
2459                                 (char *) &features_buf, sizeof (features_buf),
2460                                 NULL);
2461         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2462         if (ret < 0)
2463                 return ret;
2464
2465         incompat = le64_to_cpu(features_buf.incompat);
2466         if (incompat & ~RBD_FEATURES_ALL)
2467                 return -ENXIO;
2468
2469         *snap_features = le64_to_cpu(features_buf.features);
2470
2471         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2472                 (unsigned long long) snap_id,
2473                 (unsigned long long) *snap_features,
2474                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2475
2476         return 0;
2477 }
2478
2479 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2480 {
2481         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2482                                                 &rbd_dev->header.features);
2483 }
2484
2485 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2486 {
2487         struct rbd_spec *parent_spec;
2488         size_t size;
2489         void *reply_buf = NULL;
2490         __le64 snapid;
2491         void *p;
2492         void *end;
2493         char *image_id;
2494         u64 overlap;
2495         int ret;
2496
2497         parent_spec = rbd_spec_alloc();
2498         if (!parent_spec)
2499                 return -ENOMEM;
2500
2501         size = sizeof (__le64) +                                /* pool_id */
2502                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2503                 sizeof (__le64) +                               /* snap_id */
2504                 sizeof (__le64);                                /* overlap */
2505         reply_buf = kmalloc(size, GFP_KERNEL);
2506         if (!reply_buf) {
2507                 ret = -ENOMEM;
2508                 goto out_err;
2509         }
2510
2511         snapid = cpu_to_le64(CEPH_NOSNAP);
2512         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2513                                 "rbd", "get_parent",
2514                                 (char *) &snapid, sizeof (snapid),
2515                                 (char *) reply_buf, size, NULL);
2516         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2517         if (ret < 0)
2518                 goto out_err;
2519
2520         ret = -ERANGE;
2521         p = reply_buf;
2522         end = (char *) reply_buf + size;
2523         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2524         if (parent_spec->pool_id == CEPH_NOPOOL)
2525                 goto out;       /* No parent?  No problem. */
2526
2527         /* The ceph file layout needs to fit pool id in 32 bits */
2528
2529         ret = -EIO;
2530         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2531                 goto out;
2532
2533         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2534         if (IS_ERR(image_id)) {
2535                 ret = PTR_ERR(image_id);
2536                 goto out_err;
2537         }
2538         parent_spec->image_id = image_id;
2539         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2540         ceph_decode_64_safe(&p, end, overlap, out_err);
2541
2542         rbd_dev->parent_overlap = overlap;
2543         rbd_dev->parent_spec = parent_spec;
2544         parent_spec = NULL;     /* rbd_dev now owns this */
2545 out:
2546         ret = 0;
2547 out_err:
2548         kfree(reply_buf);
2549         rbd_spec_put(parent_spec);
2550
2551         return ret;
2552 }
2553
2554 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2555 {
2556         size_t image_id_size;
2557         char *image_id;
2558         void *p;
2559         void *end;
2560         size_t size;
2561         void *reply_buf = NULL;
2562         size_t len = 0;
2563         char *image_name = NULL;
2564         int ret;
2565
2566         rbd_assert(!rbd_dev->spec->image_name);
2567
2568         len = strlen(rbd_dev->spec->image_id);
2569         image_id_size = sizeof (__le32) + len;
2570         image_id = kmalloc(image_id_size, GFP_KERNEL);
2571         if (!image_id)
2572                 return NULL;
2573
2574         p = image_id;
2575         end = (char *) image_id + image_id_size;
2576         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2577
2578         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2579         reply_buf = kmalloc(size, GFP_KERNEL);
2580         if (!reply_buf)
2581                 goto out;
2582
2583         ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2584                                 "rbd", "dir_get_name",
2585                                 image_id, image_id_size,
2586                                 (char *) reply_buf, size, NULL);
2587         if (ret < 0)
2588                 goto out;
2589         p = reply_buf;
2590         end = (char *) reply_buf + size;
2591         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2592         if (IS_ERR(image_name))
2593                 image_name = NULL;
2594         else
2595                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2596 out:
2597         kfree(reply_buf);
2598         kfree(image_id);
2599
2600         return image_name;
2601 }
2602
2603 /*
2604  * When a parent image gets probed, we only have the pool, image,
2605  * and snapshot ids but not the names of any of them.  This call
2606  * is made later to fill in those names.  It has to be done after
2607  * rbd_dev_snaps_update() has completed because some of the
2608  * information (in particular, snapshot name) is not available
2609  * until then.
2610  */
2611 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2612 {
2613         struct ceph_osd_client *osdc;
2614         const char *name;
2615         void *reply_buf = NULL;
2616         int ret;
2617
2618         if (rbd_dev->spec->pool_name)
2619                 return 0;       /* Already have the names */
2620
2621         /* Look up the pool name */
2622
2623         osdc = &rbd_dev->rbd_client->client->osdc;
2624         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2625         if (!name) {
2626                 rbd_warn(rbd_dev, "there is no pool with id %llu",
2627                         rbd_dev->spec->pool_id);        /* Really a BUG() */
2628                 return -EIO;
2629         }
2630
2631         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2632         if (!rbd_dev->spec->pool_name)
2633                 return -ENOMEM;
2634
2635         /* Fetch the image name; tolerate failure here */
2636
2637         name = rbd_dev_image_name(rbd_dev);
2638         if (name)
2639                 rbd_dev->spec->image_name = (char *) name;
2640         else
2641                 rbd_warn(rbd_dev, "unable to get image name");
2642
2643         /* Look up the snapshot name. */
2644
2645         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2646         if (!name) {
2647                 rbd_warn(rbd_dev, "no snapshot with id %llu",
2648                         rbd_dev->spec->snap_id);        /* Really a BUG() */
2649                 ret = -EIO;
2650                 goto out_err;
2651         }
2652         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2653         if(!rbd_dev->spec->snap_name)
2654                 goto out_err;
2655
2656         return 0;
2657 out_err:
2658         kfree(reply_buf);
2659         kfree(rbd_dev->spec->pool_name);
2660         rbd_dev->spec->pool_name = NULL;
2661
2662         return ret;
2663 }
2664
2665 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2666 {
2667         size_t size;
2668         int ret;
2669         void *reply_buf;
2670         void *p;
2671         void *end;
2672         u64 seq;
2673         u32 snap_count;
2674         struct ceph_snap_context *snapc;
2675         u32 i;
2676
2677         /*
2678          * We'll need room for the seq value (maximum snapshot id),
2679          * snapshot count, and array of that many snapshot ids.
2680          * For now we have a fixed upper limit on the number we're
2681          * prepared to receive.
2682          */
2683         size = sizeof (__le64) + sizeof (__le32) +
2684                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2685         reply_buf = kzalloc(size, GFP_KERNEL);
2686         if (!reply_buf)
2687                 return -ENOMEM;
2688
2689         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2690                                 "rbd", "get_snapcontext",
2691                                 NULL, 0,
2692                                 reply_buf, size, ver);
2693         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2694         if (ret < 0)
2695                 goto out;
2696
2697         ret = -ERANGE;
2698         p = reply_buf;
2699         end = (char *) reply_buf + size;
2700         ceph_decode_64_safe(&p, end, seq, out);
2701         ceph_decode_32_safe(&p, end, snap_count, out);
2702
2703         /*
2704          * Make sure the reported number of snapshot ids wouldn't go
2705          * beyond the end of our buffer.  But before checking that,
2706          * make sure the computed size of the snapshot context we
2707          * allocate is representable in a size_t.
2708          */
2709         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2710                                  / sizeof (u64)) {
2711                 ret = -EINVAL;
2712                 goto out;
2713         }
2714         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2715                 goto out;
2716
2717         size = sizeof (struct ceph_snap_context) +
2718                                 snap_count * sizeof (snapc->snaps[0]);
2719         snapc = kmalloc(size, GFP_KERNEL);
2720         if (!snapc) {
2721                 ret = -ENOMEM;
2722                 goto out;
2723         }
2724
2725         atomic_set(&snapc->nref, 1);
2726         snapc->seq = seq;
2727         snapc->num_snaps = snap_count;
2728         for (i = 0; i < snap_count; i++)
2729                 snapc->snaps[i] = ceph_decode_64(&p);
2730
2731         rbd_dev->header.snapc = snapc;
2732
2733         dout("  snap context seq = %llu, snap_count = %u\n",
2734                 (unsigned long long) seq, (unsigned int) snap_count);
2735
2736 out:
2737         kfree(reply_buf);
2738
2739         return 0;
2740 }
2741
2742 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2743 {
2744         size_t size;
2745         void *reply_buf;
2746         __le64 snap_id;
2747         int ret;
2748         void *p;
2749         void *end;
2750         char *snap_name;
2751
2752         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2753         reply_buf = kmalloc(size, GFP_KERNEL);
2754         if (!reply_buf)
2755                 return ERR_PTR(-ENOMEM);
2756
2757         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2758         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2759                                 "rbd", "get_snapshot_name",
2760                                 (char *) &snap_id, sizeof (snap_id),
2761                                 reply_buf, size, NULL);
2762         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2763         if (ret < 0)
2764                 goto out;
2765
2766         p = reply_buf;
2767         end = (char *) reply_buf + size;
2768         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2769         if (IS_ERR(snap_name)) {
2770                 ret = PTR_ERR(snap_name);
2771                 goto out;
2772         } else {
2773                 dout("  snap_id 0x%016llx snap_name = %s\n",
2774                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2775         }
2776         kfree(reply_buf);
2777
2778         return snap_name;
2779 out:
2780         kfree(reply_buf);
2781
2782         return ERR_PTR(ret);
2783 }
2784
2785 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2786                 u64 *snap_size, u64 *snap_features)
2787 {
2788         __le64 snap_id;
2789         u8 order;
2790         int ret;
2791
2792         snap_id = rbd_dev->header.snapc->snaps[which];
2793         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2794         if (ret)
2795                 return ERR_PTR(ret);
2796         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2797         if (ret)
2798                 return ERR_PTR(ret);
2799
2800         return rbd_dev_v2_snap_name(rbd_dev, which);
2801 }
2802
2803 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2804                 u64 *snap_size, u64 *snap_features)
2805 {
2806         if (rbd_dev->image_format == 1)
2807                 return rbd_dev_v1_snap_info(rbd_dev, which,
2808                                         snap_size, snap_features);
2809         if (rbd_dev->image_format == 2)
2810                 return rbd_dev_v2_snap_info(rbd_dev, which,
2811                                         snap_size, snap_features);
2812         return ERR_PTR(-EINVAL);
2813 }
2814
2815 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2816 {
2817         int ret;
2818         __u8 obj_order;
2819
2820         down_write(&rbd_dev->header_rwsem);
2821
2822         /* Grab old order first, to see if it changes */
2823
2824         obj_order = rbd_dev->header.obj_order,
2825         ret = rbd_dev_v2_image_size(rbd_dev);
2826         if (ret)
2827                 goto out;
2828         if (rbd_dev->header.obj_order != obj_order) {
2829                 ret = -EIO;
2830                 goto out;
2831         }
2832         rbd_update_mapping_size(rbd_dev);
2833
2834         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2835         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2836         if (ret)
2837                 goto out;
2838         ret = rbd_dev_snaps_update(rbd_dev);
2839         dout("rbd_dev_snaps_update returned %d\n", ret);
2840         if (ret)
2841                 goto out;
2842         ret = rbd_dev_snaps_register(rbd_dev);
2843         dout("rbd_dev_snaps_register returned %d\n", ret);
2844 out:
2845         up_write(&rbd_dev->header_rwsem);
2846
2847         return ret;
2848 }
2849
2850 /*
2851  * Scan the rbd device's current snapshot list and compare it to the
2852  * newly-received snapshot context.  Remove any existing snapshots
2853  * not present in the new snapshot context.  Add a new snapshot for
2854  * any snaphots in the snapshot context not in the current list.
2855  * And verify there are no changes to snapshots we already know
2856  * about.
2857  *
2858  * Assumes the snapshots in the snapshot context are sorted by
2859  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2860  * are also maintained in that order.)
2861  */
2862 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2863 {
2864         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2865         const u32 snap_count = snapc->num_snaps;
2866         struct list_head *head = &rbd_dev->snaps;
2867         struct list_head *links = head->next;
2868         u32 index = 0;
2869
2870         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2871         while (index < snap_count || links != head) {
2872                 u64 snap_id;
2873                 struct rbd_snap *snap;
2874                 char *snap_name;
2875                 u64 snap_size = 0;
2876                 u64 snap_features = 0;
2877
2878                 snap_id = index < snap_count ? snapc->snaps[index]
2879                                              : CEPH_NOSNAP;
2880                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2881                                      : NULL;
2882                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2883
2884                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2885                         struct list_head *next = links->next;
2886
2887                         /* Existing snapshot not in the new snap context */
2888
2889                         if (rbd_dev->spec->snap_id == snap->id)
2890                                 atomic_set(&rbd_dev->exists, 0);
2891                         rbd_remove_snap_dev(snap);
2892                         dout("%ssnap id %llu has been removed\n",
2893                                 rbd_dev->spec->snap_id == snap->id ?
2894                                                         "mapped " : "",
2895                                 (unsigned long long) snap->id);
2896
2897                         /* Done with this list entry; advance */
2898
2899                         links = next;
2900                         continue;
2901                 }
2902
2903                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2904                                         &snap_size, &snap_features);
2905                 if (IS_ERR(snap_name))
2906                         return PTR_ERR(snap_name);
2907
2908                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2909                         (unsigned long long) snap_id);
2910                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2911                         struct rbd_snap *new_snap;
2912
2913                         /* We haven't seen this snapshot before */
2914
2915                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2916                                         snap_id, snap_size, snap_features);
2917                         if (IS_ERR(new_snap)) {
2918                                 int err = PTR_ERR(new_snap);
2919
2920                                 dout("  failed to add dev, error %d\n", err);
2921
2922                                 return err;
2923                         }
2924
2925                         /* New goes before existing, or at end of list */
2926
2927                         dout("  added dev%s\n", snap ? "" : " at end\n");
2928                         if (snap)
2929                                 list_add_tail(&new_snap->node, &snap->node);
2930                         else
2931                                 list_add_tail(&new_snap->node, head);
2932                 } else {
2933                         /* Already have this one */
2934
2935                         dout("  already present\n");
2936
2937                         rbd_assert(snap->size == snap_size);
2938                         rbd_assert(!strcmp(snap->name, snap_name));
2939                         rbd_assert(snap->features == snap_features);
2940
2941                         /* Done with this list entry; advance */
2942
2943                         links = links->next;
2944                 }
2945
2946                 /* Advance to the next entry in the snapshot context */
2947
2948                 index++;
2949         }
2950         dout("%s: done\n", __func__);
2951
2952         return 0;
2953 }
2954
2955 /*
2956  * Scan the list of snapshots and register the devices for any that
2957  * have not already been registered.
2958  */
2959 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2960 {
2961         struct rbd_snap *snap;
2962         int ret = 0;
2963
2964         dout("%s called\n", __func__);
2965         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2966                 return -EIO;
2967
2968         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2969                 if (!rbd_snap_registered(snap)) {
2970                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2971                         if (ret < 0)
2972                                 break;
2973                 }
2974         }
2975         dout("%s: returning %d\n", __func__, ret);
2976
2977         return ret;
2978 }
2979
2980 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2981 {
2982         struct device *dev;
2983         int ret;
2984
2985         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2986
2987         dev = &rbd_dev->dev;
2988         dev->bus = &rbd_bus_type;
2989         dev->type = &rbd_device_type;
2990         dev->parent = &rbd_root_dev;
2991         dev->release = rbd_dev_release;
2992         dev_set_name(dev, "%d", rbd_dev->dev_id);
2993         ret = device_register(dev);
2994
2995         mutex_unlock(&ctl_mutex);
2996
2997         return ret;
2998 }
2999
3000 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3001 {
3002         device_unregister(&rbd_dev->dev);
3003 }
3004
3005 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
3006 {
3007         int ret, rc;
3008
3009         do {
3010                 ret = rbd_req_sync_watch(rbd_dev, 1);
3011                 if (ret == -ERANGE) {
3012                         rc = rbd_dev_refresh(rbd_dev, NULL);
3013                         if (rc < 0)
3014                                 return rc;
3015                 }
3016         } while (ret == -ERANGE);
3017
3018         return ret;
3019 }
3020
3021 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3022
3023 /*
3024  * Get a unique rbd identifier for the given new rbd_dev, and add
3025  * the rbd_dev to the global list.  The minimum rbd id is 1.
3026  */
3027 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3028 {
3029         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3030
3031         spin_lock(&rbd_dev_list_lock);
3032         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3033         spin_unlock(&rbd_dev_list_lock);
3034         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3035                 (unsigned long long) rbd_dev->dev_id);
3036 }
3037
3038 /*
3039  * Remove an rbd_dev from the global list, and record that its
3040  * identifier is no longer in use.
3041  */
3042 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3043 {
3044         struct list_head *tmp;
3045         int rbd_id = rbd_dev->dev_id;
3046         int max_id;
3047
3048         rbd_assert(rbd_id > 0);
3049
3050         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3051                 (unsigned long long) rbd_dev->dev_id);
3052         spin_lock(&rbd_dev_list_lock);
3053         list_del_init(&rbd_dev->node);
3054
3055         /*
3056          * If the id being "put" is not the current maximum, there
3057          * is nothing special we need to do.
3058          */
3059         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3060                 spin_unlock(&rbd_dev_list_lock);
3061                 return;
3062         }
3063
3064         /*
3065          * We need to update the current maximum id.  Search the
3066          * list to find out what it is.  We're more likely to find
3067          * the maximum at the end, so search the list backward.
3068          */
3069         max_id = 0;
3070         list_for_each_prev(tmp, &rbd_dev_list) {
3071                 struct rbd_device *rbd_dev;
3072
3073                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3074                 if (rbd_dev->dev_id > max_id)
3075                         max_id = rbd_dev->dev_id;
3076         }
3077         spin_unlock(&rbd_dev_list_lock);
3078
3079         /*
3080          * The max id could have been updated by rbd_dev_id_get(), in
3081          * which case it now accurately reflects the new maximum.
3082          * Be careful not to overwrite the maximum value in that
3083          * case.
3084          */
3085         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3086         dout("  max dev id has been reset\n");
3087 }
3088
3089 /*
3090  * Skips over white space at *buf, and updates *buf to point to the
3091  * first found non-space character (if any). Returns the length of
3092  * the token (string of non-white space characters) found.  Note
3093  * that *buf must be terminated with '\0'.
3094  */
3095 static inline size_t next_token(const char **buf)
3096 {
3097         /*
3098         * These are the characters that produce nonzero for
3099         * isspace() in the "C" and "POSIX" locales.
3100         */
3101         const char *spaces = " \f\n\r\t\v";
3102
3103         *buf += strspn(*buf, spaces);   /* Find start of token */
3104
3105         return strcspn(*buf, spaces);   /* Return token length */
3106 }
3107
3108 /*
3109  * Finds the next token in *buf, and if the provided token buffer is
3110  * big enough, copies the found token into it.  The result, if
3111  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3112  * must be terminated with '\0' on entry.
3113  *
3114  * Returns the length of the token found (not including the '\0').
3115  * Return value will be 0 if no token is found, and it will be >=
3116  * token_size if the token would not fit.
3117  *
3118  * The *buf pointer will be updated to point beyond the end of the
3119  * found token.  Note that this occurs even if the token buffer is
3120  * too small to hold it.
3121  */
3122 static inline size_t copy_token(const char **buf,
3123                                 char *token,
3124                                 size_t token_size)
3125 {
3126         size_t len;
3127
3128         len = next_token(buf);
3129         if (len < token_size) {
3130                 memcpy(token, *buf, len);
3131                 *(token + len) = '\0';
3132         }
3133         *buf += len;
3134
3135         return len;
3136 }
3137
3138 /*
3139  * Finds the next token in *buf, dynamically allocates a buffer big
3140  * enough to hold a copy of it, and copies the token into the new
3141  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3142  * that a duplicate buffer is created even for a zero-length token.
3143  *
3144  * Returns a pointer to the newly-allocated duplicate, or a null
3145  * pointer if memory for the duplicate was not available.  If
3146  * the lenp argument is a non-null pointer, the length of the token
3147  * (not including the '\0') is returned in *lenp.
3148  *
3149  * If successful, the *buf pointer will be updated to point beyond
3150  * the end of the found token.
3151  *
3152  * Note: uses GFP_KERNEL for allocation.
3153  */
3154 static inline char *dup_token(const char **buf, size_t *lenp)
3155 {
3156         char *dup;
3157         size_t len;
3158
3159         len = next_token(buf);
3160         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3161         if (!dup)
3162                 return NULL;
3163         *(dup + len) = '\0';
3164         *buf += len;
3165
3166         if (lenp)
3167                 *lenp = len;
3168
3169         return dup;
3170 }
3171
3172 /*
3173  * Parse the options provided for an "rbd add" (i.e., rbd image
3174  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3175  * and the data written is passed here via a NUL-terminated buffer.
3176  * Returns 0 if successful or an error code otherwise.
3177  *
3178  * The information extracted from these options is recorded in
3179  * the other parameters which return dynamically-allocated
3180  * structures:
3181  *  ceph_opts
3182  *      The address of a pointer that will refer to a ceph options
3183  *      structure.  Caller must release the returned pointer using
3184  *      ceph_destroy_options() when it is no longer needed.
3185  *  rbd_opts
3186  *      Address of an rbd options pointer.  Fully initialized by
3187  *      this function; caller must release with kfree().
3188  *  spec
3189  *      Address of an rbd image specification pointer.  Fully
3190  *      initialized by this function based on parsed options.
3191  *      Caller must release with rbd_spec_put().
3192  *
3193  * The options passed take this form:
3194  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3195  * where:
3196  *  <mon_addrs>
3197  *      A comma-separated list of one or more monitor addresses.
3198  *      A monitor address is an ip address, optionally followed
3199  *      by a port number (separated by a colon).
3200  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3201  *  <options>
3202  *      A comma-separated list of ceph and/or rbd options.
3203  *  <pool_name>
3204  *      The name of the rados pool containing the rbd image.
3205  *  <image_name>
3206  *      The name of the image in that pool to map.
3207  *  <snap_id>
3208  *      An optional snapshot id.  If provided, the mapping will
3209  *      present data from the image at the time that snapshot was
3210  *      created.  The image head is used if no snapshot id is
3211  *      provided.  Snapshot mappings are always read-only.
3212  */
3213 static int rbd_add_parse_args(const char *buf,
3214                                 struct ceph_options **ceph_opts,
3215                                 struct rbd_options **opts,
3216                                 struct rbd_spec **rbd_spec)
3217 {
3218         size_t len;
3219         char *options;
3220         const char *mon_addrs;
3221         size_t mon_addrs_size;
3222         struct rbd_spec *spec = NULL;
3223         struct rbd_options *rbd_opts = NULL;
3224         struct ceph_options *copts;
3225         int ret;
3226
3227         /* The first four tokens are required */
3228
3229         len = next_token(&buf);
3230         if (!len) {
3231                 rbd_warn(NULL, "no monitor address(es) provided");
3232                 return -EINVAL;
3233         }
3234         mon_addrs = buf;
3235         mon_addrs_size = len + 1;
3236         buf += len;
3237
3238         ret = -EINVAL;
3239         options = dup_token(&buf, NULL);
3240         if (!options)
3241                 return -ENOMEM;
3242         if (!*options) {
3243                 rbd_warn(NULL, "no options provided");
3244                 goto out_err;
3245         }
3246
3247         spec = rbd_spec_alloc();
3248         if (!spec)
3249                 goto out_mem;
3250
3251         spec->pool_name = dup_token(&buf, NULL);
3252         if (!spec->pool_name)
3253                 goto out_mem;
3254         if (!*spec->pool_name) {
3255                 rbd_warn(NULL, "no pool name provided");
3256                 goto out_err;
3257         }
3258
3259         spec->image_name = dup_token(&buf, NULL);
3260         if (!spec->image_name)
3261                 goto out_mem;
3262         if (!*spec->image_name) {
3263                 rbd_warn(NULL, "no image name provided");
3264                 goto out_err;
3265         }
3266
3267         /*
3268          * Snapshot name is optional; default is to use "-"
3269          * (indicating the head/no snapshot).
3270          */
3271         len = next_token(&buf);
3272         if (!len) {
3273                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3274                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3275         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3276                 ret = -ENAMETOOLONG;
3277                 goto out_err;
3278         }
3279         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3280         if (!spec->snap_name)
3281                 goto out_mem;
3282         *(spec->snap_name + len) = '\0';
3283
3284         /* Initialize all rbd options to the defaults */
3285
3286         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3287         if (!rbd_opts)
3288                 goto out_mem;
3289
3290         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3291
3292         copts = ceph_parse_options(options, mon_addrs,
3293                                         mon_addrs + mon_addrs_size - 1,
3294                                         parse_rbd_opts_token, rbd_opts);
3295         if (IS_ERR(copts)) {
3296                 ret = PTR_ERR(copts);
3297                 goto out_err;
3298         }
3299         kfree(options);
3300
3301         *ceph_opts = copts;
3302         *opts = rbd_opts;
3303         *rbd_spec = spec;
3304
3305         return 0;
3306 out_mem:
3307         ret = -ENOMEM;
3308 out_err:
3309         kfree(rbd_opts);
3310         rbd_spec_put(spec);
3311         kfree(options);
3312
3313         return ret;
3314 }
3315
3316 /*
3317  * An rbd format 2 image has a unique identifier, distinct from the
3318  * name given to it by the user.  Internally, that identifier is
3319  * what's used to specify the names of objects related to the image.
3320  *
3321  * A special "rbd id" object is used to map an rbd image name to its
3322  * id.  If that object doesn't exist, then there is no v2 rbd image
3323  * with the supplied name.
3324  *
3325  * This function will record the given rbd_dev's image_id field if
3326  * it can be determined, and in that case will return 0.  If any
3327  * errors occur a negative errno will be returned and the rbd_dev's
3328  * image_id field will be unchanged (and should be NULL).
3329  */
3330 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3331 {
3332         int ret;
3333         size_t size;
3334         char *object_name;
3335         void *response;
3336         void *p;
3337
3338         /*
3339          * When probing a parent image, the image id is already
3340          * known (and the image name likely is not).  There's no
3341          * need to fetch the image id again in this case.
3342          */
3343         if (rbd_dev->spec->image_id)
3344                 return 0;
3345
3346         /*
3347          * First, see if the format 2 image id file exists, and if
3348          * so, get the image's persistent id from it.
3349          */
3350         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3351         object_name = kmalloc(size, GFP_NOIO);
3352         if (!object_name)
3353                 return -ENOMEM;
3354         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3355         dout("rbd id object name is %s\n", object_name);
3356
3357         /* Response will be an encoded string, which includes a length */
3358
3359         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3360         response = kzalloc(size, GFP_NOIO);
3361         if (!response) {
3362                 ret = -ENOMEM;
3363                 goto out;
3364         }
3365
3366         ret = rbd_req_sync_exec(rbd_dev, object_name,
3367                                 "rbd", "get_id",
3368                                 NULL, 0,
3369                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3370         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3371         if (ret < 0)
3372                 goto out;
3373         ret = 0;    /* rbd_req_sync_exec() can return positive */
3374
3375         p = response;
3376         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3377                                                 p + RBD_IMAGE_ID_LEN_MAX,
3378                                                 NULL, GFP_NOIO);
3379         if (IS_ERR(rbd_dev->spec->image_id)) {
3380                 ret = PTR_ERR(rbd_dev->spec->image_id);
3381                 rbd_dev->spec->image_id = NULL;
3382         } else {
3383                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3384         }
3385 out:
3386         kfree(response);
3387         kfree(object_name);
3388
3389         return ret;
3390 }
3391
3392 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3393 {
3394         int ret;
3395         size_t size;
3396
3397         /* Version 1 images have no id; empty string is used */
3398
3399         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3400         if (!rbd_dev->spec->image_id)
3401                 return -ENOMEM;
3402
3403         /* Record the header object name for this rbd image. */
3404
3405         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3406         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3407         if (!rbd_dev->header_name) {
3408                 ret = -ENOMEM;
3409                 goto out_err;
3410         }
3411         sprintf(rbd_dev->header_name, "%s%s",
3412                 rbd_dev->spec->image_name, RBD_SUFFIX);
3413
3414         /* Populate rbd image metadata */
3415
3416         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3417         if (ret < 0)
3418                 goto out_err;
3419
3420         /* Version 1 images have no parent (no layering) */
3421
3422         rbd_dev->parent_spec = NULL;
3423         rbd_dev->parent_overlap = 0;
3424
3425         rbd_dev->image_format = 1;
3426
3427         dout("discovered version 1 image, header name is %s\n",
3428                 rbd_dev->header_name);
3429
3430         return 0;
3431
3432 out_err:
3433         kfree(rbd_dev->header_name);
3434         rbd_dev->header_name = NULL;
3435         kfree(rbd_dev->spec->image_id);
3436         rbd_dev->spec->image_id = NULL;
3437
3438         return ret;
3439 }
3440
3441 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3442 {
3443         size_t size;
3444         int ret;
3445         u64 ver = 0;
3446
3447         /*
3448          * Image id was filled in by the caller.  Record the header
3449          * object name for this rbd image.
3450          */
3451         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3452         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3453         if (!rbd_dev->header_name)
3454                 return -ENOMEM;
3455         sprintf(rbd_dev->header_name, "%s%s",
3456                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3457
3458         /* Get the size and object order for the image */
3459
3460         ret = rbd_dev_v2_image_size(rbd_dev);
3461         if (ret < 0)
3462                 goto out_err;
3463
3464         /* Get the object prefix (a.k.a. block_name) for the image */
3465
3466         ret = rbd_dev_v2_object_prefix(rbd_dev);
3467         if (ret < 0)
3468                 goto out_err;
3469
3470         /* Get the and check features for the image */
3471
3472         ret = rbd_dev_v2_features(rbd_dev);
3473         if (ret < 0)
3474                 goto out_err;
3475
3476         /* If the image supports layering, get the parent info */
3477
3478         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3479                 ret = rbd_dev_v2_parent_info(rbd_dev);
3480                 if (ret < 0)
3481                         goto out_err;
3482         }
3483
3484         /* crypto and compression type aren't (yet) supported for v2 images */
3485
3486         rbd_dev->header.crypt_type = 0;
3487         rbd_dev->header.comp_type = 0;
3488
3489         /* Get the snapshot context, plus the header version */
3490
3491         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3492         if (ret)
3493                 goto out_err;
3494         rbd_dev->header.obj_version = ver;
3495
3496         rbd_dev->image_format = 2;
3497
3498         dout("discovered version 2 image, header name is %s\n",
3499                 rbd_dev->header_name);
3500
3501         return 0;
3502 out_err:
3503         rbd_dev->parent_overlap = 0;
3504         rbd_spec_put(rbd_dev->parent_spec);
3505         rbd_dev->parent_spec = NULL;
3506         kfree(rbd_dev->header_name);
3507         rbd_dev->header_name = NULL;
3508         kfree(rbd_dev->header.object_prefix);
3509         rbd_dev->header.object_prefix = NULL;
3510
3511         return ret;
3512 }
3513
3514 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3515 {
3516         int ret;
3517
3518         /* no need to lock here, as rbd_dev is not registered yet */
3519         ret = rbd_dev_snaps_update(rbd_dev);
3520         if (ret)
3521                 return ret;
3522
3523         ret = rbd_dev_probe_update_spec(rbd_dev);
3524         if (ret)
3525                 goto err_out_snaps;
3526
3527         ret = rbd_dev_set_mapping(rbd_dev);
3528         if (ret)
3529                 goto err_out_snaps;
3530
3531         /* generate unique id: find highest unique id, add one */
3532         rbd_dev_id_get(rbd_dev);
3533
3534         /* Fill in the device name, now that we have its id. */
3535         BUILD_BUG_ON(DEV_NAME_LEN
3536                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3537         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3538
3539         /* Get our block major device number. */
3540
3541         ret = register_blkdev(0, rbd_dev->name);
3542         if (ret < 0)
3543                 goto err_out_id;
3544         rbd_dev->major = ret;
3545
3546         /* Set up the blkdev mapping. */
3547
3548         ret = rbd_init_disk(rbd_dev);
3549         if (ret)
3550                 goto err_out_blkdev;
3551
3552         ret = rbd_bus_add_dev(rbd_dev);
3553         if (ret)
3554                 goto err_out_disk;
3555
3556         /*
3557          * At this point cleanup in the event of an error is the job
3558          * of the sysfs code (initiated by rbd_bus_del_dev()).
3559          */
3560         down_write(&rbd_dev->header_rwsem);
3561         ret = rbd_dev_snaps_register(rbd_dev);
3562         up_write(&rbd_dev->header_rwsem);
3563         if (ret)
3564                 goto err_out_bus;
3565
3566         ret = rbd_init_watch_dev(rbd_dev);
3567         if (ret)
3568                 goto err_out_bus;
3569
3570         /* Everything's ready.  Announce the disk to the world. */
3571
3572         add_disk(rbd_dev->disk);
3573
3574         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3575                 (unsigned long long) rbd_dev->mapping.size);
3576
3577         return ret;
3578 err_out_bus:
3579         /* this will also clean up rest of rbd_dev stuff */
3580
3581         rbd_bus_del_dev(rbd_dev);
3582
3583         return ret;
3584 err_out_disk:
3585         rbd_free_disk(rbd_dev);
3586 err_out_blkdev:
3587         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3588 err_out_id:
3589         rbd_dev_id_put(rbd_dev);
3590 err_out_snaps:
3591         rbd_remove_all_snaps(rbd_dev);
3592
3593         return ret;
3594 }
3595
3596 /*
3597  * Probe for the existence of the header object for the given rbd
3598  * device.  For format 2 images this includes determining the image
3599  * id.
3600  */
3601 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3602 {
3603         int ret;
3604
3605         /*
3606          * Get the id from the image id object.  If it's not a
3607          * format 2 image, we'll get ENOENT back, and we'll assume
3608          * it's a format 1 image.
3609          */
3610         ret = rbd_dev_image_id(rbd_dev);
3611         if (ret)
3612                 ret = rbd_dev_v1_probe(rbd_dev);
3613         else
3614                 ret = rbd_dev_v2_probe(rbd_dev);
3615         if (ret) {
3616                 dout("probe failed, returning %d\n", ret);
3617
3618                 return ret;
3619         }
3620
3621         ret = rbd_dev_probe_finish(rbd_dev);
3622         if (ret)
3623                 rbd_header_free(&rbd_dev->header);
3624
3625         return ret;
3626 }
3627
3628 static ssize_t rbd_add(struct bus_type *bus,
3629                        const char *buf,
3630                        size_t count)
3631 {
3632         struct rbd_device *rbd_dev = NULL;
3633         struct ceph_options *ceph_opts = NULL;
3634         struct rbd_options *rbd_opts = NULL;
3635         struct rbd_spec *spec = NULL;
3636         struct rbd_client *rbdc;
3637         struct ceph_osd_client *osdc;
3638         int rc = -ENOMEM;
3639
3640         if (!try_module_get(THIS_MODULE))
3641                 return -ENODEV;
3642
3643         /* parse add command */
3644         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3645         if (rc < 0)
3646                 goto err_out_module;
3647
3648         rbdc = rbd_get_client(ceph_opts);
3649         if (IS_ERR(rbdc)) {
3650                 rc = PTR_ERR(rbdc);
3651                 goto err_out_args;
3652         }
3653         ceph_opts = NULL;       /* rbd_dev client now owns this */
3654
3655         /* pick the pool */
3656         osdc = &rbdc->client->osdc;
3657         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3658         if (rc < 0)
3659                 goto err_out_client;
3660         spec->pool_id = (u64) rc;
3661
3662         /* The ceph file layout needs to fit pool id in 32 bits */
3663
3664         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
3665                 rc = -EIO;
3666                 goto err_out_client;
3667         }
3668
3669         rbd_dev = rbd_dev_create(rbdc, spec);
3670         if (!rbd_dev)
3671                 goto err_out_client;
3672         rbdc = NULL;            /* rbd_dev now owns this */
3673         spec = NULL;            /* rbd_dev now owns this */
3674
3675         rbd_dev->mapping.read_only = rbd_opts->read_only;
3676         kfree(rbd_opts);
3677         rbd_opts = NULL;        /* done with this */
3678
3679         rc = rbd_dev_probe(rbd_dev);
3680         if (rc < 0)
3681                 goto err_out_rbd_dev;
3682
3683         return count;
3684 err_out_rbd_dev:
3685         rbd_dev_destroy(rbd_dev);
3686 err_out_client:
3687         rbd_put_client(rbdc);
3688 err_out_args:
3689         if (ceph_opts)
3690                 ceph_destroy_options(ceph_opts);
3691         kfree(rbd_opts);
3692         rbd_spec_put(spec);
3693 err_out_module:
3694         module_put(THIS_MODULE);
3695
3696         dout("Error adding device %s\n", buf);
3697
3698         return (ssize_t) rc;
3699 }
3700
3701 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3702 {
3703         struct list_head *tmp;
3704         struct rbd_device *rbd_dev;
3705
3706         spin_lock(&rbd_dev_list_lock);
3707         list_for_each(tmp, &rbd_dev_list) {
3708                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3709                 if (rbd_dev->dev_id == dev_id) {
3710                         spin_unlock(&rbd_dev_list_lock);
3711                         return rbd_dev;
3712                 }
3713         }
3714         spin_unlock(&rbd_dev_list_lock);
3715         return NULL;
3716 }
3717
3718 static void rbd_dev_release(struct device *dev)
3719 {
3720         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3721
3722         if (rbd_dev->watch_request) {
3723                 struct ceph_client *client = rbd_dev->rbd_client->client;
3724
3725                 ceph_osdc_unregister_linger_request(&client->osdc,
3726                                                     rbd_dev->watch_request);
3727         }
3728         if (rbd_dev->watch_event)
3729                 rbd_req_sync_watch(rbd_dev, 0);
3730
3731         /* clean up and free blkdev */
3732         rbd_free_disk(rbd_dev);
3733         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3734
3735         /* release allocated disk header fields */
3736         rbd_header_free(&rbd_dev->header);
3737
3738         /* done with the id, and with the rbd_dev */
3739         rbd_dev_id_put(rbd_dev);
3740         rbd_assert(rbd_dev->rbd_client != NULL);
3741         rbd_dev_destroy(rbd_dev);
3742
3743         /* release module ref */
3744         module_put(THIS_MODULE);
3745 }
3746
3747 static ssize_t rbd_remove(struct bus_type *bus,
3748                           const char *buf,
3749                           size_t count)
3750 {
3751         struct rbd_device *rbd_dev = NULL;
3752         int target_id, rc;
3753         unsigned long ul;
3754         int ret = count;
3755
3756         rc = strict_strtoul(buf, 10, &ul);
3757         if (rc)
3758                 return rc;
3759
3760         /* convert to int; abort if we lost anything in the conversion */
3761         target_id = (int) ul;
3762         if (target_id != ul)
3763                 return -EINVAL;
3764
3765         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3766
3767         rbd_dev = __rbd_get_dev(target_id);
3768         if (!rbd_dev) {
3769                 ret = -ENOENT;
3770                 goto done;
3771         }
3772
3773         if (rbd_dev->open_count) {
3774                 ret = -EBUSY;
3775                 goto done;
3776         }
3777
3778         rbd_remove_all_snaps(rbd_dev);
3779         rbd_bus_del_dev(rbd_dev);
3780
3781 done:
3782         mutex_unlock(&ctl_mutex);
3783
3784         return ret;
3785 }
3786
3787 /*
3788  * create control files in sysfs
3789  * /sys/bus/rbd/...
3790  */
3791 static int rbd_sysfs_init(void)
3792 {
3793         int ret;
3794
3795         ret = device_register(&rbd_root_dev);
3796         if (ret < 0)
3797                 return ret;
3798
3799         ret = bus_register(&rbd_bus_type);
3800         if (ret < 0)
3801                 device_unregister(&rbd_root_dev);
3802
3803         return ret;
3804 }
3805
3806 static void rbd_sysfs_cleanup(void)
3807 {
3808         bus_unregister(&rbd_bus_type);
3809         device_unregister(&rbd_root_dev);
3810 }
3811
3812 int __init rbd_init(void)
3813 {
3814         int rc;
3815
3816         rc = rbd_sysfs_init();
3817         if (rc)
3818                 return rc;
3819         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3820         return 0;
3821 }
3822
3823 void __exit rbd_exit(void)
3824 {
3825         rbd_sysfs_cleanup();
3826 }
3827
3828 module_init(rbd_init);
3829 module_exit(rbd_exit);
3830
3831 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3832 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3833 MODULE_DESCRIPTION("rados block device");
3834
3835 /* following authorship retained from original osdblk.c */
3836 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3837
3838 MODULE_LICENSE("GPL");