3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These four fields never change for a given rbd image */
110 /* The remaining fields need to be updated occasionally */
112 struct ceph_snap_context *snapc;
121 * An rbd image specification.
123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124 * identify an image. Each rbd_dev structure includes a pointer to
125 * an rbd_spec structure that encapsulates this identity.
127 * Each of the id's in an rbd_spec has an associated name. For a
128 * user-mapped image, the names are supplied and the id's associated
129 * with them are looked up. For a layered image, a parent image is
130 * defined by the tuple, and the names are looked up.
132 * An rbd_dev structure contains a parent_spec pointer which is
133 * non-null if the image it represents is a child in a layered
134 * image. This pointer will refer to the rbd_spec structure used
135 * by the parent rbd_dev for its own identity (i.e., the structure
136 * is shared between the parent and child).
138 * Since these structures are populated once, during the discovery
139 * phase of image construction, they are effectively immutable so
140 * we make no effort to synchronize access to them.
142 * Note that code herein does not assume the image name is known (it
143 * could be a null pointer).
147 const char *pool_name;
149 const char *image_id;
150 const char *image_name;
153 const char *snap_name;
159 * an instance of the client. multiple devices may share an rbd client.
162 struct ceph_client *client;
164 struct list_head node;
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
170 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
175 enum obj_request_type {
176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
186 struct rbd_obj_request {
187 const char *object_name;
188 u64 offset; /* object start byte */
189 u64 length; /* bytes from offset */
193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
209 struct rbd_obj_request *obj_request; /* STAT op */
211 struct rbd_img_request *img_request;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
217 u32 which; /* posn image request list */
219 enum obj_request_type type;
221 struct bio *bio_list;
227 struct page **copyup_pages;
229 struct ceph_osd_request *osd_req;
231 u64 xferred; /* bytes transferred */
234 rbd_obj_callback_t callback;
235 struct completion completion;
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
246 struct rbd_img_request {
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
252 u64 snap_id; /* for reads */
253 struct ceph_snap_context *snapc; /* for writes */
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
259 struct page **copyup_pages;
260 spinlock_t completion_lock;/* protects next_completion */
262 rbd_img_callback_t callback;
263 u64 xferred;/* aggregate bytes transferred */
264 int result; /* first nonzero obj_request result */
266 u32 obj_request_count;
267 struct list_head obj_requests; /* rbd_obj_request structs */
272 #define for_each_obj_request(ireq, oreq) \
273 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
289 int dev_id; /* blkdev unique id */
291 int major; /* blkdev assigned major */
292 struct gendisk *disk; /* blkdev's gendisk and rq */
294 u32 image_format; /* Either 1 or 2 */
295 struct rbd_client *rbd_client;
297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299 spinlock_t lock; /* queue, flags, open_count */
301 struct rbd_image_header header;
302 unsigned long flags; /* possibly lock protected */
303 struct rbd_spec *spec;
307 struct ceph_file_layout layout;
309 struct ceph_osd_event *watch_event;
310 struct rbd_obj_request *watch_request;
312 struct rbd_spec *parent_spec;
314 struct rbd_device *parent;
316 /* protects updating the header */
317 struct rw_semaphore header_rwsem;
319 struct rbd_mapping mapping;
321 struct list_head node;
325 unsigned long open_count; /* protected by lock */
329 * Flag bits for rbd_dev->flags. If atomicity is required,
330 * rbd_dev->lock is used to protect access.
332 * Currently, only the "removing" flag (which is coupled with the
333 * "open_count" field) requires atomic access.
336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
340 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
342 static LIST_HEAD(rbd_dev_list); /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345 static LIST_HEAD(rbd_client_list); /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
348 /* Slab caches for frequently-allocated structures */
350 static struct kmem_cache *rbd_img_request_cache;
351 static struct kmem_cache *rbd_obj_request_cache;
352 static struct kmem_cache *rbd_segment_name_cache;
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356 static void rbd_dev_device_release(struct device *dev);
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
364 static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add),
366 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
370 static struct bus_type rbd_bus_type = {
372 .bus_attrs = rbd_bus_attrs,
375 static void rbd_root_dev_release(struct device *dev)
379 static struct device rbd_root_dev = {
381 .release = rbd_root_dev_release,
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 struct va_format vaf;
395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 else if (rbd_dev->disk)
397 printk(KERN_WARNING "%s: %s: %pV\n",
398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 printk(KERN_WARNING "%s: image %s: %pV\n",
401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 printk(KERN_WARNING "%s: id %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 RBD_DRV_NAME, rbd_dev, &vaf);
412 #define rbd_assert(expr) \
413 if (unlikely(!(expr))) { \
414 printk(KERN_ERR "\nAssertion failure in %s() " \
416 "\trbd_assert(%s);\n\n", \
417 __func__, __LINE__, #expr); \
420 #else /* !RBD_DEBUG */
421 # define rbd_assert(expr) ((void) 0)
422 #endif /* !RBD_DEBUG */
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 bool removing = false;
443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446 spin_lock_irq(&rbd_dev->lock);
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450 rbd_dev->open_count++;
451 spin_unlock_irq(&rbd_dev->lock);
455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 (void) get_device(&rbd_dev->dev);
457 set_device_ro(bdev, rbd_dev->mapping.read_only);
458 mutex_unlock(&ctl_mutex);
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 struct rbd_device *rbd_dev = disk->private_data;
466 unsigned long open_count_before;
468 spin_lock_irq(&rbd_dev->lock);
469 open_count_before = rbd_dev->open_count--;
470 spin_unlock_irq(&rbd_dev->lock);
471 rbd_assert(open_count_before > 0);
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 put_device(&rbd_dev->dev);
475 mutex_unlock(&ctl_mutex);
480 static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
483 .release = rbd_release,
487 * Initialize an rbd client instance.
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 struct rbd_client *rbdc;
495 dout("%s:\n", __func__);
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506 if (IS_ERR(rbdc->client))
508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510 ret = ceph_open_session(rbdc->client);
514 spin_lock(&rbd_client_list_lock);
515 list_add_tail(&rbdc->node, &rbd_client_list);
516 spin_unlock(&rbd_client_list_lock);
518 mutex_unlock(&ctl_mutex);
519 dout("%s: rbdc %p\n", __func__, rbdc);
524 ceph_destroy_client(rbdc->client);
526 mutex_unlock(&ctl_mutex);
530 ceph_destroy_options(ceph_opts);
531 dout("%s: error %d\n", __func__, ret);
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 kref_get(&rbdc->kref);
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 struct rbd_client *client_node;
552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558 __rbd_get_client(client_node);
564 spin_unlock(&rbd_client_list_lock);
566 return found ? client_node : NULL;
576 /* string args above */
579 /* Boolean args above */
583 static match_table_t rbd_opts_tokens = {
585 /* string args above */
586 {Opt_read_only, "read_only"},
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
598 #define RBD_READ_ONLY_DEFAULT false
600 static int parse_rbd_opts_token(char *c, void *private)
602 struct rbd_options *rbd_opts = private;
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
606 token = match_token(c, rbd_opts_tokens, argstr);
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
613 pr_err("bad mount option arg (not int) "
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
624 dout("got token %d\n", token);
629 rbd_opts->read_only = true;
632 rbd_opts->read_only = false;
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 struct rbd_client *rbdc;
649 rbdc = rbd_client_find(ceph_opts);
650 if (rbdc) /* using an existing client */
651 ceph_destroy_options(ceph_opts);
653 rbdc = rbd_client_create(ceph_opts);
659 * Destroy ceph client
661 * Caller must hold rbd_client_list_lock.
663 static void rbd_client_release(struct kref *kref)
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667 dout("%s: rbdc %p\n", __func__, rbdc);
668 spin_lock(&rbd_client_list_lock);
669 list_del(&rbdc->node);
670 spin_unlock(&rbd_client_list_lock);
672 ceph_destroy_client(rbdc->client);
677 * Drop reference to ceph client node. If it's not referenced anymore, release
680 static void rbd_put_client(struct rbd_client *rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
686 static bool rbd_image_format_valid(u32 image_format)
688 return image_format == 1 || image_format == 2;
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700 /* The bio layer requires at least sector-sized I/O */
702 if (ondisk->options.order < SECTOR_SHIFT)
705 /* If we use u64 in a few spots we may be able to loosen this */
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731 * Create a new header structure, translate header format from the on-disk
734 static int rbd_header_from_disk(struct rbd_image_header *header,
735 struct rbd_image_header_ondisk *ondisk)
742 memset(header, 0, sizeof (*header));
744 snap_count = le32_to_cpu(ondisk->snap_count);
746 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
748 if (!header->object_prefix)
750 memcpy(header->object_prefix, ondisk->object_prefix, len);
751 header->object_prefix[len] = '\0';
754 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
756 /* Save a copy of the snapshot names */
758 if (snap_names_len > (u64) SIZE_MAX)
760 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
761 if (!header->snap_names)
764 * Note that rbd_dev_v1_header_read() guarantees
765 * the ondisk buffer we're working with has
766 * snap_names_len bytes beyond the end of the
767 * snapshot id array, this memcpy() is safe.
769 memcpy(header->snap_names, &ondisk->snaps[snap_count],
772 /* Record each snapshot's size */
774 size = snap_count * sizeof (*header->snap_sizes);
775 header->snap_sizes = kmalloc(size, GFP_KERNEL);
776 if (!header->snap_sizes)
778 for (i = 0; i < snap_count; i++)
779 header->snap_sizes[i] =
780 le64_to_cpu(ondisk->snaps[i].image_size);
782 header->snap_names = NULL;
783 header->snap_sizes = NULL;
786 header->features = 0; /* No features support in v1 images */
787 header->obj_order = ondisk->options.order;
788 header->crypt_type = ondisk->options.crypt_type;
789 header->comp_type = ondisk->options.comp_type;
791 /* Allocate and fill in the snapshot context */
793 header->image_size = le64_to_cpu(ondisk->image_size);
795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
799 for (i = 0; i < snap_count; i++)
800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
805 kfree(header->snap_sizes);
806 header->snap_sizes = NULL;
807 kfree(header->snap_names);
808 header->snap_names = NULL;
809 kfree(header->object_prefix);
810 header->object_prefix = NULL;
815 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
817 const char *snap_name;
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
821 /* Skip over names until we find the one we are looking for */
823 snap_name = rbd_dev->header.snap_names;
825 snap_name += strlen(snap_name) + 1;
827 return kstrdup(snap_name, GFP_KERNEL);
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
834 static int snapid_compare_reverse(const void *s1, const void *s2)
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
839 if (snap_id1 < snap_id2)
841 return snap_id1 == snap_id2 ? 0 : -1;
845 * Search a snapshot context to see if the given snapshot id is
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
854 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
865 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
877 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
879 if (snap_id == CEPH_NOSNAP)
880 return RBD_SNAP_HEAD_NAME;
882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883 if (rbd_dev->image_format == 1)
884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
889 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
902 *snap_size = rbd_dev->header.snap_sizes[which];
907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
916 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
932 *snap_features = features;
937 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
939 u64 snap_id = rbd_dev->spec->snap_id;
944 ret = rbd_snap_size(rbd_dev, snap_id, &size);
947 ret = rbd_snap_features(rbd_dev, snap_id, &features);
951 rbd_dev->mapping.size = size;
952 rbd_dev->mapping.features = features;
957 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
959 rbd_dev->mapping.size = 0;
960 rbd_dev->mapping.features = 0;
963 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
969 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
972 segment = offset >> rbd_dev->header.obj_order;
973 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
974 rbd_dev->header.object_prefix, segment);
975 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
976 pr_err("error formatting segment name for #%llu (%d)\n",
985 static void rbd_segment_name_free(const char *name)
987 /* The explicit cast here is needed to drop the const qualifier */
989 kmem_cache_free(rbd_segment_name_cache, (void *)name);
992 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
994 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
996 return offset & (segment_size - 1);
999 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1000 u64 offset, u64 length)
1002 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1004 offset &= segment_size - 1;
1006 rbd_assert(length <= U64_MAX - offset);
1007 if (offset + length > segment_size)
1008 length = segment_size - offset;
1014 * returns the size of an object in the image
1016 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1018 return 1 << header->obj_order;
1025 static void bio_chain_put(struct bio *chain)
1031 chain = chain->bi_next;
1037 * zeros a bio chain, starting at specific offset
1039 static void zero_bio_chain(struct bio *chain, int start_ofs)
1042 unsigned long flags;
1048 bio_for_each_segment(bv, chain, i) {
1049 if (pos + bv->bv_len > start_ofs) {
1050 int remainder = max(start_ofs - pos, 0);
1051 buf = bvec_kmap_irq(bv, &flags);
1052 memset(buf + remainder, 0,
1053 bv->bv_len - remainder);
1054 bvec_kunmap_irq(buf, &flags);
1059 chain = chain->bi_next;
1064 * similar to zero_bio_chain(), zeros data defined by a page array,
1065 * starting at the given byte offset from the start of the array and
1066 * continuing up to the given end offset. The pages array is
1067 * assumed to be big enough to hold all bytes up to the end.
1069 static void zero_pages(struct page **pages, u64 offset, u64 end)
1071 struct page **page = &pages[offset >> PAGE_SHIFT];
1073 rbd_assert(end > offset);
1074 rbd_assert(end - offset <= (u64)SIZE_MAX);
1075 while (offset < end) {
1078 unsigned long flags;
1081 page_offset = (size_t)(offset & ~PAGE_MASK);
1082 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1083 local_irq_save(flags);
1084 kaddr = kmap_atomic(*page);
1085 memset(kaddr + page_offset, 0, length);
1086 kunmap_atomic(kaddr);
1087 local_irq_restore(flags);
1095 * Clone a portion of a bio, starting at the given byte offset
1096 * and continuing for the number of bytes indicated.
1098 static struct bio *bio_clone_range(struct bio *bio_src,
1099 unsigned int offset,
1107 unsigned short end_idx;
1108 unsigned short vcnt;
1111 /* Handle the easy case for the caller */
1113 if (!offset && len == bio_src->bi_size)
1114 return bio_clone(bio_src, gfpmask);
1116 if (WARN_ON_ONCE(!len))
1118 if (WARN_ON_ONCE(len > bio_src->bi_size))
1120 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1123 /* Find first affected segment... */
1126 __bio_for_each_segment(bv, bio_src, idx, 0) {
1127 if (resid < bv->bv_len)
1129 resid -= bv->bv_len;
1133 /* ...and the last affected segment */
1136 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1137 if (resid <= bv->bv_len)
1139 resid -= bv->bv_len;
1141 vcnt = end_idx - idx + 1;
1143 /* Build the clone */
1145 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1147 return NULL; /* ENOMEM */
1149 bio->bi_bdev = bio_src->bi_bdev;
1150 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1151 bio->bi_rw = bio_src->bi_rw;
1152 bio->bi_flags |= 1 << BIO_CLONED;
1155 * Copy over our part of the bio_vec, then update the first
1156 * and last (or only) entries.
1158 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1159 vcnt * sizeof (struct bio_vec));
1160 bio->bi_io_vec[0].bv_offset += voff;
1162 bio->bi_io_vec[0].bv_len -= voff;
1163 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1165 bio->bi_io_vec[0].bv_len = len;
1168 bio->bi_vcnt = vcnt;
1176 * Clone a portion of a bio chain, starting at the given byte offset
1177 * into the first bio in the source chain and continuing for the
1178 * number of bytes indicated. The result is another bio chain of
1179 * exactly the given length, or a null pointer on error.
1181 * The bio_src and offset parameters are both in-out. On entry they
1182 * refer to the first source bio and the offset into that bio where
1183 * the start of data to be cloned is located.
1185 * On return, bio_src is updated to refer to the bio in the source
1186 * chain that contains first un-cloned byte, and *offset will
1187 * contain the offset of that byte within that bio.
1189 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1190 unsigned int *offset,
1194 struct bio *bi = *bio_src;
1195 unsigned int off = *offset;
1196 struct bio *chain = NULL;
1199 /* Build up a chain of clone bios up to the limit */
1201 if (!bi || off >= bi->bi_size || !len)
1202 return NULL; /* Nothing to clone */
1206 unsigned int bi_size;
1210 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1211 goto out_err; /* EINVAL; ran out of bio's */
1213 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1214 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1216 goto out_err; /* ENOMEM */
1219 end = &bio->bi_next;
1222 if (off == bi->bi_size) {
1233 bio_chain_put(chain);
1239 * The default/initial value for all object request flags is 0. For
1240 * each flag, once its value is set to 1 it is never reset to 0
1243 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1245 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1246 struct rbd_device *rbd_dev;
1248 rbd_dev = obj_request->img_request->rbd_dev;
1249 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1254 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1257 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1260 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1262 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1263 struct rbd_device *rbd_dev = NULL;
1265 if (obj_request_img_data_test(obj_request))
1266 rbd_dev = obj_request->img_request->rbd_dev;
1267 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1272 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1275 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1279 * This sets the KNOWN flag after (possibly) setting the EXISTS
1280 * flag. The latter is set based on the "exists" value provided.
1282 * Note that for our purposes once an object exists it never goes
1283 * away again. It's possible that the response from two existence
1284 * checks are separated by the creation of the target object, and
1285 * the first ("doesn't exist") response arrives *after* the second
1286 * ("does exist"). In that case we ignore the second one.
1288 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1292 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1293 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1297 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1300 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1303 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1306 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1309 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1311 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1312 atomic_read(&obj_request->kref.refcount));
1313 kref_get(&obj_request->kref);
1316 static void rbd_obj_request_destroy(struct kref *kref);
1317 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1319 rbd_assert(obj_request != NULL);
1320 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1321 atomic_read(&obj_request->kref.refcount));
1322 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1325 static void rbd_img_request_get(struct rbd_img_request *img_request)
1327 dout("%s: img %p (was %d)\n", __func__, img_request,
1328 atomic_read(&img_request->kref.refcount));
1329 kref_get(&img_request->kref);
1332 static void rbd_img_request_destroy(struct kref *kref);
1333 static void rbd_img_request_put(struct rbd_img_request *img_request)
1335 rbd_assert(img_request != NULL);
1336 dout("%s: img %p (was %d)\n", __func__, img_request,
1337 atomic_read(&img_request->kref.refcount));
1338 kref_put(&img_request->kref, rbd_img_request_destroy);
1341 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1342 struct rbd_obj_request *obj_request)
1344 rbd_assert(obj_request->img_request == NULL);
1346 /* Image request now owns object's original reference */
1347 obj_request->img_request = img_request;
1348 obj_request->which = img_request->obj_request_count;
1349 rbd_assert(!obj_request_img_data_test(obj_request));
1350 obj_request_img_data_set(obj_request);
1351 rbd_assert(obj_request->which != BAD_WHICH);
1352 img_request->obj_request_count++;
1353 list_add_tail(&obj_request->links, &img_request->obj_requests);
1354 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1355 obj_request->which);
1358 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1359 struct rbd_obj_request *obj_request)
1361 rbd_assert(obj_request->which != BAD_WHICH);
1363 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1364 obj_request->which);
1365 list_del(&obj_request->links);
1366 rbd_assert(img_request->obj_request_count > 0);
1367 img_request->obj_request_count--;
1368 rbd_assert(obj_request->which == img_request->obj_request_count);
1369 obj_request->which = BAD_WHICH;
1370 rbd_assert(obj_request_img_data_test(obj_request));
1371 rbd_assert(obj_request->img_request == img_request);
1372 obj_request->img_request = NULL;
1373 obj_request->callback = NULL;
1374 rbd_obj_request_put(obj_request);
1377 static bool obj_request_type_valid(enum obj_request_type type)
1380 case OBJ_REQUEST_NODATA:
1381 case OBJ_REQUEST_BIO:
1382 case OBJ_REQUEST_PAGES:
1389 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1390 struct rbd_obj_request *obj_request)
1392 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1394 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1397 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1400 dout("%s: img %p\n", __func__, img_request);
1403 * If no error occurred, compute the aggregate transfer
1404 * count for the image request. We could instead use
1405 * atomic64_cmpxchg() to update it as each object request
1406 * completes; not clear which way is better off hand.
1408 if (!img_request->result) {
1409 struct rbd_obj_request *obj_request;
1412 for_each_obj_request(img_request, obj_request)
1413 xferred += obj_request->xferred;
1414 img_request->xferred = xferred;
1417 if (img_request->callback)
1418 img_request->callback(img_request);
1420 rbd_img_request_put(img_request);
1423 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1425 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1427 dout("%s: obj %p\n", __func__, obj_request);
1429 return wait_for_completion_interruptible(&obj_request->completion);
1433 * The default/initial value for all image request flags is 0. Each
1434 * is conditionally set to 1 at image request initialization time
1435 * and currently never change thereafter.
1437 static void img_request_write_set(struct rbd_img_request *img_request)
1439 set_bit(IMG_REQ_WRITE, &img_request->flags);
1443 static bool img_request_write_test(struct rbd_img_request *img_request)
1446 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1449 static void img_request_child_set(struct rbd_img_request *img_request)
1451 set_bit(IMG_REQ_CHILD, &img_request->flags);
1455 static bool img_request_child_test(struct rbd_img_request *img_request)
1458 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1461 static void img_request_layered_set(struct rbd_img_request *img_request)
1463 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1467 static bool img_request_layered_test(struct rbd_img_request *img_request)
1470 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1474 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1476 u64 xferred = obj_request->xferred;
1477 u64 length = obj_request->length;
1479 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1480 obj_request, obj_request->img_request, obj_request->result,
1483 * ENOENT means a hole in the image. We zero-fill the
1484 * entire length of the request. A short read also implies
1485 * zero-fill to the end of the request. Either way we
1486 * update the xferred count to indicate the whole request
1489 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1490 if (obj_request->result == -ENOENT) {
1491 if (obj_request->type == OBJ_REQUEST_BIO)
1492 zero_bio_chain(obj_request->bio_list, 0);
1494 zero_pages(obj_request->pages, 0, length);
1495 obj_request->result = 0;
1496 obj_request->xferred = length;
1497 } else if (xferred < length && !obj_request->result) {
1498 if (obj_request->type == OBJ_REQUEST_BIO)
1499 zero_bio_chain(obj_request->bio_list, xferred);
1501 zero_pages(obj_request->pages, xferred, length);
1502 obj_request->xferred = length;
1504 obj_request_done_set(obj_request);
1507 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1509 dout("%s: obj %p cb %p\n", __func__, obj_request,
1510 obj_request->callback);
1511 if (obj_request->callback)
1512 obj_request->callback(obj_request);
1514 complete_all(&obj_request->completion);
1517 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1519 dout("%s: obj %p\n", __func__, obj_request);
1520 obj_request_done_set(obj_request);
1523 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1525 struct rbd_img_request *img_request = NULL;
1526 struct rbd_device *rbd_dev = NULL;
1527 bool layered = false;
1529 if (obj_request_img_data_test(obj_request)) {
1530 img_request = obj_request->img_request;
1531 layered = img_request && img_request_layered_test(img_request);
1532 rbd_dev = img_request->rbd_dev;
1535 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1536 obj_request, img_request, obj_request->result,
1537 obj_request->xferred, obj_request->length);
1538 if (layered && obj_request->result == -ENOENT &&
1539 obj_request->img_offset < rbd_dev->parent_overlap)
1540 rbd_img_parent_read(obj_request);
1541 else if (img_request)
1542 rbd_img_obj_request_read_callback(obj_request);
1544 obj_request_done_set(obj_request);
1547 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1549 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1550 obj_request->result, obj_request->length);
1552 * There is no such thing as a successful short write. Set
1553 * it to our originally-requested length.
1555 obj_request->xferred = obj_request->length;
1556 obj_request_done_set(obj_request);
1560 * For a simple stat call there's nothing to do. We'll do more if
1561 * this is part of a write sequence for a layered image.
1563 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1565 dout("%s: obj %p\n", __func__, obj_request);
1566 obj_request_done_set(obj_request);
1569 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1570 struct ceph_msg *msg)
1572 struct rbd_obj_request *obj_request = osd_req->r_priv;
1575 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1576 rbd_assert(osd_req == obj_request->osd_req);
1577 if (obj_request_img_data_test(obj_request)) {
1578 rbd_assert(obj_request->img_request);
1579 rbd_assert(obj_request->which != BAD_WHICH);
1581 rbd_assert(obj_request->which == BAD_WHICH);
1584 if (osd_req->r_result < 0)
1585 obj_request->result = osd_req->r_result;
1587 BUG_ON(osd_req->r_num_ops > 2);
1590 * We support a 64-bit length, but ultimately it has to be
1591 * passed to blk_end_request(), which takes an unsigned int.
1593 obj_request->xferred = osd_req->r_reply_op_len[0];
1594 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1595 opcode = osd_req->r_ops[0].op;
1597 case CEPH_OSD_OP_READ:
1598 rbd_osd_read_callback(obj_request);
1600 case CEPH_OSD_OP_WRITE:
1601 rbd_osd_write_callback(obj_request);
1603 case CEPH_OSD_OP_STAT:
1604 rbd_osd_stat_callback(obj_request);
1606 case CEPH_OSD_OP_CALL:
1607 case CEPH_OSD_OP_NOTIFY_ACK:
1608 case CEPH_OSD_OP_WATCH:
1609 rbd_osd_trivial_callback(obj_request);
1612 rbd_warn(NULL, "%s: unsupported op %hu\n",
1613 obj_request->object_name, (unsigned short) opcode);
1617 if (obj_request_done_test(obj_request))
1618 rbd_obj_request_complete(obj_request);
1621 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1623 struct rbd_img_request *img_request = obj_request->img_request;
1624 struct ceph_osd_request *osd_req = obj_request->osd_req;
1627 rbd_assert(osd_req != NULL);
1629 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1630 ceph_osdc_build_request(osd_req, obj_request->offset,
1631 NULL, snap_id, NULL);
1634 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1636 struct rbd_img_request *img_request = obj_request->img_request;
1637 struct ceph_osd_request *osd_req = obj_request->osd_req;
1638 struct ceph_snap_context *snapc;
1639 struct timespec mtime = CURRENT_TIME;
1641 rbd_assert(osd_req != NULL);
1643 snapc = img_request ? img_request->snapc : NULL;
1644 ceph_osdc_build_request(osd_req, obj_request->offset,
1645 snapc, CEPH_NOSNAP, &mtime);
1648 static struct ceph_osd_request *rbd_osd_req_create(
1649 struct rbd_device *rbd_dev,
1651 struct rbd_obj_request *obj_request)
1653 struct ceph_snap_context *snapc = NULL;
1654 struct ceph_osd_client *osdc;
1655 struct ceph_osd_request *osd_req;
1657 if (obj_request_img_data_test(obj_request)) {
1658 struct rbd_img_request *img_request = obj_request->img_request;
1660 rbd_assert(write_request ==
1661 img_request_write_test(img_request));
1663 snapc = img_request->snapc;
1666 /* Allocate and initialize the request, for the single op */
1668 osdc = &rbd_dev->rbd_client->client->osdc;
1669 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1671 return NULL; /* ENOMEM */
1674 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1676 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1678 osd_req->r_callback = rbd_osd_req_callback;
1679 osd_req->r_priv = obj_request;
1681 osd_req->r_oid_len = strlen(obj_request->object_name);
1682 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1683 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1685 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1691 * Create a copyup osd request based on the information in the
1692 * object request supplied. A copyup request has two osd ops,
1693 * a copyup method call, and a "normal" write request.
1695 static struct ceph_osd_request *
1696 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1698 struct rbd_img_request *img_request;
1699 struct ceph_snap_context *snapc;
1700 struct rbd_device *rbd_dev;
1701 struct ceph_osd_client *osdc;
1702 struct ceph_osd_request *osd_req;
1704 rbd_assert(obj_request_img_data_test(obj_request));
1705 img_request = obj_request->img_request;
1706 rbd_assert(img_request);
1707 rbd_assert(img_request_write_test(img_request));
1709 /* Allocate and initialize the request, for the two ops */
1711 snapc = img_request->snapc;
1712 rbd_dev = img_request->rbd_dev;
1713 osdc = &rbd_dev->rbd_client->client->osdc;
1714 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1716 return NULL; /* ENOMEM */
1718 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1719 osd_req->r_callback = rbd_osd_req_callback;
1720 osd_req->r_priv = obj_request;
1722 osd_req->r_oid_len = strlen(obj_request->object_name);
1723 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1724 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1726 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1732 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1734 ceph_osdc_put_request(osd_req);
1737 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1739 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1740 u64 offset, u64 length,
1741 enum obj_request_type type)
1743 struct rbd_obj_request *obj_request;
1747 rbd_assert(obj_request_type_valid(type));
1749 size = strlen(object_name) + 1;
1750 name = kmalloc(size, GFP_KERNEL);
1754 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1760 obj_request->object_name = memcpy(name, object_name, size);
1761 obj_request->offset = offset;
1762 obj_request->length = length;
1763 obj_request->flags = 0;
1764 obj_request->which = BAD_WHICH;
1765 obj_request->type = type;
1766 INIT_LIST_HEAD(&obj_request->links);
1767 init_completion(&obj_request->completion);
1768 kref_init(&obj_request->kref);
1770 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1771 offset, length, (int)type, obj_request);
1776 static void rbd_obj_request_destroy(struct kref *kref)
1778 struct rbd_obj_request *obj_request;
1780 obj_request = container_of(kref, struct rbd_obj_request, kref);
1782 dout("%s: obj %p\n", __func__, obj_request);
1784 rbd_assert(obj_request->img_request == NULL);
1785 rbd_assert(obj_request->which == BAD_WHICH);
1787 if (obj_request->osd_req)
1788 rbd_osd_req_destroy(obj_request->osd_req);
1790 rbd_assert(obj_request_type_valid(obj_request->type));
1791 switch (obj_request->type) {
1792 case OBJ_REQUEST_NODATA:
1793 break; /* Nothing to do */
1794 case OBJ_REQUEST_BIO:
1795 if (obj_request->bio_list)
1796 bio_chain_put(obj_request->bio_list);
1798 case OBJ_REQUEST_PAGES:
1799 if (obj_request->pages)
1800 ceph_release_page_vector(obj_request->pages,
1801 obj_request->page_count);
1805 kfree(obj_request->object_name);
1806 obj_request->object_name = NULL;
1807 kmem_cache_free(rbd_obj_request_cache, obj_request);
1811 * Caller is responsible for filling in the list of object requests
1812 * that comprises the image request, and the Linux request pointer
1813 * (if there is one).
1815 static struct rbd_img_request *rbd_img_request_create(
1816 struct rbd_device *rbd_dev,
1817 u64 offset, u64 length,
1821 struct rbd_img_request *img_request;
1823 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1827 if (write_request) {
1828 down_read(&rbd_dev->header_rwsem);
1829 ceph_get_snap_context(rbd_dev->header.snapc);
1830 up_read(&rbd_dev->header_rwsem);
1833 img_request->rq = NULL;
1834 img_request->rbd_dev = rbd_dev;
1835 img_request->offset = offset;
1836 img_request->length = length;
1837 img_request->flags = 0;
1838 if (write_request) {
1839 img_request_write_set(img_request);
1840 img_request->snapc = rbd_dev->header.snapc;
1842 img_request->snap_id = rbd_dev->spec->snap_id;
1845 img_request_child_set(img_request);
1846 if (rbd_dev->parent_spec)
1847 img_request_layered_set(img_request);
1848 spin_lock_init(&img_request->completion_lock);
1849 img_request->next_completion = 0;
1850 img_request->callback = NULL;
1851 img_request->result = 0;
1852 img_request->obj_request_count = 0;
1853 INIT_LIST_HEAD(&img_request->obj_requests);
1854 kref_init(&img_request->kref);
1856 rbd_img_request_get(img_request); /* Avoid a warning */
1857 rbd_img_request_put(img_request); /* TEMPORARY */
1859 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1860 write_request ? "write" : "read", offset, length,
1866 static void rbd_img_request_destroy(struct kref *kref)
1868 struct rbd_img_request *img_request;
1869 struct rbd_obj_request *obj_request;
1870 struct rbd_obj_request *next_obj_request;
1872 img_request = container_of(kref, struct rbd_img_request, kref);
1874 dout("%s: img %p\n", __func__, img_request);
1876 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1877 rbd_img_obj_request_del(img_request, obj_request);
1878 rbd_assert(img_request->obj_request_count == 0);
1880 if (img_request_write_test(img_request))
1881 ceph_put_snap_context(img_request->snapc);
1883 if (img_request_child_test(img_request))
1884 rbd_obj_request_put(img_request->obj_request);
1886 kmem_cache_free(rbd_img_request_cache, img_request);
1889 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1891 struct rbd_img_request *img_request;
1892 unsigned int xferred;
1896 rbd_assert(obj_request_img_data_test(obj_request));
1897 img_request = obj_request->img_request;
1899 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1900 xferred = (unsigned int)obj_request->xferred;
1901 result = obj_request->result;
1903 struct rbd_device *rbd_dev = img_request->rbd_dev;
1905 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1906 img_request_write_test(img_request) ? "write" : "read",
1907 obj_request->length, obj_request->img_offset,
1908 obj_request->offset);
1909 rbd_warn(rbd_dev, " result %d xferred %x\n",
1911 if (!img_request->result)
1912 img_request->result = result;
1915 /* Image object requests don't own their page array */
1917 if (obj_request->type == OBJ_REQUEST_PAGES) {
1918 obj_request->pages = NULL;
1919 obj_request->page_count = 0;
1922 if (img_request_child_test(img_request)) {
1923 rbd_assert(img_request->obj_request != NULL);
1924 more = obj_request->which < img_request->obj_request_count - 1;
1926 rbd_assert(img_request->rq != NULL);
1927 more = blk_end_request(img_request->rq, result, xferred);
1933 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1935 struct rbd_img_request *img_request;
1936 u32 which = obj_request->which;
1939 rbd_assert(obj_request_img_data_test(obj_request));
1940 img_request = obj_request->img_request;
1942 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1943 rbd_assert(img_request != NULL);
1944 rbd_assert(img_request->obj_request_count > 0);
1945 rbd_assert(which != BAD_WHICH);
1946 rbd_assert(which < img_request->obj_request_count);
1947 rbd_assert(which >= img_request->next_completion);
1949 spin_lock_irq(&img_request->completion_lock);
1950 if (which != img_request->next_completion)
1953 for_each_obj_request_from(img_request, obj_request) {
1955 rbd_assert(which < img_request->obj_request_count);
1957 if (!obj_request_done_test(obj_request))
1959 more = rbd_img_obj_end_request(obj_request);
1963 rbd_assert(more ^ (which == img_request->obj_request_count));
1964 img_request->next_completion = which;
1966 spin_unlock_irq(&img_request->completion_lock);
1969 rbd_img_request_complete(img_request);
1973 * Split up an image request into one or more object requests, each
1974 * to a different object. The "type" parameter indicates whether
1975 * "data_desc" is the pointer to the head of a list of bio
1976 * structures, or the base of a page array. In either case this
1977 * function assumes data_desc describes memory sufficient to hold
1978 * all data described by the image request.
1980 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1981 enum obj_request_type type,
1984 struct rbd_device *rbd_dev = img_request->rbd_dev;
1985 struct rbd_obj_request *obj_request = NULL;
1986 struct rbd_obj_request *next_obj_request;
1987 bool write_request = img_request_write_test(img_request);
1988 struct bio *bio_list;
1989 unsigned int bio_offset = 0;
1990 struct page **pages;
1995 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1996 (int)type, data_desc);
1998 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1999 img_offset = img_request->offset;
2000 resid = img_request->length;
2001 rbd_assert(resid > 0);
2003 if (type == OBJ_REQUEST_BIO) {
2004 bio_list = data_desc;
2005 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2007 rbd_assert(type == OBJ_REQUEST_PAGES);
2012 struct ceph_osd_request *osd_req;
2013 const char *object_name;
2017 object_name = rbd_segment_name(rbd_dev, img_offset);
2020 offset = rbd_segment_offset(rbd_dev, img_offset);
2021 length = rbd_segment_length(rbd_dev, img_offset, resid);
2022 obj_request = rbd_obj_request_create(object_name,
2023 offset, length, type);
2024 /* object request has its own copy of the object name */
2025 rbd_segment_name_free(object_name);
2029 if (type == OBJ_REQUEST_BIO) {
2030 unsigned int clone_size;
2032 rbd_assert(length <= (u64)UINT_MAX);
2033 clone_size = (unsigned int)length;
2034 obj_request->bio_list =
2035 bio_chain_clone_range(&bio_list,
2039 if (!obj_request->bio_list)
2042 unsigned int page_count;
2044 obj_request->pages = pages;
2045 page_count = (u32)calc_pages_for(offset, length);
2046 obj_request->page_count = page_count;
2047 if ((offset + length) & ~PAGE_MASK)
2048 page_count--; /* more on last page */
2049 pages += page_count;
2052 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2056 obj_request->osd_req = osd_req;
2057 obj_request->callback = rbd_img_obj_callback;
2059 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2061 if (type == OBJ_REQUEST_BIO)
2062 osd_req_op_extent_osd_data_bio(osd_req, 0,
2063 obj_request->bio_list, length);
2065 osd_req_op_extent_osd_data_pages(osd_req, 0,
2066 obj_request->pages, length,
2067 offset & ~PAGE_MASK, false, false);
2070 rbd_osd_req_format_write(obj_request);
2072 rbd_osd_req_format_read(obj_request);
2074 obj_request->img_offset = img_offset;
2075 rbd_img_obj_request_add(img_request, obj_request);
2077 img_offset += length;
2084 rbd_obj_request_put(obj_request);
2086 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2087 rbd_obj_request_put(obj_request);
2093 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2095 struct rbd_img_request *img_request;
2096 struct rbd_device *rbd_dev;
2100 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2101 rbd_assert(obj_request_img_data_test(obj_request));
2102 img_request = obj_request->img_request;
2103 rbd_assert(img_request);
2105 rbd_dev = img_request->rbd_dev;
2106 rbd_assert(rbd_dev);
2107 length = (u64)1 << rbd_dev->header.obj_order;
2108 page_count = (u32)calc_pages_for(0, length);
2110 rbd_assert(obj_request->copyup_pages);
2111 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2112 obj_request->copyup_pages = NULL;
2115 * We want the transfer count to reflect the size of the
2116 * original write request. There is no such thing as a
2117 * successful short write, so if the request was successful
2118 * we can just set it to the originally-requested length.
2120 if (!obj_request->result)
2121 obj_request->xferred = obj_request->length;
2123 /* Finish up with the normal image object callback */
2125 rbd_img_obj_callback(obj_request);
2129 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2131 struct rbd_obj_request *orig_request;
2132 struct ceph_osd_request *osd_req;
2133 struct ceph_osd_client *osdc;
2134 struct rbd_device *rbd_dev;
2135 struct page **pages;
2140 rbd_assert(img_request_child_test(img_request));
2142 /* First get what we need from the image request */
2144 pages = img_request->copyup_pages;
2145 rbd_assert(pages != NULL);
2146 img_request->copyup_pages = NULL;
2148 orig_request = img_request->obj_request;
2149 rbd_assert(orig_request != NULL);
2150 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2151 result = img_request->result;
2152 obj_size = img_request->length;
2153 xferred = img_request->xferred;
2155 rbd_dev = img_request->rbd_dev;
2156 rbd_assert(rbd_dev);
2157 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2159 rbd_img_request_put(img_request);
2164 /* Allocate the new copyup osd request for the original request */
2167 rbd_assert(!orig_request->osd_req);
2168 osd_req = rbd_osd_req_create_copyup(orig_request);
2171 orig_request->osd_req = osd_req;
2172 orig_request->copyup_pages = pages;
2174 /* Initialize the copyup op */
2176 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2177 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2180 /* Then the original write request op */
2182 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2183 orig_request->offset,
2184 orig_request->length, 0, 0);
2185 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2186 orig_request->length);
2188 rbd_osd_req_format_write(orig_request);
2190 /* All set, send it off. */
2192 orig_request->callback = rbd_img_obj_copyup_callback;
2193 osdc = &rbd_dev->rbd_client->client->osdc;
2194 result = rbd_obj_request_submit(osdc, orig_request);
2198 /* Record the error code and complete the request */
2200 orig_request->result = result;
2201 orig_request->xferred = 0;
2202 obj_request_done_set(orig_request);
2203 rbd_obj_request_complete(orig_request);
2207 * Read from the parent image the range of data that covers the
2208 * entire target of the given object request. This is used for
2209 * satisfying a layered image write request when the target of an
2210 * object request from the image request does not exist.
2212 * A page array big enough to hold the returned data is allocated
2213 * and supplied to rbd_img_request_fill() as the "data descriptor."
2214 * When the read completes, this page array will be transferred to
2215 * the original object request for the copyup operation.
2217 * If an error occurs, record it as the result of the original
2218 * object request and mark it done so it gets completed.
2220 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2222 struct rbd_img_request *img_request = NULL;
2223 struct rbd_img_request *parent_request = NULL;
2224 struct rbd_device *rbd_dev;
2227 struct page **pages = NULL;
2231 rbd_assert(obj_request_img_data_test(obj_request));
2232 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2234 img_request = obj_request->img_request;
2235 rbd_assert(img_request != NULL);
2236 rbd_dev = img_request->rbd_dev;
2237 rbd_assert(rbd_dev->parent != NULL);
2240 * First things first. The original osd request is of no
2241 * use to use any more, we'll need a new one that can hold
2242 * the two ops in a copyup request. We'll get that later,
2243 * but for now we can release the old one.
2245 rbd_osd_req_destroy(obj_request->osd_req);
2246 obj_request->osd_req = NULL;
2249 * Determine the byte range covered by the object in the
2250 * child image to which the original request was to be sent.
2252 img_offset = obj_request->img_offset - obj_request->offset;
2253 length = (u64)1 << rbd_dev->header.obj_order;
2256 * There is no defined parent data beyond the parent
2257 * overlap, so limit what we read at that boundary if
2260 if (img_offset + length > rbd_dev->parent_overlap) {
2261 rbd_assert(img_offset < rbd_dev->parent_overlap);
2262 length = rbd_dev->parent_overlap - img_offset;
2266 * Allocate a page array big enough to receive the data read
2269 page_count = (u32)calc_pages_for(0, length);
2270 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2271 if (IS_ERR(pages)) {
2272 result = PTR_ERR(pages);
2278 parent_request = rbd_img_request_create(rbd_dev->parent,
2281 if (!parent_request)
2283 rbd_obj_request_get(obj_request);
2284 parent_request->obj_request = obj_request;
2286 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2289 parent_request->copyup_pages = pages;
2291 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2292 result = rbd_img_request_submit(parent_request);
2296 parent_request->copyup_pages = NULL;
2297 parent_request->obj_request = NULL;
2298 rbd_obj_request_put(obj_request);
2301 ceph_release_page_vector(pages, page_count);
2303 rbd_img_request_put(parent_request);
2304 obj_request->result = result;
2305 obj_request->xferred = 0;
2306 obj_request_done_set(obj_request);
2311 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2313 struct rbd_obj_request *orig_request;
2316 rbd_assert(!obj_request_img_data_test(obj_request));
2319 * All we need from the object request is the original
2320 * request and the result of the STAT op. Grab those, then
2321 * we're done with the request.
2323 orig_request = obj_request->obj_request;
2324 obj_request->obj_request = NULL;
2325 rbd_assert(orig_request);
2326 rbd_assert(orig_request->img_request);
2328 result = obj_request->result;
2329 obj_request->result = 0;
2331 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2332 obj_request, orig_request, result,
2333 obj_request->xferred, obj_request->length);
2334 rbd_obj_request_put(obj_request);
2336 rbd_assert(orig_request);
2337 rbd_assert(orig_request->img_request);
2340 * Our only purpose here is to determine whether the object
2341 * exists, and we don't want to treat the non-existence as
2342 * an error. If something else comes back, transfer the
2343 * error to the original request and complete it now.
2346 obj_request_existence_set(orig_request, true);
2347 } else if (result == -ENOENT) {
2348 obj_request_existence_set(orig_request, false);
2349 } else if (result) {
2350 orig_request->result = result;
2355 * Resubmit the original request now that we have recorded
2356 * whether the target object exists.
2358 orig_request->result = rbd_img_obj_request_submit(orig_request);
2360 if (orig_request->result)
2361 rbd_obj_request_complete(orig_request);
2362 rbd_obj_request_put(orig_request);
2365 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2367 struct rbd_obj_request *stat_request;
2368 struct rbd_device *rbd_dev;
2369 struct ceph_osd_client *osdc;
2370 struct page **pages = NULL;
2376 * The response data for a STAT call consists of:
2383 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2384 page_count = (u32)calc_pages_for(0, size);
2385 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2387 return PTR_ERR(pages);
2390 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2395 rbd_obj_request_get(obj_request);
2396 stat_request->obj_request = obj_request;
2397 stat_request->pages = pages;
2398 stat_request->page_count = page_count;
2400 rbd_assert(obj_request->img_request);
2401 rbd_dev = obj_request->img_request->rbd_dev;
2402 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2404 if (!stat_request->osd_req)
2406 stat_request->callback = rbd_img_obj_exists_callback;
2408 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2409 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2411 rbd_osd_req_format_read(stat_request);
2413 osdc = &rbd_dev->rbd_client->client->osdc;
2414 ret = rbd_obj_request_submit(osdc, stat_request);
2417 rbd_obj_request_put(obj_request);
2422 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2424 struct rbd_img_request *img_request;
2425 struct rbd_device *rbd_dev;
2428 rbd_assert(obj_request_img_data_test(obj_request));
2430 img_request = obj_request->img_request;
2431 rbd_assert(img_request);
2432 rbd_dev = img_request->rbd_dev;
2435 * Only writes to layered images need special handling.
2436 * Reads and non-layered writes are simple object requests.
2437 * Layered writes that start beyond the end of the overlap
2438 * with the parent have no parent data, so they too are
2439 * simple object requests. Finally, if the target object is
2440 * known to already exist, its parent data has already been
2441 * copied, so a write to the object can also be handled as a
2442 * simple object request.
2444 if (!img_request_write_test(img_request) ||
2445 !img_request_layered_test(img_request) ||
2446 rbd_dev->parent_overlap <= obj_request->img_offset ||
2447 ((known = obj_request_known_test(obj_request)) &&
2448 obj_request_exists_test(obj_request))) {
2450 struct rbd_device *rbd_dev;
2451 struct ceph_osd_client *osdc;
2453 rbd_dev = obj_request->img_request->rbd_dev;
2454 osdc = &rbd_dev->rbd_client->client->osdc;
2456 return rbd_obj_request_submit(osdc, obj_request);
2460 * It's a layered write. The target object might exist but
2461 * we may not know that yet. If we know it doesn't exist,
2462 * start by reading the data for the full target object from
2463 * the parent so we can use it for a copyup to the target.
2466 return rbd_img_obj_parent_read_full(obj_request);
2468 /* We don't know whether the target exists. Go find out. */
2470 return rbd_img_obj_exists_submit(obj_request);
2473 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2475 struct rbd_obj_request *obj_request;
2476 struct rbd_obj_request *next_obj_request;
2478 dout("%s: img %p\n", __func__, img_request);
2479 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2482 ret = rbd_img_obj_request_submit(obj_request);
2490 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2492 struct rbd_obj_request *obj_request;
2493 struct rbd_device *rbd_dev;
2496 rbd_assert(img_request_child_test(img_request));
2498 obj_request = img_request->obj_request;
2499 rbd_assert(obj_request);
2500 rbd_assert(obj_request->img_request);
2502 obj_request->result = img_request->result;
2503 if (obj_request->result)
2507 * We need to zero anything beyond the parent overlap
2508 * boundary. Since rbd_img_obj_request_read_callback()
2509 * will zero anything beyond the end of a short read, an
2510 * easy way to do this is to pretend the data from the
2511 * parent came up short--ending at the overlap boundary.
2513 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2514 obj_end = obj_request->img_offset + obj_request->length;
2515 rbd_dev = obj_request->img_request->rbd_dev;
2516 if (obj_end > rbd_dev->parent_overlap) {
2519 if (obj_request->img_offset < rbd_dev->parent_overlap)
2520 xferred = rbd_dev->parent_overlap -
2521 obj_request->img_offset;
2523 obj_request->xferred = min(img_request->xferred, xferred);
2525 obj_request->xferred = img_request->xferred;
2528 rbd_img_request_put(img_request);
2529 rbd_img_obj_request_read_callback(obj_request);
2530 rbd_obj_request_complete(obj_request);
2533 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2535 struct rbd_device *rbd_dev;
2536 struct rbd_img_request *img_request;
2539 rbd_assert(obj_request_img_data_test(obj_request));
2540 rbd_assert(obj_request->img_request != NULL);
2541 rbd_assert(obj_request->result == (s32) -ENOENT);
2542 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2544 rbd_dev = obj_request->img_request->rbd_dev;
2545 rbd_assert(rbd_dev->parent != NULL);
2546 /* rbd_read_finish(obj_request, obj_request->length); */
2547 img_request = rbd_img_request_create(rbd_dev->parent,
2548 obj_request->img_offset,
2549 obj_request->length,
2555 rbd_obj_request_get(obj_request);
2556 img_request->obj_request = obj_request;
2558 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2559 obj_request->bio_list);
2563 img_request->callback = rbd_img_parent_read_callback;
2564 result = rbd_img_request_submit(img_request);
2571 rbd_img_request_put(img_request);
2572 obj_request->result = result;
2573 obj_request->xferred = 0;
2574 obj_request_done_set(obj_request);
2577 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2579 struct rbd_obj_request *obj_request;
2580 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2583 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2584 OBJ_REQUEST_NODATA);
2589 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2590 if (!obj_request->osd_req)
2592 obj_request->callback = rbd_obj_request_put;
2594 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2596 rbd_osd_req_format_read(obj_request);
2598 ret = rbd_obj_request_submit(osdc, obj_request);
2601 rbd_obj_request_put(obj_request);
2606 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2608 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2614 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2615 rbd_dev->header_name, (unsigned long long)notify_id,
2616 (unsigned int)opcode);
2617 ret = rbd_dev_refresh(rbd_dev);
2619 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2621 rbd_obj_notify_ack(rbd_dev, notify_id);
2625 * Request sync osd watch/unwatch. The value of "start" determines
2626 * whether a watch request is being initiated or torn down.
2628 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2630 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2631 struct rbd_obj_request *obj_request;
2634 rbd_assert(start ^ !!rbd_dev->watch_event);
2635 rbd_assert(start ^ !!rbd_dev->watch_request);
2638 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2639 &rbd_dev->watch_event);
2642 rbd_assert(rbd_dev->watch_event != NULL);
2646 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2647 OBJ_REQUEST_NODATA);
2651 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2652 if (!obj_request->osd_req)
2656 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2658 ceph_osdc_unregister_linger_request(osdc,
2659 rbd_dev->watch_request->osd_req);
2661 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2662 rbd_dev->watch_event->cookie, 0, start);
2663 rbd_osd_req_format_write(obj_request);
2665 ret = rbd_obj_request_submit(osdc, obj_request);
2668 ret = rbd_obj_request_wait(obj_request);
2671 ret = obj_request->result;
2676 * A watch request is set to linger, so the underlying osd
2677 * request won't go away until we unregister it. We retain
2678 * a pointer to the object request during that time (in
2679 * rbd_dev->watch_request), so we'll keep a reference to
2680 * it. We'll drop that reference (below) after we've
2684 rbd_dev->watch_request = obj_request;
2689 /* We have successfully torn down the watch request */
2691 rbd_obj_request_put(rbd_dev->watch_request);
2692 rbd_dev->watch_request = NULL;
2694 /* Cancel the event if we're tearing down, or on error */
2695 ceph_osdc_cancel_event(rbd_dev->watch_event);
2696 rbd_dev->watch_event = NULL;
2698 rbd_obj_request_put(obj_request);
2704 * Synchronous osd object method call. Returns the number of bytes
2705 * returned in the outbound buffer, or a negative error code.
2707 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2708 const char *object_name,
2709 const char *class_name,
2710 const char *method_name,
2711 const void *outbound,
2712 size_t outbound_size,
2714 size_t inbound_size)
2716 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2717 struct rbd_obj_request *obj_request;
2718 struct page **pages;
2723 * Method calls are ultimately read operations. The result
2724 * should placed into the inbound buffer provided. They
2725 * also supply outbound data--parameters for the object
2726 * method. Currently if this is present it will be a
2729 page_count = (u32)calc_pages_for(0, inbound_size);
2730 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2732 return PTR_ERR(pages);
2735 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2740 obj_request->pages = pages;
2741 obj_request->page_count = page_count;
2743 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2744 if (!obj_request->osd_req)
2747 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2748 class_name, method_name);
2749 if (outbound_size) {
2750 struct ceph_pagelist *pagelist;
2752 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2756 ceph_pagelist_init(pagelist);
2757 ceph_pagelist_append(pagelist, outbound, outbound_size);
2758 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2761 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2762 obj_request->pages, inbound_size,
2764 rbd_osd_req_format_read(obj_request);
2766 ret = rbd_obj_request_submit(osdc, obj_request);
2769 ret = rbd_obj_request_wait(obj_request);
2773 ret = obj_request->result;
2777 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2778 ret = (int)obj_request->xferred;
2779 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2782 rbd_obj_request_put(obj_request);
2784 ceph_release_page_vector(pages, page_count);
2789 static void rbd_request_fn(struct request_queue *q)
2790 __releases(q->queue_lock) __acquires(q->queue_lock)
2792 struct rbd_device *rbd_dev = q->queuedata;
2793 bool read_only = rbd_dev->mapping.read_only;
2797 while ((rq = blk_fetch_request(q))) {
2798 bool write_request = rq_data_dir(rq) == WRITE;
2799 struct rbd_img_request *img_request;
2803 /* Ignore any non-FS requests that filter through. */
2805 if (rq->cmd_type != REQ_TYPE_FS) {
2806 dout("%s: non-fs request type %d\n", __func__,
2807 (int) rq->cmd_type);
2808 __blk_end_request_all(rq, 0);
2812 /* Ignore/skip any zero-length requests */
2814 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2815 length = (u64) blk_rq_bytes(rq);
2818 dout("%s: zero-length request\n", __func__);
2819 __blk_end_request_all(rq, 0);
2823 spin_unlock_irq(q->queue_lock);
2825 /* Disallow writes to a read-only device */
2827 if (write_request) {
2831 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2835 * Quit early if the mapped snapshot no longer
2836 * exists. It's still possible the snapshot will
2837 * have disappeared by the time our request arrives
2838 * at the osd, but there's no sense in sending it if
2841 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2842 dout("request for non-existent snapshot");
2843 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2849 if (offset && length > U64_MAX - offset + 1) {
2850 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2852 goto end_request; /* Shouldn't happen */
2856 if (offset + length > rbd_dev->mapping.size) {
2857 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2858 offset, length, rbd_dev->mapping.size);
2863 img_request = rbd_img_request_create(rbd_dev, offset, length,
2864 write_request, false);
2868 img_request->rq = rq;
2870 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2873 result = rbd_img_request_submit(img_request);
2875 rbd_img_request_put(img_request);
2877 spin_lock_irq(q->queue_lock);
2879 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2880 write_request ? "write" : "read",
2881 length, offset, result);
2883 __blk_end_request_all(rq, result);
2889 * a queue callback. Makes sure that we don't create a bio that spans across
2890 * multiple osd objects. One exception would be with a single page bios,
2891 * which we handle later at bio_chain_clone_range()
2893 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2894 struct bio_vec *bvec)
2896 struct rbd_device *rbd_dev = q->queuedata;
2897 sector_t sector_offset;
2898 sector_t sectors_per_obj;
2899 sector_t obj_sector_offset;
2903 * Find how far into its rbd object the partition-relative
2904 * bio start sector is to offset relative to the enclosing
2907 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2908 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2909 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2912 * Compute the number of bytes from that offset to the end
2913 * of the object. Account for what's already used by the bio.
2915 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2916 if (ret > bmd->bi_size)
2917 ret -= bmd->bi_size;
2922 * Don't send back more than was asked for. And if the bio
2923 * was empty, let the whole thing through because: "Note
2924 * that a block device *must* allow a single page to be
2925 * added to an empty bio."
2927 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2928 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2929 ret = (int) bvec->bv_len;
2934 static void rbd_free_disk(struct rbd_device *rbd_dev)
2936 struct gendisk *disk = rbd_dev->disk;
2941 rbd_dev->disk = NULL;
2942 if (disk->flags & GENHD_FL_UP) {
2945 blk_cleanup_queue(disk->queue);
2950 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2951 const char *object_name,
2952 u64 offset, u64 length, void *buf)
2955 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2956 struct rbd_obj_request *obj_request;
2957 struct page **pages = NULL;
2962 page_count = (u32) calc_pages_for(offset, length);
2963 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2965 ret = PTR_ERR(pages);
2968 obj_request = rbd_obj_request_create(object_name, offset, length,
2973 obj_request->pages = pages;
2974 obj_request->page_count = page_count;
2976 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2977 if (!obj_request->osd_req)
2980 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2981 offset, length, 0, 0);
2982 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2984 obj_request->length,
2985 obj_request->offset & ~PAGE_MASK,
2987 rbd_osd_req_format_read(obj_request);
2989 ret = rbd_obj_request_submit(osdc, obj_request);
2992 ret = rbd_obj_request_wait(obj_request);
2996 ret = obj_request->result;
3000 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3001 size = (size_t) obj_request->xferred;
3002 ceph_copy_from_page_vector(pages, buf, 0, size);
3003 rbd_assert(size <= (size_t)INT_MAX);
3007 rbd_obj_request_put(obj_request);
3009 ceph_release_page_vector(pages, page_count);
3015 * Read the complete header for the given rbd device.
3017 * Returns a pointer to a dynamically-allocated buffer containing
3018 * the complete and validated header. Caller can pass the address
3019 * of a variable that will be filled in with the version of the
3020 * header object at the time it was read.
3022 * Returns a pointer-coded errno if a failure occurs.
3024 static struct rbd_image_header_ondisk *
3025 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3027 struct rbd_image_header_ondisk *ondisk = NULL;
3034 * The complete header will include an array of its 64-bit
3035 * snapshot ids, followed by the names of those snapshots as
3036 * a contiguous block of NUL-terminated strings. Note that
3037 * the number of snapshots could change by the time we read
3038 * it in, in which case we re-read it.
3045 size = sizeof (*ondisk);
3046 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3048 ondisk = kmalloc(size, GFP_KERNEL);
3050 return ERR_PTR(-ENOMEM);
3052 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3056 if ((size_t)ret < size) {
3058 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3062 if (!rbd_dev_ondisk_valid(ondisk)) {
3064 rbd_warn(rbd_dev, "invalid header");
3068 names_size = le64_to_cpu(ondisk->snap_names_len);
3069 want_count = snap_count;
3070 snap_count = le32_to_cpu(ondisk->snap_count);
3071 } while (snap_count != want_count);
3078 return ERR_PTR(ret);
3082 * reload the ondisk the header
3084 static int rbd_read_header(struct rbd_device *rbd_dev,
3085 struct rbd_image_header *header)
3087 struct rbd_image_header_ondisk *ondisk;
3090 ondisk = rbd_dev_v1_header_read(rbd_dev);
3092 return PTR_ERR(ondisk);
3093 ret = rbd_header_from_disk(header, ondisk);
3100 * only read the first part of the ondisk header, without the snaps info
3102 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3105 struct rbd_image_header h;
3107 ret = rbd_read_header(rbd_dev, &h);
3111 down_write(&rbd_dev->header_rwsem);
3113 /* Update image size, and check for resize of mapped image */
3114 rbd_dev->header.image_size = h.image_size;
3115 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3116 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3117 rbd_dev->mapping.size = rbd_dev->header.image_size;
3119 /* rbd_dev->header.object_prefix shouldn't change */
3120 kfree(rbd_dev->header.snap_sizes);
3121 kfree(rbd_dev->header.snap_names);
3122 /* osd requests may still refer to snapc */
3123 ceph_put_snap_context(rbd_dev->header.snapc);
3125 rbd_dev->header.image_size = h.image_size;
3126 rbd_dev->header.snapc = h.snapc;
3127 rbd_dev->header.snap_names = h.snap_names;
3128 rbd_dev->header.snap_sizes = h.snap_sizes;
3129 /* Free the extra copy of the object prefix */
3130 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3131 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3132 kfree(h.object_prefix);
3134 up_write(&rbd_dev->header_rwsem);
3140 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3141 * has disappeared from the (just updated) snapshot context.
3143 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3147 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3150 snap_id = rbd_dev->spec->snap_id;
3151 if (snap_id == CEPH_NOSNAP)
3154 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3155 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3158 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3163 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3164 mapping_size = rbd_dev->mapping.size;
3165 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3166 if (rbd_dev->image_format == 1)
3167 ret = rbd_dev_v1_refresh(rbd_dev);
3169 ret = rbd_dev_v2_refresh(rbd_dev);
3171 /* If it's a mapped snapshot, validate its EXISTS flag */
3173 rbd_exists_validate(rbd_dev);
3174 mutex_unlock(&ctl_mutex);
3175 if (mapping_size != rbd_dev->mapping.size) {
3178 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3179 dout("setting size to %llu sectors", (unsigned long long)size);
3180 set_capacity(rbd_dev->disk, size);
3181 revalidate_disk(rbd_dev->disk);
3187 static int rbd_init_disk(struct rbd_device *rbd_dev)
3189 struct gendisk *disk;
3190 struct request_queue *q;
3193 /* create gendisk info */
3194 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3198 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3200 disk->major = rbd_dev->major;
3201 disk->first_minor = 0;
3202 disk->fops = &rbd_bd_ops;
3203 disk->private_data = rbd_dev;
3205 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3209 /* We use the default size, but let's be explicit about it. */
3210 blk_queue_physical_block_size(q, SECTOR_SIZE);
3212 /* set io sizes to object size */
3213 segment_size = rbd_obj_bytes(&rbd_dev->header);
3214 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3215 blk_queue_max_segment_size(q, segment_size);
3216 blk_queue_io_min(q, segment_size);
3217 blk_queue_io_opt(q, segment_size);
3219 blk_queue_merge_bvec(q, rbd_merge_bvec);
3222 q->queuedata = rbd_dev;
3224 rbd_dev->disk = disk;
3237 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3239 return container_of(dev, struct rbd_device, dev);
3242 static ssize_t rbd_size_show(struct device *dev,
3243 struct device_attribute *attr, char *buf)
3245 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3247 return sprintf(buf, "%llu\n",
3248 (unsigned long long)rbd_dev->mapping.size);
3252 * Note this shows the features for whatever's mapped, which is not
3253 * necessarily the base image.
3255 static ssize_t rbd_features_show(struct device *dev,
3256 struct device_attribute *attr, char *buf)
3258 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3260 return sprintf(buf, "0x%016llx\n",
3261 (unsigned long long)rbd_dev->mapping.features);
3264 static ssize_t rbd_major_show(struct device *dev,
3265 struct device_attribute *attr, char *buf)
3267 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270 return sprintf(buf, "%d\n", rbd_dev->major);
3272 return sprintf(buf, "(none)\n");
3276 static ssize_t rbd_client_id_show(struct device *dev,
3277 struct device_attribute *attr, char *buf)
3279 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281 return sprintf(buf, "client%lld\n",
3282 ceph_client_id(rbd_dev->rbd_client->client));
3285 static ssize_t rbd_pool_show(struct device *dev,
3286 struct device_attribute *attr, char *buf)
3288 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3293 static ssize_t rbd_pool_id_show(struct device *dev,
3294 struct device_attribute *attr, char *buf)
3296 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3298 return sprintf(buf, "%llu\n",
3299 (unsigned long long) rbd_dev->spec->pool_id);
3302 static ssize_t rbd_name_show(struct device *dev,
3303 struct device_attribute *attr, char *buf)
3305 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3307 if (rbd_dev->spec->image_name)
3308 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3310 return sprintf(buf, "(unknown)\n");
3313 static ssize_t rbd_image_id_show(struct device *dev,
3314 struct device_attribute *attr, char *buf)
3316 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3318 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3322 * Shows the name of the currently-mapped snapshot (or
3323 * RBD_SNAP_HEAD_NAME for the base image).
3325 static ssize_t rbd_snap_show(struct device *dev,
3326 struct device_attribute *attr,
3329 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3331 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3335 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3336 * for the parent image. If there is no parent, simply shows
3337 * "(no parent image)".
3339 static ssize_t rbd_parent_show(struct device *dev,
3340 struct device_attribute *attr,
3343 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3344 struct rbd_spec *spec = rbd_dev->parent_spec;
3349 return sprintf(buf, "(no parent image)\n");
3351 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3352 (unsigned long long) spec->pool_id, spec->pool_name);
3357 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3358 spec->image_name ? spec->image_name : "(unknown)");
3363 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3364 (unsigned long long) spec->snap_id, spec->snap_name);
3369 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3374 return (ssize_t) (bufp - buf);
3377 static ssize_t rbd_image_refresh(struct device *dev,
3378 struct device_attribute *attr,
3382 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3385 ret = rbd_dev_refresh(rbd_dev);
3387 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3389 return ret < 0 ? ret : size;
3392 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3393 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3394 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3395 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3396 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3397 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3398 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3399 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3400 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3401 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3402 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3404 static struct attribute *rbd_attrs[] = {
3405 &dev_attr_size.attr,
3406 &dev_attr_features.attr,
3407 &dev_attr_major.attr,
3408 &dev_attr_client_id.attr,
3409 &dev_attr_pool.attr,
3410 &dev_attr_pool_id.attr,
3411 &dev_attr_name.attr,
3412 &dev_attr_image_id.attr,
3413 &dev_attr_current_snap.attr,
3414 &dev_attr_parent.attr,
3415 &dev_attr_refresh.attr,
3419 static struct attribute_group rbd_attr_group = {
3423 static const struct attribute_group *rbd_attr_groups[] = {
3428 static void rbd_sysfs_dev_release(struct device *dev)
3432 static struct device_type rbd_device_type = {
3434 .groups = rbd_attr_groups,
3435 .release = rbd_sysfs_dev_release,
3438 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3440 kref_get(&spec->kref);
3445 static void rbd_spec_free(struct kref *kref);
3446 static void rbd_spec_put(struct rbd_spec *spec)
3449 kref_put(&spec->kref, rbd_spec_free);
3452 static struct rbd_spec *rbd_spec_alloc(void)
3454 struct rbd_spec *spec;
3456 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3459 kref_init(&spec->kref);
3464 static void rbd_spec_free(struct kref *kref)
3466 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3468 kfree(spec->pool_name);
3469 kfree(spec->image_id);
3470 kfree(spec->image_name);
3471 kfree(spec->snap_name);
3475 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3476 struct rbd_spec *spec)
3478 struct rbd_device *rbd_dev;
3480 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3484 spin_lock_init(&rbd_dev->lock);
3486 INIT_LIST_HEAD(&rbd_dev->node);
3487 init_rwsem(&rbd_dev->header_rwsem);
3489 rbd_dev->spec = spec;
3490 rbd_dev->rbd_client = rbdc;
3492 /* Initialize the layout used for all rbd requests */
3494 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3495 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3496 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3497 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3502 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3504 rbd_put_client(rbd_dev->rbd_client);
3505 rbd_spec_put(rbd_dev->spec);
3510 * Get the size and object order for an image snapshot, or if
3511 * snap_id is CEPH_NOSNAP, gets this information for the base
3514 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3515 u8 *order, u64 *snap_size)
3517 __le64 snapid = cpu_to_le64(snap_id);
3522 } __attribute__ ((packed)) size_buf = { 0 };
3524 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3526 &snapid, sizeof (snapid),
3527 &size_buf, sizeof (size_buf));
3528 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3531 if (ret < sizeof (size_buf))
3535 *order = size_buf.order;
3536 *snap_size = le64_to_cpu(size_buf.size);
3538 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3539 (unsigned long long)snap_id, (unsigned int)*order,
3540 (unsigned long long)*snap_size);
3545 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3547 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3548 &rbd_dev->header.obj_order,
3549 &rbd_dev->header.image_size);
3552 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3558 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3562 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3563 "rbd", "get_object_prefix", NULL, 0,
3564 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3565 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3570 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3571 p + ret, NULL, GFP_NOIO);
3574 if (IS_ERR(rbd_dev->header.object_prefix)) {
3575 ret = PTR_ERR(rbd_dev->header.object_prefix);
3576 rbd_dev->header.object_prefix = NULL;
3578 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3586 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3589 __le64 snapid = cpu_to_le64(snap_id);
3593 } __attribute__ ((packed)) features_buf = { 0 };
3597 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3598 "rbd", "get_features",
3599 &snapid, sizeof (snapid),
3600 &features_buf, sizeof (features_buf));
3601 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3604 if (ret < sizeof (features_buf))
3607 incompat = le64_to_cpu(features_buf.incompat);
3608 if (incompat & ~RBD_FEATURES_SUPPORTED)
3611 *snap_features = le64_to_cpu(features_buf.features);
3613 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3614 (unsigned long long)snap_id,
3615 (unsigned long long)*snap_features,
3616 (unsigned long long)le64_to_cpu(features_buf.incompat));
3621 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3623 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3624 &rbd_dev->header.features);
3627 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3629 struct rbd_spec *parent_spec;
3631 void *reply_buf = NULL;
3639 parent_spec = rbd_spec_alloc();
3643 size = sizeof (__le64) + /* pool_id */
3644 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3645 sizeof (__le64) + /* snap_id */
3646 sizeof (__le64); /* overlap */
3647 reply_buf = kmalloc(size, GFP_KERNEL);
3653 snapid = cpu_to_le64(CEPH_NOSNAP);
3654 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3655 "rbd", "get_parent",
3656 &snapid, sizeof (snapid),
3658 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3663 end = reply_buf + ret;
3665 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3666 if (parent_spec->pool_id == CEPH_NOPOOL)
3667 goto out; /* No parent? No problem. */
3669 /* The ceph file layout needs to fit pool id in 32 bits */
3672 if (parent_spec->pool_id > (u64)U32_MAX) {
3673 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3674 (unsigned long long)parent_spec->pool_id, U32_MAX);
3678 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3679 if (IS_ERR(image_id)) {
3680 ret = PTR_ERR(image_id);
3683 parent_spec->image_id = image_id;
3684 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3685 ceph_decode_64_safe(&p, end, overlap, out_err);
3687 rbd_dev->parent_overlap = overlap;
3688 rbd_dev->parent_spec = parent_spec;
3689 parent_spec = NULL; /* rbd_dev now owns this */
3694 rbd_spec_put(parent_spec);
3699 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3703 __le64 stripe_count;
3704 } __attribute__ ((packed)) striping_info_buf = { 0 };
3705 size_t size = sizeof (striping_info_buf);
3712 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3713 "rbd", "get_stripe_unit_count", NULL, 0,
3714 (char *)&striping_info_buf, size);
3715 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3722 * We don't actually support the "fancy striping" feature
3723 * (STRIPINGV2) yet, but if the striping sizes are the
3724 * defaults the behavior is the same as before. So find
3725 * out, and only fail if the image has non-default values.
3728 obj_size = (u64)1 << rbd_dev->header.obj_order;
3729 p = &striping_info_buf;
3730 stripe_unit = ceph_decode_64(&p);
3731 if (stripe_unit != obj_size) {
3732 rbd_warn(rbd_dev, "unsupported stripe unit "
3733 "(got %llu want %llu)",
3734 stripe_unit, obj_size);
3737 stripe_count = ceph_decode_64(&p);
3738 if (stripe_count != 1) {
3739 rbd_warn(rbd_dev, "unsupported stripe count "
3740 "(got %llu want 1)", stripe_count);
3743 rbd_dev->header.stripe_unit = stripe_unit;
3744 rbd_dev->header.stripe_count = stripe_count;
3749 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3751 size_t image_id_size;
3756 void *reply_buf = NULL;
3758 char *image_name = NULL;
3761 rbd_assert(!rbd_dev->spec->image_name);
3763 len = strlen(rbd_dev->spec->image_id);
3764 image_id_size = sizeof (__le32) + len;
3765 image_id = kmalloc(image_id_size, GFP_KERNEL);
3770 end = image_id + image_id_size;
3771 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3773 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3774 reply_buf = kmalloc(size, GFP_KERNEL);
3778 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3779 "rbd", "dir_get_name",
3780 image_id, image_id_size,
3785 end = reply_buf + ret;
3787 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3788 if (IS_ERR(image_name))
3791 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3799 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3801 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3802 const char *snap_name;
3805 /* Skip over names until we find the one we are looking for */
3807 snap_name = rbd_dev->header.snap_names;
3808 while (which < snapc->num_snaps) {
3809 if (!strcmp(name, snap_name))
3810 return snapc->snaps[which];
3811 snap_name += strlen(snap_name) + 1;
3817 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3819 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3824 for (which = 0; !found && which < snapc->num_snaps; which++) {
3825 const char *snap_name;
3827 snap_id = snapc->snaps[which];
3828 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3829 if (IS_ERR(snap_name))
3831 found = !strcmp(name, snap_name);
3834 return found ? snap_id : CEPH_NOSNAP;
3838 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3839 * no snapshot by that name is found, or if an error occurs.
3841 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3843 if (rbd_dev->image_format == 1)
3844 return rbd_v1_snap_id_by_name(rbd_dev, name);
3846 return rbd_v2_snap_id_by_name(rbd_dev, name);
3850 * When an rbd image has a parent image, it is identified by the
3851 * pool, image, and snapshot ids (not names). This function fills
3852 * in the names for those ids. (It's OK if we can't figure out the
3853 * name for an image id, but the pool and snapshot ids should always
3854 * exist and have names.) All names in an rbd spec are dynamically
3857 * When an image being mapped (not a parent) is probed, we have the
3858 * pool name and pool id, image name and image id, and the snapshot
3859 * name. The only thing we're missing is the snapshot id.
3861 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3863 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3864 struct rbd_spec *spec = rbd_dev->spec;
3865 const char *pool_name;
3866 const char *image_name;
3867 const char *snap_name;
3871 * An image being mapped will have the pool name (etc.), but
3872 * we need to look up the snapshot id.
3874 if (spec->pool_name) {
3875 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3878 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3879 if (snap_id == CEPH_NOSNAP)
3881 spec->snap_id = snap_id;
3883 spec->snap_id = CEPH_NOSNAP;
3889 /* Get the pool name; we have to make our own copy of this */
3891 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3893 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3896 pool_name = kstrdup(pool_name, GFP_KERNEL);
3900 /* Fetch the image name; tolerate failure here */
3902 image_name = rbd_dev_image_name(rbd_dev);
3904 rbd_warn(rbd_dev, "unable to get image name");
3906 /* Look up the snapshot name, and make a copy */
3908 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3914 spec->pool_name = pool_name;
3915 spec->image_name = image_name;
3916 spec->snap_name = snap_name;
3926 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3935 struct ceph_snap_context *snapc;
3939 * We'll need room for the seq value (maximum snapshot id),
3940 * snapshot count, and array of that many snapshot ids.
3941 * For now we have a fixed upper limit on the number we're
3942 * prepared to receive.
3944 size = sizeof (__le64) + sizeof (__le32) +
3945 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3946 reply_buf = kzalloc(size, GFP_KERNEL);
3950 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3951 "rbd", "get_snapcontext", NULL, 0,
3953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3958 end = reply_buf + ret;
3960 ceph_decode_64_safe(&p, end, seq, out);
3961 ceph_decode_32_safe(&p, end, snap_count, out);
3964 * Make sure the reported number of snapshot ids wouldn't go
3965 * beyond the end of our buffer. But before checking that,
3966 * make sure the computed size of the snapshot context we
3967 * allocate is representable in a size_t.
3969 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3974 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3978 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3984 for (i = 0; i < snap_count; i++)
3985 snapc->snaps[i] = ceph_decode_64(&p);
3987 ceph_put_snap_context(rbd_dev->header.snapc);
3988 rbd_dev->header.snapc = snapc;
3990 dout(" snap context seq = %llu, snap_count = %u\n",
3991 (unsigned long long)seq, (unsigned int)snap_count);
3998 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4009 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4010 reply_buf = kmalloc(size, GFP_KERNEL);
4012 return ERR_PTR(-ENOMEM);
4014 snapid = cpu_to_le64(snap_id);
4015 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4016 "rbd", "get_snapshot_name",
4017 &snapid, sizeof (snapid),
4019 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4021 snap_name = ERR_PTR(ret);
4026 end = reply_buf + ret;
4027 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4028 if (IS_ERR(snap_name))
4031 dout(" snap_id 0x%016llx snap_name = %s\n",
4032 (unsigned long long)snap_id, snap_name);
4039 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4043 down_write(&rbd_dev->header_rwsem);
4045 ret = rbd_dev_v2_image_size(rbd_dev);
4048 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4049 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4050 rbd_dev->mapping.size = rbd_dev->header.image_size;
4052 ret = rbd_dev_v2_snap_context(rbd_dev);
4053 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4057 up_write(&rbd_dev->header_rwsem);
4062 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4067 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4069 dev = &rbd_dev->dev;
4070 dev->bus = &rbd_bus_type;
4071 dev->type = &rbd_device_type;
4072 dev->parent = &rbd_root_dev;
4073 dev->release = rbd_dev_device_release;
4074 dev_set_name(dev, "%d", rbd_dev->dev_id);
4075 ret = device_register(dev);
4077 mutex_unlock(&ctl_mutex);
4082 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4084 device_unregister(&rbd_dev->dev);
4087 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4090 * Get a unique rbd identifier for the given new rbd_dev, and add
4091 * the rbd_dev to the global list. The minimum rbd id is 1.
4093 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4095 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4097 spin_lock(&rbd_dev_list_lock);
4098 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4099 spin_unlock(&rbd_dev_list_lock);
4100 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4101 (unsigned long long) rbd_dev->dev_id);
4105 * Remove an rbd_dev from the global list, and record that its
4106 * identifier is no longer in use.
4108 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4110 struct list_head *tmp;
4111 int rbd_id = rbd_dev->dev_id;
4114 rbd_assert(rbd_id > 0);
4116 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4117 (unsigned long long) rbd_dev->dev_id);
4118 spin_lock(&rbd_dev_list_lock);
4119 list_del_init(&rbd_dev->node);
4122 * If the id being "put" is not the current maximum, there
4123 * is nothing special we need to do.
4125 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4126 spin_unlock(&rbd_dev_list_lock);
4131 * We need to update the current maximum id. Search the
4132 * list to find out what it is. We're more likely to find
4133 * the maximum at the end, so search the list backward.
4136 list_for_each_prev(tmp, &rbd_dev_list) {
4137 struct rbd_device *rbd_dev;
4139 rbd_dev = list_entry(tmp, struct rbd_device, node);
4140 if (rbd_dev->dev_id > max_id)
4141 max_id = rbd_dev->dev_id;
4143 spin_unlock(&rbd_dev_list_lock);
4146 * The max id could have been updated by rbd_dev_id_get(), in
4147 * which case it now accurately reflects the new maximum.
4148 * Be careful not to overwrite the maximum value in that
4151 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4152 dout(" max dev id has been reset\n");
4156 * Skips over white space at *buf, and updates *buf to point to the
4157 * first found non-space character (if any). Returns the length of
4158 * the token (string of non-white space characters) found. Note
4159 * that *buf must be terminated with '\0'.
4161 static inline size_t next_token(const char **buf)
4164 * These are the characters that produce nonzero for
4165 * isspace() in the "C" and "POSIX" locales.
4167 const char *spaces = " \f\n\r\t\v";
4169 *buf += strspn(*buf, spaces); /* Find start of token */
4171 return strcspn(*buf, spaces); /* Return token length */
4175 * Finds the next token in *buf, and if the provided token buffer is
4176 * big enough, copies the found token into it. The result, if
4177 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4178 * must be terminated with '\0' on entry.
4180 * Returns the length of the token found (not including the '\0').
4181 * Return value will be 0 if no token is found, and it will be >=
4182 * token_size if the token would not fit.
4184 * The *buf pointer will be updated to point beyond the end of the
4185 * found token. Note that this occurs even if the token buffer is
4186 * too small to hold it.
4188 static inline size_t copy_token(const char **buf,
4194 len = next_token(buf);
4195 if (len < token_size) {
4196 memcpy(token, *buf, len);
4197 *(token + len) = '\0';
4205 * Finds the next token in *buf, dynamically allocates a buffer big
4206 * enough to hold a copy of it, and copies the token into the new
4207 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4208 * that a duplicate buffer is created even for a zero-length token.
4210 * Returns a pointer to the newly-allocated duplicate, or a null
4211 * pointer if memory for the duplicate was not available. If
4212 * the lenp argument is a non-null pointer, the length of the token
4213 * (not including the '\0') is returned in *lenp.
4215 * If successful, the *buf pointer will be updated to point beyond
4216 * the end of the found token.
4218 * Note: uses GFP_KERNEL for allocation.
4220 static inline char *dup_token(const char **buf, size_t *lenp)
4225 len = next_token(buf);
4226 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4229 *(dup + len) = '\0';
4239 * Parse the options provided for an "rbd add" (i.e., rbd image
4240 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4241 * and the data written is passed here via a NUL-terminated buffer.
4242 * Returns 0 if successful or an error code otherwise.
4244 * The information extracted from these options is recorded in
4245 * the other parameters which return dynamically-allocated
4248 * The address of a pointer that will refer to a ceph options
4249 * structure. Caller must release the returned pointer using
4250 * ceph_destroy_options() when it is no longer needed.
4252 * Address of an rbd options pointer. Fully initialized by
4253 * this function; caller must release with kfree().
4255 * Address of an rbd image specification pointer. Fully
4256 * initialized by this function based on parsed options.
4257 * Caller must release with rbd_spec_put().
4259 * The options passed take this form:
4260 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4263 * A comma-separated list of one or more monitor addresses.
4264 * A monitor address is an ip address, optionally followed
4265 * by a port number (separated by a colon).
4266 * I.e.: ip1[:port1][,ip2[:port2]...]
4268 * A comma-separated list of ceph and/or rbd options.
4270 * The name of the rados pool containing the rbd image.
4272 * The name of the image in that pool to map.
4274 * An optional snapshot id. If provided, the mapping will
4275 * present data from the image at the time that snapshot was
4276 * created. The image head is used if no snapshot id is
4277 * provided. Snapshot mappings are always read-only.
4279 static int rbd_add_parse_args(const char *buf,
4280 struct ceph_options **ceph_opts,
4281 struct rbd_options **opts,
4282 struct rbd_spec **rbd_spec)
4286 const char *mon_addrs;
4288 size_t mon_addrs_size;
4289 struct rbd_spec *spec = NULL;
4290 struct rbd_options *rbd_opts = NULL;
4291 struct ceph_options *copts;
4294 /* The first four tokens are required */
4296 len = next_token(&buf);
4298 rbd_warn(NULL, "no monitor address(es) provided");
4302 mon_addrs_size = len + 1;
4306 options = dup_token(&buf, NULL);
4310 rbd_warn(NULL, "no options provided");
4314 spec = rbd_spec_alloc();
4318 spec->pool_name = dup_token(&buf, NULL);
4319 if (!spec->pool_name)
4321 if (!*spec->pool_name) {
4322 rbd_warn(NULL, "no pool name provided");
4326 spec->image_name = dup_token(&buf, NULL);
4327 if (!spec->image_name)
4329 if (!*spec->image_name) {
4330 rbd_warn(NULL, "no image name provided");
4335 * Snapshot name is optional; default is to use "-"
4336 * (indicating the head/no snapshot).
4338 len = next_token(&buf);
4340 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4341 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4342 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4343 ret = -ENAMETOOLONG;
4346 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4349 *(snap_name + len) = '\0';
4350 spec->snap_name = snap_name;
4352 /* Initialize all rbd options to the defaults */
4354 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4358 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4360 copts = ceph_parse_options(options, mon_addrs,
4361 mon_addrs + mon_addrs_size - 1,
4362 parse_rbd_opts_token, rbd_opts);
4363 if (IS_ERR(copts)) {
4364 ret = PTR_ERR(copts);
4385 * An rbd format 2 image has a unique identifier, distinct from the
4386 * name given to it by the user. Internally, that identifier is
4387 * what's used to specify the names of objects related to the image.
4389 * A special "rbd id" object is used to map an rbd image name to its
4390 * id. If that object doesn't exist, then there is no v2 rbd image
4391 * with the supplied name.
4393 * This function will record the given rbd_dev's image_id field if
4394 * it can be determined, and in that case will return 0. If any
4395 * errors occur a negative errno will be returned and the rbd_dev's
4396 * image_id field will be unchanged (and should be NULL).
4398 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4407 * When probing a parent image, the image id is already
4408 * known (and the image name likely is not). There's no
4409 * need to fetch the image id again in this case. We
4410 * do still need to set the image format though.
4412 if (rbd_dev->spec->image_id) {
4413 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4419 * First, see if the format 2 image id file exists, and if
4420 * so, get the image's persistent id from it.
4422 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4423 object_name = kmalloc(size, GFP_NOIO);
4426 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4427 dout("rbd id object name is %s\n", object_name);
4429 /* Response will be an encoded string, which includes a length */
4431 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4432 response = kzalloc(size, GFP_NOIO);
4438 /* If it doesn't exist we'll assume it's a format 1 image */
4440 ret = rbd_obj_method_sync(rbd_dev, object_name,
4441 "rbd", "get_id", NULL, 0,
4442 response, RBD_IMAGE_ID_LEN_MAX);
4443 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4444 if (ret == -ENOENT) {
4445 image_id = kstrdup("", GFP_KERNEL);
4446 ret = image_id ? 0 : -ENOMEM;
4448 rbd_dev->image_format = 1;
4449 } else if (ret > sizeof (__le32)) {
4452 image_id = ceph_extract_encoded_string(&p, p + ret,
4454 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4456 rbd_dev->image_format = 2;
4462 rbd_dev->spec->image_id = image_id;
4463 dout("image_id is %s\n", image_id);
4472 /* Undo whatever state changes are made by v1 or v2 image probe */
4474 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4476 struct rbd_image_header *header;
4478 rbd_dev_remove_parent(rbd_dev);
4479 rbd_spec_put(rbd_dev->parent_spec);
4480 rbd_dev->parent_spec = NULL;
4481 rbd_dev->parent_overlap = 0;
4483 /* Free dynamic fields from the header, then zero it out */
4485 header = &rbd_dev->header;
4486 ceph_put_snap_context(header->snapc);
4487 kfree(header->snap_sizes);
4488 kfree(header->snap_names);
4489 kfree(header->object_prefix);
4490 memset(header, 0, sizeof (*header));
4493 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4497 /* Populate rbd image metadata */
4499 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4503 /* Version 1 images have no parent (no layering) */
4505 rbd_dev->parent_spec = NULL;
4506 rbd_dev->parent_overlap = 0;
4508 dout("discovered version 1 image, header name is %s\n",
4509 rbd_dev->header_name);
4514 kfree(rbd_dev->header_name);
4515 rbd_dev->header_name = NULL;
4516 kfree(rbd_dev->spec->image_id);
4517 rbd_dev->spec->image_id = NULL;
4522 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4526 ret = rbd_dev_v2_image_size(rbd_dev);
4530 /* Get the object prefix (a.k.a. block_name) for the image */
4532 ret = rbd_dev_v2_object_prefix(rbd_dev);
4536 /* Get the and check features for the image */
4538 ret = rbd_dev_v2_features(rbd_dev);
4542 /* If the image supports layering, get the parent info */
4544 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4545 ret = rbd_dev_v2_parent_info(rbd_dev);
4549 * Print a warning if this image has a parent.
4550 * Don't print it if the image now being probed
4551 * is itself a parent. We can tell at this point
4552 * because we won't know its pool name yet (just its
4555 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4556 rbd_warn(rbd_dev, "WARNING: kernel layering "
4557 "is EXPERIMENTAL!");
4560 /* If the image supports fancy striping, get its parameters */
4562 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4563 ret = rbd_dev_v2_striping_info(rbd_dev);
4568 /* crypto and compression type aren't (yet) supported for v2 images */
4570 rbd_dev->header.crypt_type = 0;
4571 rbd_dev->header.comp_type = 0;
4573 /* Get the snapshot context, plus the header version */
4575 ret = rbd_dev_v2_snap_context(rbd_dev);
4579 dout("discovered version 2 image, header name is %s\n",
4580 rbd_dev->header_name);
4584 rbd_dev->parent_overlap = 0;
4585 rbd_spec_put(rbd_dev->parent_spec);
4586 rbd_dev->parent_spec = NULL;
4587 kfree(rbd_dev->header_name);
4588 rbd_dev->header_name = NULL;
4589 kfree(rbd_dev->header.object_prefix);
4590 rbd_dev->header.object_prefix = NULL;
4595 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4597 struct rbd_device *parent = NULL;
4598 struct rbd_spec *parent_spec;
4599 struct rbd_client *rbdc;
4602 if (!rbd_dev->parent_spec)
4605 * We need to pass a reference to the client and the parent
4606 * spec when creating the parent rbd_dev. Images related by
4607 * parent/child relationships always share both.
4609 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4610 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4613 parent = rbd_dev_create(rbdc, parent_spec);
4617 ret = rbd_dev_image_probe(parent, true);
4620 rbd_dev->parent = parent;
4625 rbd_spec_put(rbd_dev->parent_spec);
4626 kfree(rbd_dev->header_name);
4627 rbd_dev_destroy(parent);
4629 rbd_put_client(rbdc);
4630 rbd_spec_put(parent_spec);
4636 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4640 ret = rbd_dev_mapping_set(rbd_dev);
4644 /* generate unique id: find highest unique id, add one */
4645 rbd_dev_id_get(rbd_dev);
4647 /* Fill in the device name, now that we have its id. */
4648 BUILD_BUG_ON(DEV_NAME_LEN
4649 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4650 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4652 /* Get our block major device number. */
4654 ret = register_blkdev(0, rbd_dev->name);
4657 rbd_dev->major = ret;
4659 /* Set up the blkdev mapping. */
4661 ret = rbd_init_disk(rbd_dev);
4663 goto err_out_blkdev;
4665 ret = rbd_bus_add_dev(rbd_dev);
4669 /* Everything's ready. Announce the disk to the world. */
4671 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4672 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4673 add_disk(rbd_dev->disk);
4675 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4676 (unsigned long long) rbd_dev->mapping.size);
4681 rbd_free_disk(rbd_dev);
4683 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4685 rbd_dev_id_put(rbd_dev);
4686 rbd_dev_mapping_clear(rbd_dev);
4691 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4693 struct rbd_spec *spec = rbd_dev->spec;
4696 /* Record the header object name for this rbd image. */
4698 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4700 if (rbd_dev->image_format == 1)
4701 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4703 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4705 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4706 if (!rbd_dev->header_name)
4709 if (rbd_dev->image_format == 1)
4710 sprintf(rbd_dev->header_name, "%s%s",
4711 spec->image_name, RBD_SUFFIX);
4713 sprintf(rbd_dev->header_name, "%s%s",
4714 RBD_HEADER_PREFIX, spec->image_id);
4718 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4722 rbd_dev_unprobe(rbd_dev);
4723 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4725 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4726 kfree(rbd_dev->header_name);
4727 rbd_dev->header_name = NULL;
4728 rbd_dev->image_format = 0;
4729 kfree(rbd_dev->spec->image_id);
4730 rbd_dev->spec->image_id = NULL;
4732 rbd_dev_destroy(rbd_dev);
4736 * Probe for the existence of the header object for the given rbd
4737 * device. For format 2 images this includes determining the image
4740 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4746 * Get the id from the image id object. If it's not a
4747 * format 2 image, we'll get ENOENT back, and we'll assume
4748 * it's a format 1 image.
4750 ret = rbd_dev_image_id(rbd_dev);
4753 rbd_assert(rbd_dev->spec->image_id);
4754 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4756 ret = rbd_dev_header_name(rbd_dev);
4758 goto err_out_format;
4760 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4762 goto out_header_name;
4764 if (rbd_dev->image_format == 1)
4765 ret = rbd_dev_v1_probe(rbd_dev);
4767 ret = rbd_dev_v2_probe(rbd_dev);
4771 ret = rbd_dev_spec_update(rbd_dev);
4775 /* If we are mapping a snapshot it must be marked read-only */
4777 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4779 rbd_dev->mapping.read_only = read_only;
4781 ret = rbd_dev_probe_parent(rbd_dev);
4786 rbd_dev_unprobe(rbd_dev);
4788 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4790 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4792 kfree(rbd_dev->header_name);
4793 rbd_dev->header_name = NULL;
4795 rbd_dev->image_format = 0;
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4799 dout("probe failed, returning %d\n", ret);
4804 static ssize_t rbd_add(struct bus_type *bus,
4808 struct rbd_device *rbd_dev = NULL;
4809 struct ceph_options *ceph_opts = NULL;
4810 struct rbd_options *rbd_opts = NULL;
4811 struct rbd_spec *spec = NULL;
4812 struct rbd_client *rbdc;
4813 struct ceph_osd_client *osdc;
4817 if (!try_module_get(THIS_MODULE))
4820 /* parse add command */
4821 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4823 goto err_out_module;
4824 read_only = rbd_opts->read_only;
4826 rbd_opts = NULL; /* done with this */
4828 rbdc = rbd_get_client(ceph_opts);
4833 ceph_opts = NULL; /* rbd_dev client now owns this */
4836 osdc = &rbdc->client->osdc;
4837 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4839 goto err_out_client;
4840 spec->pool_id = (u64)rc;
4842 /* The ceph file layout needs to fit pool id in 32 bits */
4844 if (spec->pool_id > (u64)U32_MAX) {
4845 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4846 (unsigned long long)spec->pool_id, U32_MAX);
4848 goto err_out_client;
4851 rbd_dev = rbd_dev_create(rbdc, spec);
4853 goto err_out_client;
4854 rbdc = NULL; /* rbd_dev now owns this */
4855 spec = NULL; /* rbd_dev now owns this */
4857 rc = rbd_dev_image_probe(rbd_dev, read_only);
4859 goto err_out_rbd_dev;
4861 rc = rbd_dev_device_setup(rbd_dev);
4865 rbd_dev_image_release(rbd_dev);
4867 rbd_dev_destroy(rbd_dev);
4869 rbd_put_client(rbdc);
4872 ceph_destroy_options(ceph_opts);
4876 module_put(THIS_MODULE);
4878 dout("Error adding device %s\n", buf);
4883 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4885 struct list_head *tmp;
4886 struct rbd_device *rbd_dev;
4888 spin_lock(&rbd_dev_list_lock);
4889 list_for_each(tmp, &rbd_dev_list) {
4890 rbd_dev = list_entry(tmp, struct rbd_device, node);
4891 if (rbd_dev->dev_id == dev_id) {
4892 spin_unlock(&rbd_dev_list_lock);
4896 spin_unlock(&rbd_dev_list_lock);
4900 static void rbd_dev_device_release(struct device *dev)
4902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4904 rbd_free_disk(rbd_dev);
4905 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4906 rbd_dev_mapping_clear(rbd_dev);
4907 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4909 rbd_dev_id_put(rbd_dev);
4910 rbd_dev_mapping_clear(rbd_dev);
4913 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4915 while (rbd_dev->parent) {
4916 struct rbd_device *first = rbd_dev;
4917 struct rbd_device *second = first->parent;
4918 struct rbd_device *third;
4921 * Follow to the parent with no grandparent and
4924 while (second && (third = second->parent)) {
4929 rbd_dev_image_release(second);
4930 first->parent = NULL;
4931 first->parent_overlap = 0;
4933 rbd_assert(first->parent_spec);
4934 rbd_spec_put(first->parent_spec);
4935 first->parent_spec = NULL;
4939 static ssize_t rbd_remove(struct bus_type *bus,
4943 struct rbd_device *rbd_dev = NULL;
4948 ret = strict_strtoul(buf, 10, &ul);
4952 /* convert to int; abort if we lost anything in the conversion */
4953 target_id = (int) ul;
4954 if (target_id != ul)
4957 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4959 rbd_dev = __rbd_get_dev(target_id);
4965 spin_lock_irq(&rbd_dev->lock);
4966 if (rbd_dev->open_count)
4969 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4970 spin_unlock_irq(&rbd_dev->lock);
4974 rbd_bus_del_dev(rbd_dev);
4975 rbd_dev_image_release(rbd_dev);
4976 module_put(THIS_MODULE);
4978 mutex_unlock(&ctl_mutex);
4984 * create control files in sysfs
4987 static int rbd_sysfs_init(void)
4991 ret = device_register(&rbd_root_dev);
4995 ret = bus_register(&rbd_bus_type);
4997 device_unregister(&rbd_root_dev);
5002 static void rbd_sysfs_cleanup(void)
5004 bus_unregister(&rbd_bus_type);
5005 device_unregister(&rbd_root_dev);
5008 static int rbd_slab_init(void)
5010 rbd_assert(!rbd_img_request_cache);
5011 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012 sizeof (struct rbd_img_request),
5013 __alignof__(struct rbd_img_request),
5015 if (!rbd_img_request_cache)
5018 rbd_assert(!rbd_obj_request_cache);
5019 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020 sizeof (struct rbd_obj_request),
5021 __alignof__(struct rbd_obj_request),
5023 if (!rbd_obj_request_cache)
5026 rbd_assert(!rbd_segment_name_cache);
5027 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029 if (rbd_segment_name_cache)
5032 if (rbd_obj_request_cache) {
5033 kmem_cache_destroy(rbd_obj_request_cache);
5034 rbd_obj_request_cache = NULL;
5037 kmem_cache_destroy(rbd_img_request_cache);
5038 rbd_img_request_cache = NULL;
5043 static void rbd_slab_exit(void)
5045 rbd_assert(rbd_segment_name_cache);
5046 kmem_cache_destroy(rbd_segment_name_cache);
5047 rbd_segment_name_cache = NULL;
5049 rbd_assert(rbd_obj_request_cache);
5050 kmem_cache_destroy(rbd_obj_request_cache);
5051 rbd_obj_request_cache = NULL;
5053 rbd_assert(rbd_img_request_cache);
5054 kmem_cache_destroy(rbd_img_request_cache);
5055 rbd_img_request_cache = NULL;
5058 static int __init rbd_init(void)
5062 if (!libceph_compatible(NULL)) {
5063 rbd_warn(NULL, "libceph incompatibility (quitting)");
5067 rc = rbd_slab_init();
5070 rc = rbd_sysfs_init();
5074 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5079 static void __exit rbd_exit(void)
5081 rbd_sysfs_cleanup();
5085 module_init(rbd_init);
5086 module_exit(rbd_exit);
5088 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090 MODULE_DESCRIPTION("rados block device");
5092 /* following authorship retained from original osdblk.c */
5093 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5095 MODULE_LICENSE("GPL");