]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 /*
166  * a request completion status
167  */
168 struct rbd_req_status {
169         int done;
170         s32 rc;
171         u64 bytes;
172 };
173
174 /*
175  * a collection of requests
176  */
177 struct rbd_req_coll {
178         int                     total;
179         int                     num_done;
180         struct kref             kref;
181         struct rbd_req_status   status[0];
182 };
183
184 /*
185  * a single io request
186  */
187 struct rbd_request {
188         struct request          *rq;            /* blk layer request */
189         struct bio              *bio;           /* cloned bio */
190         struct page             **pages;        /* list of used pages */
191         u64                     len;
192         int                     coll_index;
193         struct rbd_req_coll     *coll;
194 };
195
196 struct rbd_snap {
197         struct  device          dev;
198         const char              *name;
199         u64                     size;
200         struct list_head        node;
201         u64                     id;
202         u64                     features;
203 };
204
205 struct rbd_mapping {
206         u64                     size;
207         u64                     features;
208         bool                    read_only;
209 };
210
211 /*
212  * a single device
213  */
214 struct rbd_device {
215         int                     dev_id;         /* blkdev unique id */
216
217         int                     major;          /* blkdev assigned major */
218         struct gendisk          *disk;          /* blkdev's gendisk and rq */
219
220         u32                     image_format;   /* Either 1 or 2 */
221         struct rbd_client       *rbd_client;
222
223         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
224
225         spinlock_t              lock;           /* queue lock */
226
227         struct rbd_image_header header;
228         atomic_t                exists;
229         struct rbd_spec         *spec;
230
231         char                    *header_name;
232
233         struct ceph_file_layout layout;
234
235         struct ceph_osd_event   *watch_event;
236         struct ceph_osd_request *watch_request;
237
238         struct rbd_spec         *parent_spec;
239         u64                     parent_overlap;
240
241         /* protects updating the header */
242         struct rw_semaphore     header_rwsem;
243
244         struct rbd_mapping      mapping;
245
246         struct list_head        node;
247
248         /* list of snapshots */
249         struct list_head        snaps;
250
251         /* sysfs related */
252         struct device           dev;
253         unsigned long           open_count;
254 };
255
256 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
257
258 static LIST_HEAD(rbd_dev_list);    /* devices */
259 static DEFINE_SPINLOCK(rbd_dev_list_lock);
260
261 static LIST_HEAD(rbd_client_list);              /* clients */
262 static DEFINE_SPINLOCK(rbd_client_list_lock);
263
264 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
265 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
266
267 static void rbd_dev_release(struct device *dev);
268 static void rbd_remove_snap_dev(struct rbd_snap *snap);
269
270 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
271                        size_t count);
272 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
273                           size_t count);
274
275 static struct bus_attribute rbd_bus_attrs[] = {
276         __ATTR(add, S_IWUSR, NULL, rbd_add),
277         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
278         __ATTR_NULL
279 };
280
281 static struct bus_type rbd_bus_type = {
282         .name           = "rbd",
283         .bus_attrs      = rbd_bus_attrs,
284 };
285
286 static void rbd_root_dev_release(struct device *dev)
287 {
288 }
289
290 static struct device rbd_root_dev = {
291         .init_name =    "rbd",
292         .release =      rbd_root_dev_release,
293 };
294
295 static __printf(2, 3)
296 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
297 {
298         struct va_format vaf;
299         va_list args;
300
301         va_start(args, fmt);
302         vaf.fmt = fmt;
303         vaf.va = &args;
304
305         if (!rbd_dev)
306                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
307         else if (rbd_dev->disk)
308                 printk(KERN_WARNING "%s: %s: %pV\n",
309                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
310         else if (rbd_dev->spec && rbd_dev->spec->image_name)
311                 printk(KERN_WARNING "%s: image %s: %pV\n",
312                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
313         else if (rbd_dev->spec && rbd_dev->spec->image_id)
314                 printk(KERN_WARNING "%s: id %s: %pV\n",
315                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
316         else    /* punt */
317                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
318                         RBD_DRV_NAME, rbd_dev, &vaf);
319         va_end(args);
320 }
321
322 #ifdef RBD_DEBUG
323 #define rbd_assert(expr)                                                \
324                 if (unlikely(!(expr))) {                                \
325                         printk(KERN_ERR "\nAssertion failure in %s() "  \
326                                                 "at line %d:\n\n"       \
327                                         "\trbd_assert(%s);\n\n",        \
328                                         __func__, __LINE__, #expr);     \
329                         BUG();                                          \
330                 }
331 #else /* !RBD_DEBUG */
332 #  define rbd_assert(expr)      ((void) 0)
333 #endif /* !RBD_DEBUG */
334
335 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
336 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
337
338 static int rbd_open(struct block_device *bdev, fmode_t mode)
339 {
340         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
341
342         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
343                 return -EROFS;
344
345         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
346         (void) get_device(&rbd_dev->dev);
347         set_device_ro(bdev, rbd_dev->mapping.read_only);
348         rbd_dev->open_count++;
349         mutex_unlock(&ctl_mutex);
350
351         return 0;
352 }
353
354 static int rbd_release(struct gendisk *disk, fmode_t mode)
355 {
356         struct rbd_device *rbd_dev = disk->private_data;
357
358         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
359         rbd_assert(rbd_dev->open_count > 0);
360         rbd_dev->open_count--;
361         put_device(&rbd_dev->dev);
362         mutex_unlock(&ctl_mutex);
363
364         return 0;
365 }
366
367 static const struct block_device_operations rbd_bd_ops = {
368         .owner                  = THIS_MODULE,
369         .open                   = rbd_open,
370         .release                = rbd_release,
371 };
372
373 /*
374  * Initialize an rbd client instance.
375  * We own *ceph_opts.
376  */
377 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
378 {
379         struct rbd_client *rbdc;
380         int ret = -ENOMEM;
381
382         dout("rbd_client_create\n");
383         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
384         if (!rbdc)
385                 goto out_opt;
386
387         kref_init(&rbdc->kref);
388         INIT_LIST_HEAD(&rbdc->node);
389
390         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
391
392         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
393         if (IS_ERR(rbdc->client))
394                 goto out_mutex;
395         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
396
397         ret = ceph_open_session(rbdc->client);
398         if (ret < 0)
399                 goto out_err;
400
401         spin_lock(&rbd_client_list_lock);
402         list_add_tail(&rbdc->node, &rbd_client_list);
403         spin_unlock(&rbd_client_list_lock);
404
405         mutex_unlock(&ctl_mutex);
406
407         dout("rbd_client_create created %p\n", rbdc);
408         return rbdc;
409
410 out_err:
411         ceph_destroy_client(rbdc->client);
412 out_mutex:
413         mutex_unlock(&ctl_mutex);
414         kfree(rbdc);
415 out_opt:
416         if (ceph_opts)
417                 ceph_destroy_options(ceph_opts);
418         return ERR_PTR(ret);
419 }
420
421 /*
422  * Find a ceph client with specific addr and configuration.  If
423  * found, bump its reference count.
424  */
425 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
426 {
427         struct rbd_client *client_node;
428         bool found = false;
429
430         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
431                 return NULL;
432
433         spin_lock(&rbd_client_list_lock);
434         list_for_each_entry(client_node, &rbd_client_list, node) {
435                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
436                         kref_get(&client_node->kref);
437                         found = true;
438                         break;
439                 }
440         }
441         spin_unlock(&rbd_client_list_lock);
442
443         return found ? client_node : NULL;
444 }
445
446 /*
447  * mount options
448  */
449 enum {
450         Opt_last_int,
451         /* int args above */
452         Opt_last_string,
453         /* string args above */
454         Opt_read_only,
455         Opt_read_write,
456         /* Boolean args above */
457         Opt_last_bool,
458 };
459
460 static match_table_t rbd_opts_tokens = {
461         /* int args above */
462         /* string args above */
463         {Opt_read_only, "read_only"},
464         {Opt_read_only, "ro"},          /* Alternate spelling */
465         {Opt_read_write, "read_write"},
466         {Opt_read_write, "rw"},         /* Alternate spelling */
467         /* Boolean args above */
468         {-1, NULL}
469 };
470
471 struct rbd_options {
472         bool    read_only;
473 };
474
475 #define RBD_READ_ONLY_DEFAULT   false
476
477 static int parse_rbd_opts_token(char *c, void *private)
478 {
479         struct rbd_options *rbd_opts = private;
480         substring_t argstr[MAX_OPT_ARGS];
481         int token, intval, ret;
482
483         token = match_token(c, rbd_opts_tokens, argstr);
484         if (token < 0)
485                 return -EINVAL;
486
487         if (token < Opt_last_int) {
488                 ret = match_int(&argstr[0], &intval);
489                 if (ret < 0) {
490                         pr_err("bad mount option arg (not int) "
491                                "at '%s'\n", c);
492                         return ret;
493                 }
494                 dout("got int token %d val %d\n", token, intval);
495         } else if (token > Opt_last_int && token < Opt_last_string) {
496                 dout("got string token %d val %s\n", token,
497                      argstr[0].from);
498         } else if (token > Opt_last_string && token < Opt_last_bool) {
499                 dout("got Boolean token %d\n", token);
500         } else {
501                 dout("got token %d\n", token);
502         }
503
504         switch (token) {
505         case Opt_read_only:
506                 rbd_opts->read_only = true;
507                 break;
508         case Opt_read_write:
509                 rbd_opts->read_only = false;
510                 break;
511         default:
512                 rbd_assert(false);
513                 break;
514         }
515         return 0;
516 }
517
518 /*
519  * Get a ceph client with specific addr and configuration, if one does
520  * not exist create it.
521  */
522 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
523 {
524         struct rbd_client *rbdc;
525
526         rbdc = rbd_client_find(ceph_opts);
527         if (rbdc)       /* using an existing client */
528                 ceph_destroy_options(ceph_opts);
529         else
530                 rbdc = rbd_client_create(ceph_opts);
531
532         return rbdc;
533 }
534
535 /*
536  * Destroy ceph client
537  *
538  * Caller must hold rbd_client_list_lock.
539  */
540 static void rbd_client_release(struct kref *kref)
541 {
542         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
543
544         dout("rbd_release_client %p\n", rbdc);
545         spin_lock(&rbd_client_list_lock);
546         list_del(&rbdc->node);
547         spin_unlock(&rbd_client_list_lock);
548
549         ceph_destroy_client(rbdc->client);
550         kfree(rbdc);
551 }
552
553 /*
554  * Drop reference to ceph client node. If it's not referenced anymore, release
555  * it.
556  */
557 static void rbd_put_client(struct rbd_client *rbdc)
558 {
559         if (rbdc)
560                 kref_put(&rbdc->kref, rbd_client_release);
561 }
562
563 /*
564  * Destroy requests collection
565  */
566 static void rbd_coll_release(struct kref *kref)
567 {
568         struct rbd_req_coll *coll =
569                 container_of(kref, struct rbd_req_coll, kref);
570
571         dout("rbd_coll_release %p\n", coll);
572         kfree(coll);
573 }
574
575 static bool rbd_image_format_valid(u32 image_format)
576 {
577         return image_format == 1 || image_format == 2;
578 }
579
580 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
581 {
582         size_t size;
583         u32 snap_count;
584
585         /* The header has to start with the magic rbd header text */
586         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
587                 return false;
588
589         /* The bio layer requires at least sector-sized I/O */
590
591         if (ondisk->options.order < SECTOR_SHIFT)
592                 return false;
593
594         /* If we use u64 in a few spots we may be able to loosen this */
595
596         if (ondisk->options.order > 8 * sizeof (int) - 1)
597                 return false;
598
599         /*
600          * The size of a snapshot header has to fit in a size_t, and
601          * that limits the number of snapshots.
602          */
603         snap_count = le32_to_cpu(ondisk->snap_count);
604         size = SIZE_MAX - sizeof (struct ceph_snap_context);
605         if (snap_count > size / sizeof (__le64))
606                 return false;
607
608         /*
609          * Not only that, but the size of the entire the snapshot
610          * header must also be representable in a size_t.
611          */
612         size -= snap_count * sizeof (__le64);
613         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
614                 return false;
615
616         return true;
617 }
618
619 /*
620  * Create a new header structure, translate header format from the on-disk
621  * header.
622  */
623 static int rbd_header_from_disk(struct rbd_image_header *header,
624                                  struct rbd_image_header_ondisk *ondisk)
625 {
626         u32 snap_count;
627         size_t len;
628         size_t size;
629         u32 i;
630
631         memset(header, 0, sizeof (*header));
632
633         snap_count = le32_to_cpu(ondisk->snap_count);
634
635         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
636         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
637         if (!header->object_prefix)
638                 return -ENOMEM;
639         memcpy(header->object_prefix, ondisk->object_prefix, len);
640         header->object_prefix[len] = '\0';
641
642         if (snap_count) {
643                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
644
645                 /* Save a copy of the snapshot names */
646
647                 if (snap_names_len > (u64) SIZE_MAX)
648                         return -EIO;
649                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
650                 if (!header->snap_names)
651                         goto out_err;
652                 /*
653                  * Note that rbd_dev_v1_header_read() guarantees
654                  * the ondisk buffer we're working with has
655                  * snap_names_len bytes beyond the end of the
656                  * snapshot id array, this memcpy() is safe.
657                  */
658                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
659                         snap_names_len);
660
661                 /* Record each snapshot's size */
662
663                 size = snap_count * sizeof (*header->snap_sizes);
664                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
665                 if (!header->snap_sizes)
666                         goto out_err;
667                 for (i = 0; i < snap_count; i++)
668                         header->snap_sizes[i] =
669                                 le64_to_cpu(ondisk->snaps[i].image_size);
670         } else {
671                 WARN_ON(ondisk->snap_names_len);
672                 header->snap_names = NULL;
673                 header->snap_sizes = NULL;
674         }
675
676         header->features = 0;   /* No features support in v1 images */
677         header->obj_order = ondisk->options.order;
678         header->crypt_type = ondisk->options.crypt_type;
679         header->comp_type = ondisk->options.comp_type;
680
681         /* Allocate and fill in the snapshot context */
682
683         header->image_size = le64_to_cpu(ondisk->image_size);
684         size = sizeof (struct ceph_snap_context);
685         size += snap_count * sizeof (header->snapc->snaps[0]);
686         header->snapc = kzalloc(size, GFP_KERNEL);
687         if (!header->snapc)
688                 goto out_err;
689
690         atomic_set(&header->snapc->nref, 1);
691         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
692         header->snapc->num_snaps = snap_count;
693         for (i = 0; i < snap_count; i++)
694                 header->snapc->snaps[i] =
695                         le64_to_cpu(ondisk->snaps[i].id);
696
697         return 0;
698
699 out_err:
700         kfree(header->snap_sizes);
701         header->snap_sizes = NULL;
702         kfree(header->snap_names);
703         header->snap_names = NULL;
704         kfree(header->object_prefix);
705         header->object_prefix = NULL;
706
707         return -ENOMEM;
708 }
709
710 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
711 {
712         struct rbd_snap *snap;
713
714         if (snap_id == CEPH_NOSNAP)
715                 return RBD_SNAP_HEAD_NAME;
716
717         list_for_each_entry(snap, &rbd_dev->snaps, node)
718                 if (snap_id == snap->id)
719                         return snap->name;
720
721         return NULL;
722 }
723
724 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
725 {
726
727         struct rbd_snap *snap;
728
729         list_for_each_entry(snap, &rbd_dev->snaps, node) {
730                 if (!strcmp(snap_name, snap->name)) {
731                         rbd_dev->spec->snap_id = snap->id;
732                         rbd_dev->mapping.size = snap->size;
733                         rbd_dev->mapping.features = snap->features;
734
735                         return 0;
736                 }
737         }
738
739         return -ENOENT;
740 }
741
742 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
743 {
744         int ret;
745
746         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
747                     sizeof (RBD_SNAP_HEAD_NAME))) {
748                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
749                 rbd_dev->mapping.size = rbd_dev->header.image_size;
750                 rbd_dev->mapping.features = rbd_dev->header.features;
751                 ret = 0;
752         } else {
753                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
754                 if (ret < 0)
755                         goto done;
756                 rbd_dev->mapping.read_only = true;
757         }
758         atomic_set(&rbd_dev->exists, 1);
759 done:
760         return ret;
761 }
762
763 static void rbd_header_free(struct rbd_image_header *header)
764 {
765         kfree(header->object_prefix);
766         header->object_prefix = NULL;
767         kfree(header->snap_sizes);
768         header->snap_sizes = NULL;
769         kfree(header->snap_names);
770         header->snap_names = NULL;
771         ceph_put_snap_context(header->snapc);
772         header->snapc = NULL;
773 }
774
775 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
776 {
777         char *name;
778         u64 segment;
779         int ret;
780
781         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
782         if (!name)
783                 return NULL;
784         segment = offset >> rbd_dev->header.obj_order;
785         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
786                         rbd_dev->header.object_prefix, segment);
787         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
788                 pr_err("error formatting segment name for #%llu (%d)\n",
789                         segment, ret);
790                 kfree(name);
791                 name = NULL;
792         }
793
794         return name;
795 }
796
797 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
798 {
799         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
800
801         return offset & (segment_size - 1);
802 }
803
804 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
805                                 u64 offset, u64 length)
806 {
807         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
808
809         offset &= segment_size - 1;
810
811         rbd_assert(length <= U64_MAX - offset);
812         if (offset + length > segment_size)
813                 length = segment_size - offset;
814
815         return length;
816 }
817
818 static int rbd_get_num_segments(struct rbd_image_header *header,
819                                 u64 ofs, u64 len)
820 {
821         u64 start_seg;
822         u64 end_seg;
823         u64 result;
824
825         if (!len)
826                 return 0;
827         if (len - 1 > U64_MAX - ofs)
828                 return -ERANGE;
829
830         start_seg = ofs >> header->obj_order;
831         end_seg = (ofs + len - 1) >> header->obj_order;
832
833         result = end_seg - start_seg + 1;
834         if (result > (u64) INT_MAX)
835                 return -ERANGE;
836
837         return (int) result;
838 }
839
840 /*
841  * returns the size of an object in the image
842  */
843 static u64 rbd_obj_bytes(struct rbd_image_header *header)
844 {
845         return 1 << header->obj_order;
846 }
847
848 /*
849  * bio helpers
850  */
851
852 static void bio_chain_put(struct bio *chain)
853 {
854         struct bio *tmp;
855
856         while (chain) {
857                 tmp = chain;
858                 chain = chain->bi_next;
859                 bio_put(tmp);
860         }
861 }
862
863 /*
864  * zeros a bio chain, starting at specific offset
865  */
866 static void zero_bio_chain(struct bio *chain, int start_ofs)
867 {
868         struct bio_vec *bv;
869         unsigned long flags;
870         void *buf;
871         int i;
872         int pos = 0;
873
874         while (chain) {
875                 bio_for_each_segment(bv, chain, i) {
876                         if (pos + bv->bv_len > start_ofs) {
877                                 int remainder = max(start_ofs - pos, 0);
878                                 buf = bvec_kmap_irq(bv, &flags);
879                                 memset(buf + remainder, 0,
880                                        bv->bv_len - remainder);
881                                 bvec_kunmap_irq(buf, &flags);
882                         }
883                         pos += bv->bv_len;
884                 }
885
886                 chain = chain->bi_next;
887         }
888 }
889
890 /*
891  * Clone a portion of a bio, starting at the given byte offset
892  * and continuing for the number of bytes indicated.
893  */
894 static struct bio *bio_clone_range(struct bio *bio_src,
895                                         unsigned int offset,
896                                         unsigned int len,
897                                         gfp_t gfpmask)
898 {
899         struct bio_vec *bv;
900         unsigned int resid;
901         unsigned short idx;
902         unsigned int voff;
903         unsigned short end_idx;
904         unsigned short vcnt;
905         struct bio *bio;
906
907         /* Handle the easy case for the caller */
908
909         if (!offset && len == bio_src->bi_size)
910                 return bio_clone(bio_src, gfpmask);
911
912         if (WARN_ON_ONCE(!len))
913                 return NULL;
914         if (WARN_ON_ONCE(len > bio_src->bi_size))
915                 return NULL;
916         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
917                 return NULL;
918
919         /* Find first affected segment... */
920
921         resid = offset;
922         __bio_for_each_segment(bv, bio_src, idx, 0) {
923                 if (resid < bv->bv_len)
924                         break;
925                 resid -= bv->bv_len;
926         }
927         voff = resid;
928
929         /* ...and the last affected segment */
930
931         resid += len;
932         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
933                 if (resid <= bv->bv_len)
934                         break;
935                 resid -= bv->bv_len;
936         }
937         vcnt = end_idx - idx + 1;
938
939         /* Build the clone */
940
941         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
942         if (!bio)
943                 return NULL;    /* ENOMEM */
944
945         bio->bi_bdev = bio_src->bi_bdev;
946         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
947         bio->bi_rw = bio_src->bi_rw;
948         bio->bi_flags |= 1 << BIO_CLONED;
949
950         /*
951          * Copy over our part of the bio_vec, then update the first
952          * and last (or only) entries.
953          */
954         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
955                         vcnt * sizeof (struct bio_vec));
956         bio->bi_io_vec[0].bv_offset += voff;
957         if (vcnt > 1) {
958                 bio->bi_io_vec[0].bv_len -= voff;
959                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
960         } else {
961                 bio->bi_io_vec[0].bv_len = len;
962         }
963
964         bio->bi_vcnt = vcnt;
965         bio->bi_size = len;
966         bio->bi_idx = 0;
967
968         return bio;
969 }
970
971 /*
972  * Clone a portion of a bio chain, starting at the given byte offset
973  * into the first bio in the source chain and continuing for the
974  * number of bytes indicated.  The result is another bio chain of
975  * exactly the given length, or a null pointer on error.
976  *
977  * The bio_src and offset parameters are both in-out.  On entry they
978  * refer to the first source bio and the offset into that bio where
979  * the start of data to be cloned is located.
980  *
981  * On return, bio_src is updated to refer to the bio in the source
982  * chain that contains first un-cloned byte, and *offset will
983  * contain the offset of that byte within that bio.
984  */
985 static struct bio *bio_chain_clone_range(struct bio **bio_src,
986                                         unsigned int *offset,
987                                         unsigned int len,
988                                         gfp_t gfpmask)
989 {
990         struct bio *bi = *bio_src;
991         unsigned int off = *offset;
992         struct bio *chain = NULL;
993         struct bio **end;
994
995         /* Build up a chain of clone bios up to the limit */
996
997         if (!bi || off >= bi->bi_size || !len)
998                 return NULL;            /* Nothing to clone */
999
1000         end = &chain;
1001         while (len) {
1002                 unsigned int bi_size;
1003                 struct bio *bio;
1004
1005                 if (!bi) {
1006                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1007                         goto out_err;   /* EINVAL; ran out of bio's */
1008                 }
1009                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1010                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1011                 if (!bio)
1012                         goto out_err;   /* ENOMEM */
1013
1014                 *end = bio;
1015                 end = &bio->bi_next;
1016
1017                 off += bi_size;
1018                 if (off == bi->bi_size) {
1019                         bi = bi->bi_next;
1020                         off = 0;
1021                 }
1022                 len -= bi_size;
1023         }
1024         *bio_src = bi;
1025         *offset = off;
1026
1027         return chain;
1028 out_err:
1029         bio_chain_put(chain);
1030
1031         return NULL;
1032 }
1033
1034 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1035 {
1036         struct ceph_osd_req_op *op;
1037         va_list args;
1038         size_t size;
1039
1040         op = kzalloc(sizeof (*op), GFP_NOIO);
1041         if (!op)
1042                 return NULL;
1043         op->op = opcode;
1044         va_start(args, opcode);
1045         switch (opcode) {
1046         case CEPH_OSD_OP_READ:
1047         case CEPH_OSD_OP_WRITE:
1048                 /* rbd_osd_req_op_create(READ, offset, length) */
1049                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1050                 op->extent.offset = va_arg(args, u64);
1051                 op->extent.length = va_arg(args, u64);
1052                 if (opcode == CEPH_OSD_OP_WRITE)
1053                         op->payload_len = op->extent.length;
1054                 break;
1055         case CEPH_OSD_OP_CALL:
1056                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1057                 op->cls.class_name = va_arg(args, char *);
1058                 size = strlen(op->cls.class_name);
1059                 rbd_assert(size <= (size_t) U8_MAX);
1060                 op->cls.class_len = size;
1061                 op->payload_len = size;
1062
1063                 op->cls.method_name = va_arg(args, char *);
1064                 size = strlen(op->cls.method_name);
1065                 rbd_assert(size <= (size_t) U8_MAX);
1066                 op->cls.method_len = size;
1067                 op->payload_len += size;
1068
1069                 op->cls.argc = 0;
1070                 op->cls.indata = va_arg(args, void *);
1071                 size = va_arg(args, size_t);
1072                 rbd_assert(size <= (size_t) U32_MAX);
1073                 op->cls.indata_len = (u32) size;
1074                 op->payload_len += size;
1075                 break;
1076         case CEPH_OSD_OP_NOTIFY_ACK:
1077         case CEPH_OSD_OP_WATCH:
1078                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1079                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1080                 op->watch.cookie = va_arg(args, u64);
1081                 op->watch.ver = va_arg(args, u64);
1082                 op->watch.ver = cpu_to_le64(op->watch.ver);
1083                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1084                         op->watch.flag = (u8) 1;
1085                 break;
1086         default:
1087                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1088                 kfree(op);
1089                 op = NULL;
1090                 break;
1091         }
1092         va_end(args);
1093
1094         return op;
1095 }
1096
1097 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1098 {
1099         kfree(op);
1100 }
1101
1102 static void rbd_coll_end_req_index(struct request *rq,
1103                                    struct rbd_req_coll *coll,
1104                                    int index,
1105                                    s32 ret, u64 len)
1106 {
1107         struct request_queue *q;
1108         int min, max, i;
1109
1110         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1111              coll, index, (int)ret, (unsigned long long)len);
1112
1113         if (!rq)
1114                 return;
1115
1116         if (!coll) {
1117                 blk_end_request(rq, ret, len);
1118                 return;
1119         }
1120
1121         q = rq->q;
1122
1123         spin_lock_irq(q->queue_lock);
1124         coll->status[index].done = 1;
1125         coll->status[index].rc = ret;
1126         coll->status[index].bytes = len;
1127         max = min = coll->num_done;
1128         while (max < coll->total && coll->status[max].done)
1129                 max++;
1130
1131         for (i = min; i<max; i++) {
1132                 __blk_end_request(rq, (int)coll->status[i].rc,
1133                                   coll->status[i].bytes);
1134                 coll->num_done++;
1135                 kref_put(&coll->kref, rbd_coll_release);
1136         }
1137         spin_unlock_irq(q->queue_lock);
1138 }
1139
1140 static void rbd_coll_end_req(struct rbd_request *rbd_req,
1141                              s32 ret, u64 len)
1142 {
1143         rbd_coll_end_req_index(rbd_req->rq,
1144                                 rbd_req->coll, rbd_req->coll_index,
1145                                 ret, len);
1146 }
1147
1148 /*
1149  * Send ceph osd request
1150  */
1151 static int rbd_do_request(struct request *rq,
1152                           struct rbd_device *rbd_dev,
1153                           struct ceph_snap_context *snapc,
1154                           u64 snapid,
1155                           const char *object_name, u64 ofs, u64 len,
1156                           struct bio *bio,
1157                           struct page **pages,
1158                           int num_pages,
1159                           int flags,
1160                           struct ceph_osd_req_op *op,
1161                           struct rbd_req_coll *coll,
1162                           int coll_index,
1163                           void (*rbd_cb)(struct ceph_osd_request *,
1164                                          struct ceph_msg *),
1165                           u64 *ver)
1166 {
1167         struct ceph_osd_client *osdc;
1168         struct ceph_osd_request *osd_req;
1169         struct rbd_request *rbd_req = NULL;
1170         struct timespec mtime = CURRENT_TIME;
1171         int ret;
1172
1173         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1174                 object_name, (unsigned long long) ofs,
1175                 (unsigned long long) len, coll, coll_index);
1176
1177         osdc = &rbd_dev->rbd_client->client->osdc;
1178         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1179         if (!osd_req)
1180                 return -ENOMEM;
1181
1182         osd_req->r_flags = flags;
1183         osd_req->r_pages = pages;
1184         if (bio) {
1185                 osd_req->r_bio = bio;
1186                 bio_get(osd_req->r_bio);
1187         }
1188
1189         if (coll) {
1190                 ret = -ENOMEM;
1191                 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1192                 if (!rbd_req)
1193                         goto done_osd_req;
1194
1195                 rbd_req->rq = rq;
1196                 rbd_req->bio = bio;
1197                 rbd_req->pages = pages;
1198                 rbd_req->len = len;
1199                 rbd_req->coll = coll;
1200                 rbd_req->coll_index = coll_index;
1201         }
1202
1203         osd_req->r_callback = rbd_cb;
1204         osd_req->r_priv = rbd_req;
1205
1206         strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1207         osd_req->r_oid_len = strlen(osd_req->r_oid);
1208
1209         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1210         osd_req->r_num_pages = calc_pages_for(ofs, len);
1211         osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1212
1213         ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1214                                 snapc, snapid, &mtime);
1215
1216         if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1217                 ceph_osdc_set_request_linger(osdc, osd_req);
1218                 rbd_dev->watch_request = osd_req;
1219         }
1220
1221         ret = ceph_osdc_start_request(osdc, osd_req, false);
1222         if (ret < 0)
1223                 goto done_err;
1224
1225         if (!rbd_cb) {
1226                 u64 version;
1227
1228                 ret = ceph_osdc_wait_request(osdc, osd_req);
1229                 version = le64_to_cpu(osd_req->r_reassert_version.version);
1230                 if (ver)
1231                         *ver = version;
1232                 dout("reassert_ver=%llu\n", (unsigned long long) version);
1233                 ceph_osdc_put_request(osd_req);
1234         }
1235         return ret;
1236
1237 done_err:
1238         if (bio)
1239                 bio_chain_put(osd_req->r_bio);
1240         kfree(rbd_req);
1241 done_osd_req:
1242         ceph_osdc_put_request(osd_req);
1243
1244         return ret;
1245 }
1246
1247 /*
1248  * Ceph osd op callback
1249  */
1250 static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1251 {
1252         struct rbd_request *rbd_req = osd_req->r_priv;
1253         struct ceph_osd_reply_head *replyhead;
1254         struct ceph_osd_op *op;
1255         s32 rc;
1256         u64 bytes;
1257         int read_op;
1258
1259         /* parse reply */
1260         replyhead = msg->front.iov_base;
1261         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1262         op = (void *)(replyhead + 1);
1263         rc = (s32)le32_to_cpu(replyhead->result);
1264         bytes = le64_to_cpu(op->extent.length);
1265         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1266
1267         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1268                 (unsigned long long) bytes, read_op, (int) rc);
1269
1270         if (rc == (s32)-ENOENT && read_op) {
1271                 zero_bio_chain(rbd_req->bio, 0);
1272                 rc = 0;
1273         } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1274                 zero_bio_chain(rbd_req->bio, bytes);
1275                 bytes = rbd_req->len;
1276         }
1277
1278         rbd_coll_end_req(rbd_req, rc, bytes);
1279
1280         if (rbd_req->bio)
1281                 bio_chain_put(rbd_req->bio);
1282
1283         ceph_osdc_put_request(osd_req);
1284         kfree(rbd_req);
1285 }
1286
1287 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1288                                 struct ceph_msg *msg)
1289 {
1290         ceph_osdc_put_request(osd_req);
1291 }
1292
1293 /*
1294  * Do a synchronous ceph osd operation
1295  */
1296 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1297                            int flags,
1298                            struct ceph_osd_req_op *op,
1299                            const char *object_name,
1300                            u64 ofs, u64 inbound_size,
1301                            char *inbound,
1302                            u64 *ver)
1303 {
1304         int ret;
1305         struct page **pages;
1306         int num_pages;
1307
1308         rbd_assert(op != NULL);
1309
1310         num_pages = calc_pages_for(ofs, inbound_size);
1311         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1312         if (IS_ERR(pages))
1313                 return PTR_ERR(pages);
1314
1315         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1316                           object_name, ofs, inbound_size, NULL,
1317                           pages, num_pages,
1318                           flags,
1319                           op,
1320                           NULL, 0,
1321                           NULL,
1322                           ver);
1323         if (ret < 0)
1324                 goto done;
1325
1326         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1327                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1328
1329 done:
1330         ceph_release_page_vector(pages, num_pages);
1331         return ret;
1332 }
1333
1334 /*
1335  * Do an asynchronous ceph osd operation
1336  */
1337 static int rbd_do_op(struct request *rq,
1338                      struct rbd_device *rbd_dev,
1339                      struct ceph_snap_context *snapc,
1340                      u64 ofs, u64 len,
1341                      struct bio *bio,
1342                      struct rbd_req_coll *coll,
1343                      int coll_index)
1344 {
1345         const char *seg_name;
1346         u64 seg_ofs;
1347         u64 seg_len;
1348         int ret;
1349         struct ceph_osd_req_op *op;
1350         int opcode;
1351         int flags;
1352         u64 snapid;
1353
1354         seg_name = rbd_segment_name(rbd_dev, ofs);
1355         if (!seg_name)
1356                 return -ENOMEM;
1357         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1358         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1359
1360         if (rq_data_dir(rq) == WRITE) {
1361                 opcode = CEPH_OSD_OP_WRITE;
1362                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1363                 snapid = CEPH_NOSNAP;
1364         } else {
1365                 opcode = CEPH_OSD_OP_READ;
1366                 flags = CEPH_OSD_FLAG_READ;
1367                 rbd_assert(!snapc);
1368                 snapid = rbd_dev->spec->snap_id;
1369         }
1370
1371         ret = -ENOMEM;
1372         op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
1373         if (!op)
1374                 goto done;
1375
1376         /* we've taken care of segment sizes earlier when we
1377            cloned the bios. We should never have a segment
1378            truncated at this point */
1379         rbd_assert(seg_len == len);
1380
1381         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1382                              seg_name, seg_ofs, seg_len,
1383                              bio,
1384                              NULL, 0,
1385                              flags,
1386                              op,
1387                              coll, coll_index,
1388                              rbd_req_cb, NULL);
1389         if (ret < 0)
1390                 rbd_coll_end_req_index(rq, coll, coll_index,
1391                                         (s32)ret, seg_len);
1392         rbd_osd_req_op_destroy(op);
1393 done:
1394         kfree(seg_name);
1395         return ret;
1396 }
1397
1398 /*
1399  * Request sync osd read
1400  */
1401 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1402                           const char *object_name,
1403                           u64 ofs, u64 len,
1404                           char *buf,
1405                           u64 *ver)
1406 {
1407         struct ceph_osd_req_op *op;
1408         int ret;
1409
1410         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
1411         if (!op)
1412                 return -ENOMEM;
1413
1414         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1415                                op, object_name, ofs, len, buf, ver);
1416         rbd_osd_req_op_destroy(op);
1417
1418         return ret;
1419 }
1420
1421 /*
1422  * Request sync osd watch
1423  */
1424 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1425                                    u64 ver,
1426                                    u64 notify_id)
1427 {
1428         struct ceph_osd_req_op *op;
1429         int ret;
1430
1431         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1432         if (!op)
1433                 return -ENOMEM;
1434
1435         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1436                           rbd_dev->header_name, 0, 0, NULL,
1437                           NULL, 0,
1438                           CEPH_OSD_FLAG_READ,
1439                           op,
1440                           NULL, 0,
1441                           rbd_simple_req_cb, NULL);
1442
1443         rbd_osd_req_op_destroy(op);
1444
1445         return ret;
1446 }
1447
1448 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1449 {
1450         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1451         u64 hver;
1452         int rc;
1453
1454         if (!rbd_dev)
1455                 return;
1456
1457         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1458                 rbd_dev->header_name, (unsigned long long) notify_id,
1459                 (unsigned int) opcode);
1460         rc = rbd_dev_refresh(rbd_dev, &hver);
1461         if (rc)
1462                 rbd_warn(rbd_dev, "got notification but failed to "
1463                            " update snaps: %d\n", rc);
1464
1465         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1466 }
1467
1468 /*
1469  * Request sync osd watch/unwatch.  The value of "start" determines
1470  * whether a watch request is being initiated or torn down.
1471  */
1472 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1473 {
1474         struct ceph_osd_req_op *op;
1475         int ret = 0;
1476
1477         rbd_assert(start ^ !!rbd_dev->watch_event);
1478         rbd_assert(start ^ !!rbd_dev->watch_request);
1479
1480         if (start) {
1481                 struct ceph_osd_client *osdc;
1482
1483                 osdc = &rbd_dev->rbd_client->client->osdc;
1484                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1485                                                 &rbd_dev->watch_event);
1486                 if (ret < 0)
1487                         return ret;
1488         }
1489
1490         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1491                                 rbd_dev->watch_event->cookie,
1492                                 rbd_dev->header.obj_version, start);
1493         if (op)
1494                 ret = rbd_req_sync_op(rbd_dev,
1495                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1496                               op, rbd_dev->header_name,
1497                               0, 0, NULL, NULL);
1498
1499         /* Cancel the event if we're tearing down, or on error */
1500
1501         if (!start || !op || ret < 0) {
1502                 ceph_osdc_cancel_event(rbd_dev->watch_event);
1503                 rbd_dev->watch_event = NULL;
1504         }
1505         rbd_osd_req_op_destroy(op);
1506
1507         return ret;
1508 }
1509
1510 /*
1511  * Synchronous osd object method call
1512  */
1513 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1514                              const char *object_name,
1515                              const char *class_name,
1516                              const char *method_name,
1517                              const char *outbound,
1518                              size_t outbound_size,
1519                              char *inbound,
1520                              size_t inbound_size,
1521                              u64 *ver)
1522 {
1523         struct ceph_osd_req_op *op;
1524         int ret;
1525
1526         /*
1527          * Any input parameters required by the method we're calling
1528          * will be sent along with the class and method names as
1529          * part of the message payload.  That data and its size are
1530          * supplied via the indata and indata_len fields (named from
1531          * the perspective of the server side) in the OSD request
1532          * operation.
1533          */
1534         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1535                                         method_name, outbound, outbound_size);
1536         if (!op)
1537                 return -ENOMEM;
1538
1539         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1540                                object_name, 0, inbound_size, inbound,
1541                                ver);
1542
1543         rbd_osd_req_op_destroy(op);
1544
1545         dout("cls_exec returned %d\n", ret);
1546         return ret;
1547 }
1548
1549 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1550 {
1551         struct rbd_req_coll *coll =
1552                         kzalloc(sizeof(struct rbd_req_coll) +
1553                                 sizeof(struct rbd_req_status) * num_reqs,
1554                                 GFP_ATOMIC);
1555
1556         if (!coll)
1557                 return NULL;
1558         coll->total = num_reqs;
1559         kref_init(&coll->kref);
1560         return coll;
1561 }
1562
1563 static int rbd_dev_do_request(struct request *rq,
1564                                 struct rbd_device *rbd_dev,
1565                                 struct ceph_snap_context *snapc,
1566                                 u64 ofs, unsigned int size,
1567                                 struct bio *bio_chain)
1568 {
1569         int num_segs;
1570         struct rbd_req_coll *coll;
1571         unsigned int bio_offset;
1572         int cur_seg = 0;
1573
1574         dout("%s 0x%x bytes at 0x%llx\n",
1575                 rq_data_dir(rq) == WRITE ? "write" : "read",
1576                 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1577
1578         num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1579         if (num_segs <= 0)
1580                 return num_segs;
1581
1582         coll = rbd_alloc_coll(num_segs);
1583         if (!coll)
1584                 return -ENOMEM;
1585
1586         bio_offset = 0;
1587         do {
1588                 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1589                 unsigned int clone_size;
1590                 struct bio *bio_clone;
1591
1592                 BUG_ON(limit > (u64)UINT_MAX);
1593                 clone_size = (unsigned int)limit;
1594                 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1595
1596                 kref_get(&coll->kref);
1597
1598                 /* Pass a cloned bio chain via an osd request */
1599
1600                 bio_clone = bio_chain_clone_range(&bio_chain,
1601                                         &bio_offset, clone_size,
1602                                         GFP_ATOMIC);
1603                 if (bio_clone)
1604                         (void)rbd_do_op(rq, rbd_dev, snapc,
1605                                         ofs, clone_size,
1606                                         bio_clone, coll, cur_seg);
1607                 else
1608                         rbd_coll_end_req_index(rq, coll, cur_seg,
1609                                                 (s32)-ENOMEM,
1610                                                 clone_size);
1611                 size -= clone_size;
1612                 ofs += clone_size;
1613
1614                 cur_seg++;
1615         } while (size > 0);
1616         kref_put(&coll->kref, rbd_coll_release);
1617
1618         return 0;
1619 }
1620
1621 /*
1622  * block device queue callback
1623  */
1624 static void rbd_rq_fn(struct request_queue *q)
1625 {
1626         struct rbd_device *rbd_dev = q->queuedata;
1627         bool read_only = rbd_dev->mapping.read_only;
1628         struct request *rq;
1629
1630         while ((rq = blk_fetch_request(q))) {
1631                 struct ceph_snap_context *snapc = NULL;
1632                 unsigned int size = 0;
1633                 int result;
1634
1635                 dout("fetched request\n");
1636
1637                 /* Filter out block requests we don't understand */
1638
1639                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1640                         __blk_end_request_all(rq, 0);
1641                         continue;
1642                 }
1643                 spin_unlock_irq(q->queue_lock);
1644
1645                 /* Write requests need a reference to the snapshot context */
1646
1647                 if (rq_data_dir(rq) == WRITE) {
1648                         result = -EROFS;
1649                         if (read_only) /* Can't write to a read-only device */
1650                                 goto out_end_request;
1651
1652                         /*
1653                          * Note that each osd request will take its
1654                          * own reference to the snapshot context
1655                          * supplied.  The reference we take here
1656                          * just guarantees the one we provide stays
1657                          * valid.
1658                          */
1659                         down_read(&rbd_dev->header_rwsem);
1660                         snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1661                         up_read(&rbd_dev->header_rwsem);
1662                         rbd_assert(snapc != NULL);
1663                 } else if (!atomic_read(&rbd_dev->exists)) {
1664                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1665                         dout("request for non-existent snapshot");
1666                         result = -ENXIO;
1667                         goto out_end_request;
1668                 }
1669
1670                 size = blk_rq_bytes(rq);
1671                 result = rbd_dev_do_request(rq, rbd_dev, snapc,
1672                                 blk_rq_pos(rq) * SECTOR_SIZE,
1673                                 size, rq->bio);
1674 out_end_request:
1675                 if (snapc)
1676                         ceph_put_snap_context(snapc);
1677                 spin_lock_irq(q->queue_lock);
1678                 if (!size || result < 0)
1679                         __blk_end_request_all(rq, result);
1680         }
1681 }
1682
1683 /*
1684  * a queue callback. Makes sure that we don't create a bio that spans across
1685  * multiple osd objects. One exception would be with a single page bios,
1686  * which we handle later at bio_chain_clone_range()
1687  */
1688 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1689                           struct bio_vec *bvec)
1690 {
1691         struct rbd_device *rbd_dev = q->queuedata;
1692         sector_t sector_offset;
1693         sector_t sectors_per_obj;
1694         sector_t obj_sector_offset;
1695         int ret;
1696
1697         /*
1698          * Find how far into its rbd object the partition-relative
1699          * bio start sector is to offset relative to the enclosing
1700          * device.
1701          */
1702         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1703         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1704         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1705
1706         /*
1707          * Compute the number of bytes from that offset to the end
1708          * of the object.  Account for what's already used by the bio.
1709          */
1710         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1711         if (ret > bmd->bi_size)
1712                 ret -= bmd->bi_size;
1713         else
1714                 ret = 0;
1715
1716         /*
1717          * Don't send back more than was asked for.  And if the bio
1718          * was empty, let the whole thing through because:  "Note
1719          * that a block device *must* allow a single page to be
1720          * added to an empty bio."
1721          */
1722         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1723         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1724                 ret = (int) bvec->bv_len;
1725
1726         return ret;
1727 }
1728
1729 static void rbd_free_disk(struct rbd_device *rbd_dev)
1730 {
1731         struct gendisk *disk = rbd_dev->disk;
1732
1733         if (!disk)
1734                 return;
1735
1736         if (disk->flags & GENHD_FL_UP)
1737                 del_gendisk(disk);
1738         if (disk->queue)
1739                 blk_cleanup_queue(disk->queue);
1740         put_disk(disk);
1741 }
1742
1743 /*
1744  * Read the complete header for the given rbd device.
1745  *
1746  * Returns a pointer to a dynamically-allocated buffer containing
1747  * the complete and validated header.  Caller can pass the address
1748  * of a variable that will be filled in with the version of the
1749  * header object at the time it was read.
1750  *
1751  * Returns a pointer-coded errno if a failure occurs.
1752  */
1753 static struct rbd_image_header_ondisk *
1754 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1755 {
1756         struct rbd_image_header_ondisk *ondisk = NULL;
1757         u32 snap_count = 0;
1758         u64 names_size = 0;
1759         u32 want_count;
1760         int ret;
1761
1762         /*
1763          * The complete header will include an array of its 64-bit
1764          * snapshot ids, followed by the names of those snapshots as
1765          * a contiguous block of NUL-terminated strings.  Note that
1766          * the number of snapshots could change by the time we read
1767          * it in, in which case we re-read it.
1768          */
1769         do {
1770                 size_t size;
1771
1772                 kfree(ondisk);
1773
1774                 size = sizeof (*ondisk);
1775                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1776                 size += names_size;
1777                 ondisk = kmalloc(size, GFP_KERNEL);
1778                 if (!ondisk)
1779                         return ERR_PTR(-ENOMEM);
1780
1781                 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
1782                                        0, size,
1783                                        (char *) ondisk, version);
1784
1785                 if (ret < 0)
1786                         goto out_err;
1787                 if (WARN_ON((size_t) ret < size)) {
1788                         ret = -ENXIO;
1789                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1790                                 size, ret);
1791                         goto out_err;
1792                 }
1793                 if (!rbd_dev_ondisk_valid(ondisk)) {
1794                         ret = -ENXIO;
1795                         rbd_warn(rbd_dev, "invalid header");
1796                         goto out_err;
1797                 }
1798
1799                 names_size = le64_to_cpu(ondisk->snap_names_len);
1800                 want_count = snap_count;
1801                 snap_count = le32_to_cpu(ondisk->snap_count);
1802         } while (snap_count != want_count);
1803
1804         return ondisk;
1805
1806 out_err:
1807         kfree(ondisk);
1808
1809         return ERR_PTR(ret);
1810 }
1811
1812 /*
1813  * reload the ondisk the header
1814  */
1815 static int rbd_read_header(struct rbd_device *rbd_dev,
1816                            struct rbd_image_header *header)
1817 {
1818         struct rbd_image_header_ondisk *ondisk;
1819         u64 ver = 0;
1820         int ret;
1821
1822         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1823         if (IS_ERR(ondisk))
1824                 return PTR_ERR(ondisk);
1825         ret = rbd_header_from_disk(header, ondisk);
1826         if (ret >= 0)
1827                 header->obj_version = ver;
1828         kfree(ondisk);
1829
1830         return ret;
1831 }
1832
1833 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1834 {
1835         struct rbd_snap *snap;
1836         struct rbd_snap *next;
1837
1838         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1839                 rbd_remove_snap_dev(snap);
1840 }
1841
1842 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1843 {
1844         sector_t size;
1845
1846         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1847                 return;
1848
1849         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1850         dout("setting size to %llu sectors", (unsigned long long) size);
1851         rbd_dev->mapping.size = (u64) size;
1852         set_capacity(rbd_dev->disk, size);
1853 }
1854
1855 /*
1856  * only read the first part of the ondisk header, without the snaps info
1857  */
1858 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1859 {
1860         int ret;
1861         struct rbd_image_header h;
1862
1863         ret = rbd_read_header(rbd_dev, &h);
1864         if (ret < 0)
1865                 return ret;
1866
1867         down_write(&rbd_dev->header_rwsem);
1868
1869         /* Update image size, and check for resize of mapped image */
1870         rbd_dev->header.image_size = h.image_size;
1871         rbd_update_mapping_size(rbd_dev);
1872
1873         /* rbd_dev->header.object_prefix shouldn't change */
1874         kfree(rbd_dev->header.snap_sizes);
1875         kfree(rbd_dev->header.snap_names);
1876         /* osd requests may still refer to snapc */
1877         ceph_put_snap_context(rbd_dev->header.snapc);
1878
1879         if (hver)
1880                 *hver = h.obj_version;
1881         rbd_dev->header.obj_version = h.obj_version;
1882         rbd_dev->header.image_size = h.image_size;
1883         rbd_dev->header.snapc = h.snapc;
1884         rbd_dev->header.snap_names = h.snap_names;
1885         rbd_dev->header.snap_sizes = h.snap_sizes;
1886         /* Free the extra copy of the object prefix */
1887         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1888         kfree(h.object_prefix);
1889
1890         ret = rbd_dev_snaps_update(rbd_dev);
1891         if (!ret)
1892                 ret = rbd_dev_snaps_register(rbd_dev);
1893
1894         up_write(&rbd_dev->header_rwsem);
1895
1896         return ret;
1897 }
1898
1899 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1900 {
1901         int ret;
1902
1903         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1904         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1905         if (rbd_dev->image_format == 1)
1906                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1907         else
1908                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1909         mutex_unlock(&ctl_mutex);
1910
1911         return ret;
1912 }
1913
1914 static int rbd_init_disk(struct rbd_device *rbd_dev)
1915 {
1916         struct gendisk *disk;
1917         struct request_queue *q;
1918         u64 segment_size;
1919
1920         /* create gendisk info */
1921         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1922         if (!disk)
1923                 return -ENOMEM;
1924
1925         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1926                  rbd_dev->dev_id);
1927         disk->major = rbd_dev->major;
1928         disk->first_minor = 0;
1929         disk->fops = &rbd_bd_ops;
1930         disk->private_data = rbd_dev;
1931
1932         /* init rq */
1933         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1934         if (!q)
1935                 goto out_disk;
1936
1937         /* We use the default size, but let's be explicit about it. */
1938         blk_queue_physical_block_size(q, SECTOR_SIZE);
1939
1940         /* set io sizes to object size */
1941         segment_size = rbd_obj_bytes(&rbd_dev->header);
1942         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1943         blk_queue_max_segment_size(q, segment_size);
1944         blk_queue_io_min(q, segment_size);
1945         blk_queue_io_opt(q, segment_size);
1946
1947         blk_queue_merge_bvec(q, rbd_merge_bvec);
1948         disk->queue = q;
1949
1950         q->queuedata = rbd_dev;
1951
1952         rbd_dev->disk = disk;
1953
1954         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1955
1956         return 0;
1957 out_disk:
1958         put_disk(disk);
1959
1960         return -ENOMEM;
1961 }
1962
1963 /*
1964   sysfs
1965 */
1966
1967 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1968 {
1969         return container_of(dev, struct rbd_device, dev);
1970 }
1971
1972 static ssize_t rbd_size_show(struct device *dev,
1973                              struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976         sector_t size;
1977
1978         down_read(&rbd_dev->header_rwsem);
1979         size = get_capacity(rbd_dev->disk);
1980         up_read(&rbd_dev->header_rwsem);
1981
1982         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1983 }
1984
1985 /*
1986  * Note this shows the features for whatever's mapped, which is not
1987  * necessarily the base image.
1988  */
1989 static ssize_t rbd_features_show(struct device *dev,
1990                              struct device_attribute *attr, char *buf)
1991 {
1992         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993
1994         return sprintf(buf, "0x%016llx\n",
1995                         (unsigned long long) rbd_dev->mapping.features);
1996 }
1997
1998 static ssize_t rbd_major_show(struct device *dev,
1999                               struct device_attribute *attr, char *buf)
2000 {
2001         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2002
2003         return sprintf(buf, "%d\n", rbd_dev->major);
2004 }
2005
2006 static ssize_t rbd_client_id_show(struct device *dev,
2007                                   struct device_attribute *attr, char *buf)
2008 {
2009         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2010
2011         return sprintf(buf, "client%lld\n",
2012                         ceph_client_id(rbd_dev->rbd_client->client));
2013 }
2014
2015 static ssize_t rbd_pool_show(struct device *dev,
2016                              struct device_attribute *attr, char *buf)
2017 {
2018         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2019
2020         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2021 }
2022
2023 static ssize_t rbd_pool_id_show(struct device *dev,
2024                              struct device_attribute *attr, char *buf)
2025 {
2026         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2027
2028         return sprintf(buf, "%llu\n",
2029                 (unsigned long long) rbd_dev->spec->pool_id);
2030 }
2031
2032 static ssize_t rbd_name_show(struct device *dev,
2033                              struct device_attribute *attr, char *buf)
2034 {
2035         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2036
2037         if (rbd_dev->spec->image_name)
2038                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2039
2040         return sprintf(buf, "(unknown)\n");
2041 }
2042
2043 static ssize_t rbd_image_id_show(struct device *dev,
2044                              struct device_attribute *attr, char *buf)
2045 {
2046         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2047
2048         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2049 }
2050
2051 /*
2052  * Shows the name of the currently-mapped snapshot (or
2053  * RBD_SNAP_HEAD_NAME for the base image).
2054  */
2055 static ssize_t rbd_snap_show(struct device *dev,
2056                              struct device_attribute *attr,
2057                              char *buf)
2058 {
2059         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2060
2061         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2062 }
2063
2064 /*
2065  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2066  * for the parent image.  If there is no parent, simply shows
2067  * "(no parent image)".
2068  */
2069 static ssize_t rbd_parent_show(struct device *dev,
2070                              struct device_attribute *attr,
2071                              char *buf)
2072 {
2073         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2074         struct rbd_spec *spec = rbd_dev->parent_spec;
2075         int count;
2076         char *bufp = buf;
2077
2078         if (!spec)
2079                 return sprintf(buf, "(no parent image)\n");
2080
2081         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2082                         (unsigned long long) spec->pool_id, spec->pool_name);
2083         if (count < 0)
2084                 return count;
2085         bufp += count;
2086
2087         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2088                         spec->image_name ? spec->image_name : "(unknown)");
2089         if (count < 0)
2090                 return count;
2091         bufp += count;
2092
2093         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2094                         (unsigned long long) spec->snap_id, spec->snap_name);
2095         if (count < 0)
2096                 return count;
2097         bufp += count;
2098
2099         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2100         if (count < 0)
2101                 return count;
2102         bufp += count;
2103
2104         return (ssize_t) (bufp - buf);
2105 }
2106
2107 static ssize_t rbd_image_refresh(struct device *dev,
2108                                  struct device_attribute *attr,
2109                                  const char *buf,
2110                                  size_t size)
2111 {
2112         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2113         int ret;
2114
2115         ret = rbd_dev_refresh(rbd_dev, NULL);
2116
2117         return ret < 0 ? ret : size;
2118 }
2119
2120 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2121 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2122 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2123 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2124 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2125 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2126 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2127 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2128 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2129 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2130 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2131
2132 static struct attribute *rbd_attrs[] = {
2133         &dev_attr_size.attr,
2134         &dev_attr_features.attr,
2135         &dev_attr_major.attr,
2136         &dev_attr_client_id.attr,
2137         &dev_attr_pool.attr,
2138         &dev_attr_pool_id.attr,
2139         &dev_attr_name.attr,
2140         &dev_attr_image_id.attr,
2141         &dev_attr_current_snap.attr,
2142         &dev_attr_parent.attr,
2143         &dev_attr_refresh.attr,
2144         NULL
2145 };
2146
2147 static struct attribute_group rbd_attr_group = {
2148         .attrs = rbd_attrs,
2149 };
2150
2151 static const struct attribute_group *rbd_attr_groups[] = {
2152         &rbd_attr_group,
2153         NULL
2154 };
2155
2156 static void rbd_sysfs_dev_release(struct device *dev)
2157 {
2158 }
2159
2160 static struct device_type rbd_device_type = {
2161         .name           = "rbd",
2162         .groups         = rbd_attr_groups,
2163         .release        = rbd_sysfs_dev_release,
2164 };
2165
2166
2167 /*
2168   sysfs - snapshots
2169 */
2170
2171 static ssize_t rbd_snap_size_show(struct device *dev,
2172                                   struct device_attribute *attr,
2173                                   char *buf)
2174 {
2175         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2176
2177         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2178 }
2179
2180 static ssize_t rbd_snap_id_show(struct device *dev,
2181                                 struct device_attribute *attr,
2182                                 char *buf)
2183 {
2184         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2185
2186         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2187 }
2188
2189 static ssize_t rbd_snap_features_show(struct device *dev,
2190                                 struct device_attribute *attr,
2191                                 char *buf)
2192 {
2193         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2194
2195         return sprintf(buf, "0x%016llx\n",
2196                         (unsigned long long) snap->features);
2197 }
2198
2199 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2200 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2201 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2202
2203 static struct attribute *rbd_snap_attrs[] = {
2204         &dev_attr_snap_size.attr,
2205         &dev_attr_snap_id.attr,
2206         &dev_attr_snap_features.attr,
2207         NULL,
2208 };
2209
2210 static struct attribute_group rbd_snap_attr_group = {
2211         .attrs = rbd_snap_attrs,
2212 };
2213
2214 static void rbd_snap_dev_release(struct device *dev)
2215 {
2216         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2217         kfree(snap->name);
2218         kfree(snap);
2219 }
2220
2221 static const struct attribute_group *rbd_snap_attr_groups[] = {
2222         &rbd_snap_attr_group,
2223         NULL
2224 };
2225
2226 static struct device_type rbd_snap_device_type = {
2227         .groups         = rbd_snap_attr_groups,
2228         .release        = rbd_snap_dev_release,
2229 };
2230
2231 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2232 {
2233         kref_get(&spec->kref);
2234
2235         return spec;
2236 }
2237
2238 static void rbd_spec_free(struct kref *kref);
2239 static void rbd_spec_put(struct rbd_spec *spec)
2240 {
2241         if (spec)
2242                 kref_put(&spec->kref, rbd_spec_free);
2243 }
2244
2245 static struct rbd_spec *rbd_spec_alloc(void)
2246 {
2247         struct rbd_spec *spec;
2248
2249         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2250         if (!spec)
2251                 return NULL;
2252         kref_init(&spec->kref);
2253
2254         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2255
2256         return spec;
2257 }
2258
2259 static void rbd_spec_free(struct kref *kref)
2260 {
2261         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2262
2263         kfree(spec->pool_name);
2264         kfree(spec->image_id);
2265         kfree(spec->image_name);
2266         kfree(spec->snap_name);
2267         kfree(spec);
2268 }
2269
2270 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2271                                 struct rbd_spec *spec)
2272 {
2273         struct rbd_device *rbd_dev;
2274
2275         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2276         if (!rbd_dev)
2277                 return NULL;
2278
2279         spin_lock_init(&rbd_dev->lock);
2280         atomic_set(&rbd_dev->exists, 0);
2281         INIT_LIST_HEAD(&rbd_dev->node);
2282         INIT_LIST_HEAD(&rbd_dev->snaps);
2283         init_rwsem(&rbd_dev->header_rwsem);
2284
2285         rbd_dev->spec = spec;
2286         rbd_dev->rbd_client = rbdc;
2287
2288         /* Initialize the layout used for all rbd requests */
2289
2290         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2291         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2292         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2293         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2294
2295         return rbd_dev;
2296 }
2297
2298 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2299 {
2300         rbd_spec_put(rbd_dev->parent_spec);
2301         kfree(rbd_dev->header_name);
2302         rbd_put_client(rbd_dev->rbd_client);
2303         rbd_spec_put(rbd_dev->spec);
2304         kfree(rbd_dev);
2305 }
2306
2307 static bool rbd_snap_registered(struct rbd_snap *snap)
2308 {
2309         bool ret = snap->dev.type == &rbd_snap_device_type;
2310         bool reg = device_is_registered(&snap->dev);
2311
2312         rbd_assert(!ret ^ reg);
2313
2314         return ret;
2315 }
2316
2317 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2318 {
2319         list_del(&snap->node);
2320         if (device_is_registered(&snap->dev))
2321                 device_unregister(&snap->dev);
2322 }
2323
2324 static int rbd_register_snap_dev(struct rbd_snap *snap,
2325                                   struct device *parent)
2326 {
2327         struct device *dev = &snap->dev;
2328         int ret;
2329
2330         dev->type = &rbd_snap_device_type;
2331         dev->parent = parent;
2332         dev->release = rbd_snap_dev_release;
2333         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2334         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2335
2336         ret = device_register(dev);
2337
2338         return ret;
2339 }
2340
2341 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2342                                                 const char *snap_name,
2343                                                 u64 snap_id, u64 snap_size,
2344                                                 u64 snap_features)
2345 {
2346         struct rbd_snap *snap;
2347         int ret;
2348
2349         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2350         if (!snap)
2351                 return ERR_PTR(-ENOMEM);
2352
2353         ret = -ENOMEM;
2354         snap->name = kstrdup(snap_name, GFP_KERNEL);
2355         if (!snap->name)
2356                 goto err;
2357
2358         snap->id = snap_id;
2359         snap->size = snap_size;
2360         snap->features = snap_features;
2361
2362         return snap;
2363
2364 err:
2365         kfree(snap->name);
2366         kfree(snap);
2367
2368         return ERR_PTR(ret);
2369 }
2370
2371 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2372                 u64 *snap_size, u64 *snap_features)
2373 {
2374         char *snap_name;
2375
2376         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2377
2378         *snap_size = rbd_dev->header.snap_sizes[which];
2379         *snap_features = 0;     /* No features for v1 */
2380
2381         /* Skip over names until we find the one we are looking for */
2382
2383         snap_name = rbd_dev->header.snap_names;
2384         while (which--)
2385                 snap_name += strlen(snap_name) + 1;
2386
2387         return snap_name;
2388 }
2389
2390 /*
2391  * Get the size and object order for an image snapshot, or if
2392  * snap_id is CEPH_NOSNAP, gets this information for the base
2393  * image.
2394  */
2395 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2396                                 u8 *order, u64 *snap_size)
2397 {
2398         __le64 snapid = cpu_to_le64(snap_id);
2399         int ret;
2400         struct {
2401                 u8 order;
2402                 __le64 size;
2403         } __attribute__ ((packed)) size_buf = { 0 };
2404
2405         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2406                                 "rbd", "get_size",
2407                                 (char *) &snapid, sizeof (snapid),
2408                                 (char *) &size_buf, sizeof (size_buf), NULL);
2409         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2410         if (ret < 0)
2411                 return ret;
2412
2413         *order = size_buf.order;
2414         *snap_size = le64_to_cpu(size_buf.size);
2415
2416         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2417                 (unsigned long long) snap_id, (unsigned int) *order,
2418                 (unsigned long long) *snap_size);
2419
2420         return 0;
2421 }
2422
2423 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2424 {
2425         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2426                                         &rbd_dev->header.obj_order,
2427                                         &rbd_dev->header.image_size);
2428 }
2429
2430 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2431 {
2432         void *reply_buf;
2433         int ret;
2434         void *p;
2435
2436         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2437         if (!reply_buf)
2438                 return -ENOMEM;
2439
2440         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2441                                 "rbd", "get_object_prefix",
2442                                 NULL, 0,
2443                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2444         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2445         if (ret < 0)
2446                 goto out;
2447         ret = 0;    /* rbd_req_sync_exec() can return positive */
2448
2449         p = reply_buf;
2450         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2451                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2452                                                 NULL, GFP_NOIO);
2453
2454         if (IS_ERR(rbd_dev->header.object_prefix)) {
2455                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2456                 rbd_dev->header.object_prefix = NULL;
2457         } else {
2458                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2459         }
2460
2461 out:
2462         kfree(reply_buf);
2463
2464         return ret;
2465 }
2466
2467 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2468                 u64 *snap_features)
2469 {
2470         __le64 snapid = cpu_to_le64(snap_id);
2471         struct {
2472                 __le64 features;
2473                 __le64 incompat;
2474         } features_buf = { 0 };
2475         u64 incompat;
2476         int ret;
2477
2478         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2479                                 "rbd", "get_features",
2480                                 (char *) &snapid, sizeof (snapid),
2481                                 (char *) &features_buf, sizeof (features_buf),
2482                                 NULL);
2483         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2484         if (ret < 0)
2485                 return ret;
2486
2487         incompat = le64_to_cpu(features_buf.incompat);
2488         if (incompat & ~RBD_FEATURES_ALL)
2489                 return -ENXIO;
2490
2491         *snap_features = le64_to_cpu(features_buf.features);
2492
2493         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2494                 (unsigned long long) snap_id,
2495                 (unsigned long long) *snap_features,
2496                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2497
2498         return 0;
2499 }
2500
2501 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2502 {
2503         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2504                                                 &rbd_dev->header.features);
2505 }
2506
2507 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2508 {
2509         struct rbd_spec *parent_spec;
2510         size_t size;
2511         void *reply_buf = NULL;
2512         __le64 snapid;
2513         void *p;
2514         void *end;
2515         char *image_id;
2516         u64 overlap;
2517         int ret;
2518
2519         parent_spec = rbd_spec_alloc();
2520         if (!parent_spec)
2521                 return -ENOMEM;
2522
2523         size = sizeof (__le64) +                                /* pool_id */
2524                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2525                 sizeof (__le64) +                               /* snap_id */
2526                 sizeof (__le64);                                /* overlap */
2527         reply_buf = kmalloc(size, GFP_KERNEL);
2528         if (!reply_buf) {
2529                 ret = -ENOMEM;
2530                 goto out_err;
2531         }
2532
2533         snapid = cpu_to_le64(CEPH_NOSNAP);
2534         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2535                                 "rbd", "get_parent",
2536                                 (char *) &snapid, sizeof (snapid),
2537                                 (char *) reply_buf, size, NULL);
2538         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2539         if (ret < 0)
2540                 goto out_err;
2541
2542         ret = -ERANGE;
2543         p = reply_buf;
2544         end = (char *) reply_buf + size;
2545         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2546         if (parent_spec->pool_id == CEPH_NOPOOL)
2547                 goto out;       /* No parent?  No problem. */
2548
2549         /* The ceph file layout needs to fit pool id in 32 bits */
2550
2551         ret = -EIO;
2552         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2553                 goto out;
2554
2555         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2556         if (IS_ERR(image_id)) {
2557                 ret = PTR_ERR(image_id);
2558                 goto out_err;
2559         }
2560         parent_spec->image_id = image_id;
2561         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2562         ceph_decode_64_safe(&p, end, overlap, out_err);
2563
2564         rbd_dev->parent_overlap = overlap;
2565         rbd_dev->parent_spec = parent_spec;
2566         parent_spec = NULL;     /* rbd_dev now owns this */
2567 out:
2568         ret = 0;
2569 out_err:
2570         kfree(reply_buf);
2571         rbd_spec_put(parent_spec);
2572
2573         return ret;
2574 }
2575
2576 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2577 {
2578         size_t image_id_size;
2579         char *image_id;
2580         void *p;
2581         void *end;
2582         size_t size;
2583         void *reply_buf = NULL;
2584         size_t len = 0;
2585         char *image_name = NULL;
2586         int ret;
2587
2588         rbd_assert(!rbd_dev->spec->image_name);
2589
2590         len = strlen(rbd_dev->spec->image_id);
2591         image_id_size = sizeof (__le32) + len;
2592         image_id = kmalloc(image_id_size, GFP_KERNEL);
2593         if (!image_id)
2594                 return NULL;
2595
2596         p = image_id;
2597         end = (char *) image_id + image_id_size;
2598         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2599
2600         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2601         reply_buf = kmalloc(size, GFP_KERNEL);
2602         if (!reply_buf)
2603                 goto out;
2604
2605         ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2606                                 "rbd", "dir_get_name",
2607                                 image_id, image_id_size,
2608                                 (char *) reply_buf, size, NULL);
2609         if (ret < 0)
2610                 goto out;
2611         p = reply_buf;
2612         end = (char *) reply_buf + size;
2613         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2614         if (IS_ERR(image_name))
2615                 image_name = NULL;
2616         else
2617                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2618 out:
2619         kfree(reply_buf);
2620         kfree(image_id);
2621
2622         return image_name;
2623 }
2624
2625 /*
2626  * When a parent image gets probed, we only have the pool, image,
2627  * and snapshot ids but not the names of any of them.  This call
2628  * is made later to fill in those names.  It has to be done after
2629  * rbd_dev_snaps_update() has completed because some of the
2630  * information (in particular, snapshot name) is not available
2631  * until then.
2632  */
2633 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2634 {
2635         struct ceph_osd_client *osdc;
2636         const char *name;
2637         void *reply_buf = NULL;
2638         int ret;
2639
2640         if (rbd_dev->spec->pool_name)
2641                 return 0;       /* Already have the names */
2642
2643         /* Look up the pool name */
2644
2645         osdc = &rbd_dev->rbd_client->client->osdc;
2646         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2647         if (!name) {
2648                 rbd_warn(rbd_dev, "there is no pool with id %llu",
2649                         rbd_dev->spec->pool_id);        /* Really a BUG() */
2650                 return -EIO;
2651         }
2652
2653         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2654         if (!rbd_dev->spec->pool_name)
2655                 return -ENOMEM;
2656
2657         /* Fetch the image name; tolerate failure here */
2658
2659         name = rbd_dev_image_name(rbd_dev);
2660         if (name)
2661                 rbd_dev->spec->image_name = (char *) name;
2662         else
2663                 rbd_warn(rbd_dev, "unable to get image name");
2664
2665         /* Look up the snapshot name. */
2666
2667         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2668         if (!name) {
2669                 rbd_warn(rbd_dev, "no snapshot with id %llu",
2670                         rbd_dev->spec->snap_id);        /* Really a BUG() */
2671                 ret = -EIO;
2672                 goto out_err;
2673         }
2674         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2675         if(!rbd_dev->spec->snap_name)
2676                 goto out_err;
2677
2678         return 0;
2679 out_err:
2680         kfree(reply_buf);
2681         kfree(rbd_dev->spec->pool_name);
2682         rbd_dev->spec->pool_name = NULL;
2683
2684         return ret;
2685 }
2686
2687 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2688 {
2689         size_t size;
2690         int ret;
2691         void *reply_buf;
2692         void *p;
2693         void *end;
2694         u64 seq;
2695         u32 snap_count;
2696         struct ceph_snap_context *snapc;
2697         u32 i;
2698
2699         /*
2700          * We'll need room for the seq value (maximum snapshot id),
2701          * snapshot count, and array of that many snapshot ids.
2702          * For now we have a fixed upper limit on the number we're
2703          * prepared to receive.
2704          */
2705         size = sizeof (__le64) + sizeof (__le32) +
2706                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2707         reply_buf = kzalloc(size, GFP_KERNEL);
2708         if (!reply_buf)
2709                 return -ENOMEM;
2710
2711         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2712                                 "rbd", "get_snapcontext",
2713                                 NULL, 0,
2714                                 reply_buf, size, ver);
2715         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2716         if (ret < 0)
2717                 goto out;
2718
2719         ret = -ERANGE;
2720         p = reply_buf;
2721         end = (char *) reply_buf + size;
2722         ceph_decode_64_safe(&p, end, seq, out);
2723         ceph_decode_32_safe(&p, end, snap_count, out);
2724
2725         /*
2726          * Make sure the reported number of snapshot ids wouldn't go
2727          * beyond the end of our buffer.  But before checking that,
2728          * make sure the computed size of the snapshot context we
2729          * allocate is representable in a size_t.
2730          */
2731         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2732                                  / sizeof (u64)) {
2733                 ret = -EINVAL;
2734                 goto out;
2735         }
2736         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2737                 goto out;
2738
2739         size = sizeof (struct ceph_snap_context) +
2740                                 snap_count * sizeof (snapc->snaps[0]);
2741         snapc = kmalloc(size, GFP_KERNEL);
2742         if (!snapc) {
2743                 ret = -ENOMEM;
2744                 goto out;
2745         }
2746
2747         atomic_set(&snapc->nref, 1);
2748         snapc->seq = seq;
2749         snapc->num_snaps = snap_count;
2750         for (i = 0; i < snap_count; i++)
2751                 snapc->snaps[i] = ceph_decode_64(&p);
2752
2753         rbd_dev->header.snapc = snapc;
2754
2755         dout("  snap context seq = %llu, snap_count = %u\n",
2756                 (unsigned long long) seq, (unsigned int) snap_count);
2757
2758 out:
2759         kfree(reply_buf);
2760
2761         return 0;
2762 }
2763
2764 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2765 {
2766         size_t size;
2767         void *reply_buf;
2768         __le64 snap_id;
2769         int ret;
2770         void *p;
2771         void *end;
2772         char *snap_name;
2773
2774         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2775         reply_buf = kmalloc(size, GFP_KERNEL);
2776         if (!reply_buf)
2777                 return ERR_PTR(-ENOMEM);
2778
2779         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2780         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2781                                 "rbd", "get_snapshot_name",
2782                                 (char *) &snap_id, sizeof (snap_id),
2783                                 reply_buf, size, NULL);
2784         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2785         if (ret < 0)
2786                 goto out;
2787
2788         p = reply_buf;
2789         end = (char *) reply_buf + size;
2790         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2791         if (IS_ERR(snap_name)) {
2792                 ret = PTR_ERR(snap_name);
2793                 goto out;
2794         } else {
2795                 dout("  snap_id 0x%016llx snap_name = %s\n",
2796                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2797         }
2798         kfree(reply_buf);
2799
2800         return snap_name;
2801 out:
2802         kfree(reply_buf);
2803
2804         return ERR_PTR(ret);
2805 }
2806
2807 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2808                 u64 *snap_size, u64 *snap_features)
2809 {
2810         u64 snap_id;
2811         u8 order;
2812         int ret;
2813
2814         snap_id = rbd_dev->header.snapc->snaps[which];
2815         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2816         if (ret)
2817                 return ERR_PTR(ret);
2818         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2819         if (ret)
2820                 return ERR_PTR(ret);
2821
2822         return rbd_dev_v2_snap_name(rbd_dev, which);
2823 }
2824
2825 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2826                 u64 *snap_size, u64 *snap_features)
2827 {
2828         if (rbd_dev->image_format == 1)
2829                 return rbd_dev_v1_snap_info(rbd_dev, which,
2830                                         snap_size, snap_features);
2831         if (rbd_dev->image_format == 2)
2832                 return rbd_dev_v2_snap_info(rbd_dev, which,
2833                                         snap_size, snap_features);
2834         return ERR_PTR(-EINVAL);
2835 }
2836
2837 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2838 {
2839         int ret;
2840         __u8 obj_order;
2841
2842         down_write(&rbd_dev->header_rwsem);
2843
2844         /* Grab old order first, to see if it changes */
2845
2846         obj_order = rbd_dev->header.obj_order,
2847         ret = rbd_dev_v2_image_size(rbd_dev);
2848         if (ret)
2849                 goto out;
2850         if (rbd_dev->header.obj_order != obj_order) {
2851                 ret = -EIO;
2852                 goto out;
2853         }
2854         rbd_update_mapping_size(rbd_dev);
2855
2856         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2857         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2858         if (ret)
2859                 goto out;
2860         ret = rbd_dev_snaps_update(rbd_dev);
2861         dout("rbd_dev_snaps_update returned %d\n", ret);
2862         if (ret)
2863                 goto out;
2864         ret = rbd_dev_snaps_register(rbd_dev);
2865         dout("rbd_dev_snaps_register returned %d\n", ret);
2866 out:
2867         up_write(&rbd_dev->header_rwsem);
2868
2869         return ret;
2870 }
2871
2872 /*
2873  * Scan the rbd device's current snapshot list and compare it to the
2874  * newly-received snapshot context.  Remove any existing snapshots
2875  * not present in the new snapshot context.  Add a new snapshot for
2876  * any snaphots in the snapshot context not in the current list.
2877  * And verify there are no changes to snapshots we already know
2878  * about.
2879  *
2880  * Assumes the snapshots in the snapshot context are sorted by
2881  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2882  * are also maintained in that order.)
2883  */
2884 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2885 {
2886         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2887         const u32 snap_count = snapc->num_snaps;
2888         struct list_head *head = &rbd_dev->snaps;
2889         struct list_head *links = head->next;
2890         u32 index = 0;
2891
2892         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2893         while (index < snap_count || links != head) {
2894                 u64 snap_id;
2895                 struct rbd_snap *snap;
2896                 char *snap_name;
2897                 u64 snap_size = 0;
2898                 u64 snap_features = 0;
2899
2900                 snap_id = index < snap_count ? snapc->snaps[index]
2901                                              : CEPH_NOSNAP;
2902                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2903                                      : NULL;
2904                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2905
2906                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2907                         struct list_head *next = links->next;
2908
2909                         /* Existing snapshot not in the new snap context */
2910
2911                         if (rbd_dev->spec->snap_id == snap->id)
2912                                 atomic_set(&rbd_dev->exists, 0);
2913                         rbd_remove_snap_dev(snap);
2914                         dout("%ssnap id %llu has been removed\n",
2915                                 rbd_dev->spec->snap_id == snap->id ?
2916                                                         "mapped " : "",
2917                                 (unsigned long long) snap->id);
2918
2919                         /* Done with this list entry; advance */
2920
2921                         links = next;
2922                         continue;
2923                 }
2924
2925                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2926                                         &snap_size, &snap_features);
2927                 if (IS_ERR(snap_name))
2928                         return PTR_ERR(snap_name);
2929
2930                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2931                         (unsigned long long) snap_id);
2932                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2933                         struct rbd_snap *new_snap;
2934
2935                         /* We haven't seen this snapshot before */
2936
2937                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2938                                         snap_id, snap_size, snap_features);
2939                         if (IS_ERR(new_snap)) {
2940                                 int err = PTR_ERR(new_snap);
2941
2942                                 dout("  failed to add dev, error %d\n", err);
2943
2944                                 return err;
2945                         }
2946
2947                         /* New goes before existing, or at end of list */
2948
2949                         dout("  added dev%s\n", snap ? "" : " at end\n");
2950                         if (snap)
2951                                 list_add_tail(&new_snap->node, &snap->node);
2952                         else
2953                                 list_add_tail(&new_snap->node, head);
2954                 } else {
2955                         /* Already have this one */
2956
2957                         dout("  already present\n");
2958
2959                         rbd_assert(snap->size == snap_size);
2960                         rbd_assert(!strcmp(snap->name, snap_name));
2961                         rbd_assert(snap->features == snap_features);
2962
2963                         /* Done with this list entry; advance */
2964
2965                         links = links->next;
2966                 }
2967
2968                 /* Advance to the next entry in the snapshot context */
2969
2970                 index++;
2971         }
2972         dout("%s: done\n", __func__);
2973
2974         return 0;
2975 }
2976
2977 /*
2978  * Scan the list of snapshots and register the devices for any that
2979  * have not already been registered.
2980  */
2981 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2982 {
2983         struct rbd_snap *snap;
2984         int ret = 0;
2985
2986         dout("%s called\n", __func__);
2987         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2988                 return -EIO;
2989
2990         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2991                 if (!rbd_snap_registered(snap)) {
2992                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2993                         if (ret < 0)
2994                                 break;
2995                 }
2996         }
2997         dout("%s: returning %d\n", __func__, ret);
2998
2999         return ret;
3000 }
3001
3002 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3003 {
3004         struct device *dev;
3005         int ret;
3006
3007         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3008
3009         dev = &rbd_dev->dev;
3010         dev->bus = &rbd_bus_type;
3011         dev->type = &rbd_device_type;
3012         dev->parent = &rbd_root_dev;
3013         dev->release = rbd_dev_release;
3014         dev_set_name(dev, "%d", rbd_dev->dev_id);
3015         ret = device_register(dev);
3016
3017         mutex_unlock(&ctl_mutex);
3018
3019         return ret;
3020 }
3021
3022 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3023 {
3024         device_unregister(&rbd_dev->dev);
3025 }
3026
3027 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3028
3029 /*
3030  * Get a unique rbd identifier for the given new rbd_dev, and add
3031  * the rbd_dev to the global list.  The minimum rbd id is 1.
3032  */
3033 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3034 {
3035         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3036
3037         spin_lock(&rbd_dev_list_lock);
3038         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3039         spin_unlock(&rbd_dev_list_lock);
3040         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3041                 (unsigned long long) rbd_dev->dev_id);
3042 }
3043
3044 /*
3045  * Remove an rbd_dev from the global list, and record that its
3046  * identifier is no longer in use.
3047  */
3048 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3049 {
3050         struct list_head *tmp;
3051         int rbd_id = rbd_dev->dev_id;
3052         int max_id;
3053
3054         rbd_assert(rbd_id > 0);
3055
3056         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3057                 (unsigned long long) rbd_dev->dev_id);
3058         spin_lock(&rbd_dev_list_lock);
3059         list_del_init(&rbd_dev->node);
3060
3061         /*
3062          * If the id being "put" is not the current maximum, there
3063          * is nothing special we need to do.
3064          */
3065         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3066                 spin_unlock(&rbd_dev_list_lock);
3067                 return;
3068         }
3069
3070         /*
3071          * We need to update the current maximum id.  Search the
3072          * list to find out what it is.  We're more likely to find
3073          * the maximum at the end, so search the list backward.
3074          */
3075         max_id = 0;
3076         list_for_each_prev(tmp, &rbd_dev_list) {
3077                 struct rbd_device *rbd_dev;
3078
3079                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3080                 if (rbd_dev->dev_id > max_id)
3081                         max_id = rbd_dev->dev_id;
3082         }
3083         spin_unlock(&rbd_dev_list_lock);
3084
3085         /*
3086          * The max id could have been updated by rbd_dev_id_get(), in
3087          * which case it now accurately reflects the new maximum.
3088          * Be careful not to overwrite the maximum value in that
3089          * case.
3090          */
3091         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3092         dout("  max dev id has been reset\n");
3093 }
3094
3095 /*
3096  * Skips over white space at *buf, and updates *buf to point to the
3097  * first found non-space character (if any). Returns the length of
3098  * the token (string of non-white space characters) found.  Note
3099  * that *buf must be terminated with '\0'.
3100  */
3101 static inline size_t next_token(const char **buf)
3102 {
3103         /*
3104         * These are the characters that produce nonzero for
3105         * isspace() in the "C" and "POSIX" locales.
3106         */
3107         const char *spaces = " \f\n\r\t\v";
3108
3109         *buf += strspn(*buf, spaces);   /* Find start of token */
3110
3111         return strcspn(*buf, spaces);   /* Return token length */
3112 }
3113
3114 /*
3115  * Finds the next token in *buf, and if the provided token buffer is
3116  * big enough, copies the found token into it.  The result, if
3117  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3118  * must be terminated with '\0' on entry.
3119  *
3120  * Returns the length of the token found (not including the '\0').
3121  * Return value will be 0 if no token is found, and it will be >=
3122  * token_size if the token would not fit.
3123  *
3124  * The *buf pointer will be updated to point beyond the end of the
3125  * found token.  Note that this occurs even if the token buffer is
3126  * too small to hold it.
3127  */
3128 static inline size_t copy_token(const char **buf,
3129                                 char *token,
3130                                 size_t token_size)
3131 {
3132         size_t len;
3133
3134         len = next_token(buf);
3135         if (len < token_size) {
3136                 memcpy(token, *buf, len);
3137                 *(token + len) = '\0';
3138         }
3139         *buf += len;
3140
3141         return len;
3142 }
3143
3144 /*
3145  * Finds the next token in *buf, dynamically allocates a buffer big
3146  * enough to hold a copy of it, and copies the token into the new
3147  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3148  * that a duplicate buffer is created even for a zero-length token.
3149  *
3150  * Returns a pointer to the newly-allocated duplicate, or a null
3151  * pointer if memory for the duplicate was not available.  If
3152  * the lenp argument is a non-null pointer, the length of the token
3153  * (not including the '\0') is returned in *lenp.
3154  *
3155  * If successful, the *buf pointer will be updated to point beyond
3156  * the end of the found token.
3157  *
3158  * Note: uses GFP_KERNEL for allocation.
3159  */
3160 static inline char *dup_token(const char **buf, size_t *lenp)
3161 {
3162         char *dup;
3163         size_t len;
3164
3165         len = next_token(buf);
3166         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3167         if (!dup)
3168                 return NULL;
3169         *(dup + len) = '\0';
3170         *buf += len;
3171
3172         if (lenp)
3173                 *lenp = len;
3174
3175         return dup;
3176 }
3177
3178 /*
3179  * Parse the options provided for an "rbd add" (i.e., rbd image
3180  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3181  * and the data written is passed here via a NUL-terminated buffer.
3182  * Returns 0 if successful or an error code otherwise.
3183  *
3184  * The information extracted from these options is recorded in
3185  * the other parameters which return dynamically-allocated
3186  * structures:
3187  *  ceph_opts
3188  *      The address of a pointer that will refer to a ceph options
3189  *      structure.  Caller must release the returned pointer using
3190  *      ceph_destroy_options() when it is no longer needed.
3191  *  rbd_opts
3192  *      Address of an rbd options pointer.  Fully initialized by
3193  *      this function; caller must release with kfree().
3194  *  spec
3195  *      Address of an rbd image specification pointer.  Fully
3196  *      initialized by this function based on parsed options.
3197  *      Caller must release with rbd_spec_put().
3198  *
3199  * The options passed take this form:
3200  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3201  * where:
3202  *  <mon_addrs>
3203  *      A comma-separated list of one or more monitor addresses.
3204  *      A monitor address is an ip address, optionally followed
3205  *      by a port number (separated by a colon).
3206  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3207  *  <options>
3208  *      A comma-separated list of ceph and/or rbd options.
3209  *  <pool_name>
3210  *      The name of the rados pool containing the rbd image.
3211  *  <image_name>
3212  *      The name of the image in that pool to map.
3213  *  <snap_id>
3214  *      An optional snapshot id.  If provided, the mapping will
3215  *      present data from the image at the time that snapshot was
3216  *      created.  The image head is used if no snapshot id is
3217  *      provided.  Snapshot mappings are always read-only.
3218  */
3219 static int rbd_add_parse_args(const char *buf,
3220                                 struct ceph_options **ceph_opts,
3221                                 struct rbd_options **opts,
3222                                 struct rbd_spec **rbd_spec)
3223 {
3224         size_t len;
3225         char *options;
3226         const char *mon_addrs;
3227         size_t mon_addrs_size;
3228         struct rbd_spec *spec = NULL;
3229         struct rbd_options *rbd_opts = NULL;
3230         struct ceph_options *copts;
3231         int ret;
3232
3233         /* The first four tokens are required */
3234
3235         len = next_token(&buf);
3236         if (!len) {
3237                 rbd_warn(NULL, "no monitor address(es) provided");
3238                 return -EINVAL;
3239         }
3240         mon_addrs = buf;
3241         mon_addrs_size = len + 1;
3242         buf += len;
3243
3244         ret = -EINVAL;
3245         options = dup_token(&buf, NULL);
3246         if (!options)
3247                 return -ENOMEM;
3248         if (!*options) {
3249                 rbd_warn(NULL, "no options provided");
3250                 goto out_err;
3251         }
3252
3253         spec = rbd_spec_alloc();
3254         if (!spec)
3255                 goto out_mem;
3256
3257         spec->pool_name = dup_token(&buf, NULL);
3258         if (!spec->pool_name)
3259                 goto out_mem;
3260         if (!*spec->pool_name) {
3261                 rbd_warn(NULL, "no pool name provided");
3262                 goto out_err;
3263         }
3264
3265         spec->image_name = dup_token(&buf, NULL);
3266         if (!spec->image_name)
3267                 goto out_mem;
3268         if (!*spec->image_name) {
3269                 rbd_warn(NULL, "no image name provided");
3270                 goto out_err;
3271         }
3272
3273         /*
3274          * Snapshot name is optional; default is to use "-"
3275          * (indicating the head/no snapshot).
3276          */
3277         len = next_token(&buf);
3278         if (!len) {
3279                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3280                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3281         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3282                 ret = -ENAMETOOLONG;
3283                 goto out_err;
3284         }
3285         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3286         if (!spec->snap_name)
3287                 goto out_mem;
3288         *(spec->snap_name + len) = '\0';
3289
3290         /* Initialize all rbd options to the defaults */
3291
3292         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3293         if (!rbd_opts)
3294                 goto out_mem;
3295
3296         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3297
3298         copts = ceph_parse_options(options, mon_addrs,
3299                                         mon_addrs + mon_addrs_size - 1,
3300                                         parse_rbd_opts_token, rbd_opts);
3301         if (IS_ERR(copts)) {
3302                 ret = PTR_ERR(copts);
3303                 goto out_err;
3304         }
3305         kfree(options);
3306
3307         *ceph_opts = copts;
3308         *opts = rbd_opts;
3309         *rbd_spec = spec;
3310
3311         return 0;
3312 out_mem:
3313         ret = -ENOMEM;
3314 out_err:
3315         kfree(rbd_opts);
3316         rbd_spec_put(spec);
3317         kfree(options);
3318
3319         return ret;
3320 }
3321
3322 /*
3323  * An rbd format 2 image has a unique identifier, distinct from the
3324  * name given to it by the user.  Internally, that identifier is
3325  * what's used to specify the names of objects related to the image.
3326  *
3327  * A special "rbd id" object is used to map an rbd image name to its
3328  * id.  If that object doesn't exist, then there is no v2 rbd image
3329  * with the supplied name.
3330  *
3331  * This function will record the given rbd_dev's image_id field if
3332  * it can be determined, and in that case will return 0.  If any
3333  * errors occur a negative errno will be returned and the rbd_dev's
3334  * image_id field will be unchanged (and should be NULL).
3335  */
3336 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3337 {
3338         int ret;
3339         size_t size;
3340         char *object_name;
3341         void *response;
3342         void *p;
3343
3344         /*
3345          * When probing a parent image, the image id is already
3346          * known (and the image name likely is not).  There's no
3347          * need to fetch the image id again in this case.
3348          */
3349         if (rbd_dev->spec->image_id)
3350                 return 0;
3351
3352         /*
3353          * First, see if the format 2 image id file exists, and if
3354          * so, get the image's persistent id from it.
3355          */
3356         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3357         object_name = kmalloc(size, GFP_NOIO);
3358         if (!object_name)
3359                 return -ENOMEM;
3360         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3361         dout("rbd id object name is %s\n", object_name);
3362
3363         /* Response will be an encoded string, which includes a length */
3364
3365         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3366         response = kzalloc(size, GFP_NOIO);
3367         if (!response) {
3368                 ret = -ENOMEM;
3369                 goto out;
3370         }
3371
3372         ret = rbd_req_sync_exec(rbd_dev, object_name,
3373                                 "rbd", "get_id",
3374                                 NULL, 0,
3375                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3376         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3377         if (ret < 0)
3378                 goto out;
3379         ret = 0;    /* rbd_req_sync_exec() can return positive */
3380
3381         p = response;
3382         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3383                                                 p + RBD_IMAGE_ID_LEN_MAX,
3384                                                 NULL, GFP_NOIO);
3385         if (IS_ERR(rbd_dev->spec->image_id)) {
3386                 ret = PTR_ERR(rbd_dev->spec->image_id);
3387                 rbd_dev->spec->image_id = NULL;
3388         } else {
3389                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3390         }
3391 out:
3392         kfree(response);
3393         kfree(object_name);
3394
3395         return ret;
3396 }
3397
3398 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3399 {
3400         int ret;
3401         size_t size;
3402
3403         /* Version 1 images have no id; empty string is used */
3404
3405         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3406         if (!rbd_dev->spec->image_id)
3407                 return -ENOMEM;
3408
3409         /* Record the header object name for this rbd image. */
3410
3411         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3412         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3413         if (!rbd_dev->header_name) {
3414                 ret = -ENOMEM;
3415                 goto out_err;
3416         }
3417         sprintf(rbd_dev->header_name, "%s%s",
3418                 rbd_dev->spec->image_name, RBD_SUFFIX);
3419
3420         /* Populate rbd image metadata */
3421
3422         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3423         if (ret < 0)
3424                 goto out_err;
3425
3426         /* Version 1 images have no parent (no layering) */
3427
3428         rbd_dev->parent_spec = NULL;
3429         rbd_dev->parent_overlap = 0;
3430
3431         rbd_dev->image_format = 1;
3432
3433         dout("discovered version 1 image, header name is %s\n",
3434                 rbd_dev->header_name);
3435
3436         return 0;
3437
3438 out_err:
3439         kfree(rbd_dev->header_name);
3440         rbd_dev->header_name = NULL;
3441         kfree(rbd_dev->spec->image_id);
3442         rbd_dev->spec->image_id = NULL;
3443
3444         return ret;
3445 }
3446
3447 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3448 {
3449         size_t size;
3450         int ret;
3451         u64 ver = 0;
3452
3453         /*
3454          * Image id was filled in by the caller.  Record the header
3455          * object name for this rbd image.
3456          */
3457         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3458         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3459         if (!rbd_dev->header_name)
3460                 return -ENOMEM;
3461         sprintf(rbd_dev->header_name, "%s%s",
3462                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3463
3464         /* Get the size and object order for the image */
3465
3466         ret = rbd_dev_v2_image_size(rbd_dev);
3467         if (ret < 0)
3468                 goto out_err;
3469
3470         /* Get the object prefix (a.k.a. block_name) for the image */
3471
3472         ret = rbd_dev_v2_object_prefix(rbd_dev);
3473         if (ret < 0)
3474                 goto out_err;
3475
3476         /* Get the and check features for the image */
3477
3478         ret = rbd_dev_v2_features(rbd_dev);
3479         if (ret < 0)
3480                 goto out_err;
3481
3482         /* If the image supports layering, get the parent info */
3483
3484         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3485                 ret = rbd_dev_v2_parent_info(rbd_dev);
3486                 if (ret < 0)
3487                         goto out_err;
3488         }
3489
3490         /* crypto and compression type aren't (yet) supported for v2 images */
3491
3492         rbd_dev->header.crypt_type = 0;
3493         rbd_dev->header.comp_type = 0;
3494
3495         /* Get the snapshot context, plus the header version */
3496
3497         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3498         if (ret)
3499                 goto out_err;
3500         rbd_dev->header.obj_version = ver;
3501
3502         rbd_dev->image_format = 2;
3503
3504         dout("discovered version 2 image, header name is %s\n",
3505                 rbd_dev->header_name);
3506
3507         return 0;
3508 out_err:
3509         rbd_dev->parent_overlap = 0;
3510         rbd_spec_put(rbd_dev->parent_spec);
3511         rbd_dev->parent_spec = NULL;
3512         kfree(rbd_dev->header_name);
3513         rbd_dev->header_name = NULL;
3514         kfree(rbd_dev->header.object_prefix);
3515         rbd_dev->header.object_prefix = NULL;
3516
3517         return ret;
3518 }
3519
3520 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3521 {
3522         int ret;
3523
3524         /* no need to lock here, as rbd_dev is not registered yet */
3525         ret = rbd_dev_snaps_update(rbd_dev);
3526         if (ret)
3527                 return ret;
3528
3529         ret = rbd_dev_probe_update_spec(rbd_dev);
3530         if (ret)
3531                 goto err_out_snaps;
3532
3533         ret = rbd_dev_set_mapping(rbd_dev);
3534         if (ret)
3535                 goto err_out_snaps;
3536
3537         /* generate unique id: find highest unique id, add one */
3538         rbd_dev_id_get(rbd_dev);
3539
3540         /* Fill in the device name, now that we have its id. */
3541         BUILD_BUG_ON(DEV_NAME_LEN
3542                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3543         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3544
3545         /* Get our block major device number. */
3546
3547         ret = register_blkdev(0, rbd_dev->name);
3548         if (ret < 0)
3549                 goto err_out_id;
3550         rbd_dev->major = ret;
3551
3552         /* Set up the blkdev mapping. */
3553
3554         ret = rbd_init_disk(rbd_dev);
3555         if (ret)
3556                 goto err_out_blkdev;
3557
3558         ret = rbd_bus_add_dev(rbd_dev);
3559         if (ret)
3560                 goto err_out_disk;
3561
3562         /*
3563          * At this point cleanup in the event of an error is the job
3564          * of the sysfs code (initiated by rbd_bus_del_dev()).
3565          */
3566         down_write(&rbd_dev->header_rwsem);
3567         ret = rbd_dev_snaps_register(rbd_dev);
3568         up_write(&rbd_dev->header_rwsem);
3569         if (ret)
3570                 goto err_out_bus;
3571
3572         ret = rbd_req_sync_watch(rbd_dev, 1);
3573         if (ret)
3574                 goto err_out_bus;
3575
3576         /* Everything's ready.  Announce the disk to the world. */
3577
3578         add_disk(rbd_dev->disk);
3579
3580         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3581                 (unsigned long long) rbd_dev->mapping.size);
3582
3583         return ret;
3584 err_out_bus:
3585         /* this will also clean up rest of rbd_dev stuff */
3586
3587         rbd_bus_del_dev(rbd_dev);
3588
3589         return ret;
3590 err_out_disk:
3591         rbd_free_disk(rbd_dev);
3592 err_out_blkdev:
3593         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3594 err_out_id:
3595         rbd_dev_id_put(rbd_dev);
3596 err_out_snaps:
3597         rbd_remove_all_snaps(rbd_dev);
3598
3599         return ret;
3600 }
3601
3602 /*
3603  * Probe for the existence of the header object for the given rbd
3604  * device.  For format 2 images this includes determining the image
3605  * id.
3606  */
3607 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3608 {
3609         int ret;
3610
3611         /*
3612          * Get the id from the image id object.  If it's not a
3613          * format 2 image, we'll get ENOENT back, and we'll assume
3614          * it's a format 1 image.
3615          */
3616         ret = rbd_dev_image_id(rbd_dev);
3617         if (ret)
3618                 ret = rbd_dev_v1_probe(rbd_dev);
3619         else
3620                 ret = rbd_dev_v2_probe(rbd_dev);
3621         if (ret) {
3622                 dout("probe failed, returning %d\n", ret);
3623
3624                 return ret;
3625         }
3626
3627         ret = rbd_dev_probe_finish(rbd_dev);
3628         if (ret)
3629                 rbd_header_free(&rbd_dev->header);
3630
3631         return ret;
3632 }
3633
3634 static ssize_t rbd_add(struct bus_type *bus,
3635                        const char *buf,
3636                        size_t count)
3637 {
3638         struct rbd_device *rbd_dev = NULL;
3639         struct ceph_options *ceph_opts = NULL;
3640         struct rbd_options *rbd_opts = NULL;
3641         struct rbd_spec *spec = NULL;
3642         struct rbd_client *rbdc;
3643         struct ceph_osd_client *osdc;
3644         int rc = -ENOMEM;
3645
3646         if (!try_module_get(THIS_MODULE))
3647                 return -ENODEV;
3648
3649         /* parse add command */
3650         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3651         if (rc < 0)
3652                 goto err_out_module;
3653
3654         rbdc = rbd_get_client(ceph_opts);
3655         if (IS_ERR(rbdc)) {
3656                 rc = PTR_ERR(rbdc);
3657                 goto err_out_args;
3658         }
3659         ceph_opts = NULL;       /* rbd_dev client now owns this */
3660
3661         /* pick the pool */
3662         osdc = &rbdc->client->osdc;
3663         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3664         if (rc < 0)
3665                 goto err_out_client;
3666         spec->pool_id = (u64) rc;
3667
3668         /* The ceph file layout needs to fit pool id in 32 bits */
3669
3670         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
3671                 rc = -EIO;
3672                 goto err_out_client;
3673         }
3674
3675         rbd_dev = rbd_dev_create(rbdc, spec);
3676         if (!rbd_dev)
3677                 goto err_out_client;
3678         rbdc = NULL;            /* rbd_dev now owns this */
3679         spec = NULL;            /* rbd_dev now owns this */
3680
3681         rbd_dev->mapping.read_only = rbd_opts->read_only;
3682         kfree(rbd_opts);
3683         rbd_opts = NULL;        /* done with this */
3684
3685         rc = rbd_dev_probe(rbd_dev);
3686         if (rc < 0)
3687                 goto err_out_rbd_dev;
3688
3689         return count;
3690 err_out_rbd_dev:
3691         rbd_dev_destroy(rbd_dev);
3692 err_out_client:
3693         rbd_put_client(rbdc);
3694 err_out_args:
3695         if (ceph_opts)
3696                 ceph_destroy_options(ceph_opts);
3697         kfree(rbd_opts);
3698         rbd_spec_put(spec);
3699 err_out_module:
3700         module_put(THIS_MODULE);
3701
3702         dout("Error adding device %s\n", buf);
3703
3704         return (ssize_t) rc;
3705 }
3706
3707 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3708 {
3709         struct list_head *tmp;
3710         struct rbd_device *rbd_dev;
3711
3712         spin_lock(&rbd_dev_list_lock);
3713         list_for_each(tmp, &rbd_dev_list) {
3714                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3715                 if (rbd_dev->dev_id == dev_id) {
3716                         spin_unlock(&rbd_dev_list_lock);
3717                         return rbd_dev;
3718                 }
3719         }
3720         spin_unlock(&rbd_dev_list_lock);
3721         return NULL;
3722 }
3723
3724 static void rbd_dev_release(struct device *dev)
3725 {
3726         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3727
3728         if (rbd_dev->watch_request) {
3729                 struct ceph_client *client = rbd_dev->rbd_client->client;
3730
3731                 ceph_osdc_unregister_linger_request(&client->osdc,
3732                                                     rbd_dev->watch_request);
3733         }
3734         if (rbd_dev->watch_event)
3735                 rbd_req_sync_watch(rbd_dev, 0);
3736
3737         /* clean up and free blkdev */
3738         rbd_free_disk(rbd_dev);
3739         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3740
3741         /* release allocated disk header fields */
3742         rbd_header_free(&rbd_dev->header);
3743
3744         /* done with the id, and with the rbd_dev */
3745         rbd_dev_id_put(rbd_dev);
3746         rbd_assert(rbd_dev->rbd_client != NULL);
3747         rbd_dev_destroy(rbd_dev);
3748
3749         /* release module ref */
3750         module_put(THIS_MODULE);
3751 }
3752
3753 static ssize_t rbd_remove(struct bus_type *bus,
3754                           const char *buf,
3755                           size_t count)
3756 {
3757         struct rbd_device *rbd_dev = NULL;
3758         int target_id, rc;
3759         unsigned long ul;
3760         int ret = count;
3761
3762         rc = strict_strtoul(buf, 10, &ul);
3763         if (rc)
3764                 return rc;
3765
3766         /* convert to int; abort if we lost anything in the conversion */
3767         target_id = (int) ul;
3768         if (target_id != ul)
3769                 return -EINVAL;
3770
3771         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3772
3773         rbd_dev = __rbd_get_dev(target_id);
3774         if (!rbd_dev) {
3775                 ret = -ENOENT;
3776                 goto done;
3777         }
3778
3779         if (rbd_dev->open_count) {
3780                 ret = -EBUSY;
3781                 goto done;
3782         }
3783
3784         rbd_remove_all_snaps(rbd_dev);
3785         rbd_bus_del_dev(rbd_dev);
3786
3787 done:
3788         mutex_unlock(&ctl_mutex);
3789
3790         return ret;
3791 }
3792
3793 /*
3794  * create control files in sysfs
3795  * /sys/bus/rbd/...
3796  */
3797 static int rbd_sysfs_init(void)
3798 {
3799         int ret;
3800
3801         ret = device_register(&rbd_root_dev);
3802         if (ret < 0)
3803                 return ret;
3804
3805         ret = bus_register(&rbd_bus_type);
3806         if (ret < 0)
3807                 device_unregister(&rbd_root_dev);
3808
3809         return ret;
3810 }
3811
3812 static void rbd_sysfs_cleanup(void)
3813 {
3814         bus_unregister(&rbd_bus_type);
3815         device_unregister(&rbd_root_dev);
3816 }
3817
3818 int __init rbd_init(void)
3819 {
3820         int rc;
3821
3822         rc = rbd_sysfs_init();
3823         if (rc)
3824                 return rc;
3825         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3826         return 0;
3827 }
3828
3829 void __exit rbd_exit(void)
3830 {
3831         rbd_sysfs_cleanup();
3832 }
3833
3834 module_init(rbd_init);
3835 module_exit(rbd_exit);
3836
3837 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3838 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3839 MODULE_DESCRIPTION("rados block device");
3840
3841 /* following authorship retained from original osdblk.c */
3842 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3843
3844 MODULE_LICENSE("GPL");