]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
libceph: specify osd op by index in request
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING      1
77
78 /* Features supported by this (client software) implementation. */
79
80 #define RBD_FEATURES_ALL          (0)
81
82 /*
83  * An RBD device name will be "rbd#", where the "rbd" comes from
84  * RBD_DRV_NAME above, and # is a unique integer identifier.
85  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86  * enough to hold all possible device names.
87  */
88 #define DEV_NAME_LEN            32
89 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
90
91 /*
92  * block device image metadata (in-memory version)
93  */
94 struct rbd_image_header {
95         /* These four fields never change for a given rbd image */
96         char *object_prefix;
97         u64 features;
98         __u8 obj_order;
99         __u8 crypt_type;
100         __u8 comp_type;
101
102         /* The remaining fields need to be updated occasionally */
103         u64 image_size;
104         struct ceph_snap_context *snapc;
105         char *snap_names;
106         u64 *snap_sizes;
107
108         u64 obj_version;
109 };
110
111 /*
112  * An rbd image specification.
113  *
114  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
115  * identify an image.  Each rbd_dev structure includes a pointer to
116  * an rbd_spec structure that encapsulates this identity.
117  *
118  * Each of the id's in an rbd_spec has an associated name.  For a
119  * user-mapped image, the names are supplied and the id's associated
120  * with them are looked up.  For a layered image, a parent image is
121  * defined by the tuple, and the names are looked up.
122  *
123  * An rbd_dev structure contains a parent_spec pointer which is
124  * non-null if the image it represents is a child in a layered
125  * image.  This pointer will refer to the rbd_spec structure used
126  * by the parent rbd_dev for its own identity (i.e., the structure
127  * is shared between the parent and child).
128  *
129  * Since these structures are populated once, during the discovery
130  * phase of image construction, they are effectively immutable so
131  * we make no effort to synchronize access to them.
132  *
133  * Note that code herein does not assume the image name is known (it
134  * could be a null pointer).
135  */
136 struct rbd_spec {
137         u64             pool_id;
138         char            *pool_name;
139
140         char            *image_id;
141         char            *image_name;
142
143         u64             snap_id;
144         char            *snap_name;
145
146         struct kref     kref;
147 };
148
149 /*
150  * an instance of the client.  multiple devices may share an rbd client.
151  */
152 struct rbd_client {
153         struct ceph_client      *client;
154         struct kref             kref;
155         struct list_head        node;
156 };
157
158 struct rbd_img_request;
159 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
162
163 struct rbd_obj_request;
164 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
166 enum obj_request_type {
167         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168 };
169
170 struct rbd_obj_request {
171         const char              *object_name;
172         u64                     offset;         /* object start byte */
173         u64                     length;         /* bytes from offset */
174
175         struct rbd_img_request  *img_request;
176         struct list_head        links;          /* img_request->obj_requests */
177         u32                     which;          /* posn image request list */
178
179         enum obj_request_type   type;
180         union {
181                 struct bio      *bio_list;
182                 struct {
183                         struct page     **pages;
184                         u32             page_count;
185                 };
186         };
187
188         struct ceph_osd_request *osd_req;
189
190         u64                     xferred;        /* bytes transferred */
191         u64                     version;
192         int                     result;
193         atomic_t                done;
194
195         rbd_obj_callback_t      callback;
196         struct completion       completion;
197
198         struct kref             kref;
199 };
200
201 struct rbd_img_request {
202         struct request          *rq;
203         struct rbd_device       *rbd_dev;
204         u64                     offset; /* starting image byte offset */
205         u64                     length; /* byte count from offset */
206         bool                    write_request;  /* false for read */
207         union {
208                 struct ceph_snap_context *snapc;        /* for writes */
209                 u64             snap_id;                /* for reads */
210         };
211         spinlock_t              completion_lock;/* protects next_completion */
212         u32                     next_completion;
213         rbd_img_callback_t      callback;
214
215         u32                     obj_request_count;
216         struct list_head        obj_requests;   /* rbd_obj_request structs */
217
218         struct kref             kref;
219 };
220
221 #define for_each_obj_request(ireq, oreq) \
222         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
223 #define for_each_obj_request_from(ireq, oreq) \
224         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
225 #define for_each_obj_request_safe(ireq, oreq, n) \
226         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
227
228 struct rbd_snap {
229         struct  device          dev;
230         const char              *name;
231         u64                     size;
232         struct list_head        node;
233         u64                     id;
234         u64                     features;
235 };
236
237 struct rbd_mapping {
238         u64                     size;
239         u64                     features;
240         bool                    read_only;
241 };
242
243 /*
244  * a single device
245  */
246 struct rbd_device {
247         int                     dev_id;         /* blkdev unique id */
248
249         int                     major;          /* blkdev assigned major */
250         struct gendisk          *disk;          /* blkdev's gendisk and rq */
251
252         u32                     image_format;   /* Either 1 or 2 */
253         struct rbd_client       *rbd_client;
254
255         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
257         spinlock_t              lock;           /* queue, flags, open_count */
258
259         struct rbd_image_header header;
260         unsigned long           flags;          /* possibly lock protected */
261         struct rbd_spec         *spec;
262
263         char                    *header_name;
264
265         struct ceph_file_layout layout;
266
267         struct ceph_osd_event   *watch_event;
268         struct rbd_obj_request  *watch_request;
269
270         struct rbd_spec         *parent_spec;
271         u64                     parent_overlap;
272
273         /* protects updating the header */
274         struct rw_semaphore     header_rwsem;
275
276         struct rbd_mapping      mapping;
277
278         struct list_head        node;
279
280         /* list of snapshots */
281         struct list_head        snaps;
282
283         /* sysfs related */
284         struct device           dev;
285         unsigned long           open_count;     /* protected by lock */
286 };
287
288 /*
289  * Flag bits for rbd_dev->flags.  If atomicity is required,
290  * rbd_dev->lock is used to protect access.
291  *
292  * Currently, only the "removing" flag (which is coupled with the
293  * "open_count" field) requires atomic access.
294  */
295 enum rbd_dev_flags {
296         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
297         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
298 };
299
300 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
301
302 static LIST_HEAD(rbd_dev_list);    /* devices */
303 static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
305 static LIST_HEAD(rbd_client_list);              /* clients */
306 static DEFINE_SPINLOCK(rbd_client_list_lock);
307
308 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
311 static void rbd_dev_release(struct device *dev);
312 static void rbd_remove_snap_dev(struct rbd_snap *snap);
313
314 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315                        size_t count);
316 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317                           size_t count);
318
319 static struct bus_attribute rbd_bus_attrs[] = {
320         __ATTR(add, S_IWUSR, NULL, rbd_add),
321         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322         __ATTR_NULL
323 };
324
325 static struct bus_type rbd_bus_type = {
326         .name           = "rbd",
327         .bus_attrs      = rbd_bus_attrs,
328 };
329
330 static void rbd_root_dev_release(struct device *dev)
331 {
332 }
333
334 static struct device rbd_root_dev = {
335         .init_name =    "rbd",
336         .release =      rbd_root_dev_release,
337 };
338
339 static __printf(2, 3)
340 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341 {
342         struct va_format vaf;
343         va_list args;
344
345         va_start(args, fmt);
346         vaf.fmt = fmt;
347         vaf.va = &args;
348
349         if (!rbd_dev)
350                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351         else if (rbd_dev->disk)
352                 printk(KERN_WARNING "%s: %s: %pV\n",
353                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354         else if (rbd_dev->spec && rbd_dev->spec->image_name)
355                 printk(KERN_WARNING "%s: image %s: %pV\n",
356                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357         else if (rbd_dev->spec && rbd_dev->spec->image_id)
358                 printk(KERN_WARNING "%s: id %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360         else    /* punt */
361                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362                         RBD_DRV_NAME, rbd_dev, &vaf);
363         va_end(args);
364 }
365
366 #ifdef RBD_DEBUG
367 #define rbd_assert(expr)                                                \
368                 if (unlikely(!(expr))) {                                \
369                         printk(KERN_ERR "\nAssertion failure in %s() "  \
370                                                 "at line %d:\n\n"       \
371                                         "\trbd_assert(%s);\n\n",        \
372                                         __func__, __LINE__, #expr);     \
373                         BUG();                                          \
374                 }
375 #else /* !RBD_DEBUG */
376 #  define rbd_assert(expr)      ((void) 0)
377 #endif /* !RBD_DEBUG */
378
379 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
381
382 static int rbd_open(struct block_device *bdev, fmode_t mode)
383 {
384         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
385         bool removing = false;
386
387         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
388                 return -EROFS;
389
390         spin_lock_irq(&rbd_dev->lock);
391         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392                 removing = true;
393         else
394                 rbd_dev->open_count++;
395         spin_unlock_irq(&rbd_dev->lock);
396         if (removing)
397                 return -ENOENT;
398
399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
400         (void) get_device(&rbd_dev->dev);
401         set_device_ro(bdev, rbd_dev->mapping.read_only);
402         mutex_unlock(&ctl_mutex);
403
404         return 0;
405 }
406
407 static int rbd_release(struct gendisk *disk, fmode_t mode)
408 {
409         struct rbd_device *rbd_dev = disk->private_data;
410         unsigned long open_count_before;
411
412         spin_lock_irq(&rbd_dev->lock);
413         open_count_before = rbd_dev->open_count--;
414         spin_unlock_irq(&rbd_dev->lock);
415         rbd_assert(open_count_before > 0);
416
417         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
418         put_device(&rbd_dev->dev);
419         mutex_unlock(&ctl_mutex);
420
421         return 0;
422 }
423
424 static const struct block_device_operations rbd_bd_ops = {
425         .owner                  = THIS_MODULE,
426         .open                   = rbd_open,
427         .release                = rbd_release,
428 };
429
430 /*
431  * Initialize an rbd client instance.
432  * We own *ceph_opts.
433  */
434 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
435 {
436         struct rbd_client *rbdc;
437         int ret = -ENOMEM;
438
439         dout("%s:\n", __func__);
440         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441         if (!rbdc)
442                 goto out_opt;
443
444         kref_init(&rbdc->kref);
445         INIT_LIST_HEAD(&rbdc->node);
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
449         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
450         if (IS_ERR(rbdc->client))
451                 goto out_mutex;
452         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
453
454         ret = ceph_open_session(rbdc->client);
455         if (ret < 0)
456                 goto out_err;
457
458         spin_lock(&rbd_client_list_lock);
459         list_add_tail(&rbdc->node, &rbd_client_list);
460         spin_unlock(&rbd_client_list_lock);
461
462         mutex_unlock(&ctl_mutex);
463         dout("%s: rbdc %p\n", __func__, rbdc);
464
465         return rbdc;
466
467 out_err:
468         ceph_destroy_client(rbdc->client);
469 out_mutex:
470         mutex_unlock(&ctl_mutex);
471         kfree(rbdc);
472 out_opt:
473         if (ceph_opts)
474                 ceph_destroy_options(ceph_opts);
475         dout("%s: error %d\n", __func__, ret);
476
477         return ERR_PTR(ret);
478 }
479
480 /*
481  * Find a ceph client with specific addr and configuration.  If
482  * found, bump its reference count.
483  */
484 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
485 {
486         struct rbd_client *client_node;
487         bool found = false;
488
489         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
490                 return NULL;
491
492         spin_lock(&rbd_client_list_lock);
493         list_for_each_entry(client_node, &rbd_client_list, node) {
494                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495                         kref_get(&client_node->kref);
496                         found = true;
497                         break;
498                 }
499         }
500         spin_unlock(&rbd_client_list_lock);
501
502         return found ? client_node : NULL;
503 }
504
505 /*
506  * mount options
507  */
508 enum {
509         Opt_last_int,
510         /* int args above */
511         Opt_last_string,
512         /* string args above */
513         Opt_read_only,
514         Opt_read_write,
515         /* Boolean args above */
516         Opt_last_bool,
517 };
518
519 static match_table_t rbd_opts_tokens = {
520         /* int args above */
521         /* string args above */
522         {Opt_read_only, "read_only"},
523         {Opt_read_only, "ro"},          /* Alternate spelling */
524         {Opt_read_write, "read_write"},
525         {Opt_read_write, "rw"},         /* Alternate spelling */
526         /* Boolean args above */
527         {-1, NULL}
528 };
529
530 struct rbd_options {
531         bool    read_only;
532 };
533
534 #define RBD_READ_ONLY_DEFAULT   false
535
536 static int parse_rbd_opts_token(char *c, void *private)
537 {
538         struct rbd_options *rbd_opts = private;
539         substring_t argstr[MAX_OPT_ARGS];
540         int token, intval, ret;
541
542         token = match_token(c, rbd_opts_tokens, argstr);
543         if (token < 0)
544                 return -EINVAL;
545
546         if (token < Opt_last_int) {
547                 ret = match_int(&argstr[0], &intval);
548                 if (ret < 0) {
549                         pr_err("bad mount option arg (not int) "
550                                "at '%s'\n", c);
551                         return ret;
552                 }
553                 dout("got int token %d val %d\n", token, intval);
554         } else if (token > Opt_last_int && token < Opt_last_string) {
555                 dout("got string token %d val %s\n", token,
556                      argstr[0].from);
557         } else if (token > Opt_last_string && token < Opt_last_bool) {
558                 dout("got Boolean token %d\n", token);
559         } else {
560                 dout("got token %d\n", token);
561         }
562
563         switch (token) {
564         case Opt_read_only:
565                 rbd_opts->read_only = true;
566                 break;
567         case Opt_read_write:
568                 rbd_opts->read_only = false;
569                 break;
570         default:
571                 rbd_assert(false);
572                 break;
573         }
574         return 0;
575 }
576
577 /*
578  * Get a ceph client with specific addr and configuration, if one does
579  * not exist create it.
580  */
581 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
582 {
583         struct rbd_client *rbdc;
584
585         rbdc = rbd_client_find(ceph_opts);
586         if (rbdc)       /* using an existing client */
587                 ceph_destroy_options(ceph_opts);
588         else
589                 rbdc = rbd_client_create(ceph_opts);
590
591         return rbdc;
592 }
593
594 /*
595  * Destroy ceph client
596  *
597  * Caller must hold rbd_client_list_lock.
598  */
599 static void rbd_client_release(struct kref *kref)
600 {
601         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
603         dout("%s: rbdc %p\n", __func__, rbdc);
604         spin_lock(&rbd_client_list_lock);
605         list_del(&rbdc->node);
606         spin_unlock(&rbd_client_list_lock);
607
608         ceph_destroy_client(rbdc->client);
609         kfree(rbdc);
610 }
611
612 /*
613  * Drop reference to ceph client node. If it's not referenced anymore, release
614  * it.
615  */
616 static void rbd_put_client(struct rbd_client *rbdc)
617 {
618         if (rbdc)
619                 kref_put(&rbdc->kref, rbd_client_release);
620 }
621
622 static bool rbd_image_format_valid(u32 image_format)
623 {
624         return image_format == 1 || image_format == 2;
625 }
626
627 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628 {
629         size_t size;
630         u32 snap_count;
631
632         /* The header has to start with the magic rbd header text */
633         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634                 return false;
635
636         /* The bio layer requires at least sector-sized I/O */
637
638         if (ondisk->options.order < SECTOR_SHIFT)
639                 return false;
640
641         /* If we use u64 in a few spots we may be able to loosen this */
642
643         if (ondisk->options.order > 8 * sizeof (int) - 1)
644                 return false;
645
646         /*
647          * The size of a snapshot header has to fit in a size_t, and
648          * that limits the number of snapshots.
649          */
650         snap_count = le32_to_cpu(ondisk->snap_count);
651         size = SIZE_MAX - sizeof (struct ceph_snap_context);
652         if (snap_count > size / sizeof (__le64))
653                 return false;
654
655         /*
656          * Not only that, but the size of the entire the snapshot
657          * header must also be representable in a size_t.
658          */
659         size -= snap_count * sizeof (__le64);
660         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661                 return false;
662
663         return true;
664 }
665
666 /*
667  * Create a new header structure, translate header format from the on-disk
668  * header.
669  */
670 static int rbd_header_from_disk(struct rbd_image_header *header,
671                                  struct rbd_image_header_ondisk *ondisk)
672 {
673         u32 snap_count;
674         size_t len;
675         size_t size;
676         u32 i;
677
678         memset(header, 0, sizeof (*header));
679
680         snap_count = le32_to_cpu(ondisk->snap_count);
681
682         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
684         if (!header->object_prefix)
685                 return -ENOMEM;
686         memcpy(header->object_prefix, ondisk->object_prefix, len);
687         header->object_prefix[len] = '\0';
688
689         if (snap_count) {
690                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
692                 /* Save a copy of the snapshot names */
693
694                 if (snap_names_len > (u64) SIZE_MAX)
695                         return -EIO;
696                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
697                 if (!header->snap_names)
698                         goto out_err;
699                 /*
700                  * Note that rbd_dev_v1_header_read() guarantees
701                  * the ondisk buffer we're working with has
702                  * snap_names_len bytes beyond the end of the
703                  * snapshot id array, this memcpy() is safe.
704                  */
705                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706                         snap_names_len);
707
708                 /* Record each snapshot's size */
709
710                 size = snap_count * sizeof (*header->snap_sizes);
711                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
712                 if (!header->snap_sizes)
713                         goto out_err;
714                 for (i = 0; i < snap_count; i++)
715                         header->snap_sizes[i] =
716                                 le64_to_cpu(ondisk->snaps[i].image_size);
717         } else {
718                 WARN_ON(ondisk->snap_names_len);
719                 header->snap_names = NULL;
720                 header->snap_sizes = NULL;
721         }
722
723         header->features = 0;   /* No features support in v1 images */
724         header->obj_order = ondisk->options.order;
725         header->crypt_type = ondisk->options.crypt_type;
726         header->comp_type = ondisk->options.comp_type;
727
728         /* Allocate and fill in the snapshot context */
729
730         header->image_size = le64_to_cpu(ondisk->image_size);
731         size = sizeof (struct ceph_snap_context);
732         size += snap_count * sizeof (header->snapc->snaps[0]);
733         header->snapc = kzalloc(size, GFP_KERNEL);
734         if (!header->snapc)
735                 goto out_err;
736
737         atomic_set(&header->snapc->nref, 1);
738         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
739         header->snapc->num_snaps = snap_count;
740         for (i = 0; i < snap_count; i++)
741                 header->snapc->snaps[i] =
742                         le64_to_cpu(ondisk->snaps[i].id);
743
744         return 0;
745
746 out_err:
747         kfree(header->snap_sizes);
748         header->snap_sizes = NULL;
749         kfree(header->snap_names);
750         header->snap_names = NULL;
751         kfree(header->object_prefix);
752         header->object_prefix = NULL;
753
754         return -ENOMEM;
755 }
756
757 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758 {
759         struct rbd_snap *snap;
760
761         if (snap_id == CEPH_NOSNAP)
762                 return RBD_SNAP_HEAD_NAME;
763
764         list_for_each_entry(snap, &rbd_dev->snaps, node)
765                 if (snap_id == snap->id)
766                         return snap->name;
767
768         return NULL;
769 }
770
771 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
772 {
773
774         struct rbd_snap *snap;
775
776         list_for_each_entry(snap, &rbd_dev->snaps, node) {
777                 if (!strcmp(snap_name, snap->name)) {
778                         rbd_dev->spec->snap_id = snap->id;
779                         rbd_dev->mapping.size = snap->size;
780                         rbd_dev->mapping.features = snap->features;
781
782                         return 0;
783                 }
784         }
785
786         return -ENOENT;
787 }
788
789 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
790 {
791         int ret;
792
793         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
794                     sizeof (RBD_SNAP_HEAD_NAME))) {
795                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
796                 rbd_dev->mapping.size = rbd_dev->header.image_size;
797                 rbd_dev->mapping.features = rbd_dev->header.features;
798                 ret = 0;
799         } else {
800                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
801                 if (ret < 0)
802                         goto done;
803                 rbd_dev->mapping.read_only = true;
804         }
805         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
807 done:
808         return ret;
809 }
810
811 static void rbd_header_free(struct rbd_image_header *header)
812 {
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815         kfree(header->snap_sizes);
816         header->snap_sizes = NULL;
817         kfree(header->snap_names);
818         header->snap_names = NULL;
819         ceph_put_snap_context(header->snapc);
820         header->snapc = NULL;
821 }
822
823 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
824 {
825         char *name;
826         u64 segment;
827         int ret;
828
829         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
830         if (!name)
831                 return NULL;
832         segment = offset >> rbd_dev->header.obj_order;
833         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
834                         rbd_dev->header.object_prefix, segment);
835         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
836                 pr_err("error formatting segment name for #%llu (%d)\n",
837                         segment, ret);
838                 kfree(name);
839                 name = NULL;
840         }
841
842         return name;
843 }
844
845 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846 {
847         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
848
849         return offset & (segment_size - 1);
850 }
851
852 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853                                 u64 offset, u64 length)
854 {
855         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857         offset &= segment_size - 1;
858
859         rbd_assert(length <= U64_MAX - offset);
860         if (offset + length > segment_size)
861                 length = segment_size - offset;
862
863         return length;
864 }
865
866 /*
867  * returns the size of an object in the image
868  */
869 static u64 rbd_obj_bytes(struct rbd_image_header *header)
870 {
871         return 1 << header->obj_order;
872 }
873
874 /*
875  * bio helpers
876  */
877
878 static void bio_chain_put(struct bio *chain)
879 {
880         struct bio *tmp;
881
882         while (chain) {
883                 tmp = chain;
884                 chain = chain->bi_next;
885                 bio_put(tmp);
886         }
887 }
888
889 /*
890  * zeros a bio chain, starting at specific offset
891  */
892 static void zero_bio_chain(struct bio *chain, int start_ofs)
893 {
894         struct bio_vec *bv;
895         unsigned long flags;
896         void *buf;
897         int i;
898         int pos = 0;
899
900         while (chain) {
901                 bio_for_each_segment(bv, chain, i) {
902                         if (pos + bv->bv_len > start_ofs) {
903                                 int remainder = max(start_ofs - pos, 0);
904                                 buf = bvec_kmap_irq(bv, &flags);
905                                 memset(buf + remainder, 0,
906                                        bv->bv_len - remainder);
907                                 bvec_kunmap_irq(buf, &flags);
908                         }
909                         pos += bv->bv_len;
910                 }
911
912                 chain = chain->bi_next;
913         }
914 }
915
916 /*
917  * Clone a portion of a bio, starting at the given byte offset
918  * and continuing for the number of bytes indicated.
919  */
920 static struct bio *bio_clone_range(struct bio *bio_src,
921                                         unsigned int offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio_vec *bv;
926         unsigned int resid;
927         unsigned short idx;
928         unsigned int voff;
929         unsigned short end_idx;
930         unsigned short vcnt;
931         struct bio *bio;
932
933         /* Handle the easy case for the caller */
934
935         if (!offset && len == bio_src->bi_size)
936                 return bio_clone(bio_src, gfpmask);
937
938         if (WARN_ON_ONCE(!len))
939                 return NULL;
940         if (WARN_ON_ONCE(len > bio_src->bi_size))
941                 return NULL;
942         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943                 return NULL;
944
945         /* Find first affected segment... */
946
947         resid = offset;
948         __bio_for_each_segment(bv, bio_src, idx, 0) {
949                 if (resid < bv->bv_len)
950                         break;
951                 resid -= bv->bv_len;
952         }
953         voff = resid;
954
955         /* ...and the last affected segment */
956
957         resid += len;
958         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959                 if (resid <= bv->bv_len)
960                         break;
961                 resid -= bv->bv_len;
962         }
963         vcnt = end_idx - idx + 1;
964
965         /* Build the clone */
966
967         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968         if (!bio)
969                 return NULL;    /* ENOMEM */
970
971         bio->bi_bdev = bio_src->bi_bdev;
972         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973         bio->bi_rw = bio_src->bi_rw;
974         bio->bi_flags |= 1 << BIO_CLONED;
975
976         /*
977          * Copy over our part of the bio_vec, then update the first
978          * and last (or only) entries.
979          */
980         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981                         vcnt * sizeof (struct bio_vec));
982         bio->bi_io_vec[0].bv_offset += voff;
983         if (vcnt > 1) {
984                 bio->bi_io_vec[0].bv_len -= voff;
985                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986         } else {
987                 bio->bi_io_vec[0].bv_len = len;
988         }
989
990         bio->bi_vcnt = vcnt;
991         bio->bi_size = len;
992         bio->bi_idx = 0;
993
994         return bio;
995 }
996
997 /*
998  * Clone a portion of a bio chain, starting at the given byte offset
999  * into the first bio in the source chain and continuing for the
1000  * number of bytes indicated.  The result is another bio chain of
1001  * exactly the given length, or a null pointer on error.
1002  *
1003  * The bio_src and offset parameters are both in-out.  On entry they
1004  * refer to the first source bio and the offset into that bio where
1005  * the start of data to be cloned is located.
1006  *
1007  * On return, bio_src is updated to refer to the bio in the source
1008  * chain that contains first un-cloned byte, and *offset will
1009  * contain the offset of that byte within that bio.
1010  */
1011 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012                                         unsigned int *offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio *bi = *bio_src;
1017         unsigned int off = *offset;
1018         struct bio *chain = NULL;
1019         struct bio **end;
1020
1021         /* Build up a chain of clone bios up to the limit */
1022
1023         if (!bi || off >= bi->bi_size || !len)
1024                 return NULL;            /* Nothing to clone */
1025
1026         end = &chain;
1027         while (len) {
1028                 unsigned int bi_size;
1029                 struct bio *bio;
1030
1031                 if (!bi) {
1032                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1033                         goto out_err;   /* EINVAL; ran out of bio's */
1034                 }
1035                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037                 if (!bio)
1038                         goto out_err;   /* ENOMEM */
1039
1040                 *end = bio;
1041                 end = &bio->bi_next;
1042
1043                 off += bi_size;
1044                 if (off == bi->bi_size) {
1045                         bi = bi->bi_next;
1046                         off = 0;
1047                 }
1048                 len -= bi_size;
1049         }
1050         *bio_src = bi;
1051         *offset = off;
1052
1053         return chain;
1054 out_err:
1055         bio_chain_put(chain);
1056
1057         return NULL;
1058 }
1059
1060 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061 {
1062         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063                 atomic_read(&obj_request->kref.refcount));
1064         kref_get(&obj_request->kref);
1065 }
1066
1067 static void rbd_obj_request_destroy(struct kref *kref);
1068 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069 {
1070         rbd_assert(obj_request != NULL);
1071         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072                 atomic_read(&obj_request->kref.refcount));
1073         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074 }
1075
1076 static void rbd_img_request_get(struct rbd_img_request *img_request)
1077 {
1078         dout("%s: img %p (was %d)\n", __func__, img_request,
1079                 atomic_read(&img_request->kref.refcount));
1080         kref_get(&img_request->kref);
1081 }
1082
1083 static void rbd_img_request_destroy(struct kref *kref);
1084 static void rbd_img_request_put(struct rbd_img_request *img_request)
1085 {
1086         rbd_assert(img_request != NULL);
1087         dout("%s: img %p (was %d)\n", __func__, img_request,
1088                 atomic_read(&img_request->kref.refcount));
1089         kref_put(&img_request->kref, rbd_img_request_destroy);
1090 }
1091
1092 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093                                         struct rbd_obj_request *obj_request)
1094 {
1095         rbd_assert(obj_request->img_request == NULL);
1096
1097         rbd_obj_request_get(obj_request);
1098         obj_request->img_request = img_request;
1099         obj_request->which = img_request->obj_request_count;
1100         rbd_assert(obj_request->which != BAD_WHICH);
1101         img_request->obj_request_count++;
1102         list_add_tail(&obj_request->links, &img_request->obj_requests);
1103         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104                 obj_request->which);
1105 }
1106
1107 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108                                         struct rbd_obj_request *obj_request)
1109 {
1110         rbd_assert(obj_request->which != BAD_WHICH);
1111
1112         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113                 obj_request->which);
1114         list_del(&obj_request->links);
1115         rbd_assert(img_request->obj_request_count > 0);
1116         img_request->obj_request_count--;
1117         rbd_assert(obj_request->which == img_request->obj_request_count);
1118         obj_request->which = BAD_WHICH;
1119         rbd_assert(obj_request->img_request == img_request);
1120         obj_request->img_request = NULL;
1121         obj_request->callback = NULL;
1122         rbd_obj_request_put(obj_request);
1123 }
1124
1125 static bool obj_request_type_valid(enum obj_request_type type)
1126 {
1127         switch (type) {
1128         case OBJ_REQUEST_NODATA:
1129         case OBJ_REQUEST_BIO:
1130         case OBJ_REQUEST_PAGES:
1131                 return true;
1132         default:
1133                 return false;
1134         }
1135 }
1136
1137 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138                                 struct rbd_obj_request *obj_request)
1139 {
1140         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
1142         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143 }
1144
1145 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146 {
1147         dout("%s: img %p\n", __func__, img_request);
1148         if (img_request->callback)
1149                 img_request->callback(img_request);
1150         else
1151                 rbd_img_request_put(img_request);
1152 }
1153
1154 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157 {
1158         dout("%s: obj %p\n", __func__, obj_request);
1159
1160         return wait_for_completion_interruptible(&obj_request->completion);
1161 }
1162
1163 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164 {
1165         atomic_set(&obj_request->done, 0);
1166         smp_wmb();
1167 }
1168
1169 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170 {
1171         int done;
1172
1173         done = atomic_inc_return(&obj_request->done);
1174         if (done > 1) {
1175                 struct rbd_img_request *img_request = obj_request->img_request;
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return atomic_read(&obj_request->done) != 0;
1188 }
1189
1190 static void
1191 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192 {
1193         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194                 obj_request, obj_request->img_request, obj_request->result,
1195                 obj_request->xferred, obj_request->length);
1196         /*
1197          * ENOENT means a hole in the image.  We zero-fill the
1198          * entire length of the request.  A short read also implies
1199          * zero-fill to the end of the request.  Either way we
1200          * update the xferred count to indicate the whole request
1201          * was satisfied.
1202          */
1203         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204         if (obj_request->result == -ENOENT) {
1205                 zero_bio_chain(obj_request->bio_list, 0);
1206                 obj_request->result = 0;
1207                 obj_request->xferred = obj_request->length;
1208         } else if (obj_request->xferred < obj_request->length &&
1209                         !obj_request->result) {
1210                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211                 obj_request->xferred = obj_request->length;
1212         }
1213         obj_request_done_set(obj_request);
1214 }
1215
1216 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p cb %p\n", __func__, obj_request,
1219                 obj_request->callback);
1220         if (obj_request->callback)
1221                 obj_request->callback(obj_request);
1222         else
1223                 complete_all(&obj_request->completion);
1224 }
1225
1226 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p\n", __func__, obj_request);
1229         obj_request_done_set(obj_request);
1230 }
1231
1232 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1233 {
1234         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1235                 obj_request->result, obj_request->xferred, obj_request->length);
1236         if (obj_request->img_request)
1237                 rbd_img_obj_request_read_callback(obj_request);
1238         else
1239                 obj_request_done_set(obj_request);
1240 }
1241
1242 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245                 obj_request->result, obj_request->length);
1246         /*
1247          * There is no such thing as a successful short write.
1248          * Our xferred value is the number of bytes transferred
1249          * back.  Set it to our originally-requested length.
1250          */
1251         obj_request->xferred = obj_request->length;
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 /*
1256  * For a simple stat call there's nothing to do.  We'll do more if
1257  * this is part of a write sequence for a layered image.
1258  */
1259 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1260 {
1261         dout("%s: obj %p\n", __func__, obj_request);
1262         obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266                                 struct ceph_msg *msg)
1267 {
1268         struct rbd_obj_request *obj_request = osd_req->r_priv;
1269         u16 opcode;
1270
1271         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1272         rbd_assert(osd_req == obj_request->osd_req);
1273         rbd_assert(!!obj_request->img_request ^
1274                                 (obj_request->which == BAD_WHICH));
1275
1276         if (osd_req->r_result < 0)
1277                 obj_request->result = osd_req->r_result;
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1281
1282         /*
1283          * We support a 64-bit length, but ultimately it has to be
1284          * passed to blk_end_request(), which takes an unsigned int.
1285          */
1286         obj_request->xferred = osd_req->r_reply_op_len[0];
1287         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1288         opcode = osd_req->r_ops[0].op;
1289         switch (opcode) {
1290         case CEPH_OSD_OP_READ:
1291                 rbd_osd_read_callback(obj_request);
1292                 break;
1293         case CEPH_OSD_OP_WRITE:
1294                 rbd_osd_write_callback(obj_request);
1295                 break;
1296         case CEPH_OSD_OP_STAT:
1297                 rbd_osd_stat_callback(obj_request);
1298                 break;
1299         case CEPH_OSD_OP_CALL:
1300         case CEPH_OSD_OP_NOTIFY_ACK:
1301         case CEPH_OSD_OP_WATCH:
1302                 rbd_osd_trivial_callback(obj_request);
1303                 break;
1304         default:
1305                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306                         obj_request->object_name, (unsigned short) opcode);
1307                 break;
1308         }
1309
1310         if (obj_request_done_test(obj_request))
1311                 rbd_obj_request_complete(obj_request);
1312 }
1313
1314 static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request,
1315                                         bool write_request)
1316 {
1317         struct rbd_img_request *img_request = obj_request->img_request;
1318         struct ceph_osd_request *osd_req = obj_request->osd_req;
1319         struct ceph_osd_data *osd_data = NULL;
1320         struct ceph_snap_context *snapc = NULL;
1321         u64 snap_id = CEPH_NOSNAP;
1322         struct timespec *mtime = NULL;
1323         struct timespec now;
1324
1325         rbd_assert(osd_req != NULL);
1326
1327         if (write_request) {
1328                 osd_data = &osd_req->r_data_out;
1329                 now = CURRENT_TIME;
1330                 mtime = &now;
1331                 if (img_request)
1332                         snapc = img_request->snapc;
1333         } else {
1334                 osd_data = &osd_req->r_data_in;
1335                 if (img_request)
1336                         snap_id = img_request->snap_id;
1337         }
1338         if (obj_request->type != OBJ_REQUEST_NODATA) {
1339                 /*
1340                  * If it has data, it's either a object class method
1341                  * call (cls) or it's an extent operation.
1342                  */
1343                 /* XXX This use of the ops array goes away in the next patch */
1344                 if (obj_request->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL)
1345                         osd_req_op_cls_response_data(obj_request->osd_req, 0,
1346                                                 osd_data);
1347                 else
1348                         osd_req_op_extent_osd_data(obj_request->osd_req, 0,
1349                                                 osd_data);
1350         }
1351         ceph_osdc_build_request(osd_req, obj_request->offset,
1352                         snapc, snap_id, mtime);
1353 }
1354
1355 static struct ceph_osd_request *rbd_osd_req_create(
1356                                         struct rbd_device *rbd_dev,
1357                                         bool write_request,
1358                                         struct rbd_obj_request *obj_request)
1359 {
1360         struct rbd_img_request *img_request = obj_request->img_request;
1361         struct ceph_snap_context *snapc = NULL;
1362         struct ceph_osd_client *osdc;
1363         struct ceph_osd_request *osd_req;
1364         struct ceph_osd_data *osd_data;
1365         u64 offset = obj_request->offset;
1366
1367         if (img_request) {
1368                 rbd_assert(img_request->write_request == write_request);
1369                 if (img_request->write_request)
1370                         snapc = img_request->snapc;
1371         }
1372
1373         /* Allocate and initialize the request, for the single op */
1374
1375         osdc = &rbd_dev->rbd_client->client->osdc;
1376         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1377         if (!osd_req)
1378                 return NULL;    /* ENOMEM */
1379         osd_data = write_request ? &osd_req->r_data_out : &osd_req->r_data_in;
1380
1381         rbd_assert(obj_request_type_valid(obj_request->type));
1382         switch (obj_request->type) {
1383         case OBJ_REQUEST_NODATA:
1384                 break;          /* Nothing to do */
1385         case OBJ_REQUEST_BIO:
1386                 rbd_assert(obj_request->bio_list != NULL);
1387                 ceph_osd_data_bio_init(osd_data, obj_request->bio_list,
1388                                         obj_request->length);
1389                 break;
1390         case OBJ_REQUEST_PAGES:
1391                 ceph_osd_data_pages_init(osd_data, obj_request->pages,
1392                                 obj_request->length, offset & ~PAGE_MASK,
1393                                 false, false);
1394                 break;
1395         }
1396
1397         if (write_request)
1398                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1399         else
1400                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1401
1402         osd_req->r_callback = rbd_osd_req_callback;
1403         osd_req->r_priv = obj_request;
1404
1405         osd_req->r_oid_len = strlen(obj_request->object_name);
1406         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1407         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1408
1409         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1410
1411         return osd_req;
1412 }
1413
1414 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1415 {
1416         ceph_osdc_put_request(osd_req);
1417 }
1418
1419 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1420
1421 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1422                                                 u64 offset, u64 length,
1423                                                 enum obj_request_type type)
1424 {
1425         struct rbd_obj_request *obj_request;
1426         size_t size;
1427         char *name;
1428
1429         rbd_assert(obj_request_type_valid(type));
1430
1431         size = strlen(object_name) + 1;
1432         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1433         if (!obj_request)
1434                 return NULL;
1435
1436         name = (char *)(obj_request + 1);
1437         obj_request->object_name = memcpy(name, object_name, size);
1438         obj_request->offset = offset;
1439         obj_request->length = length;
1440         obj_request->which = BAD_WHICH;
1441         obj_request->type = type;
1442         INIT_LIST_HEAD(&obj_request->links);
1443         obj_request_done_init(obj_request);
1444         init_completion(&obj_request->completion);
1445         kref_init(&obj_request->kref);
1446
1447         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1448                 offset, length, (int)type, obj_request);
1449
1450         return obj_request;
1451 }
1452
1453 static void rbd_obj_request_destroy(struct kref *kref)
1454 {
1455         struct rbd_obj_request *obj_request;
1456
1457         obj_request = container_of(kref, struct rbd_obj_request, kref);
1458
1459         dout("%s: obj %p\n", __func__, obj_request);
1460
1461         rbd_assert(obj_request->img_request == NULL);
1462         rbd_assert(obj_request->which == BAD_WHICH);
1463
1464         if (obj_request->osd_req)
1465                 rbd_osd_req_destroy(obj_request->osd_req);
1466
1467         rbd_assert(obj_request_type_valid(obj_request->type));
1468         switch (obj_request->type) {
1469         case OBJ_REQUEST_NODATA:
1470                 break;          /* Nothing to do */
1471         case OBJ_REQUEST_BIO:
1472                 if (obj_request->bio_list)
1473                         bio_chain_put(obj_request->bio_list);
1474                 break;
1475         case OBJ_REQUEST_PAGES:
1476                 if (obj_request->pages)
1477                         ceph_release_page_vector(obj_request->pages,
1478                                                 obj_request->page_count);
1479                 break;
1480         }
1481
1482         kfree(obj_request);
1483 }
1484
1485 /*
1486  * Caller is responsible for filling in the list of object requests
1487  * that comprises the image request, and the Linux request pointer
1488  * (if there is one).
1489  */
1490 static struct rbd_img_request *rbd_img_request_create(
1491                                         struct rbd_device *rbd_dev,
1492                                         u64 offset, u64 length,
1493                                         bool write_request)
1494 {
1495         struct rbd_img_request *img_request;
1496         struct ceph_snap_context *snapc = NULL;
1497
1498         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1499         if (!img_request)
1500                 return NULL;
1501
1502         if (write_request) {
1503                 down_read(&rbd_dev->header_rwsem);
1504                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1505                 up_read(&rbd_dev->header_rwsem);
1506                 if (WARN_ON(!snapc)) {
1507                         kfree(img_request);
1508                         return NULL;    /* Shouldn't happen */
1509                 }
1510         }
1511
1512         img_request->rq = NULL;
1513         img_request->rbd_dev = rbd_dev;
1514         img_request->offset = offset;
1515         img_request->length = length;
1516         img_request->write_request = write_request;
1517         if (write_request)
1518                 img_request->snapc = snapc;
1519         else
1520                 img_request->snap_id = rbd_dev->spec->snap_id;
1521         spin_lock_init(&img_request->completion_lock);
1522         img_request->next_completion = 0;
1523         img_request->callback = NULL;
1524         img_request->obj_request_count = 0;
1525         INIT_LIST_HEAD(&img_request->obj_requests);
1526         kref_init(&img_request->kref);
1527
1528         rbd_img_request_get(img_request);       /* Avoid a warning */
1529         rbd_img_request_put(img_request);       /* TEMPORARY */
1530
1531         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1532                 write_request ? "write" : "read", offset, length,
1533                 img_request);
1534
1535         return img_request;
1536 }
1537
1538 static void rbd_img_request_destroy(struct kref *kref)
1539 {
1540         struct rbd_img_request *img_request;
1541         struct rbd_obj_request *obj_request;
1542         struct rbd_obj_request *next_obj_request;
1543
1544         img_request = container_of(kref, struct rbd_img_request, kref);
1545
1546         dout("%s: img %p\n", __func__, img_request);
1547
1548         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1549                 rbd_img_obj_request_del(img_request, obj_request);
1550         rbd_assert(img_request->obj_request_count == 0);
1551
1552         if (img_request->write_request)
1553                 ceph_put_snap_context(img_request->snapc);
1554
1555         kfree(img_request);
1556 }
1557
1558 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1559                                         struct bio *bio_list)
1560 {
1561         struct rbd_device *rbd_dev = img_request->rbd_dev;
1562         struct rbd_obj_request *obj_request = NULL;
1563         struct rbd_obj_request *next_obj_request;
1564         bool write_request = img_request->write_request;
1565         unsigned int bio_offset;
1566         u64 image_offset;
1567         u64 resid;
1568         u16 opcode;
1569
1570         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1571
1572         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1573         bio_offset = 0;
1574         image_offset = img_request->offset;
1575         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1576         resid = img_request->length;
1577         rbd_assert(resid > 0);
1578         while (resid) {
1579                 const char *object_name;
1580                 unsigned int clone_size;
1581                 u64 offset;
1582                 u64 length;
1583
1584                 object_name = rbd_segment_name(rbd_dev, image_offset);
1585                 if (!object_name)
1586                         goto out_unwind;
1587                 offset = rbd_segment_offset(rbd_dev, image_offset);
1588                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1589                 obj_request = rbd_obj_request_create(object_name,
1590                                                 offset, length,
1591                                                 OBJ_REQUEST_BIO);
1592                 kfree(object_name);     /* object request has its own copy */
1593                 if (!obj_request)
1594                         goto out_unwind;
1595
1596                 rbd_assert(length <= (u64) UINT_MAX);
1597                 clone_size = (unsigned int) length;
1598                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1599                                                 &bio_offset, clone_size,
1600                                                 GFP_ATOMIC);
1601                 if (!obj_request->bio_list)
1602                         goto out_partial;
1603
1604                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1605                                                 write_request, obj_request);
1606                 if (!obj_request->osd_req)
1607                         goto out_partial;
1608
1609                 osd_req_op_extent_init(obj_request->osd_req, 0,
1610                                         opcode, offset, length, 0, 0);
1611                 rbd_osd_req_format_op(obj_request, write_request);
1612
1613                 /* status and version are initially zero-filled */
1614
1615                 rbd_img_obj_request_add(img_request, obj_request);
1616
1617                 image_offset += length;
1618                 resid -= length;
1619         }
1620
1621         return 0;
1622
1623 out_partial:
1624         rbd_obj_request_put(obj_request);
1625 out_unwind:
1626         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1627                 rbd_obj_request_put(obj_request);
1628
1629         return -ENOMEM;
1630 }
1631
1632 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1633 {
1634         struct rbd_img_request *img_request;
1635         u32 which = obj_request->which;
1636         bool more = true;
1637
1638         img_request = obj_request->img_request;
1639
1640         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1641         rbd_assert(img_request != NULL);
1642         rbd_assert(img_request->rq != NULL);
1643         rbd_assert(img_request->obj_request_count > 0);
1644         rbd_assert(which != BAD_WHICH);
1645         rbd_assert(which < img_request->obj_request_count);
1646         rbd_assert(which >= img_request->next_completion);
1647
1648         spin_lock_irq(&img_request->completion_lock);
1649         if (which != img_request->next_completion)
1650                 goto out;
1651
1652         for_each_obj_request_from(img_request, obj_request) {
1653                 unsigned int xferred;
1654                 int result;
1655
1656                 rbd_assert(more);
1657                 rbd_assert(which < img_request->obj_request_count);
1658
1659                 if (!obj_request_done_test(obj_request))
1660                         break;
1661
1662                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1663                 xferred = (unsigned int) obj_request->xferred;
1664                 result = (int) obj_request->result;
1665                 if (result)
1666                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1667                                 img_request->write_request ? "write" : "read",
1668                                 result, xferred);
1669
1670                 more = blk_end_request(img_request->rq, result, xferred);
1671                 which++;
1672         }
1673
1674         rbd_assert(more ^ (which == img_request->obj_request_count));
1675         img_request->next_completion = which;
1676 out:
1677         spin_unlock_irq(&img_request->completion_lock);
1678
1679         if (!more)
1680                 rbd_img_request_complete(img_request);
1681 }
1682
1683 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1684 {
1685         struct rbd_device *rbd_dev = img_request->rbd_dev;
1686         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1687         struct rbd_obj_request *obj_request;
1688         struct rbd_obj_request *next_obj_request;
1689
1690         dout("%s: img %p\n", __func__, img_request);
1691         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1692                 int ret;
1693
1694                 obj_request->callback = rbd_img_obj_callback;
1695                 ret = rbd_obj_request_submit(osdc, obj_request);
1696                 if (ret)
1697                         return ret;
1698                 /*
1699                  * The image request has its own reference to each
1700                  * of its object requests, so we can safely drop the
1701                  * initial one here.
1702                  */
1703                 rbd_obj_request_put(obj_request);
1704         }
1705
1706         return 0;
1707 }
1708
1709 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1710                                    u64 ver, u64 notify_id)
1711 {
1712         struct rbd_obj_request *obj_request;
1713         struct ceph_osd_client *osdc;
1714         int ret;
1715
1716         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1717                                                         OBJ_REQUEST_NODATA);
1718         if (!obj_request)
1719                 return -ENOMEM;
1720
1721         ret = -ENOMEM;
1722         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1723         if (!obj_request->osd_req)
1724                 goto out;
1725
1726         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1727                                         notify_id, ver, 0);
1728         rbd_osd_req_format_op(obj_request, false);
1729
1730         osdc = &rbd_dev->rbd_client->client->osdc;
1731         obj_request->callback = rbd_obj_request_put;
1732         ret = rbd_obj_request_submit(osdc, obj_request);
1733 out:
1734         if (ret)
1735                 rbd_obj_request_put(obj_request);
1736
1737         return ret;
1738 }
1739
1740 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1741 {
1742         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1743         u64 hver;
1744         int rc;
1745
1746         if (!rbd_dev)
1747                 return;
1748
1749         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1750                 rbd_dev->header_name, (unsigned long long) notify_id,
1751                 (unsigned int) opcode);
1752         rc = rbd_dev_refresh(rbd_dev, &hver);
1753         if (rc)
1754                 rbd_warn(rbd_dev, "got notification but failed to "
1755                            " update snaps: %d\n", rc);
1756
1757         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1758 }
1759
1760 /*
1761  * Request sync osd watch/unwatch.  The value of "start" determines
1762  * whether a watch request is being initiated or torn down.
1763  */
1764 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1765 {
1766         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1767         struct rbd_obj_request *obj_request;
1768         int ret;
1769
1770         rbd_assert(start ^ !!rbd_dev->watch_event);
1771         rbd_assert(start ^ !!rbd_dev->watch_request);
1772
1773         if (start) {
1774                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1775                                                 &rbd_dev->watch_event);
1776                 if (ret < 0)
1777                         return ret;
1778                 rbd_assert(rbd_dev->watch_event != NULL);
1779         }
1780
1781         ret = -ENOMEM;
1782         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1783                                                         OBJ_REQUEST_NODATA);
1784         if (!obj_request)
1785                 goto out_cancel;
1786
1787         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1788         if (!obj_request->osd_req)
1789                 goto out_cancel;
1790
1791         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1792                                 rbd_dev->watch_event->cookie,
1793                                 rbd_dev->header.obj_version, start);
1794         rbd_osd_req_format_op(obj_request, true);
1795
1796         if (start)
1797                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1798         else
1799                 ceph_osdc_unregister_linger_request(osdc,
1800                                         rbd_dev->watch_request->osd_req);
1801         ret = rbd_obj_request_submit(osdc, obj_request);
1802         if (ret)
1803                 goto out_cancel;
1804         ret = rbd_obj_request_wait(obj_request);
1805         if (ret)
1806                 goto out_cancel;
1807         ret = obj_request->result;
1808         if (ret)
1809                 goto out_cancel;
1810
1811         /*
1812          * A watch request is set to linger, so the underlying osd
1813          * request won't go away until we unregister it.  We retain
1814          * a pointer to the object request during that time (in
1815          * rbd_dev->watch_request), so we'll keep a reference to
1816          * it.  We'll drop that reference (below) after we've
1817          * unregistered it.
1818          */
1819         if (start) {
1820                 rbd_dev->watch_request = obj_request;
1821
1822                 return 0;
1823         }
1824
1825         /* We have successfully torn down the watch request */
1826
1827         rbd_obj_request_put(rbd_dev->watch_request);
1828         rbd_dev->watch_request = NULL;
1829 out_cancel:
1830         /* Cancel the event if we're tearing down, or on error */
1831         ceph_osdc_cancel_event(rbd_dev->watch_event);
1832         rbd_dev->watch_event = NULL;
1833         if (obj_request)
1834                 rbd_obj_request_put(obj_request);
1835
1836         return ret;
1837 }
1838
1839 /*
1840  * Synchronous osd object method call
1841  */
1842 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1843                              const char *object_name,
1844                              const char *class_name,
1845                              const char *method_name,
1846                              const char *outbound,
1847                              size_t outbound_size,
1848                              char *inbound,
1849                              size_t inbound_size,
1850                              u64 *version)
1851 {
1852         struct rbd_obj_request *obj_request;
1853         struct ceph_osd_client *osdc;
1854         struct page **pages;
1855         u32 page_count;
1856         int ret;
1857
1858         /*
1859          * Method calls are ultimately read operations.  The result
1860          * should placed into the inbound buffer provided.  They
1861          * also supply outbound data--parameters for the object
1862          * method.  Currently if this is present it will be a
1863          * snapshot id.
1864          */
1865         page_count = (u32) calc_pages_for(0, inbound_size);
1866         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1867         if (IS_ERR(pages))
1868                 return PTR_ERR(pages);
1869
1870         ret = -ENOMEM;
1871         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1872                                                         OBJ_REQUEST_PAGES);
1873         if (!obj_request)
1874                 goto out;
1875
1876         obj_request->pages = pages;
1877         obj_request->page_count = page_count;
1878
1879         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1880         if (!obj_request->osd_req)
1881                 goto out;
1882
1883         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1884                                         class_name, method_name,
1885                                         outbound, outbound_size);
1886         rbd_osd_req_format_op(obj_request, false);
1887
1888         osdc = &rbd_dev->rbd_client->client->osdc;
1889         ret = rbd_obj_request_submit(osdc, obj_request);
1890         if (ret)
1891                 goto out;
1892         ret = rbd_obj_request_wait(obj_request);
1893         if (ret)
1894                 goto out;
1895
1896         ret = obj_request->result;
1897         if (ret < 0)
1898                 goto out;
1899         ret = 0;
1900         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1901         if (version)
1902                 *version = obj_request->version;
1903 out:
1904         if (obj_request)
1905                 rbd_obj_request_put(obj_request);
1906         else
1907                 ceph_release_page_vector(pages, page_count);
1908
1909         return ret;
1910 }
1911
1912 static void rbd_request_fn(struct request_queue *q)
1913                 __releases(q->queue_lock) __acquires(q->queue_lock)
1914 {
1915         struct rbd_device *rbd_dev = q->queuedata;
1916         bool read_only = rbd_dev->mapping.read_only;
1917         struct request *rq;
1918         int result;
1919
1920         while ((rq = blk_fetch_request(q))) {
1921                 bool write_request = rq_data_dir(rq) == WRITE;
1922                 struct rbd_img_request *img_request;
1923                 u64 offset;
1924                 u64 length;
1925
1926                 /* Ignore any non-FS requests that filter through. */
1927
1928                 if (rq->cmd_type != REQ_TYPE_FS) {
1929                         dout("%s: non-fs request type %d\n", __func__,
1930                                 (int) rq->cmd_type);
1931                         __blk_end_request_all(rq, 0);
1932                         continue;
1933                 }
1934
1935                 /* Ignore/skip any zero-length requests */
1936
1937                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1938                 length = (u64) blk_rq_bytes(rq);
1939
1940                 if (!length) {
1941                         dout("%s: zero-length request\n", __func__);
1942                         __blk_end_request_all(rq, 0);
1943                         continue;
1944                 }
1945
1946                 spin_unlock_irq(q->queue_lock);
1947
1948                 /* Disallow writes to a read-only device */
1949
1950                 if (write_request) {
1951                         result = -EROFS;
1952                         if (read_only)
1953                                 goto end_request;
1954                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1955                 }
1956
1957                 /*
1958                  * Quit early if the mapped snapshot no longer
1959                  * exists.  It's still possible the snapshot will
1960                  * have disappeared by the time our request arrives
1961                  * at the osd, but there's no sense in sending it if
1962                  * we already know.
1963                  */
1964                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1965                         dout("request for non-existent snapshot");
1966                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1967                         result = -ENXIO;
1968                         goto end_request;
1969                 }
1970
1971                 result = -EINVAL;
1972                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1973                         goto end_request;       /* Shouldn't happen */
1974
1975                 result = -ENOMEM;
1976                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1977                                                         write_request);
1978                 if (!img_request)
1979                         goto end_request;
1980
1981                 img_request->rq = rq;
1982
1983                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1984                 if (!result)
1985                         result = rbd_img_request_submit(img_request);
1986                 if (result)
1987                         rbd_img_request_put(img_request);
1988 end_request:
1989                 spin_lock_irq(q->queue_lock);
1990                 if (result < 0) {
1991                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1992                                 write_request ? "write" : "read", result);
1993                         __blk_end_request_all(rq, result);
1994                 }
1995         }
1996 }
1997
1998 /*
1999  * a queue callback. Makes sure that we don't create a bio that spans across
2000  * multiple osd objects. One exception would be with a single page bios,
2001  * which we handle later at bio_chain_clone_range()
2002  */
2003 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2004                           struct bio_vec *bvec)
2005 {
2006         struct rbd_device *rbd_dev = q->queuedata;
2007         sector_t sector_offset;
2008         sector_t sectors_per_obj;
2009         sector_t obj_sector_offset;
2010         int ret;
2011
2012         /*
2013          * Find how far into its rbd object the partition-relative
2014          * bio start sector is to offset relative to the enclosing
2015          * device.
2016          */
2017         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2018         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2019         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2020
2021         /*
2022          * Compute the number of bytes from that offset to the end
2023          * of the object.  Account for what's already used by the bio.
2024          */
2025         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2026         if (ret > bmd->bi_size)
2027                 ret -= bmd->bi_size;
2028         else
2029                 ret = 0;
2030
2031         /*
2032          * Don't send back more than was asked for.  And if the bio
2033          * was empty, let the whole thing through because:  "Note
2034          * that a block device *must* allow a single page to be
2035          * added to an empty bio."
2036          */
2037         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2038         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2039                 ret = (int) bvec->bv_len;
2040
2041         return ret;
2042 }
2043
2044 static void rbd_free_disk(struct rbd_device *rbd_dev)
2045 {
2046         struct gendisk *disk = rbd_dev->disk;
2047
2048         if (!disk)
2049                 return;
2050
2051         if (disk->flags & GENHD_FL_UP)
2052                 del_gendisk(disk);
2053         if (disk->queue)
2054                 blk_cleanup_queue(disk->queue);
2055         put_disk(disk);
2056 }
2057
2058 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2059                                 const char *object_name,
2060                                 u64 offset, u64 length,
2061                                 char *buf, u64 *version)
2062
2063 {
2064         struct rbd_obj_request *obj_request;
2065         struct ceph_osd_client *osdc;
2066         struct page **pages = NULL;
2067         u32 page_count;
2068         size_t size;
2069         int ret;
2070
2071         page_count = (u32) calc_pages_for(offset, length);
2072         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2073         if (IS_ERR(pages))
2074                 ret = PTR_ERR(pages);
2075
2076         ret = -ENOMEM;
2077         obj_request = rbd_obj_request_create(object_name, offset, length,
2078                                                         OBJ_REQUEST_PAGES);
2079         if (!obj_request)
2080                 goto out;
2081
2082         obj_request->pages = pages;
2083         obj_request->page_count = page_count;
2084
2085         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2086         if (!obj_request->osd_req)
2087                 goto out;
2088
2089         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2090                                         offset, length, 0, 0);
2091         rbd_osd_req_format_op(obj_request, false);
2092
2093         osdc = &rbd_dev->rbd_client->client->osdc;
2094         ret = rbd_obj_request_submit(osdc, obj_request);
2095         if (ret)
2096                 goto out;
2097         ret = rbd_obj_request_wait(obj_request);
2098         if (ret)
2099                 goto out;
2100
2101         ret = obj_request->result;
2102         if (ret < 0)
2103                 goto out;
2104
2105         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2106         size = (size_t) obj_request->xferred;
2107         ceph_copy_from_page_vector(pages, buf, 0, size);
2108         rbd_assert(size <= (size_t) INT_MAX);
2109         ret = (int) size;
2110         if (version)
2111                 *version = obj_request->version;
2112 out:
2113         if (obj_request)
2114                 rbd_obj_request_put(obj_request);
2115         else
2116                 ceph_release_page_vector(pages, page_count);
2117
2118         return ret;
2119 }
2120
2121 /*
2122  * Read the complete header for the given rbd device.
2123  *
2124  * Returns a pointer to a dynamically-allocated buffer containing
2125  * the complete and validated header.  Caller can pass the address
2126  * of a variable that will be filled in with the version of the
2127  * header object at the time it was read.
2128  *
2129  * Returns a pointer-coded errno if a failure occurs.
2130  */
2131 static struct rbd_image_header_ondisk *
2132 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2133 {
2134         struct rbd_image_header_ondisk *ondisk = NULL;
2135         u32 snap_count = 0;
2136         u64 names_size = 0;
2137         u32 want_count;
2138         int ret;
2139
2140         /*
2141          * The complete header will include an array of its 64-bit
2142          * snapshot ids, followed by the names of those snapshots as
2143          * a contiguous block of NUL-terminated strings.  Note that
2144          * the number of snapshots could change by the time we read
2145          * it in, in which case we re-read it.
2146          */
2147         do {
2148                 size_t size;
2149
2150                 kfree(ondisk);
2151
2152                 size = sizeof (*ondisk);
2153                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2154                 size += names_size;
2155                 ondisk = kmalloc(size, GFP_KERNEL);
2156                 if (!ondisk)
2157                         return ERR_PTR(-ENOMEM);
2158
2159                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2160                                        0, size,
2161                                        (char *) ondisk, version);
2162                 if (ret < 0)
2163                         goto out_err;
2164                 if (WARN_ON((size_t) ret < size)) {
2165                         ret = -ENXIO;
2166                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2167                                 size, ret);
2168                         goto out_err;
2169                 }
2170                 if (!rbd_dev_ondisk_valid(ondisk)) {
2171                         ret = -ENXIO;
2172                         rbd_warn(rbd_dev, "invalid header");
2173                         goto out_err;
2174                 }
2175
2176                 names_size = le64_to_cpu(ondisk->snap_names_len);
2177                 want_count = snap_count;
2178                 snap_count = le32_to_cpu(ondisk->snap_count);
2179         } while (snap_count != want_count);
2180
2181         return ondisk;
2182
2183 out_err:
2184         kfree(ondisk);
2185
2186         return ERR_PTR(ret);
2187 }
2188
2189 /*
2190  * reload the ondisk the header
2191  */
2192 static int rbd_read_header(struct rbd_device *rbd_dev,
2193                            struct rbd_image_header *header)
2194 {
2195         struct rbd_image_header_ondisk *ondisk;
2196         u64 ver = 0;
2197         int ret;
2198
2199         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2200         if (IS_ERR(ondisk))
2201                 return PTR_ERR(ondisk);
2202         ret = rbd_header_from_disk(header, ondisk);
2203         if (ret >= 0)
2204                 header->obj_version = ver;
2205         kfree(ondisk);
2206
2207         return ret;
2208 }
2209
2210 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2211 {
2212         struct rbd_snap *snap;
2213         struct rbd_snap *next;
2214
2215         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2216                 rbd_remove_snap_dev(snap);
2217 }
2218
2219 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2220 {
2221         sector_t size;
2222
2223         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2224                 return;
2225
2226         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2227         dout("setting size to %llu sectors", (unsigned long long) size);
2228         rbd_dev->mapping.size = (u64) size;
2229         set_capacity(rbd_dev->disk, size);
2230 }
2231
2232 /*
2233  * only read the first part of the ondisk header, without the snaps info
2234  */
2235 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2236 {
2237         int ret;
2238         struct rbd_image_header h;
2239
2240         ret = rbd_read_header(rbd_dev, &h);
2241         if (ret < 0)
2242                 return ret;
2243
2244         down_write(&rbd_dev->header_rwsem);
2245
2246         /* Update image size, and check for resize of mapped image */
2247         rbd_dev->header.image_size = h.image_size;
2248         rbd_update_mapping_size(rbd_dev);
2249
2250         /* rbd_dev->header.object_prefix shouldn't change */
2251         kfree(rbd_dev->header.snap_sizes);
2252         kfree(rbd_dev->header.snap_names);
2253         /* osd requests may still refer to snapc */
2254         ceph_put_snap_context(rbd_dev->header.snapc);
2255
2256         if (hver)
2257                 *hver = h.obj_version;
2258         rbd_dev->header.obj_version = h.obj_version;
2259         rbd_dev->header.image_size = h.image_size;
2260         rbd_dev->header.snapc = h.snapc;
2261         rbd_dev->header.snap_names = h.snap_names;
2262         rbd_dev->header.snap_sizes = h.snap_sizes;
2263         /* Free the extra copy of the object prefix */
2264         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2265         kfree(h.object_prefix);
2266
2267         ret = rbd_dev_snaps_update(rbd_dev);
2268         if (!ret)
2269                 ret = rbd_dev_snaps_register(rbd_dev);
2270
2271         up_write(&rbd_dev->header_rwsem);
2272
2273         return ret;
2274 }
2275
2276 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2277 {
2278         int ret;
2279
2280         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2281         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2282         if (rbd_dev->image_format == 1)
2283                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2284         else
2285                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2286         mutex_unlock(&ctl_mutex);
2287
2288         return ret;
2289 }
2290
2291 static int rbd_init_disk(struct rbd_device *rbd_dev)
2292 {
2293         struct gendisk *disk;
2294         struct request_queue *q;
2295         u64 segment_size;
2296
2297         /* create gendisk info */
2298         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2299         if (!disk)
2300                 return -ENOMEM;
2301
2302         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2303                  rbd_dev->dev_id);
2304         disk->major = rbd_dev->major;
2305         disk->first_minor = 0;
2306         disk->fops = &rbd_bd_ops;
2307         disk->private_data = rbd_dev;
2308
2309         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2310         if (!q)
2311                 goto out_disk;
2312
2313         /* We use the default size, but let's be explicit about it. */
2314         blk_queue_physical_block_size(q, SECTOR_SIZE);
2315
2316         /* set io sizes to object size */
2317         segment_size = rbd_obj_bytes(&rbd_dev->header);
2318         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2319         blk_queue_max_segment_size(q, segment_size);
2320         blk_queue_io_min(q, segment_size);
2321         blk_queue_io_opt(q, segment_size);
2322
2323         blk_queue_merge_bvec(q, rbd_merge_bvec);
2324         disk->queue = q;
2325
2326         q->queuedata = rbd_dev;
2327
2328         rbd_dev->disk = disk;
2329
2330         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2331
2332         return 0;
2333 out_disk:
2334         put_disk(disk);
2335
2336         return -ENOMEM;
2337 }
2338
2339 /*
2340   sysfs
2341 */
2342
2343 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2344 {
2345         return container_of(dev, struct rbd_device, dev);
2346 }
2347
2348 static ssize_t rbd_size_show(struct device *dev,
2349                              struct device_attribute *attr, char *buf)
2350 {
2351         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2352         sector_t size;
2353
2354         down_read(&rbd_dev->header_rwsem);
2355         size = get_capacity(rbd_dev->disk);
2356         up_read(&rbd_dev->header_rwsem);
2357
2358         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2359 }
2360
2361 /*
2362  * Note this shows the features for whatever's mapped, which is not
2363  * necessarily the base image.
2364  */
2365 static ssize_t rbd_features_show(struct device *dev,
2366                              struct device_attribute *attr, char *buf)
2367 {
2368         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2369
2370         return sprintf(buf, "0x%016llx\n",
2371                         (unsigned long long) rbd_dev->mapping.features);
2372 }
2373
2374 static ssize_t rbd_major_show(struct device *dev,
2375                               struct device_attribute *attr, char *buf)
2376 {
2377         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2378
2379         return sprintf(buf, "%d\n", rbd_dev->major);
2380 }
2381
2382 static ssize_t rbd_client_id_show(struct device *dev,
2383                                   struct device_attribute *attr, char *buf)
2384 {
2385         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2386
2387         return sprintf(buf, "client%lld\n",
2388                         ceph_client_id(rbd_dev->rbd_client->client));
2389 }
2390
2391 static ssize_t rbd_pool_show(struct device *dev,
2392                              struct device_attribute *attr, char *buf)
2393 {
2394         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395
2396         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2397 }
2398
2399 static ssize_t rbd_pool_id_show(struct device *dev,
2400                              struct device_attribute *attr, char *buf)
2401 {
2402         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2403
2404         return sprintf(buf, "%llu\n",
2405                 (unsigned long long) rbd_dev->spec->pool_id);
2406 }
2407
2408 static ssize_t rbd_name_show(struct device *dev,
2409                              struct device_attribute *attr, char *buf)
2410 {
2411         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2412
2413         if (rbd_dev->spec->image_name)
2414                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2415
2416         return sprintf(buf, "(unknown)\n");
2417 }
2418
2419 static ssize_t rbd_image_id_show(struct device *dev,
2420                              struct device_attribute *attr, char *buf)
2421 {
2422         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2423
2424         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2425 }
2426
2427 /*
2428  * Shows the name of the currently-mapped snapshot (or
2429  * RBD_SNAP_HEAD_NAME for the base image).
2430  */
2431 static ssize_t rbd_snap_show(struct device *dev,
2432                              struct device_attribute *attr,
2433                              char *buf)
2434 {
2435         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2436
2437         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2438 }
2439
2440 /*
2441  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2442  * for the parent image.  If there is no parent, simply shows
2443  * "(no parent image)".
2444  */
2445 static ssize_t rbd_parent_show(struct device *dev,
2446                              struct device_attribute *attr,
2447                              char *buf)
2448 {
2449         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2450         struct rbd_spec *spec = rbd_dev->parent_spec;
2451         int count;
2452         char *bufp = buf;
2453
2454         if (!spec)
2455                 return sprintf(buf, "(no parent image)\n");
2456
2457         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2458                         (unsigned long long) spec->pool_id, spec->pool_name);
2459         if (count < 0)
2460                 return count;
2461         bufp += count;
2462
2463         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2464                         spec->image_name ? spec->image_name : "(unknown)");
2465         if (count < 0)
2466                 return count;
2467         bufp += count;
2468
2469         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2470                         (unsigned long long) spec->snap_id, spec->snap_name);
2471         if (count < 0)
2472                 return count;
2473         bufp += count;
2474
2475         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2476         if (count < 0)
2477                 return count;
2478         bufp += count;
2479
2480         return (ssize_t) (bufp - buf);
2481 }
2482
2483 static ssize_t rbd_image_refresh(struct device *dev,
2484                                  struct device_attribute *attr,
2485                                  const char *buf,
2486                                  size_t size)
2487 {
2488         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2489         int ret;
2490
2491         ret = rbd_dev_refresh(rbd_dev, NULL);
2492
2493         return ret < 0 ? ret : size;
2494 }
2495
2496 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2497 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2498 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2499 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2500 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2501 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2502 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2503 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2504 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2505 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2506 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2507
2508 static struct attribute *rbd_attrs[] = {
2509         &dev_attr_size.attr,
2510         &dev_attr_features.attr,
2511         &dev_attr_major.attr,
2512         &dev_attr_client_id.attr,
2513         &dev_attr_pool.attr,
2514         &dev_attr_pool_id.attr,
2515         &dev_attr_name.attr,
2516         &dev_attr_image_id.attr,
2517         &dev_attr_current_snap.attr,
2518         &dev_attr_parent.attr,
2519         &dev_attr_refresh.attr,
2520         NULL
2521 };
2522
2523 static struct attribute_group rbd_attr_group = {
2524         .attrs = rbd_attrs,
2525 };
2526
2527 static const struct attribute_group *rbd_attr_groups[] = {
2528         &rbd_attr_group,
2529         NULL
2530 };
2531
2532 static void rbd_sysfs_dev_release(struct device *dev)
2533 {
2534 }
2535
2536 static struct device_type rbd_device_type = {
2537         .name           = "rbd",
2538         .groups         = rbd_attr_groups,
2539         .release        = rbd_sysfs_dev_release,
2540 };
2541
2542
2543 /*
2544   sysfs - snapshots
2545 */
2546
2547 static ssize_t rbd_snap_size_show(struct device *dev,
2548                                   struct device_attribute *attr,
2549                                   char *buf)
2550 {
2551         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2552
2553         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2554 }
2555
2556 static ssize_t rbd_snap_id_show(struct device *dev,
2557                                 struct device_attribute *attr,
2558                                 char *buf)
2559 {
2560         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2561
2562         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2563 }
2564
2565 static ssize_t rbd_snap_features_show(struct device *dev,
2566                                 struct device_attribute *attr,
2567                                 char *buf)
2568 {
2569         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2570
2571         return sprintf(buf, "0x%016llx\n",
2572                         (unsigned long long) snap->features);
2573 }
2574
2575 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2576 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2577 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2578
2579 static struct attribute *rbd_snap_attrs[] = {
2580         &dev_attr_snap_size.attr,
2581         &dev_attr_snap_id.attr,
2582         &dev_attr_snap_features.attr,
2583         NULL,
2584 };
2585
2586 static struct attribute_group rbd_snap_attr_group = {
2587         .attrs = rbd_snap_attrs,
2588 };
2589
2590 static void rbd_snap_dev_release(struct device *dev)
2591 {
2592         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2593         kfree(snap->name);
2594         kfree(snap);
2595 }
2596
2597 static const struct attribute_group *rbd_snap_attr_groups[] = {
2598         &rbd_snap_attr_group,
2599         NULL
2600 };
2601
2602 static struct device_type rbd_snap_device_type = {
2603         .groups         = rbd_snap_attr_groups,
2604         .release        = rbd_snap_dev_release,
2605 };
2606
2607 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2608 {
2609         kref_get(&spec->kref);
2610
2611         return spec;
2612 }
2613
2614 static void rbd_spec_free(struct kref *kref);
2615 static void rbd_spec_put(struct rbd_spec *spec)
2616 {
2617         if (spec)
2618                 kref_put(&spec->kref, rbd_spec_free);
2619 }
2620
2621 static struct rbd_spec *rbd_spec_alloc(void)
2622 {
2623         struct rbd_spec *spec;
2624
2625         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2626         if (!spec)
2627                 return NULL;
2628         kref_init(&spec->kref);
2629
2630         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2631
2632         return spec;
2633 }
2634
2635 static void rbd_spec_free(struct kref *kref)
2636 {
2637         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2638
2639         kfree(spec->pool_name);
2640         kfree(spec->image_id);
2641         kfree(spec->image_name);
2642         kfree(spec->snap_name);
2643         kfree(spec);
2644 }
2645
2646 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2647                                 struct rbd_spec *spec)
2648 {
2649         struct rbd_device *rbd_dev;
2650
2651         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2652         if (!rbd_dev)
2653                 return NULL;
2654
2655         spin_lock_init(&rbd_dev->lock);
2656         rbd_dev->flags = 0;
2657         INIT_LIST_HEAD(&rbd_dev->node);
2658         INIT_LIST_HEAD(&rbd_dev->snaps);
2659         init_rwsem(&rbd_dev->header_rwsem);
2660
2661         rbd_dev->spec = spec;
2662         rbd_dev->rbd_client = rbdc;
2663
2664         /* Initialize the layout used for all rbd requests */
2665
2666         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2667         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2668         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2669         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2670
2671         return rbd_dev;
2672 }
2673
2674 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2675 {
2676         rbd_spec_put(rbd_dev->parent_spec);
2677         kfree(rbd_dev->header_name);
2678         rbd_put_client(rbd_dev->rbd_client);
2679         rbd_spec_put(rbd_dev->spec);
2680         kfree(rbd_dev);
2681 }
2682
2683 static bool rbd_snap_registered(struct rbd_snap *snap)
2684 {
2685         bool ret = snap->dev.type == &rbd_snap_device_type;
2686         bool reg = device_is_registered(&snap->dev);
2687
2688         rbd_assert(!ret ^ reg);
2689
2690         return ret;
2691 }
2692
2693 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2694 {
2695         list_del(&snap->node);
2696         if (device_is_registered(&snap->dev))
2697                 device_unregister(&snap->dev);
2698 }
2699
2700 static int rbd_register_snap_dev(struct rbd_snap *snap,
2701                                   struct device *parent)
2702 {
2703         struct device *dev = &snap->dev;
2704         int ret;
2705
2706         dev->type = &rbd_snap_device_type;
2707         dev->parent = parent;
2708         dev->release = rbd_snap_dev_release;
2709         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2710         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2711
2712         ret = device_register(dev);
2713
2714         return ret;
2715 }
2716
2717 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2718                                                 const char *snap_name,
2719                                                 u64 snap_id, u64 snap_size,
2720                                                 u64 snap_features)
2721 {
2722         struct rbd_snap *snap;
2723         int ret;
2724
2725         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2726         if (!snap)
2727                 return ERR_PTR(-ENOMEM);
2728
2729         ret = -ENOMEM;
2730         snap->name = kstrdup(snap_name, GFP_KERNEL);
2731         if (!snap->name)
2732                 goto err;
2733
2734         snap->id = snap_id;
2735         snap->size = snap_size;
2736         snap->features = snap_features;
2737
2738         return snap;
2739
2740 err:
2741         kfree(snap->name);
2742         kfree(snap);
2743
2744         return ERR_PTR(ret);
2745 }
2746
2747 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2748                 u64 *snap_size, u64 *snap_features)
2749 {
2750         char *snap_name;
2751
2752         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2753
2754         *snap_size = rbd_dev->header.snap_sizes[which];
2755         *snap_features = 0;     /* No features for v1 */
2756
2757         /* Skip over names until we find the one we are looking for */
2758
2759         snap_name = rbd_dev->header.snap_names;
2760         while (which--)
2761                 snap_name += strlen(snap_name) + 1;
2762
2763         return snap_name;
2764 }
2765
2766 /*
2767  * Get the size and object order for an image snapshot, or if
2768  * snap_id is CEPH_NOSNAP, gets this information for the base
2769  * image.
2770  */
2771 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2772                                 u8 *order, u64 *snap_size)
2773 {
2774         __le64 snapid = cpu_to_le64(snap_id);
2775         int ret;
2776         struct {
2777                 u8 order;
2778                 __le64 size;
2779         } __attribute__ ((packed)) size_buf = { 0 };
2780
2781         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2782                                 "rbd", "get_size",
2783                                 (char *) &snapid, sizeof (snapid),
2784                                 (char *) &size_buf, sizeof (size_buf), NULL);
2785         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2786         if (ret < 0)
2787                 return ret;
2788
2789         *order = size_buf.order;
2790         *snap_size = le64_to_cpu(size_buf.size);
2791
2792         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2793                 (unsigned long long) snap_id, (unsigned int) *order,
2794                 (unsigned long long) *snap_size);
2795
2796         return 0;
2797 }
2798
2799 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2800 {
2801         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2802                                         &rbd_dev->header.obj_order,
2803                                         &rbd_dev->header.image_size);
2804 }
2805
2806 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2807 {
2808         void *reply_buf;
2809         int ret;
2810         void *p;
2811
2812         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2813         if (!reply_buf)
2814                 return -ENOMEM;
2815
2816         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2817                                 "rbd", "get_object_prefix",
2818                                 NULL, 0,
2819                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2820         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2821         if (ret < 0)
2822                 goto out;
2823
2824         p = reply_buf;
2825         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2826                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2827                                                 NULL, GFP_NOIO);
2828
2829         if (IS_ERR(rbd_dev->header.object_prefix)) {
2830                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2831                 rbd_dev->header.object_prefix = NULL;
2832         } else {
2833                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2834         }
2835
2836 out:
2837         kfree(reply_buf);
2838
2839         return ret;
2840 }
2841
2842 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2843                 u64 *snap_features)
2844 {
2845         __le64 snapid = cpu_to_le64(snap_id);
2846         struct {
2847                 __le64 features;
2848                 __le64 incompat;
2849         } features_buf = { 0 };
2850         u64 incompat;
2851         int ret;
2852
2853         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2854                                 "rbd", "get_features",
2855                                 (char *) &snapid, sizeof (snapid),
2856                                 (char *) &features_buf, sizeof (features_buf),
2857                                 NULL);
2858         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2859         if (ret < 0)
2860                 return ret;
2861
2862         incompat = le64_to_cpu(features_buf.incompat);
2863         if (incompat & ~RBD_FEATURES_ALL)
2864                 return -ENXIO;
2865
2866         *snap_features = le64_to_cpu(features_buf.features);
2867
2868         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2869                 (unsigned long long) snap_id,
2870                 (unsigned long long) *snap_features,
2871                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2872
2873         return 0;
2874 }
2875
2876 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2877 {
2878         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2879                                                 &rbd_dev->header.features);
2880 }
2881
2882 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2883 {
2884         struct rbd_spec *parent_spec;
2885         size_t size;
2886         void *reply_buf = NULL;
2887         __le64 snapid;
2888         void *p;
2889         void *end;
2890         char *image_id;
2891         u64 overlap;
2892         int ret;
2893
2894         parent_spec = rbd_spec_alloc();
2895         if (!parent_spec)
2896                 return -ENOMEM;
2897
2898         size = sizeof (__le64) +                                /* pool_id */
2899                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2900                 sizeof (__le64) +                               /* snap_id */
2901                 sizeof (__le64);                                /* overlap */
2902         reply_buf = kmalloc(size, GFP_KERNEL);
2903         if (!reply_buf) {
2904                 ret = -ENOMEM;
2905                 goto out_err;
2906         }
2907
2908         snapid = cpu_to_le64(CEPH_NOSNAP);
2909         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2910                                 "rbd", "get_parent",
2911                                 (char *) &snapid, sizeof (snapid),
2912                                 (char *) reply_buf, size, NULL);
2913         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2914         if (ret < 0)
2915                 goto out_err;
2916
2917         ret = -ERANGE;
2918         p = reply_buf;
2919         end = (char *) reply_buf + size;
2920         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2921         if (parent_spec->pool_id == CEPH_NOPOOL)
2922                 goto out;       /* No parent?  No problem. */
2923
2924         /* The ceph file layout needs to fit pool id in 32 bits */
2925
2926         ret = -EIO;
2927         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2928                 goto out;
2929
2930         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2931         if (IS_ERR(image_id)) {
2932                 ret = PTR_ERR(image_id);
2933                 goto out_err;
2934         }
2935         parent_spec->image_id = image_id;
2936         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2937         ceph_decode_64_safe(&p, end, overlap, out_err);
2938
2939         rbd_dev->parent_overlap = overlap;
2940         rbd_dev->parent_spec = parent_spec;
2941         parent_spec = NULL;     /* rbd_dev now owns this */
2942 out:
2943         ret = 0;
2944 out_err:
2945         kfree(reply_buf);
2946         rbd_spec_put(parent_spec);
2947
2948         return ret;
2949 }
2950
2951 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2952 {
2953         size_t image_id_size;
2954         char *image_id;
2955         void *p;
2956         void *end;
2957         size_t size;
2958         void *reply_buf = NULL;
2959         size_t len = 0;
2960         char *image_name = NULL;
2961         int ret;
2962
2963         rbd_assert(!rbd_dev->spec->image_name);
2964
2965         len = strlen(rbd_dev->spec->image_id);
2966         image_id_size = sizeof (__le32) + len;
2967         image_id = kmalloc(image_id_size, GFP_KERNEL);
2968         if (!image_id)
2969                 return NULL;
2970
2971         p = image_id;
2972         end = (char *) image_id + image_id_size;
2973         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2974
2975         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2976         reply_buf = kmalloc(size, GFP_KERNEL);
2977         if (!reply_buf)
2978                 goto out;
2979
2980         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2981                                 "rbd", "dir_get_name",
2982                                 image_id, image_id_size,
2983                                 (char *) reply_buf, size, NULL);
2984         if (ret < 0)
2985                 goto out;
2986         p = reply_buf;
2987         end = (char *) reply_buf + size;
2988         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2989         if (IS_ERR(image_name))
2990                 image_name = NULL;
2991         else
2992                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2993 out:
2994         kfree(reply_buf);
2995         kfree(image_id);
2996
2997         return image_name;
2998 }
2999
3000 /*
3001  * When a parent image gets probed, we only have the pool, image,
3002  * and snapshot ids but not the names of any of them.  This call
3003  * is made later to fill in those names.  It has to be done after
3004  * rbd_dev_snaps_update() has completed because some of the
3005  * information (in particular, snapshot name) is not available
3006  * until then.
3007  */
3008 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3009 {
3010         struct ceph_osd_client *osdc;
3011         const char *name;
3012         void *reply_buf = NULL;
3013         int ret;
3014
3015         if (rbd_dev->spec->pool_name)
3016                 return 0;       /* Already have the names */
3017
3018         /* Look up the pool name */
3019
3020         osdc = &rbd_dev->rbd_client->client->osdc;
3021         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3022         if (!name) {
3023                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3024                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3025                 return -EIO;
3026         }
3027
3028         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3029         if (!rbd_dev->spec->pool_name)
3030                 return -ENOMEM;
3031
3032         /* Fetch the image name; tolerate failure here */
3033
3034         name = rbd_dev_image_name(rbd_dev);
3035         if (name)
3036                 rbd_dev->spec->image_name = (char *) name;
3037         else
3038                 rbd_warn(rbd_dev, "unable to get image name");
3039
3040         /* Look up the snapshot name. */
3041
3042         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3043         if (!name) {
3044                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3045                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3046                 ret = -EIO;
3047                 goto out_err;
3048         }
3049         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3050         if(!rbd_dev->spec->snap_name)
3051                 goto out_err;
3052
3053         return 0;
3054 out_err:
3055         kfree(reply_buf);
3056         kfree(rbd_dev->spec->pool_name);
3057         rbd_dev->spec->pool_name = NULL;
3058
3059         return ret;
3060 }
3061
3062 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3063 {
3064         size_t size;
3065         int ret;
3066         void *reply_buf;
3067         void *p;
3068         void *end;
3069         u64 seq;
3070         u32 snap_count;
3071         struct ceph_snap_context *snapc;
3072         u32 i;
3073
3074         /*
3075          * We'll need room for the seq value (maximum snapshot id),
3076          * snapshot count, and array of that many snapshot ids.
3077          * For now we have a fixed upper limit on the number we're
3078          * prepared to receive.
3079          */
3080         size = sizeof (__le64) + sizeof (__le32) +
3081                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3082         reply_buf = kzalloc(size, GFP_KERNEL);
3083         if (!reply_buf)
3084                 return -ENOMEM;
3085
3086         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3087                                 "rbd", "get_snapcontext",
3088                                 NULL, 0,
3089                                 reply_buf, size, ver);
3090         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3091         if (ret < 0)
3092                 goto out;
3093
3094         ret = -ERANGE;
3095         p = reply_buf;
3096         end = (char *) reply_buf + size;
3097         ceph_decode_64_safe(&p, end, seq, out);
3098         ceph_decode_32_safe(&p, end, snap_count, out);
3099
3100         /*
3101          * Make sure the reported number of snapshot ids wouldn't go
3102          * beyond the end of our buffer.  But before checking that,
3103          * make sure the computed size of the snapshot context we
3104          * allocate is representable in a size_t.
3105          */
3106         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3107                                  / sizeof (u64)) {
3108                 ret = -EINVAL;
3109                 goto out;
3110         }
3111         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3112                 goto out;
3113
3114         size = sizeof (struct ceph_snap_context) +
3115                                 snap_count * sizeof (snapc->snaps[0]);
3116         snapc = kmalloc(size, GFP_KERNEL);
3117         if (!snapc) {
3118                 ret = -ENOMEM;
3119                 goto out;
3120         }
3121
3122         atomic_set(&snapc->nref, 1);
3123         snapc->seq = seq;
3124         snapc->num_snaps = snap_count;
3125         for (i = 0; i < snap_count; i++)
3126                 snapc->snaps[i] = ceph_decode_64(&p);
3127
3128         rbd_dev->header.snapc = snapc;
3129
3130         dout("  snap context seq = %llu, snap_count = %u\n",
3131                 (unsigned long long) seq, (unsigned int) snap_count);
3132
3133 out:
3134         kfree(reply_buf);
3135
3136         return 0;
3137 }
3138
3139 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3140 {
3141         size_t size;
3142         void *reply_buf;
3143         __le64 snap_id;
3144         int ret;
3145         void *p;
3146         void *end;
3147         char *snap_name;
3148
3149         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3150         reply_buf = kmalloc(size, GFP_KERNEL);
3151         if (!reply_buf)
3152                 return ERR_PTR(-ENOMEM);
3153
3154         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3155         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3156                                 "rbd", "get_snapshot_name",
3157                                 (char *) &snap_id, sizeof (snap_id),
3158                                 reply_buf, size, NULL);
3159         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3160         if (ret < 0)
3161                 goto out;
3162
3163         p = reply_buf;
3164         end = (char *) reply_buf + size;
3165         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3166         if (IS_ERR(snap_name)) {
3167                 ret = PTR_ERR(snap_name);
3168                 goto out;
3169         } else {
3170                 dout("  snap_id 0x%016llx snap_name = %s\n",
3171                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3172         }
3173         kfree(reply_buf);
3174
3175         return snap_name;
3176 out:
3177         kfree(reply_buf);
3178
3179         return ERR_PTR(ret);
3180 }
3181
3182 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3183                 u64 *snap_size, u64 *snap_features)
3184 {
3185         u64 snap_id;
3186         u8 order;
3187         int ret;
3188
3189         snap_id = rbd_dev->header.snapc->snaps[which];
3190         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3191         if (ret)
3192                 return ERR_PTR(ret);
3193         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3194         if (ret)
3195                 return ERR_PTR(ret);
3196
3197         return rbd_dev_v2_snap_name(rbd_dev, which);
3198 }
3199
3200 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3201                 u64 *snap_size, u64 *snap_features)
3202 {
3203         if (rbd_dev->image_format == 1)
3204                 return rbd_dev_v1_snap_info(rbd_dev, which,
3205                                         snap_size, snap_features);
3206         if (rbd_dev->image_format == 2)
3207                 return rbd_dev_v2_snap_info(rbd_dev, which,
3208                                         snap_size, snap_features);
3209         return ERR_PTR(-EINVAL);
3210 }
3211
3212 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3213 {
3214         int ret;
3215         __u8 obj_order;
3216
3217         down_write(&rbd_dev->header_rwsem);
3218
3219         /* Grab old order first, to see if it changes */
3220
3221         obj_order = rbd_dev->header.obj_order,
3222         ret = rbd_dev_v2_image_size(rbd_dev);
3223         if (ret)
3224                 goto out;
3225         if (rbd_dev->header.obj_order != obj_order) {
3226                 ret = -EIO;
3227                 goto out;
3228         }
3229         rbd_update_mapping_size(rbd_dev);
3230
3231         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3232         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3233         if (ret)
3234                 goto out;
3235         ret = rbd_dev_snaps_update(rbd_dev);
3236         dout("rbd_dev_snaps_update returned %d\n", ret);
3237         if (ret)
3238                 goto out;
3239         ret = rbd_dev_snaps_register(rbd_dev);
3240         dout("rbd_dev_snaps_register returned %d\n", ret);
3241 out:
3242         up_write(&rbd_dev->header_rwsem);
3243
3244         return ret;
3245 }
3246
3247 /*
3248  * Scan the rbd device's current snapshot list and compare it to the
3249  * newly-received snapshot context.  Remove any existing snapshots
3250  * not present in the new snapshot context.  Add a new snapshot for
3251  * any snaphots in the snapshot context not in the current list.
3252  * And verify there are no changes to snapshots we already know
3253  * about.
3254  *
3255  * Assumes the snapshots in the snapshot context are sorted by
3256  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3257  * are also maintained in that order.)
3258  */
3259 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3260 {
3261         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3262         const u32 snap_count = snapc->num_snaps;
3263         struct list_head *head = &rbd_dev->snaps;
3264         struct list_head *links = head->next;
3265         u32 index = 0;
3266
3267         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3268         while (index < snap_count || links != head) {
3269                 u64 snap_id;
3270                 struct rbd_snap *snap;
3271                 char *snap_name;
3272                 u64 snap_size = 0;
3273                 u64 snap_features = 0;
3274
3275                 snap_id = index < snap_count ? snapc->snaps[index]
3276                                              : CEPH_NOSNAP;
3277                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3278                                      : NULL;
3279                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3280
3281                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3282                         struct list_head *next = links->next;
3283
3284                         /*
3285                          * A previously-existing snapshot is not in
3286                          * the new snap context.
3287                          *
3288                          * If the now missing snapshot is the one the
3289                          * image is mapped to, clear its exists flag
3290                          * so we can avoid sending any more requests
3291                          * to it.
3292                          */
3293                         if (rbd_dev->spec->snap_id == snap->id)
3294                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3295                         rbd_remove_snap_dev(snap);
3296                         dout("%ssnap id %llu has been removed\n",
3297                                 rbd_dev->spec->snap_id == snap->id ?
3298                                                         "mapped " : "",
3299                                 (unsigned long long) snap->id);
3300
3301                         /* Done with this list entry; advance */
3302
3303                         links = next;
3304                         continue;
3305                 }
3306
3307                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3308                                         &snap_size, &snap_features);
3309                 if (IS_ERR(snap_name))
3310                         return PTR_ERR(snap_name);
3311
3312                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3313                         (unsigned long long) snap_id);
3314                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3315                         struct rbd_snap *new_snap;
3316
3317                         /* We haven't seen this snapshot before */
3318
3319                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3320                                         snap_id, snap_size, snap_features);
3321                         if (IS_ERR(new_snap)) {
3322                                 int err = PTR_ERR(new_snap);
3323
3324                                 dout("  failed to add dev, error %d\n", err);
3325
3326                                 return err;
3327                         }
3328
3329                         /* New goes before existing, or at end of list */
3330
3331                         dout("  added dev%s\n", snap ? "" : " at end\n");
3332                         if (snap)
3333                                 list_add_tail(&new_snap->node, &snap->node);
3334                         else
3335                                 list_add_tail(&new_snap->node, head);
3336                 } else {
3337                         /* Already have this one */
3338
3339                         dout("  already present\n");
3340
3341                         rbd_assert(snap->size == snap_size);
3342                         rbd_assert(!strcmp(snap->name, snap_name));
3343                         rbd_assert(snap->features == snap_features);
3344
3345                         /* Done with this list entry; advance */
3346
3347                         links = links->next;
3348                 }
3349
3350                 /* Advance to the next entry in the snapshot context */
3351
3352                 index++;
3353         }
3354         dout("%s: done\n", __func__);
3355
3356         return 0;
3357 }
3358
3359 /*
3360  * Scan the list of snapshots and register the devices for any that
3361  * have not already been registered.
3362  */
3363 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3364 {
3365         struct rbd_snap *snap;
3366         int ret = 0;
3367
3368         dout("%s:\n", __func__);
3369         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3370                 return -EIO;
3371
3372         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3373                 if (!rbd_snap_registered(snap)) {
3374                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3375                         if (ret < 0)
3376                                 break;
3377                 }
3378         }
3379         dout("%s: returning %d\n", __func__, ret);
3380
3381         return ret;
3382 }
3383
3384 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3385 {
3386         struct device *dev;
3387         int ret;
3388
3389         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3390
3391         dev = &rbd_dev->dev;
3392         dev->bus = &rbd_bus_type;
3393         dev->type = &rbd_device_type;
3394         dev->parent = &rbd_root_dev;
3395         dev->release = rbd_dev_release;
3396         dev_set_name(dev, "%d", rbd_dev->dev_id);
3397         ret = device_register(dev);
3398
3399         mutex_unlock(&ctl_mutex);
3400
3401         return ret;
3402 }
3403
3404 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3405 {
3406         device_unregister(&rbd_dev->dev);
3407 }
3408
3409 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3410
3411 /*
3412  * Get a unique rbd identifier for the given new rbd_dev, and add
3413  * the rbd_dev to the global list.  The minimum rbd id is 1.
3414  */
3415 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3416 {
3417         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3418
3419         spin_lock(&rbd_dev_list_lock);
3420         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3421         spin_unlock(&rbd_dev_list_lock);
3422         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3423                 (unsigned long long) rbd_dev->dev_id);
3424 }
3425
3426 /*
3427  * Remove an rbd_dev from the global list, and record that its
3428  * identifier is no longer in use.
3429  */
3430 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3431 {
3432         struct list_head *tmp;
3433         int rbd_id = rbd_dev->dev_id;
3434         int max_id;
3435
3436         rbd_assert(rbd_id > 0);
3437
3438         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3439                 (unsigned long long) rbd_dev->dev_id);
3440         spin_lock(&rbd_dev_list_lock);
3441         list_del_init(&rbd_dev->node);
3442
3443         /*
3444          * If the id being "put" is not the current maximum, there
3445          * is nothing special we need to do.
3446          */
3447         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3448                 spin_unlock(&rbd_dev_list_lock);
3449                 return;
3450         }
3451
3452         /*
3453          * We need to update the current maximum id.  Search the
3454          * list to find out what it is.  We're more likely to find
3455          * the maximum at the end, so search the list backward.
3456          */
3457         max_id = 0;
3458         list_for_each_prev(tmp, &rbd_dev_list) {
3459                 struct rbd_device *rbd_dev;
3460
3461                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3462                 if (rbd_dev->dev_id > max_id)
3463                         max_id = rbd_dev->dev_id;
3464         }
3465         spin_unlock(&rbd_dev_list_lock);
3466
3467         /*
3468          * The max id could have been updated by rbd_dev_id_get(), in
3469          * which case it now accurately reflects the new maximum.
3470          * Be careful not to overwrite the maximum value in that
3471          * case.
3472          */
3473         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3474         dout("  max dev id has been reset\n");
3475 }
3476
3477 /*
3478  * Skips over white space at *buf, and updates *buf to point to the
3479  * first found non-space character (if any). Returns the length of
3480  * the token (string of non-white space characters) found.  Note
3481  * that *buf must be terminated with '\0'.
3482  */
3483 static inline size_t next_token(const char **buf)
3484 {
3485         /*
3486         * These are the characters that produce nonzero for
3487         * isspace() in the "C" and "POSIX" locales.
3488         */
3489         const char *spaces = " \f\n\r\t\v";
3490
3491         *buf += strspn(*buf, spaces);   /* Find start of token */
3492
3493         return strcspn(*buf, spaces);   /* Return token length */
3494 }
3495
3496 /*
3497  * Finds the next token in *buf, and if the provided token buffer is
3498  * big enough, copies the found token into it.  The result, if
3499  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3500  * must be terminated with '\0' on entry.
3501  *
3502  * Returns the length of the token found (not including the '\0').
3503  * Return value will be 0 if no token is found, and it will be >=
3504  * token_size if the token would not fit.
3505  *
3506  * The *buf pointer will be updated to point beyond the end of the
3507  * found token.  Note that this occurs even if the token buffer is
3508  * too small to hold it.
3509  */
3510 static inline size_t copy_token(const char **buf,
3511                                 char *token,
3512                                 size_t token_size)
3513 {
3514         size_t len;
3515
3516         len = next_token(buf);
3517         if (len < token_size) {
3518                 memcpy(token, *buf, len);
3519                 *(token + len) = '\0';
3520         }
3521         *buf += len;
3522
3523         return len;
3524 }
3525
3526 /*
3527  * Finds the next token in *buf, dynamically allocates a buffer big
3528  * enough to hold a copy of it, and copies the token into the new
3529  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3530  * that a duplicate buffer is created even for a zero-length token.
3531  *
3532  * Returns a pointer to the newly-allocated duplicate, or a null
3533  * pointer if memory for the duplicate was not available.  If
3534  * the lenp argument is a non-null pointer, the length of the token
3535  * (not including the '\0') is returned in *lenp.
3536  *
3537  * If successful, the *buf pointer will be updated to point beyond
3538  * the end of the found token.
3539  *
3540  * Note: uses GFP_KERNEL for allocation.
3541  */
3542 static inline char *dup_token(const char **buf, size_t *lenp)
3543 {
3544         char *dup;
3545         size_t len;
3546
3547         len = next_token(buf);
3548         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3549         if (!dup)
3550                 return NULL;
3551         *(dup + len) = '\0';
3552         *buf += len;
3553
3554         if (lenp)
3555                 *lenp = len;
3556
3557         return dup;
3558 }
3559
3560 /*
3561  * Parse the options provided for an "rbd add" (i.e., rbd image
3562  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3563  * and the data written is passed here via a NUL-terminated buffer.
3564  * Returns 0 if successful or an error code otherwise.
3565  *
3566  * The information extracted from these options is recorded in
3567  * the other parameters which return dynamically-allocated
3568  * structures:
3569  *  ceph_opts
3570  *      The address of a pointer that will refer to a ceph options
3571  *      structure.  Caller must release the returned pointer using
3572  *      ceph_destroy_options() when it is no longer needed.
3573  *  rbd_opts
3574  *      Address of an rbd options pointer.  Fully initialized by
3575  *      this function; caller must release with kfree().
3576  *  spec
3577  *      Address of an rbd image specification pointer.  Fully
3578  *      initialized by this function based on parsed options.
3579  *      Caller must release with rbd_spec_put().
3580  *
3581  * The options passed take this form:
3582  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3583  * where:
3584  *  <mon_addrs>
3585  *      A comma-separated list of one or more monitor addresses.
3586  *      A monitor address is an ip address, optionally followed
3587  *      by a port number (separated by a colon).
3588  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3589  *  <options>
3590  *      A comma-separated list of ceph and/or rbd options.
3591  *  <pool_name>
3592  *      The name of the rados pool containing the rbd image.
3593  *  <image_name>
3594  *      The name of the image in that pool to map.
3595  *  <snap_id>
3596  *      An optional snapshot id.  If provided, the mapping will
3597  *      present data from the image at the time that snapshot was
3598  *      created.  The image head is used if no snapshot id is
3599  *      provided.  Snapshot mappings are always read-only.
3600  */
3601 static int rbd_add_parse_args(const char *buf,
3602                                 struct ceph_options **ceph_opts,
3603                                 struct rbd_options **opts,
3604                                 struct rbd_spec **rbd_spec)
3605 {
3606         size_t len;
3607         char *options;
3608         const char *mon_addrs;
3609         size_t mon_addrs_size;
3610         struct rbd_spec *spec = NULL;
3611         struct rbd_options *rbd_opts = NULL;
3612         struct ceph_options *copts;
3613         int ret;
3614
3615         /* The first four tokens are required */
3616
3617         len = next_token(&buf);
3618         if (!len) {
3619                 rbd_warn(NULL, "no monitor address(es) provided");
3620                 return -EINVAL;
3621         }
3622         mon_addrs = buf;
3623         mon_addrs_size = len + 1;
3624         buf += len;
3625
3626         ret = -EINVAL;
3627         options = dup_token(&buf, NULL);
3628         if (!options)
3629                 return -ENOMEM;
3630         if (!*options) {
3631                 rbd_warn(NULL, "no options provided");
3632                 goto out_err;
3633         }
3634
3635         spec = rbd_spec_alloc();
3636         if (!spec)
3637                 goto out_mem;
3638
3639         spec->pool_name = dup_token(&buf, NULL);
3640         if (!spec->pool_name)
3641                 goto out_mem;
3642         if (!*spec->pool_name) {
3643                 rbd_warn(NULL, "no pool name provided");
3644                 goto out_err;
3645         }
3646
3647         spec->image_name = dup_token(&buf, NULL);
3648         if (!spec->image_name)
3649                 goto out_mem;
3650         if (!*spec->image_name) {
3651                 rbd_warn(NULL, "no image name provided");
3652                 goto out_err;
3653         }
3654
3655         /*
3656          * Snapshot name is optional; default is to use "-"
3657          * (indicating the head/no snapshot).
3658          */
3659         len = next_token(&buf);
3660         if (!len) {
3661                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3662                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3663         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3664                 ret = -ENAMETOOLONG;
3665                 goto out_err;
3666         }
3667         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3668         if (!spec->snap_name)
3669                 goto out_mem;
3670         *(spec->snap_name + len) = '\0';
3671
3672         /* Initialize all rbd options to the defaults */
3673
3674         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3675         if (!rbd_opts)
3676                 goto out_mem;
3677
3678         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3679
3680         copts = ceph_parse_options(options, mon_addrs,
3681                                         mon_addrs + mon_addrs_size - 1,
3682                                         parse_rbd_opts_token, rbd_opts);
3683         if (IS_ERR(copts)) {
3684                 ret = PTR_ERR(copts);
3685                 goto out_err;
3686         }
3687         kfree(options);
3688
3689         *ceph_opts = copts;
3690         *opts = rbd_opts;
3691         *rbd_spec = spec;
3692
3693         return 0;
3694 out_mem:
3695         ret = -ENOMEM;
3696 out_err:
3697         kfree(rbd_opts);
3698         rbd_spec_put(spec);
3699         kfree(options);
3700
3701         return ret;
3702 }
3703
3704 /*
3705  * An rbd format 2 image has a unique identifier, distinct from the
3706  * name given to it by the user.  Internally, that identifier is
3707  * what's used to specify the names of objects related to the image.
3708  *
3709  * A special "rbd id" object is used to map an rbd image name to its
3710  * id.  If that object doesn't exist, then there is no v2 rbd image
3711  * with the supplied name.
3712  *
3713  * This function will record the given rbd_dev's image_id field if
3714  * it can be determined, and in that case will return 0.  If any
3715  * errors occur a negative errno will be returned and the rbd_dev's
3716  * image_id field will be unchanged (and should be NULL).
3717  */
3718 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3719 {
3720         int ret;
3721         size_t size;
3722         char *object_name;
3723         void *response;
3724         void *p;
3725
3726         /*
3727          * When probing a parent image, the image id is already
3728          * known (and the image name likely is not).  There's no
3729          * need to fetch the image id again in this case.
3730          */
3731         if (rbd_dev->spec->image_id)
3732                 return 0;
3733
3734         /*
3735          * First, see if the format 2 image id file exists, and if
3736          * so, get the image's persistent id from it.
3737          */
3738         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3739         object_name = kmalloc(size, GFP_NOIO);
3740         if (!object_name)
3741                 return -ENOMEM;
3742         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3743         dout("rbd id object name is %s\n", object_name);
3744
3745         /* Response will be an encoded string, which includes a length */
3746
3747         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3748         response = kzalloc(size, GFP_NOIO);
3749         if (!response) {
3750                 ret = -ENOMEM;
3751                 goto out;
3752         }
3753
3754         ret = rbd_obj_method_sync(rbd_dev, object_name,
3755                                 "rbd", "get_id",
3756                                 NULL, 0,
3757                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3758         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3759         if (ret < 0)
3760                 goto out;
3761
3762         p = response;
3763         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3764                                                 p + RBD_IMAGE_ID_LEN_MAX,
3765                                                 NULL, GFP_NOIO);
3766         if (IS_ERR(rbd_dev->spec->image_id)) {
3767                 ret = PTR_ERR(rbd_dev->spec->image_id);
3768                 rbd_dev->spec->image_id = NULL;
3769         } else {
3770                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3771         }
3772 out:
3773         kfree(response);
3774         kfree(object_name);
3775
3776         return ret;
3777 }
3778
3779 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3780 {
3781         int ret;
3782         size_t size;
3783
3784         /* Version 1 images have no id; empty string is used */
3785
3786         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3787         if (!rbd_dev->spec->image_id)
3788                 return -ENOMEM;
3789
3790         /* Record the header object name for this rbd image. */
3791
3792         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3793         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3794         if (!rbd_dev->header_name) {
3795                 ret = -ENOMEM;
3796                 goto out_err;
3797         }
3798         sprintf(rbd_dev->header_name, "%s%s",
3799                 rbd_dev->spec->image_name, RBD_SUFFIX);
3800
3801         /* Populate rbd image metadata */
3802
3803         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3804         if (ret < 0)
3805                 goto out_err;
3806
3807         /* Version 1 images have no parent (no layering) */
3808
3809         rbd_dev->parent_spec = NULL;
3810         rbd_dev->parent_overlap = 0;
3811
3812         rbd_dev->image_format = 1;
3813
3814         dout("discovered version 1 image, header name is %s\n",
3815                 rbd_dev->header_name);
3816
3817         return 0;
3818
3819 out_err:
3820         kfree(rbd_dev->header_name);
3821         rbd_dev->header_name = NULL;
3822         kfree(rbd_dev->spec->image_id);
3823         rbd_dev->spec->image_id = NULL;
3824
3825         return ret;
3826 }
3827
3828 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3829 {
3830         size_t size;
3831         int ret;
3832         u64 ver = 0;
3833
3834         /*
3835          * Image id was filled in by the caller.  Record the header
3836          * object name for this rbd image.
3837          */
3838         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3839         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3840         if (!rbd_dev->header_name)
3841                 return -ENOMEM;
3842         sprintf(rbd_dev->header_name, "%s%s",
3843                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3844
3845         /* Get the size and object order for the image */
3846
3847         ret = rbd_dev_v2_image_size(rbd_dev);
3848         if (ret < 0)
3849                 goto out_err;
3850
3851         /* Get the object prefix (a.k.a. block_name) for the image */
3852
3853         ret = rbd_dev_v2_object_prefix(rbd_dev);
3854         if (ret < 0)
3855                 goto out_err;
3856
3857         /* Get the and check features for the image */
3858
3859         ret = rbd_dev_v2_features(rbd_dev);
3860         if (ret < 0)
3861                 goto out_err;
3862
3863         /* If the image supports layering, get the parent info */
3864
3865         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3866                 ret = rbd_dev_v2_parent_info(rbd_dev);
3867                 if (ret < 0)
3868                         goto out_err;
3869         }
3870
3871         /* crypto and compression type aren't (yet) supported for v2 images */
3872
3873         rbd_dev->header.crypt_type = 0;
3874         rbd_dev->header.comp_type = 0;
3875
3876         /* Get the snapshot context, plus the header version */
3877
3878         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3879         if (ret)
3880                 goto out_err;
3881         rbd_dev->header.obj_version = ver;
3882
3883         rbd_dev->image_format = 2;
3884
3885         dout("discovered version 2 image, header name is %s\n",
3886                 rbd_dev->header_name);
3887
3888         return 0;
3889 out_err:
3890         rbd_dev->parent_overlap = 0;
3891         rbd_spec_put(rbd_dev->parent_spec);
3892         rbd_dev->parent_spec = NULL;
3893         kfree(rbd_dev->header_name);
3894         rbd_dev->header_name = NULL;
3895         kfree(rbd_dev->header.object_prefix);
3896         rbd_dev->header.object_prefix = NULL;
3897
3898         return ret;
3899 }
3900
3901 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3902 {
3903         int ret;
3904
3905         /* no need to lock here, as rbd_dev is not registered yet */
3906         ret = rbd_dev_snaps_update(rbd_dev);
3907         if (ret)
3908                 return ret;
3909
3910         ret = rbd_dev_probe_update_spec(rbd_dev);
3911         if (ret)
3912                 goto err_out_snaps;
3913
3914         ret = rbd_dev_set_mapping(rbd_dev);
3915         if (ret)
3916                 goto err_out_snaps;
3917
3918         /* generate unique id: find highest unique id, add one */
3919         rbd_dev_id_get(rbd_dev);
3920
3921         /* Fill in the device name, now that we have its id. */
3922         BUILD_BUG_ON(DEV_NAME_LEN
3923                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3924         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3925
3926         /* Get our block major device number. */
3927
3928         ret = register_blkdev(0, rbd_dev->name);
3929         if (ret < 0)
3930                 goto err_out_id;
3931         rbd_dev->major = ret;
3932
3933         /* Set up the blkdev mapping. */
3934
3935         ret = rbd_init_disk(rbd_dev);
3936         if (ret)
3937                 goto err_out_blkdev;
3938
3939         ret = rbd_bus_add_dev(rbd_dev);
3940         if (ret)
3941                 goto err_out_disk;
3942
3943         /*
3944          * At this point cleanup in the event of an error is the job
3945          * of the sysfs code (initiated by rbd_bus_del_dev()).
3946          */
3947         down_write(&rbd_dev->header_rwsem);
3948         ret = rbd_dev_snaps_register(rbd_dev);
3949         up_write(&rbd_dev->header_rwsem);
3950         if (ret)
3951                 goto err_out_bus;
3952
3953         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3954         if (ret)
3955                 goto err_out_bus;
3956
3957         /* Everything's ready.  Announce the disk to the world. */
3958
3959         add_disk(rbd_dev->disk);
3960
3961         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3962                 (unsigned long long) rbd_dev->mapping.size);
3963
3964         return ret;
3965 err_out_bus:
3966         /* this will also clean up rest of rbd_dev stuff */
3967
3968         rbd_bus_del_dev(rbd_dev);
3969
3970         return ret;
3971 err_out_disk:
3972         rbd_free_disk(rbd_dev);
3973 err_out_blkdev:
3974         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3975 err_out_id:
3976         rbd_dev_id_put(rbd_dev);
3977 err_out_snaps:
3978         rbd_remove_all_snaps(rbd_dev);
3979
3980         return ret;
3981 }
3982
3983 /*
3984  * Probe for the existence of the header object for the given rbd
3985  * device.  For format 2 images this includes determining the image
3986  * id.
3987  */
3988 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3989 {
3990         int ret;
3991
3992         /*
3993          * Get the id from the image id object.  If it's not a
3994          * format 2 image, we'll get ENOENT back, and we'll assume
3995          * it's a format 1 image.
3996          */
3997         ret = rbd_dev_image_id(rbd_dev);
3998         if (ret)
3999                 ret = rbd_dev_v1_probe(rbd_dev);
4000         else
4001                 ret = rbd_dev_v2_probe(rbd_dev);
4002         if (ret) {
4003                 dout("probe failed, returning %d\n", ret);
4004
4005                 return ret;
4006         }
4007
4008         ret = rbd_dev_probe_finish(rbd_dev);
4009         if (ret)
4010                 rbd_header_free(&rbd_dev->header);
4011
4012         return ret;
4013 }
4014
4015 static ssize_t rbd_add(struct bus_type *bus,
4016                        const char *buf,
4017                        size_t count)
4018 {
4019         struct rbd_device *rbd_dev = NULL;
4020         struct ceph_options *ceph_opts = NULL;
4021         struct rbd_options *rbd_opts = NULL;
4022         struct rbd_spec *spec = NULL;
4023         struct rbd_client *rbdc;
4024         struct ceph_osd_client *osdc;
4025         int rc = -ENOMEM;
4026
4027         if (!try_module_get(THIS_MODULE))
4028                 return -ENODEV;
4029
4030         /* parse add command */
4031         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4032         if (rc < 0)
4033                 goto err_out_module;
4034
4035         rbdc = rbd_get_client(ceph_opts);
4036         if (IS_ERR(rbdc)) {
4037                 rc = PTR_ERR(rbdc);
4038                 goto err_out_args;
4039         }
4040         ceph_opts = NULL;       /* rbd_dev client now owns this */
4041
4042         /* pick the pool */
4043         osdc = &rbdc->client->osdc;
4044         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4045         if (rc < 0)
4046                 goto err_out_client;
4047         spec->pool_id = (u64) rc;
4048
4049         /* The ceph file layout needs to fit pool id in 32 bits */
4050
4051         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4052                 rc = -EIO;
4053                 goto err_out_client;
4054         }
4055
4056         rbd_dev = rbd_dev_create(rbdc, spec);
4057         if (!rbd_dev)
4058                 goto err_out_client;
4059         rbdc = NULL;            /* rbd_dev now owns this */
4060         spec = NULL;            /* rbd_dev now owns this */
4061
4062         rbd_dev->mapping.read_only = rbd_opts->read_only;
4063         kfree(rbd_opts);
4064         rbd_opts = NULL;        /* done with this */
4065
4066         rc = rbd_dev_probe(rbd_dev);
4067         if (rc < 0)
4068                 goto err_out_rbd_dev;
4069
4070         return count;
4071 err_out_rbd_dev:
4072         rbd_dev_destroy(rbd_dev);
4073 err_out_client:
4074         rbd_put_client(rbdc);
4075 err_out_args:
4076         if (ceph_opts)
4077                 ceph_destroy_options(ceph_opts);
4078         kfree(rbd_opts);
4079         rbd_spec_put(spec);
4080 err_out_module:
4081         module_put(THIS_MODULE);
4082
4083         dout("Error adding device %s\n", buf);
4084
4085         return (ssize_t) rc;
4086 }
4087
4088 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4089 {
4090         struct list_head *tmp;
4091         struct rbd_device *rbd_dev;
4092
4093         spin_lock(&rbd_dev_list_lock);
4094         list_for_each(tmp, &rbd_dev_list) {
4095                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4096                 if (rbd_dev->dev_id == dev_id) {
4097                         spin_unlock(&rbd_dev_list_lock);
4098                         return rbd_dev;
4099                 }
4100         }
4101         spin_unlock(&rbd_dev_list_lock);
4102         return NULL;
4103 }
4104
4105 static void rbd_dev_release(struct device *dev)
4106 {
4107         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4108
4109         if (rbd_dev->watch_event)
4110                 rbd_dev_header_watch_sync(rbd_dev, 0);
4111
4112         /* clean up and free blkdev */
4113         rbd_free_disk(rbd_dev);
4114         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4115
4116         /* release allocated disk header fields */
4117         rbd_header_free(&rbd_dev->header);
4118
4119         /* done with the id, and with the rbd_dev */
4120         rbd_dev_id_put(rbd_dev);
4121         rbd_assert(rbd_dev->rbd_client != NULL);
4122         rbd_dev_destroy(rbd_dev);
4123
4124         /* release module ref */
4125         module_put(THIS_MODULE);
4126 }
4127
4128 static ssize_t rbd_remove(struct bus_type *bus,
4129                           const char *buf,
4130                           size_t count)
4131 {
4132         struct rbd_device *rbd_dev = NULL;
4133         int target_id, rc;
4134         unsigned long ul;
4135         int ret = count;
4136
4137         rc = strict_strtoul(buf, 10, &ul);
4138         if (rc)
4139                 return rc;
4140
4141         /* convert to int; abort if we lost anything in the conversion */
4142         target_id = (int) ul;
4143         if (target_id != ul)
4144                 return -EINVAL;
4145
4146         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4147
4148         rbd_dev = __rbd_get_dev(target_id);
4149         if (!rbd_dev) {
4150                 ret = -ENOENT;
4151                 goto done;
4152         }
4153
4154         spin_lock_irq(&rbd_dev->lock);
4155         if (rbd_dev->open_count)
4156                 ret = -EBUSY;
4157         else
4158                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4159         spin_unlock_irq(&rbd_dev->lock);
4160         if (ret < 0)
4161                 goto done;
4162
4163         rbd_remove_all_snaps(rbd_dev);
4164         rbd_bus_del_dev(rbd_dev);
4165
4166 done:
4167         mutex_unlock(&ctl_mutex);
4168
4169         return ret;
4170 }
4171
4172 /*
4173  * create control files in sysfs
4174  * /sys/bus/rbd/...
4175  */
4176 static int rbd_sysfs_init(void)
4177 {
4178         int ret;
4179
4180         ret = device_register(&rbd_root_dev);
4181         if (ret < 0)
4182                 return ret;
4183
4184         ret = bus_register(&rbd_bus_type);
4185         if (ret < 0)
4186                 device_unregister(&rbd_root_dev);
4187
4188         return ret;
4189 }
4190
4191 static void rbd_sysfs_cleanup(void)
4192 {
4193         bus_unregister(&rbd_bus_type);
4194         device_unregister(&rbd_root_dev);
4195 }
4196
4197 static int __init rbd_init(void)
4198 {
4199         int rc;
4200
4201         if (!libceph_compatible(NULL)) {
4202                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4203
4204                 return -EINVAL;
4205         }
4206         rc = rbd_sysfs_init();
4207         if (rc)
4208                 return rc;
4209         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4210         return 0;
4211 }
4212
4213 static void __exit rbd_exit(void)
4214 {
4215         rbd_sysfs_cleanup();
4216 }
4217
4218 module_init(rbd_init);
4219 module_exit(rbd_exit);
4220
4221 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4222 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4223 MODULE_DESCRIPTION("rados block device");
4224
4225 /* following authorship retained from original osdblk.c */
4226 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4227
4228 MODULE_LICENSE("GPL");