3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These six fields never change for a given rbd image */
110 u64 features; /* Might be changeable someday? */
112 /* The remaining fields need to be updated occasionally */
114 struct ceph_snap_context *snapc;
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
120 * An rbd image specification.
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
146 const char *pool_name;
148 const char *image_id;
149 const char *image_name;
152 const char *snap_name;
158 * an instance of the client. multiple devices may share an rbd client.
161 struct ceph_client *client;
163 struct list_head node;
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174 enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
185 struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
208 struct rbd_obj_request *obj_request; /* STAT op */
210 struct rbd_img_request *img_request;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
216 u32 which; /* posn image request list */
218 enum obj_request_type type;
220 struct bio *bio_list;
226 struct page **copyup_pages;
228 struct ceph_osd_request *osd_req;
230 u64 xferred; /* bytes transferred */
233 rbd_obj_callback_t callback;
234 struct completion completion;
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
245 struct rbd_img_request {
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
251 u64 snap_id; /* for reads */
252 struct ceph_snap_context *snapc; /* for writes */
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
258 struct page **copyup_pages;
259 spinlock_t completion_lock;/* protects next_completion */
261 rbd_img_callback_t callback;
262 u64 xferred;/* aggregate bytes transferred */
263 int result; /* first nonzero obj_request result */
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
271 #define for_each_obj_request(ireq, oreq) \
272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
288 int dev_id; /* blkdev unique id */
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
293 u32 image_format; /* Either 1 or 2 */
294 struct rbd_client *rbd_client;
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298 spinlock_t lock; /* queue, flags, open_count */
300 struct rbd_image_header header;
301 unsigned long flags; /* possibly lock protected */
302 struct rbd_spec *spec;
306 struct ceph_file_layout layout;
308 struct ceph_osd_event *watch_event;
309 struct rbd_obj_request *watch_request;
311 struct rbd_spec *parent_spec;
313 struct rbd_device *parent;
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
318 struct rbd_mapping mapping;
320 struct list_head node;
324 unsigned long open_count; /* protected by lock */
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
339 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
341 static LIST_HEAD(rbd_dev_list); /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344 static LIST_HEAD(rbd_client_list); /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
347 /* Slab caches for frequently-allocated structures */
349 static struct kmem_cache *rbd_img_request_cache;
350 static struct kmem_cache *rbd_obj_request_cache;
351 static struct kmem_cache *rbd_segment_name_cache;
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355 static void rbd_dev_device_release(struct device *dev);
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
363 static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
369 static struct bus_type rbd_bus_type = {
371 .bus_attrs = rbd_bus_attrs,
374 static void rbd_root_dev_release(struct device *dev)
378 static struct device rbd_root_dev = {
380 .release = rbd_root_dev_release,
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 struct va_format vaf;
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
411 #define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
419 #else /* !RBD_DEBUG */
420 # define rbd_assert(expr) ((void) 0)
421 #endif /* !RBD_DEBUG */
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 bool removing = false;
443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446 spin_lock_irq(&rbd_dev->lock);
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450 rbd_dev->open_count++;
451 spin_unlock_irq(&rbd_dev->lock);
455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 (void) get_device(&rbd_dev->dev);
457 set_device_ro(bdev, rbd_dev->mapping.read_only);
458 mutex_unlock(&ctl_mutex);
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 struct rbd_device *rbd_dev = disk->private_data;
466 unsigned long open_count_before;
468 spin_lock_irq(&rbd_dev->lock);
469 open_count_before = rbd_dev->open_count--;
470 spin_unlock_irq(&rbd_dev->lock);
471 rbd_assert(open_count_before > 0);
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 put_device(&rbd_dev->dev);
475 mutex_unlock(&ctl_mutex);
480 static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
483 .release = rbd_release,
487 * Initialize an rbd client instance.
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 struct rbd_client *rbdc;
495 dout("%s:\n", __func__);
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506 if (IS_ERR(rbdc->client))
508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510 ret = ceph_open_session(rbdc->client);
514 spin_lock(&rbd_client_list_lock);
515 list_add_tail(&rbdc->node, &rbd_client_list);
516 spin_unlock(&rbd_client_list_lock);
518 mutex_unlock(&ctl_mutex);
519 dout("%s: rbdc %p\n", __func__, rbdc);
524 ceph_destroy_client(rbdc->client);
526 mutex_unlock(&ctl_mutex);
530 ceph_destroy_options(ceph_opts);
531 dout("%s: error %d\n", __func__, ret);
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 kref_get(&rbdc->kref);
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 struct rbd_client *client_node;
552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558 __rbd_get_client(client_node);
564 spin_unlock(&rbd_client_list_lock);
566 return found ? client_node : NULL;
576 /* string args above */
579 /* Boolean args above */
583 static match_table_t rbd_opts_tokens = {
585 /* string args above */
586 {Opt_read_only, "read_only"},
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
598 #define RBD_READ_ONLY_DEFAULT false
600 static int parse_rbd_opts_token(char *c, void *private)
602 struct rbd_options *rbd_opts = private;
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
606 token = match_token(c, rbd_opts_tokens, argstr);
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
613 pr_err("bad mount option arg (not int) "
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
624 dout("got token %d\n", token);
629 rbd_opts->read_only = true;
632 rbd_opts->read_only = false;
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 struct rbd_client *rbdc;
649 rbdc = rbd_client_find(ceph_opts);
650 if (rbdc) /* using an existing client */
651 ceph_destroy_options(ceph_opts);
653 rbdc = rbd_client_create(ceph_opts);
659 * Destroy ceph client
661 * Caller must hold rbd_client_list_lock.
663 static void rbd_client_release(struct kref *kref)
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667 dout("%s: rbdc %p\n", __func__, rbdc);
668 spin_lock(&rbd_client_list_lock);
669 list_del(&rbdc->node);
670 spin_unlock(&rbd_client_list_lock);
672 ceph_destroy_client(rbdc->client);
677 * Drop reference to ceph client node. If it's not referenced anymore, release
680 static void rbd_put_client(struct rbd_client *rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
686 static bool rbd_image_format_valid(u32 image_format)
688 return image_format == 1 || image_format == 2;
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700 /* The bio layer requires at least sector-sized I/O */
702 if (ondisk->options.order < SECTOR_SHIFT)
705 /* If we use u64 in a few spots we may be able to loosen this */
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731 * Fill an rbd image header with information from the given format 1
734 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
735 struct rbd_image_header_ondisk *ondisk)
737 struct rbd_image_header *header = &rbd_dev->header;
738 bool first_time = header->object_prefix == NULL;
739 struct ceph_snap_context *snapc;
740 char *object_prefix = NULL;
741 char *snap_names = NULL;
742 u64 *snap_sizes = NULL;
748 /* Allocate this now to avoid having to handle failure below */
753 len = strnlen(ondisk->object_prefix,
754 sizeof (ondisk->object_prefix));
755 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758 memcpy(object_prefix, ondisk->object_prefix, len);
759 object_prefix[len] = '\0';
762 /* Allocate the snapshot context and fill it in */
764 snap_count = le32_to_cpu(ondisk->snap_count);
765 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768 snapc->seq = le64_to_cpu(ondisk->snap_seq);
770 struct rbd_image_snap_ondisk *snaps;
771 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
773 /* We'll keep a copy of the snapshot names... */
775 if (snap_names_len > (u64)SIZE_MAX)
777 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
781 /* ...as well as the array of their sizes. */
783 size = snap_count * sizeof (*header->snap_sizes);
784 snap_sizes = kmalloc(size, GFP_KERNEL);
789 * Copy the names, and fill in each snapshot's id
792 * Note that rbd_dev_v1_header_info() guarantees the
793 * ondisk buffer we're working with has
794 * snap_names_len bytes beyond the end of the
795 * snapshot id array, this memcpy() is safe.
797 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798 snaps = ondisk->snaps;
799 for (i = 0; i < snap_count; i++) {
800 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
805 /* We won't fail any more, fill in the header */
807 down_write(&rbd_dev->header_rwsem);
809 header->object_prefix = object_prefix;
810 header->obj_order = ondisk->options.order;
811 header->crypt_type = ondisk->options.crypt_type;
812 header->comp_type = ondisk->options.comp_type;
813 /* The rest aren't used for format 1 images */
814 header->stripe_unit = 0;
815 header->stripe_count = 0;
816 header->features = 0;
818 ceph_put_snap_context(header->snapc);
819 kfree(header->snap_names);
820 kfree(header->snap_sizes);
823 /* The remaining fields always get updated (when we refresh) */
825 header->image_size = le64_to_cpu(ondisk->image_size);
826 header->snapc = snapc;
827 header->snap_names = snap_names;
828 header->snap_sizes = snap_sizes;
830 /* Make sure mapping size is consistent with header info */
832 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833 if (rbd_dev->mapping.size != header->image_size)
834 rbd_dev->mapping.size = header->image_size;
836 up_write(&rbd_dev->header_rwsem);
844 ceph_put_snap_context(snapc);
845 kfree(object_prefix);
850 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
852 const char *snap_name;
854 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
856 /* Skip over names until we find the one we are looking for */
858 snap_name = rbd_dev->header.snap_names;
860 snap_name += strlen(snap_name) + 1;
862 return kstrdup(snap_name, GFP_KERNEL);
866 * Snapshot id comparison function for use with qsort()/bsearch().
867 * Note that result is for snapshots in *descending* order.
869 static int snapid_compare_reverse(const void *s1, const void *s2)
871 u64 snap_id1 = *(u64 *)s1;
872 u64 snap_id2 = *(u64 *)s2;
874 if (snap_id1 < snap_id2)
876 return snap_id1 == snap_id2 ? 0 : -1;
880 * Search a snapshot context to see if the given snapshot id is
883 * Returns the position of the snapshot id in the array if it's found,
884 * or BAD_SNAP_INDEX otherwise.
886 * Note: The snapshot array is in kept sorted (by the osd) in
887 * reverse order, highest snapshot id first.
889 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
891 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895 sizeof (snap_id), snapid_compare_reverse);
897 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
905 which = rbd_dev_snap_index(rbd_dev, snap_id);
906 if (which == BAD_SNAP_INDEX)
909 return _rbd_dev_v1_snap_name(rbd_dev, which);
912 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
914 if (snap_id == CEPH_NOSNAP)
915 return RBD_SNAP_HEAD_NAME;
917 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918 if (rbd_dev->image_format == 1)
919 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
921 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928 if (snap_id == CEPH_NOSNAP) {
929 *snap_size = rbd_dev->header.image_size;
930 } else if (rbd_dev->image_format == 1) {
933 which = rbd_dev_snap_index(rbd_dev, snap_id);
934 if (which == BAD_SNAP_INDEX)
937 *snap_size = rbd_dev->header.snap_sizes[which];
942 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
951 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955 if (snap_id == CEPH_NOSNAP) {
956 *snap_features = rbd_dev->header.features;
957 } else if (rbd_dev->image_format == 1) {
958 *snap_features = 0; /* No features for format 1 */
963 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
967 *snap_features = features;
972 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
974 u64 snap_id = rbd_dev->spec->snap_id;
979 ret = rbd_snap_size(rbd_dev, snap_id, &size);
982 ret = rbd_snap_features(rbd_dev, snap_id, &features);
986 rbd_dev->mapping.size = size;
987 rbd_dev->mapping.features = features;
992 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
994 rbd_dev->mapping.size = 0;
995 rbd_dev->mapping.features = 0;
998 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1004 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007 segment = offset >> rbd_dev->header.obj_order;
1008 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1009 rbd_dev->header.object_prefix, segment);
1010 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1011 pr_err("error formatting segment name for #%llu (%d)\n",
1020 static void rbd_segment_name_free(const char *name)
1022 /* The explicit cast here is needed to drop the const qualifier */
1024 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1029 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1031 return offset & (segment_size - 1);
1034 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035 u64 offset, u64 length)
1037 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1039 offset &= segment_size - 1;
1041 rbd_assert(length <= U64_MAX - offset);
1042 if (offset + length > segment_size)
1043 length = segment_size - offset;
1049 * returns the size of an object in the image
1051 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1053 return 1 << header->obj_order;
1060 static void bio_chain_put(struct bio *chain)
1066 chain = chain->bi_next;
1072 * zeros a bio chain, starting at specific offset
1074 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 unsigned long flags;
1083 bio_for_each_segment(bv, chain, i) {
1084 if (pos + bv->bv_len > start_ofs) {
1085 int remainder = max(start_ofs - pos, 0);
1086 buf = bvec_kmap_irq(bv, &flags);
1087 memset(buf + remainder, 0,
1088 bv->bv_len - remainder);
1089 bvec_kunmap_irq(buf, &flags);
1094 chain = chain->bi_next;
1099 * similar to zero_bio_chain(), zeros data defined by a page array,
1100 * starting at the given byte offset from the start of the array and
1101 * continuing up to the given end offset. The pages array is
1102 * assumed to be big enough to hold all bytes up to the end.
1104 static void zero_pages(struct page **pages, u64 offset, u64 end)
1106 struct page **page = &pages[offset >> PAGE_SHIFT];
1108 rbd_assert(end > offset);
1109 rbd_assert(end - offset <= (u64)SIZE_MAX);
1110 while (offset < end) {
1113 unsigned long flags;
1116 page_offset = (size_t)(offset & ~PAGE_MASK);
1117 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118 local_irq_save(flags);
1119 kaddr = kmap_atomic(*page);
1120 memset(kaddr + page_offset, 0, length);
1121 kunmap_atomic(kaddr);
1122 local_irq_restore(flags);
1130 * Clone a portion of a bio, starting at the given byte offset
1131 * and continuing for the number of bytes indicated.
1133 static struct bio *bio_clone_range(struct bio *bio_src,
1134 unsigned int offset,
1142 unsigned short end_idx;
1143 unsigned short vcnt;
1146 /* Handle the easy case for the caller */
1148 if (!offset && len == bio_src->bi_size)
1149 return bio_clone(bio_src, gfpmask);
1151 if (WARN_ON_ONCE(!len))
1153 if (WARN_ON_ONCE(len > bio_src->bi_size))
1155 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158 /* Find first affected segment... */
1161 __bio_for_each_segment(bv, bio_src, idx, 0) {
1162 if (resid < bv->bv_len)
1164 resid -= bv->bv_len;
1168 /* ...and the last affected segment */
1171 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172 if (resid <= bv->bv_len)
1174 resid -= bv->bv_len;
1176 vcnt = end_idx - idx + 1;
1178 /* Build the clone */
1180 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1182 return NULL; /* ENOMEM */
1184 bio->bi_bdev = bio_src->bi_bdev;
1185 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186 bio->bi_rw = bio_src->bi_rw;
1187 bio->bi_flags |= 1 << BIO_CLONED;
1190 * Copy over our part of the bio_vec, then update the first
1191 * and last (or only) entries.
1193 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194 vcnt * sizeof (struct bio_vec));
1195 bio->bi_io_vec[0].bv_offset += voff;
1197 bio->bi_io_vec[0].bv_len -= voff;
1198 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1200 bio->bi_io_vec[0].bv_len = len;
1203 bio->bi_vcnt = vcnt;
1211 * Clone a portion of a bio chain, starting at the given byte offset
1212 * into the first bio in the source chain and continuing for the
1213 * number of bytes indicated. The result is another bio chain of
1214 * exactly the given length, or a null pointer on error.
1216 * The bio_src and offset parameters are both in-out. On entry they
1217 * refer to the first source bio and the offset into that bio where
1218 * the start of data to be cloned is located.
1220 * On return, bio_src is updated to refer to the bio in the source
1221 * chain that contains first un-cloned byte, and *offset will
1222 * contain the offset of that byte within that bio.
1224 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225 unsigned int *offset,
1229 struct bio *bi = *bio_src;
1230 unsigned int off = *offset;
1231 struct bio *chain = NULL;
1234 /* Build up a chain of clone bios up to the limit */
1236 if (!bi || off >= bi->bi_size || !len)
1237 return NULL; /* Nothing to clone */
1241 unsigned int bi_size;
1245 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1246 goto out_err; /* EINVAL; ran out of bio's */
1248 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1251 goto out_err; /* ENOMEM */
1254 end = &bio->bi_next;
1257 if (off == bi->bi_size) {
1268 bio_chain_put(chain);
1274 * The default/initial value for all object request flags is 0. For
1275 * each flag, once its value is set to 1 it is never reset to 0
1278 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1280 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1281 struct rbd_device *rbd_dev;
1283 rbd_dev = obj_request->img_request->rbd_dev;
1284 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1289 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1297 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298 struct rbd_device *rbd_dev = NULL;
1300 if (obj_request_img_data_test(obj_request))
1301 rbd_dev = obj_request->img_request->rbd_dev;
1302 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1307 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1314 * This sets the KNOWN flag after (possibly) setting the EXISTS
1315 * flag. The latter is set based on the "exists" value provided.
1317 * Note that for our purposes once an object exists it never goes
1318 * away again. It's possible that the response from two existence
1319 * checks are separated by the creation of the target object, and
1320 * the first ("doesn't exist") response arrives *after* the second
1321 * ("does exist"). In that case we ignore the second one.
1323 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1327 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1332 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1346 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347 atomic_read(&obj_request->kref.refcount));
1348 kref_get(&obj_request->kref);
1351 static void rbd_obj_request_destroy(struct kref *kref);
1352 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1354 rbd_assert(obj_request != NULL);
1355 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356 atomic_read(&obj_request->kref.refcount));
1357 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 static void rbd_img_request_get(struct rbd_img_request *img_request)
1362 dout("%s: img %p (was %d)\n", __func__, img_request,
1363 atomic_read(&img_request->kref.refcount));
1364 kref_get(&img_request->kref);
1367 static void rbd_img_request_destroy(struct kref *kref);
1368 static void rbd_img_request_put(struct rbd_img_request *img_request)
1370 rbd_assert(img_request != NULL);
1371 dout("%s: img %p (was %d)\n", __func__, img_request,
1372 atomic_read(&img_request->kref.refcount));
1373 kref_put(&img_request->kref, rbd_img_request_destroy);
1376 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1377 struct rbd_obj_request *obj_request)
1379 rbd_assert(obj_request->img_request == NULL);
1381 /* Image request now owns object's original reference */
1382 obj_request->img_request = img_request;
1383 obj_request->which = img_request->obj_request_count;
1384 rbd_assert(!obj_request_img_data_test(obj_request));
1385 obj_request_img_data_set(obj_request);
1386 rbd_assert(obj_request->which != BAD_WHICH);
1387 img_request->obj_request_count++;
1388 list_add_tail(&obj_request->links, &img_request->obj_requests);
1389 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1390 obj_request->which);
1393 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1394 struct rbd_obj_request *obj_request)
1396 rbd_assert(obj_request->which != BAD_WHICH);
1398 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1399 obj_request->which);
1400 list_del(&obj_request->links);
1401 rbd_assert(img_request->obj_request_count > 0);
1402 img_request->obj_request_count--;
1403 rbd_assert(obj_request->which == img_request->obj_request_count);
1404 obj_request->which = BAD_WHICH;
1405 rbd_assert(obj_request_img_data_test(obj_request));
1406 rbd_assert(obj_request->img_request == img_request);
1407 obj_request->img_request = NULL;
1408 obj_request->callback = NULL;
1409 rbd_obj_request_put(obj_request);
1412 static bool obj_request_type_valid(enum obj_request_type type)
1415 case OBJ_REQUEST_NODATA:
1416 case OBJ_REQUEST_BIO:
1417 case OBJ_REQUEST_PAGES:
1424 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1425 struct rbd_obj_request *obj_request)
1427 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1429 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1432 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1435 dout("%s: img %p\n", __func__, img_request);
1438 * If no error occurred, compute the aggregate transfer
1439 * count for the image request. We could instead use
1440 * atomic64_cmpxchg() to update it as each object request
1441 * completes; not clear which way is better off hand.
1443 if (!img_request->result) {
1444 struct rbd_obj_request *obj_request;
1447 for_each_obj_request(img_request, obj_request)
1448 xferred += obj_request->xferred;
1449 img_request->xferred = xferred;
1452 if (img_request->callback)
1453 img_request->callback(img_request);
1455 rbd_img_request_put(img_request);
1458 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1460 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1462 dout("%s: obj %p\n", __func__, obj_request);
1464 return wait_for_completion_interruptible(&obj_request->completion);
1468 * The default/initial value for all image request flags is 0. Each
1469 * is conditionally set to 1 at image request initialization time
1470 * and currently never change thereafter.
1472 static void img_request_write_set(struct rbd_img_request *img_request)
1474 set_bit(IMG_REQ_WRITE, &img_request->flags);
1478 static bool img_request_write_test(struct rbd_img_request *img_request)
1481 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1484 static void img_request_child_set(struct rbd_img_request *img_request)
1486 set_bit(IMG_REQ_CHILD, &img_request->flags);
1490 static bool img_request_child_test(struct rbd_img_request *img_request)
1493 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1496 static void img_request_layered_set(struct rbd_img_request *img_request)
1498 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1502 static bool img_request_layered_test(struct rbd_img_request *img_request)
1505 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1509 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1511 u64 xferred = obj_request->xferred;
1512 u64 length = obj_request->length;
1514 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1515 obj_request, obj_request->img_request, obj_request->result,
1518 * ENOENT means a hole in the image. We zero-fill the
1519 * entire length of the request. A short read also implies
1520 * zero-fill to the end of the request. Either way we
1521 * update the xferred count to indicate the whole request
1524 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1525 if (obj_request->result == -ENOENT) {
1526 if (obj_request->type == OBJ_REQUEST_BIO)
1527 zero_bio_chain(obj_request->bio_list, 0);
1529 zero_pages(obj_request->pages, 0, length);
1530 obj_request->result = 0;
1531 obj_request->xferred = length;
1532 } else if (xferred < length && !obj_request->result) {
1533 if (obj_request->type == OBJ_REQUEST_BIO)
1534 zero_bio_chain(obj_request->bio_list, xferred);
1536 zero_pages(obj_request->pages, xferred, length);
1537 obj_request->xferred = length;
1539 obj_request_done_set(obj_request);
1542 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1544 dout("%s: obj %p cb %p\n", __func__, obj_request,
1545 obj_request->callback);
1546 if (obj_request->callback)
1547 obj_request->callback(obj_request);
1549 complete_all(&obj_request->completion);
1552 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1554 dout("%s: obj %p\n", __func__, obj_request);
1555 obj_request_done_set(obj_request);
1558 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1560 struct rbd_img_request *img_request = NULL;
1561 struct rbd_device *rbd_dev = NULL;
1562 bool layered = false;
1564 if (obj_request_img_data_test(obj_request)) {
1565 img_request = obj_request->img_request;
1566 layered = img_request && img_request_layered_test(img_request);
1567 rbd_dev = img_request->rbd_dev;
1570 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1571 obj_request, img_request, obj_request->result,
1572 obj_request->xferred, obj_request->length);
1573 if (layered && obj_request->result == -ENOENT &&
1574 obj_request->img_offset < rbd_dev->parent_overlap)
1575 rbd_img_parent_read(obj_request);
1576 else if (img_request)
1577 rbd_img_obj_request_read_callback(obj_request);
1579 obj_request_done_set(obj_request);
1582 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1584 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1585 obj_request->result, obj_request->length);
1587 * There is no such thing as a successful short write. Set
1588 * it to our originally-requested length.
1590 obj_request->xferred = obj_request->length;
1591 obj_request_done_set(obj_request);
1595 * For a simple stat call there's nothing to do. We'll do more if
1596 * this is part of a write sequence for a layered image.
1598 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1600 dout("%s: obj %p\n", __func__, obj_request);
1601 obj_request_done_set(obj_request);
1604 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1605 struct ceph_msg *msg)
1607 struct rbd_obj_request *obj_request = osd_req->r_priv;
1610 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1611 rbd_assert(osd_req == obj_request->osd_req);
1612 if (obj_request_img_data_test(obj_request)) {
1613 rbd_assert(obj_request->img_request);
1614 rbd_assert(obj_request->which != BAD_WHICH);
1616 rbd_assert(obj_request->which == BAD_WHICH);
1619 if (osd_req->r_result < 0)
1620 obj_request->result = osd_req->r_result;
1622 BUG_ON(osd_req->r_num_ops > 2);
1625 * We support a 64-bit length, but ultimately it has to be
1626 * passed to blk_end_request(), which takes an unsigned int.
1628 obj_request->xferred = osd_req->r_reply_op_len[0];
1629 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1630 opcode = osd_req->r_ops[0].op;
1632 case CEPH_OSD_OP_READ:
1633 rbd_osd_read_callback(obj_request);
1635 case CEPH_OSD_OP_WRITE:
1636 rbd_osd_write_callback(obj_request);
1638 case CEPH_OSD_OP_STAT:
1639 rbd_osd_stat_callback(obj_request);
1641 case CEPH_OSD_OP_CALL:
1642 case CEPH_OSD_OP_NOTIFY_ACK:
1643 case CEPH_OSD_OP_WATCH:
1644 rbd_osd_trivial_callback(obj_request);
1647 rbd_warn(NULL, "%s: unsupported op %hu\n",
1648 obj_request->object_name, (unsigned short) opcode);
1652 if (obj_request_done_test(obj_request))
1653 rbd_obj_request_complete(obj_request);
1656 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1658 struct rbd_img_request *img_request = obj_request->img_request;
1659 struct ceph_osd_request *osd_req = obj_request->osd_req;
1662 rbd_assert(osd_req != NULL);
1664 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1665 ceph_osdc_build_request(osd_req, obj_request->offset,
1666 NULL, snap_id, NULL);
1669 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1671 struct rbd_img_request *img_request = obj_request->img_request;
1672 struct ceph_osd_request *osd_req = obj_request->osd_req;
1673 struct ceph_snap_context *snapc;
1674 struct timespec mtime = CURRENT_TIME;
1676 rbd_assert(osd_req != NULL);
1678 snapc = img_request ? img_request->snapc : NULL;
1679 ceph_osdc_build_request(osd_req, obj_request->offset,
1680 snapc, CEPH_NOSNAP, &mtime);
1683 static struct ceph_osd_request *rbd_osd_req_create(
1684 struct rbd_device *rbd_dev,
1686 struct rbd_obj_request *obj_request)
1688 struct ceph_snap_context *snapc = NULL;
1689 struct ceph_osd_client *osdc;
1690 struct ceph_osd_request *osd_req;
1692 if (obj_request_img_data_test(obj_request)) {
1693 struct rbd_img_request *img_request = obj_request->img_request;
1695 rbd_assert(write_request ==
1696 img_request_write_test(img_request));
1698 snapc = img_request->snapc;
1701 /* Allocate and initialize the request, for the single op */
1703 osdc = &rbd_dev->rbd_client->client->osdc;
1704 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1706 return NULL; /* ENOMEM */
1709 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1711 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1713 osd_req->r_callback = rbd_osd_req_callback;
1714 osd_req->r_priv = obj_request;
1716 osd_req->r_oid_len = strlen(obj_request->object_name);
1717 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1718 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1720 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1726 * Create a copyup osd request based on the information in the
1727 * object request supplied. A copyup request has two osd ops,
1728 * a copyup method call, and a "normal" write request.
1730 static struct ceph_osd_request *
1731 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1733 struct rbd_img_request *img_request;
1734 struct ceph_snap_context *snapc;
1735 struct rbd_device *rbd_dev;
1736 struct ceph_osd_client *osdc;
1737 struct ceph_osd_request *osd_req;
1739 rbd_assert(obj_request_img_data_test(obj_request));
1740 img_request = obj_request->img_request;
1741 rbd_assert(img_request);
1742 rbd_assert(img_request_write_test(img_request));
1744 /* Allocate and initialize the request, for the two ops */
1746 snapc = img_request->snapc;
1747 rbd_dev = img_request->rbd_dev;
1748 osdc = &rbd_dev->rbd_client->client->osdc;
1749 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1751 return NULL; /* ENOMEM */
1753 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1754 osd_req->r_callback = rbd_osd_req_callback;
1755 osd_req->r_priv = obj_request;
1757 osd_req->r_oid_len = strlen(obj_request->object_name);
1758 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1759 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1761 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1767 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1769 ceph_osdc_put_request(osd_req);
1772 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1774 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1775 u64 offset, u64 length,
1776 enum obj_request_type type)
1778 struct rbd_obj_request *obj_request;
1782 rbd_assert(obj_request_type_valid(type));
1784 size = strlen(object_name) + 1;
1785 name = kmalloc(size, GFP_KERNEL);
1789 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1795 obj_request->object_name = memcpy(name, object_name, size);
1796 obj_request->offset = offset;
1797 obj_request->length = length;
1798 obj_request->flags = 0;
1799 obj_request->which = BAD_WHICH;
1800 obj_request->type = type;
1801 INIT_LIST_HEAD(&obj_request->links);
1802 init_completion(&obj_request->completion);
1803 kref_init(&obj_request->kref);
1805 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1806 offset, length, (int)type, obj_request);
1811 static void rbd_obj_request_destroy(struct kref *kref)
1813 struct rbd_obj_request *obj_request;
1815 obj_request = container_of(kref, struct rbd_obj_request, kref);
1817 dout("%s: obj %p\n", __func__, obj_request);
1819 rbd_assert(obj_request->img_request == NULL);
1820 rbd_assert(obj_request->which == BAD_WHICH);
1822 if (obj_request->osd_req)
1823 rbd_osd_req_destroy(obj_request->osd_req);
1825 rbd_assert(obj_request_type_valid(obj_request->type));
1826 switch (obj_request->type) {
1827 case OBJ_REQUEST_NODATA:
1828 break; /* Nothing to do */
1829 case OBJ_REQUEST_BIO:
1830 if (obj_request->bio_list)
1831 bio_chain_put(obj_request->bio_list);
1833 case OBJ_REQUEST_PAGES:
1834 if (obj_request->pages)
1835 ceph_release_page_vector(obj_request->pages,
1836 obj_request->page_count);
1840 kfree(obj_request->object_name);
1841 obj_request->object_name = NULL;
1842 kmem_cache_free(rbd_obj_request_cache, obj_request);
1846 * Caller is responsible for filling in the list of object requests
1847 * that comprises the image request, and the Linux request pointer
1848 * (if there is one).
1850 static struct rbd_img_request *rbd_img_request_create(
1851 struct rbd_device *rbd_dev,
1852 u64 offset, u64 length,
1856 struct rbd_img_request *img_request;
1858 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1862 if (write_request) {
1863 down_read(&rbd_dev->header_rwsem);
1864 ceph_get_snap_context(rbd_dev->header.snapc);
1865 up_read(&rbd_dev->header_rwsem);
1868 img_request->rq = NULL;
1869 img_request->rbd_dev = rbd_dev;
1870 img_request->offset = offset;
1871 img_request->length = length;
1872 img_request->flags = 0;
1873 if (write_request) {
1874 img_request_write_set(img_request);
1875 img_request->snapc = rbd_dev->header.snapc;
1877 img_request->snap_id = rbd_dev->spec->snap_id;
1880 img_request_child_set(img_request);
1881 if (rbd_dev->parent_spec)
1882 img_request_layered_set(img_request);
1883 spin_lock_init(&img_request->completion_lock);
1884 img_request->next_completion = 0;
1885 img_request->callback = NULL;
1886 img_request->result = 0;
1887 img_request->obj_request_count = 0;
1888 INIT_LIST_HEAD(&img_request->obj_requests);
1889 kref_init(&img_request->kref);
1891 rbd_img_request_get(img_request); /* Avoid a warning */
1892 rbd_img_request_put(img_request); /* TEMPORARY */
1894 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1895 write_request ? "write" : "read", offset, length,
1901 static void rbd_img_request_destroy(struct kref *kref)
1903 struct rbd_img_request *img_request;
1904 struct rbd_obj_request *obj_request;
1905 struct rbd_obj_request *next_obj_request;
1907 img_request = container_of(kref, struct rbd_img_request, kref);
1909 dout("%s: img %p\n", __func__, img_request);
1911 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1912 rbd_img_obj_request_del(img_request, obj_request);
1913 rbd_assert(img_request->obj_request_count == 0);
1915 if (img_request_write_test(img_request))
1916 ceph_put_snap_context(img_request->snapc);
1918 if (img_request_child_test(img_request))
1919 rbd_obj_request_put(img_request->obj_request);
1921 kmem_cache_free(rbd_img_request_cache, img_request);
1924 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1926 struct rbd_img_request *img_request;
1927 unsigned int xferred;
1931 rbd_assert(obj_request_img_data_test(obj_request));
1932 img_request = obj_request->img_request;
1934 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1935 xferred = (unsigned int)obj_request->xferred;
1936 result = obj_request->result;
1938 struct rbd_device *rbd_dev = img_request->rbd_dev;
1940 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1941 img_request_write_test(img_request) ? "write" : "read",
1942 obj_request->length, obj_request->img_offset,
1943 obj_request->offset);
1944 rbd_warn(rbd_dev, " result %d xferred %x\n",
1946 if (!img_request->result)
1947 img_request->result = result;
1950 /* Image object requests don't own their page array */
1952 if (obj_request->type == OBJ_REQUEST_PAGES) {
1953 obj_request->pages = NULL;
1954 obj_request->page_count = 0;
1957 if (img_request_child_test(img_request)) {
1958 rbd_assert(img_request->obj_request != NULL);
1959 more = obj_request->which < img_request->obj_request_count - 1;
1961 rbd_assert(img_request->rq != NULL);
1962 more = blk_end_request(img_request->rq, result, xferred);
1968 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1970 struct rbd_img_request *img_request;
1971 u32 which = obj_request->which;
1974 rbd_assert(obj_request_img_data_test(obj_request));
1975 img_request = obj_request->img_request;
1977 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1978 rbd_assert(img_request != NULL);
1979 rbd_assert(img_request->obj_request_count > 0);
1980 rbd_assert(which != BAD_WHICH);
1981 rbd_assert(which < img_request->obj_request_count);
1982 rbd_assert(which >= img_request->next_completion);
1984 spin_lock_irq(&img_request->completion_lock);
1985 if (which != img_request->next_completion)
1988 for_each_obj_request_from(img_request, obj_request) {
1990 rbd_assert(which < img_request->obj_request_count);
1992 if (!obj_request_done_test(obj_request))
1994 more = rbd_img_obj_end_request(obj_request);
1998 rbd_assert(more ^ (which == img_request->obj_request_count));
1999 img_request->next_completion = which;
2001 spin_unlock_irq(&img_request->completion_lock);
2004 rbd_img_request_complete(img_request);
2008 * Split up an image request into one or more object requests, each
2009 * to a different object. The "type" parameter indicates whether
2010 * "data_desc" is the pointer to the head of a list of bio
2011 * structures, or the base of a page array. In either case this
2012 * function assumes data_desc describes memory sufficient to hold
2013 * all data described by the image request.
2015 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2016 enum obj_request_type type,
2019 struct rbd_device *rbd_dev = img_request->rbd_dev;
2020 struct rbd_obj_request *obj_request = NULL;
2021 struct rbd_obj_request *next_obj_request;
2022 bool write_request = img_request_write_test(img_request);
2023 struct bio *bio_list;
2024 unsigned int bio_offset = 0;
2025 struct page **pages;
2030 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2031 (int)type, data_desc);
2033 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2034 img_offset = img_request->offset;
2035 resid = img_request->length;
2036 rbd_assert(resid > 0);
2038 if (type == OBJ_REQUEST_BIO) {
2039 bio_list = data_desc;
2040 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2042 rbd_assert(type == OBJ_REQUEST_PAGES);
2047 struct ceph_osd_request *osd_req;
2048 const char *object_name;
2052 object_name = rbd_segment_name(rbd_dev, img_offset);
2055 offset = rbd_segment_offset(rbd_dev, img_offset);
2056 length = rbd_segment_length(rbd_dev, img_offset, resid);
2057 obj_request = rbd_obj_request_create(object_name,
2058 offset, length, type);
2059 /* object request has its own copy of the object name */
2060 rbd_segment_name_free(object_name);
2064 if (type == OBJ_REQUEST_BIO) {
2065 unsigned int clone_size;
2067 rbd_assert(length <= (u64)UINT_MAX);
2068 clone_size = (unsigned int)length;
2069 obj_request->bio_list =
2070 bio_chain_clone_range(&bio_list,
2074 if (!obj_request->bio_list)
2077 unsigned int page_count;
2079 obj_request->pages = pages;
2080 page_count = (u32)calc_pages_for(offset, length);
2081 obj_request->page_count = page_count;
2082 if ((offset + length) & ~PAGE_MASK)
2083 page_count--; /* more on last page */
2084 pages += page_count;
2087 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2091 obj_request->osd_req = osd_req;
2092 obj_request->callback = rbd_img_obj_callback;
2094 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2096 if (type == OBJ_REQUEST_BIO)
2097 osd_req_op_extent_osd_data_bio(osd_req, 0,
2098 obj_request->bio_list, length);
2100 osd_req_op_extent_osd_data_pages(osd_req, 0,
2101 obj_request->pages, length,
2102 offset & ~PAGE_MASK, false, false);
2105 rbd_osd_req_format_write(obj_request);
2107 rbd_osd_req_format_read(obj_request);
2109 obj_request->img_offset = img_offset;
2110 rbd_img_obj_request_add(img_request, obj_request);
2112 img_offset += length;
2119 rbd_obj_request_put(obj_request);
2121 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122 rbd_obj_request_put(obj_request);
2128 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2130 struct rbd_img_request *img_request;
2131 struct rbd_device *rbd_dev;
2135 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2136 rbd_assert(obj_request_img_data_test(obj_request));
2137 img_request = obj_request->img_request;
2138 rbd_assert(img_request);
2140 rbd_dev = img_request->rbd_dev;
2141 rbd_assert(rbd_dev);
2142 length = (u64)1 << rbd_dev->header.obj_order;
2143 page_count = (u32)calc_pages_for(0, length);
2145 rbd_assert(obj_request->copyup_pages);
2146 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2147 obj_request->copyup_pages = NULL;
2150 * We want the transfer count to reflect the size of the
2151 * original write request. There is no such thing as a
2152 * successful short write, so if the request was successful
2153 * we can just set it to the originally-requested length.
2155 if (!obj_request->result)
2156 obj_request->xferred = obj_request->length;
2158 /* Finish up with the normal image object callback */
2160 rbd_img_obj_callback(obj_request);
2164 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2166 struct rbd_obj_request *orig_request;
2167 struct ceph_osd_request *osd_req;
2168 struct ceph_osd_client *osdc;
2169 struct rbd_device *rbd_dev;
2170 struct page **pages;
2175 rbd_assert(img_request_child_test(img_request));
2177 /* First get what we need from the image request */
2179 pages = img_request->copyup_pages;
2180 rbd_assert(pages != NULL);
2181 img_request->copyup_pages = NULL;
2183 orig_request = img_request->obj_request;
2184 rbd_assert(orig_request != NULL);
2185 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2186 result = img_request->result;
2187 obj_size = img_request->length;
2188 xferred = img_request->xferred;
2189 rbd_img_request_put(img_request);
2191 rbd_assert(orig_request->img_request);
2192 rbd_dev = orig_request->img_request->rbd_dev;
2193 rbd_assert(rbd_dev);
2194 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2199 /* Allocate the new copyup osd request for the original request */
2202 rbd_assert(!orig_request->osd_req);
2203 osd_req = rbd_osd_req_create_copyup(orig_request);
2206 orig_request->osd_req = osd_req;
2207 orig_request->copyup_pages = pages;
2209 /* Initialize the copyup op */
2211 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2212 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2215 /* Then the original write request op */
2217 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2218 orig_request->offset,
2219 orig_request->length, 0, 0);
2220 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2221 orig_request->length);
2223 rbd_osd_req_format_write(orig_request);
2225 /* All set, send it off. */
2227 orig_request->callback = rbd_img_obj_copyup_callback;
2228 osdc = &rbd_dev->rbd_client->client->osdc;
2229 result = rbd_obj_request_submit(osdc, orig_request);
2233 /* Record the error code and complete the request */
2235 orig_request->result = result;
2236 orig_request->xferred = 0;
2237 obj_request_done_set(orig_request);
2238 rbd_obj_request_complete(orig_request);
2242 * Read from the parent image the range of data that covers the
2243 * entire target of the given object request. This is used for
2244 * satisfying a layered image write request when the target of an
2245 * object request from the image request does not exist.
2247 * A page array big enough to hold the returned data is allocated
2248 * and supplied to rbd_img_request_fill() as the "data descriptor."
2249 * When the read completes, this page array will be transferred to
2250 * the original object request for the copyup operation.
2252 * If an error occurs, record it as the result of the original
2253 * object request and mark it done so it gets completed.
2255 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2257 struct rbd_img_request *img_request = NULL;
2258 struct rbd_img_request *parent_request = NULL;
2259 struct rbd_device *rbd_dev;
2262 struct page **pages = NULL;
2266 rbd_assert(obj_request_img_data_test(obj_request));
2267 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2269 img_request = obj_request->img_request;
2270 rbd_assert(img_request != NULL);
2271 rbd_dev = img_request->rbd_dev;
2272 rbd_assert(rbd_dev->parent != NULL);
2275 * First things first. The original osd request is of no
2276 * use to use any more, we'll need a new one that can hold
2277 * the two ops in a copyup request. We'll get that later,
2278 * but for now we can release the old one.
2280 rbd_osd_req_destroy(obj_request->osd_req);
2281 obj_request->osd_req = NULL;
2284 * Determine the byte range covered by the object in the
2285 * child image to which the original request was to be sent.
2287 img_offset = obj_request->img_offset - obj_request->offset;
2288 length = (u64)1 << rbd_dev->header.obj_order;
2291 * There is no defined parent data beyond the parent
2292 * overlap, so limit what we read at that boundary if
2295 if (img_offset + length > rbd_dev->parent_overlap) {
2296 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297 length = rbd_dev->parent_overlap - img_offset;
2301 * Allocate a page array big enough to receive the data read
2304 page_count = (u32)calc_pages_for(0, length);
2305 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306 if (IS_ERR(pages)) {
2307 result = PTR_ERR(pages);
2313 parent_request = rbd_img_request_create(rbd_dev->parent,
2316 if (!parent_request)
2318 rbd_obj_request_get(obj_request);
2319 parent_request->obj_request = obj_request;
2321 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2324 parent_request->copyup_pages = pages;
2326 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2327 result = rbd_img_request_submit(parent_request);
2331 parent_request->copyup_pages = NULL;
2332 parent_request->obj_request = NULL;
2333 rbd_obj_request_put(obj_request);
2336 ceph_release_page_vector(pages, page_count);
2338 rbd_img_request_put(parent_request);
2339 obj_request->result = result;
2340 obj_request->xferred = 0;
2341 obj_request_done_set(obj_request);
2346 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2348 struct rbd_obj_request *orig_request;
2351 rbd_assert(!obj_request_img_data_test(obj_request));
2354 * All we need from the object request is the original
2355 * request and the result of the STAT op. Grab those, then
2356 * we're done with the request.
2358 orig_request = obj_request->obj_request;
2359 obj_request->obj_request = NULL;
2360 rbd_assert(orig_request);
2361 rbd_assert(orig_request->img_request);
2363 result = obj_request->result;
2364 obj_request->result = 0;
2366 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2367 obj_request, orig_request, result,
2368 obj_request->xferred, obj_request->length);
2369 rbd_obj_request_put(obj_request);
2371 rbd_assert(orig_request);
2372 rbd_assert(orig_request->img_request);
2375 * Our only purpose here is to determine whether the object
2376 * exists, and we don't want to treat the non-existence as
2377 * an error. If something else comes back, transfer the
2378 * error to the original request and complete it now.
2381 obj_request_existence_set(orig_request, true);
2382 } else if (result == -ENOENT) {
2383 obj_request_existence_set(orig_request, false);
2384 } else if (result) {
2385 orig_request->result = result;
2390 * Resubmit the original request now that we have recorded
2391 * whether the target object exists.
2393 orig_request->result = rbd_img_obj_request_submit(orig_request);
2395 if (orig_request->result)
2396 rbd_obj_request_complete(orig_request);
2397 rbd_obj_request_put(orig_request);
2400 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2402 struct rbd_obj_request *stat_request;
2403 struct rbd_device *rbd_dev;
2404 struct ceph_osd_client *osdc;
2405 struct page **pages = NULL;
2411 * The response data for a STAT call consists of:
2418 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2419 page_count = (u32)calc_pages_for(0, size);
2420 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2422 return PTR_ERR(pages);
2425 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2430 rbd_obj_request_get(obj_request);
2431 stat_request->obj_request = obj_request;
2432 stat_request->pages = pages;
2433 stat_request->page_count = page_count;
2435 rbd_assert(obj_request->img_request);
2436 rbd_dev = obj_request->img_request->rbd_dev;
2437 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2439 if (!stat_request->osd_req)
2441 stat_request->callback = rbd_img_obj_exists_callback;
2443 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2444 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2446 rbd_osd_req_format_read(stat_request);
2448 osdc = &rbd_dev->rbd_client->client->osdc;
2449 ret = rbd_obj_request_submit(osdc, stat_request);
2452 rbd_obj_request_put(obj_request);
2457 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2459 struct rbd_img_request *img_request;
2460 struct rbd_device *rbd_dev;
2463 rbd_assert(obj_request_img_data_test(obj_request));
2465 img_request = obj_request->img_request;
2466 rbd_assert(img_request);
2467 rbd_dev = img_request->rbd_dev;
2470 * Only writes to layered images need special handling.
2471 * Reads and non-layered writes are simple object requests.
2472 * Layered writes that start beyond the end of the overlap
2473 * with the parent have no parent data, so they too are
2474 * simple object requests. Finally, if the target object is
2475 * known to already exist, its parent data has already been
2476 * copied, so a write to the object can also be handled as a
2477 * simple object request.
2479 if (!img_request_write_test(img_request) ||
2480 !img_request_layered_test(img_request) ||
2481 rbd_dev->parent_overlap <= obj_request->img_offset ||
2482 ((known = obj_request_known_test(obj_request)) &&
2483 obj_request_exists_test(obj_request))) {
2485 struct rbd_device *rbd_dev;
2486 struct ceph_osd_client *osdc;
2488 rbd_dev = obj_request->img_request->rbd_dev;
2489 osdc = &rbd_dev->rbd_client->client->osdc;
2491 return rbd_obj_request_submit(osdc, obj_request);
2495 * It's a layered write. The target object might exist but
2496 * we may not know that yet. If we know it doesn't exist,
2497 * start by reading the data for the full target object from
2498 * the parent so we can use it for a copyup to the target.
2501 return rbd_img_obj_parent_read_full(obj_request);
2503 /* We don't know whether the target exists. Go find out. */
2505 return rbd_img_obj_exists_submit(obj_request);
2508 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2510 struct rbd_obj_request *obj_request;
2511 struct rbd_obj_request *next_obj_request;
2513 dout("%s: img %p\n", __func__, img_request);
2514 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2517 ret = rbd_img_obj_request_submit(obj_request);
2525 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2527 struct rbd_obj_request *obj_request;
2528 struct rbd_device *rbd_dev;
2531 rbd_assert(img_request_child_test(img_request));
2533 obj_request = img_request->obj_request;
2534 rbd_assert(obj_request);
2535 rbd_assert(obj_request->img_request);
2537 obj_request->result = img_request->result;
2538 if (obj_request->result)
2542 * We need to zero anything beyond the parent overlap
2543 * boundary. Since rbd_img_obj_request_read_callback()
2544 * will zero anything beyond the end of a short read, an
2545 * easy way to do this is to pretend the data from the
2546 * parent came up short--ending at the overlap boundary.
2548 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2549 obj_end = obj_request->img_offset + obj_request->length;
2550 rbd_dev = obj_request->img_request->rbd_dev;
2551 if (obj_end > rbd_dev->parent_overlap) {
2554 if (obj_request->img_offset < rbd_dev->parent_overlap)
2555 xferred = rbd_dev->parent_overlap -
2556 obj_request->img_offset;
2558 obj_request->xferred = min(img_request->xferred, xferred);
2560 obj_request->xferred = img_request->xferred;
2563 rbd_img_request_put(img_request);
2564 rbd_img_obj_request_read_callback(obj_request);
2565 rbd_obj_request_complete(obj_request);
2568 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2570 struct rbd_device *rbd_dev;
2571 struct rbd_img_request *img_request;
2574 rbd_assert(obj_request_img_data_test(obj_request));
2575 rbd_assert(obj_request->img_request != NULL);
2576 rbd_assert(obj_request->result == (s32) -ENOENT);
2577 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2579 rbd_dev = obj_request->img_request->rbd_dev;
2580 rbd_assert(rbd_dev->parent != NULL);
2581 /* rbd_read_finish(obj_request, obj_request->length); */
2582 img_request = rbd_img_request_create(rbd_dev->parent,
2583 obj_request->img_offset,
2584 obj_request->length,
2590 rbd_obj_request_get(obj_request);
2591 img_request->obj_request = obj_request;
2593 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2594 obj_request->bio_list);
2598 img_request->callback = rbd_img_parent_read_callback;
2599 result = rbd_img_request_submit(img_request);
2606 rbd_img_request_put(img_request);
2607 obj_request->result = result;
2608 obj_request->xferred = 0;
2609 obj_request_done_set(obj_request);
2612 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2614 struct rbd_obj_request *obj_request;
2615 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2618 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2619 OBJ_REQUEST_NODATA);
2624 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2625 if (!obj_request->osd_req)
2627 obj_request->callback = rbd_obj_request_put;
2629 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2631 rbd_osd_req_format_read(obj_request);
2633 ret = rbd_obj_request_submit(osdc, obj_request);
2636 rbd_obj_request_put(obj_request);
2641 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2643 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2649 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2650 rbd_dev->header_name, (unsigned long long)notify_id,
2651 (unsigned int)opcode);
2652 ret = rbd_dev_refresh(rbd_dev);
2654 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2656 rbd_obj_notify_ack(rbd_dev, notify_id);
2660 * Request sync osd watch/unwatch. The value of "start" determines
2661 * whether a watch request is being initiated or torn down.
2663 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2665 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2666 struct rbd_obj_request *obj_request;
2669 rbd_assert(start ^ !!rbd_dev->watch_event);
2670 rbd_assert(start ^ !!rbd_dev->watch_request);
2673 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2674 &rbd_dev->watch_event);
2677 rbd_assert(rbd_dev->watch_event != NULL);
2681 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2682 OBJ_REQUEST_NODATA);
2686 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2687 if (!obj_request->osd_req)
2691 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2693 ceph_osdc_unregister_linger_request(osdc,
2694 rbd_dev->watch_request->osd_req);
2696 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2697 rbd_dev->watch_event->cookie, 0, start);
2698 rbd_osd_req_format_write(obj_request);
2700 ret = rbd_obj_request_submit(osdc, obj_request);
2703 ret = rbd_obj_request_wait(obj_request);
2706 ret = obj_request->result;
2711 * A watch request is set to linger, so the underlying osd
2712 * request won't go away until we unregister it. We retain
2713 * a pointer to the object request during that time (in
2714 * rbd_dev->watch_request), so we'll keep a reference to
2715 * it. We'll drop that reference (below) after we've
2719 rbd_dev->watch_request = obj_request;
2724 /* We have successfully torn down the watch request */
2726 rbd_obj_request_put(rbd_dev->watch_request);
2727 rbd_dev->watch_request = NULL;
2729 /* Cancel the event if we're tearing down, or on error */
2730 ceph_osdc_cancel_event(rbd_dev->watch_event);
2731 rbd_dev->watch_event = NULL;
2733 rbd_obj_request_put(obj_request);
2739 * Synchronous osd object method call. Returns the number of bytes
2740 * returned in the outbound buffer, or a negative error code.
2742 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2743 const char *object_name,
2744 const char *class_name,
2745 const char *method_name,
2746 const void *outbound,
2747 size_t outbound_size,
2749 size_t inbound_size)
2751 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2752 struct rbd_obj_request *obj_request;
2753 struct page **pages;
2758 * Method calls are ultimately read operations. The result
2759 * should placed into the inbound buffer provided. They
2760 * also supply outbound data--parameters for the object
2761 * method. Currently if this is present it will be a
2764 page_count = (u32)calc_pages_for(0, inbound_size);
2765 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2767 return PTR_ERR(pages);
2770 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2775 obj_request->pages = pages;
2776 obj_request->page_count = page_count;
2778 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2779 if (!obj_request->osd_req)
2782 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2783 class_name, method_name);
2784 if (outbound_size) {
2785 struct ceph_pagelist *pagelist;
2787 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2791 ceph_pagelist_init(pagelist);
2792 ceph_pagelist_append(pagelist, outbound, outbound_size);
2793 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2796 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2797 obj_request->pages, inbound_size,
2799 rbd_osd_req_format_read(obj_request);
2801 ret = rbd_obj_request_submit(osdc, obj_request);
2804 ret = rbd_obj_request_wait(obj_request);
2808 ret = obj_request->result;
2812 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2813 ret = (int)obj_request->xferred;
2814 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2817 rbd_obj_request_put(obj_request);
2819 ceph_release_page_vector(pages, page_count);
2824 static void rbd_request_fn(struct request_queue *q)
2825 __releases(q->queue_lock) __acquires(q->queue_lock)
2827 struct rbd_device *rbd_dev = q->queuedata;
2828 bool read_only = rbd_dev->mapping.read_only;
2832 while ((rq = blk_fetch_request(q))) {
2833 bool write_request = rq_data_dir(rq) == WRITE;
2834 struct rbd_img_request *img_request;
2838 /* Ignore any non-FS requests that filter through. */
2840 if (rq->cmd_type != REQ_TYPE_FS) {
2841 dout("%s: non-fs request type %d\n", __func__,
2842 (int) rq->cmd_type);
2843 __blk_end_request_all(rq, 0);
2847 /* Ignore/skip any zero-length requests */
2849 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2850 length = (u64) blk_rq_bytes(rq);
2853 dout("%s: zero-length request\n", __func__);
2854 __blk_end_request_all(rq, 0);
2858 spin_unlock_irq(q->queue_lock);
2860 /* Disallow writes to a read-only device */
2862 if (write_request) {
2866 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2870 * Quit early if the mapped snapshot no longer
2871 * exists. It's still possible the snapshot will
2872 * have disappeared by the time our request arrives
2873 * at the osd, but there's no sense in sending it if
2876 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2877 dout("request for non-existent snapshot");
2878 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2884 if (offset && length > U64_MAX - offset + 1) {
2885 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2887 goto end_request; /* Shouldn't happen */
2891 if (offset + length > rbd_dev->mapping.size) {
2892 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2893 offset, length, rbd_dev->mapping.size);
2898 img_request = rbd_img_request_create(rbd_dev, offset, length,
2899 write_request, false);
2903 img_request->rq = rq;
2905 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2908 result = rbd_img_request_submit(img_request);
2910 rbd_img_request_put(img_request);
2912 spin_lock_irq(q->queue_lock);
2914 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2915 write_request ? "write" : "read",
2916 length, offset, result);
2918 __blk_end_request_all(rq, result);
2924 * a queue callback. Makes sure that we don't create a bio that spans across
2925 * multiple osd objects. One exception would be with a single page bios,
2926 * which we handle later at bio_chain_clone_range()
2928 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2929 struct bio_vec *bvec)
2931 struct rbd_device *rbd_dev = q->queuedata;
2932 sector_t sector_offset;
2933 sector_t sectors_per_obj;
2934 sector_t obj_sector_offset;
2938 * Find how far into its rbd object the partition-relative
2939 * bio start sector is to offset relative to the enclosing
2942 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2943 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2944 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2947 * Compute the number of bytes from that offset to the end
2948 * of the object. Account for what's already used by the bio.
2950 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2951 if (ret > bmd->bi_size)
2952 ret -= bmd->bi_size;
2957 * Don't send back more than was asked for. And if the bio
2958 * was empty, let the whole thing through because: "Note
2959 * that a block device *must* allow a single page to be
2960 * added to an empty bio."
2962 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2963 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2964 ret = (int) bvec->bv_len;
2969 static void rbd_free_disk(struct rbd_device *rbd_dev)
2971 struct gendisk *disk = rbd_dev->disk;
2976 rbd_dev->disk = NULL;
2977 if (disk->flags & GENHD_FL_UP) {
2980 blk_cleanup_queue(disk->queue);
2985 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2986 const char *object_name,
2987 u64 offset, u64 length, void *buf)
2990 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2991 struct rbd_obj_request *obj_request;
2992 struct page **pages = NULL;
2997 page_count = (u32) calc_pages_for(offset, length);
2998 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3000 ret = PTR_ERR(pages);
3003 obj_request = rbd_obj_request_create(object_name, offset, length,
3008 obj_request->pages = pages;
3009 obj_request->page_count = page_count;
3011 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3012 if (!obj_request->osd_req)
3015 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3016 offset, length, 0, 0);
3017 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3019 obj_request->length,
3020 obj_request->offset & ~PAGE_MASK,
3022 rbd_osd_req_format_read(obj_request);
3024 ret = rbd_obj_request_submit(osdc, obj_request);
3027 ret = rbd_obj_request_wait(obj_request);
3031 ret = obj_request->result;
3035 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3036 size = (size_t) obj_request->xferred;
3037 ceph_copy_from_page_vector(pages, buf, 0, size);
3038 rbd_assert(size <= (size_t)INT_MAX);
3042 rbd_obj_request_put(obj_request);
3044 ceph_release_page_vector(pages, page_count);
3050 * Read the complete header for the given rbd device. On successful
3051 * return, the rbd_dev->header field will contain up-to-date
3052 * information about the image.
3054 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3056 struct rbd_image_header_ondisk *ondisk = NULL;
3063 * The complete header will include an array of its 64-bit
3064 * snapshot ids, followed by the names of those snapshots as
3065 * a contiguous block of NUL-terminated strings. Note that
3066 * the number of snapshots could change by the time we read
3067 * it in, in which case we re-read it.
3074 size = sizeof (*ondisk);
3075 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3077 ondisk = kmalloc(size, GFP_KERNEL);
3081 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3085 if ((size_t)ret < size) {
3087 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3091 if (!rbd_dev_ondisk_valid(ondisk)) {
3093 rbd_warn(rbd_dev, "invalid header");
3097 names_size = le64_to_cpu(ondisk->snap_names_len);
3098 want_count = snap_count;
3099 snap_count = le32_to_cpu(ondisk->snap_count);
3100 } while (snap_count != want_count);
3102 ret = rbd_header_from_disk(rbd_dev, ondisk);
3110 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3111 * has disappeared from the (just updated) snapshot context.
3113 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3117 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3120 snap_id = rbd_dev->spec->snap_id;
3121 if (snap_id == CEPH_NOSNAP)
3124 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3125 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3128 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3133 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3134 mapping_size = rbd_dev->mapping.size;
3135 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3136 if (rbd_dev->image_format == 1)
3137 ret = rbd_dev_v1_header_info(rbd_dev);
3139 ret = rbd_dev_v2_header_info(rbd_dev);
3141 /* If it's a mapped snapshot, validate its EXISTS flag */
3143 rbd_exists_validate(rbd_dev);
3144 mutex_unlock(&ctl_mutex);
3145 if (mapping_size != rbd_dev->mapping.size) {
3148 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3149 dout("setting size to %llu sectors", (unsigned long long)size);
3150 set_capacity(rbd_dev->disk, size);
3151 revalidate_disk(rbd_dev->disk);
3157 static int rbd_init_disk(struct rbd_device *rbd_dev)
3159 struct gendisk *disk;
3160 struct request_queue *q;
3163 /* create gendisk info */
3164 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3168 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3170 disk->major = rbd_dev->major;
3171 disk->first_minor = 0;
3172 disk->fops = &rbd_bd_ops;
3173 disk->private_data = rbd_dev;
3175 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3179 /* We use the default size, but let's be explicit about it. */
3180 blk_queue_physical_block_size(q, SECTOR_SIZE);
3182 /* set io sizes to object size */
3183 segment_size = rbd_obj_bytes(&rbd_dev->header);
3184 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3185 blk_queue_max_segment_size(q, segment_size);
3186 blk_queue_io_min(q, segment_size);
3187 blk_queue_io_opt(q, segment_size);
3189 blk_queue_merge_bvec(q, rbd_merge_bvec);
3192 q->queuedata = rbd_dev;
3194 rbd_dev->disk = disk;
3207 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3209 return container_of(dev, struct rbd_device, dev);
3212 static ssize_t rbd_size_show(struct device *dev,
3213 struct device_attribute *attr, char *buf)
3215 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3217 return sprintf(buf, "%llu\n",
3218 (unsigned long long)rbd_dev->mapping.size);
3222 * Note this shows the features for whatever's mapped, which is not
3223 * necessarily the base image.
3225 static ssize_t rbd_features_show(struct device *dev,
3226 struct device_attribute *attr, char *buf)
3228 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3230 return sprintf(buf, "0x%016llx\n",
3231 (unsigned long long)rbd_dev->mapping.features);
3234 static ssize_t rbd_major_show(struct device *dev,
3235 struct device_attribute *attr, char *buf)
3237 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3240 return sprintf(buf, "%d\n", rbd_dev->major);
3242 return sprintf(buf, "(none)\n");
3246 static ssize_t rbd_client_id_show(struct device *dev,
3247 struct device_attribute *attr, char *buf)
3249 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3251 return sprintf(buf, "client%lld\n",
3252 ceph_client_id(rbd_dev->rbd_client->client));
3255 static ssize_t rbd_pool_show(struct device *dev,
3256 struct device_attribute *attr, char *buf)
3258 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3260 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3263 static ssize_t rbd_pool_id_show(struct device *dev,
3264 struct device_attribute *attr, char *buf)
3266 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268 return sprintf(buf, "%llu\n",
3269 (unsigned long long) rbd_dev->spec->pool_id);
3272 static ssize_t rbd_name_show(struct device *dev,
3273 struct device_attribute *attr, char *buf)
3275 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3277 if (rbd_dev->spec->image_name)
3278 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3280 return sprintf(buf, "(unknown)\n");
3283 static ssize_t rbd_image_id_show(struct device *dev,
3284 struct device_attribute *attr, char *buf)
3286 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3292 * Shows the name of the currently-mapped snapshot (or
3293 * RBD_SNAP_HEAD_NAME for the base image).
3295 static ssize_t rbd_snap_show(struct device *dev,
3296 struct device_attribute *attr,
3299 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3301 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3305 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3306 * for the parent image. If there is no parent, simply shows
3307 * "(no parent image)".
3309 static ssize_t rbd_parent_show(struct device *dev,
3310 struct device_attribute *attr,
3313 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3314 struct rbd_spec *spec = rbd_dev->parent_spec;
3319 return sprintf(buf, "(no parent image)\n");
3321 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3322 (unsigned long long) spec->pool_id, spec->pool_name);
3327 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3328 spec->image_name ? spec->image_name : "(unknown)");
3333 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3334 (unsigned long long) spec->snap_id, spec->snap_name);
3339 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3344 return (ssize_t) (bufp - buf);
3347 static ssize_t rbd_image_refresh(struct device *dev,
3348 struct device_attribute *attr,
3352 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3355 ret = rbd_dev_refresh(rbd_dev);
3357 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3359 return ret < 0 ? ret : size;
3362 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3363 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3364 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3365 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3366 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3367 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3368 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3369 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3370 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3371 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3372 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3374 static struct attribute *rbd_attrs[] = {
3375 &dev_attr_size.attr,
3376 &dev_attr_features.attr,
3377 &dev_attr_major.attr,
3378 &dev_attr_client_id.attr,
3379 &dev_attr_pool.attr,
3380 &dev_attr_pool_id.attr,
3381 &dev_attr_name.attr,
3382 &dev_attr_image_id.attr,
3383 &dev_attr_current_snap.attr,
3384 &dev_attr_parent.attr,
3385 &dev_attr_refresh.attr,
3389 static struct attribute_group rbd_attr_group = {
3393 static const struct attribute_group *rbd_attr_groups[] = {
3398 static void rbd_sysfs_dev_release(struct device *dev)
3402 static struct device_type rbd_device_type = {
3404 .groups = rbd_attr_groups,
3405 .release = rbd_sysfs_dev_release,
3408 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3410 kref_get(&spec->kref);
3415 static void rbd_spec_free(struct kref *kref);
3416 static void rbd_spec_put(struct rbd_spec *spec)
3419 kref_put(&spec->kref, rbd_spec_free);
3422 static struct rbd_spec *rbd_spec_alloc(void)
3424 struct rbd_spec *spec;
3426 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3429 kref_init(&spec->kref);
3434 static void rbd_spec_free(struct kref *kref)
3436 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3438 kfree(spec->pool_name);
3439 kfree(spec->image_id);
3440 kfree(spec->image_name);
3441 kfree(spec->snap_name);
3445 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3446 struct rbd_spec *spec)
3448 struct rbd_device *rbd_dev;
3450 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3454 spin_lock_init(&rbd_dev->lock);
3456 INIT_LIST_HEAD(&rbd_dev->node);
3457 init_rwsem(&rbd_dev->header_rwsem);
3459 rbd_dev->spec = spec;
3460 rbd_dev->rbd_client = rbdc;
3462 /* Initialize the layout used for all rbd requests */
3464 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3465 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3466 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3467 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3472 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3474 rbd_put_client(rbd_dev->rbd_client);
3475 rbd_spec_put(rbd_dev->spec);
3480 * Get the size and object order for an image snapshot, or if
3481 * snap_id is CEPH_NOSNAP, gets this information for the base
3484 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3485 u8 *order, u64 *snap_size)
3487 __le64 snapid = cpu_to_le64(snap_id);
3492 } __attribute__ ((packed)) size_buf = { 0 };
3494 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3496 &snapid, sizeof (snapid),
3497 &size_buf, sizeof (size_buf));
3498 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3501 if (ret < sizeof (size_buf))
3505 *order = size_buf.order;
3506 *snap_size = le64_to_cpu(size_buf.size);
3508 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3509 (unsigned long long)snap_id, (unsigned int)*order,
3510 (unsigned long long)*snap_size);
3515 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3517 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3518 &rbd_dev->header.obj_order,
3519 &rbd_dev->header.image_size);
3522 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3528 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3532 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3533 "rbd", "get_object_prefix", NULL, 0,
3534 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3535 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3540 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3541 p + ret, NULL, GFP_NOIO);
3544 if (IS_ERR(rbd_dev->header.object_prefix)) {
3545 ret = PTR_ERR(rbd_dev->header.object_prefix);
3546 rbd_dev->header.object_prefix = NULL;
3548 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3556 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3559 __le64 snapid = cpu_to_le64(snap_id);
3563 } __attribute__ ((packed)) features_buf = { 0 };
3567 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3568 "rbd", "get_features",
3569 &snapid, sizeof (snapid),
3570 &features_buf, sizeof (features_buf));
3571 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3574 if (ret < sizeof (features_buf))
3577 incompat = le64_to_cpu(features_buf.incompat);
3578 if (incompat & ~RBD_FEATURES_SUPPORTED)
3581 *snap_features = le64_to_cpu(features_buf.features);
3583 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3584 (unsigned long long)snap_id,
3585 (unsigned long long)*snap_features,
3586 (unsigned long long)le64_to_cpu(features_buf.incompat));
3591 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3593 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3594 &rbd_dev->header.features);
3597 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3599 struct rbd_spec *parent_spec;
3601 void *reply_buf = NULL;
3609 parent_spec = rbd_spec_alloc();
3613 size = sizeof (__le64) + /* pool_id */
3614 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3615 sizeof (__le64) + /* snap_id */
3616 sizeof (__le64); /* overlap */
3617 reply_buf = kmalloc(size, GFP_KERNEL);
3623 snapid = cpu_to_le64(CEPH_NOSNAP);
3624 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3625 "rbd", "get_parent",
3626 &snapid, sizeof (snapid),
3628 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3633 end = reply_buf + ret;
3635 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3636 if (parent_spec->pool_id == CEPH_NOPOOL)
3637 goto out; /* No parent? No problem. */
3639 /* The ceph file layout needs to fit pool id in 32 bits */
3642 if (parent_spec->pool_id > (u64)U32_MAX) {
3643 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3644 (unsigned long long)parent_spec->pool_id, U32_MAX);
3648 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3649 if (IS_ERR(image_id)) {
3650 ret = PTR_ERR(image_id);
3653 parent_spec->image_id = image_id;
3654 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3655 ceph_decode_64_safe(&p, end, overlap, out_err);
3657 rbd_dev->parent_overlap = overlap;
3658 rbd_dev->parent_spec = parent_spec;
3659 parent_spec = NULL; /* rbd_dev now owns this */
3664 rbd_spec_put(parent_spec);
3669 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3673 __le64 stripe_count;
3674 } __attribute__ ((packed)) striping_info_buf = { 0 };
3675 size_t size = sizeof (striping_info_buf);
3682 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3683 "rbd", "get_stripe_unit_count", NULL, 0,
3684 (char *)&striping_info_buf, size);
3685 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3692 * We don't actually support the "fancy striping" feature
3693 * (STRIPINGV2) yet, but if the striping sizes are the
3694 * defaults the behavior is the same as before. So find
3695 * out, and only fail if the image has non-default values.
3698 obj_size = (u64)1 << rbd_dev->header.obj_order;
3699 p = &striping_info_buf;
3700 stripe_unit = ceph_decode_64(&p);
3701 if (stripe_unit != obj_size) {
3702 rbd_warn(rbd_dev, "unsupported stripe unit "
3703 "(got %llu want %llu)",
3704 stripe_unit, obj_size);
3707 stripe_count = ceph_decode_64(&p);
3708 if (stripe_count != 1) {
3709 rbd_warn(rbd_dev, "unsupported stripe count "
3710 "(got %llu want 1)", stripe_count);
3713 rbd_dev->header.stripe_unit = stripe_unit;
3714 rbd_dev->header.stripe_count = stripe_count;
3719 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3721 size_t image_id_size;
3726 void *reply_buf = NULL;
3728 char *image_name = NULL;
3731 rbd_assert(!rbd_dev->spec->image_name);
3733 len = strlen(rbd_dev->spec->image_id);
3734 image_id_size = sizeof (__le32) + len;
3735 image_id = kmalloc(image_id_size, GFP_KERNEL);
3740 end = image_id + image_id_size;
3741 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3743 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3744 reply_buf = kmalloc(size, GFP_KERNEL);
3748 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3749 "rbd", "dir_get_name",
3750 image_id, image_id_size,
3755 end = reply_buf + ret;
3757 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3758 if (IS_ERR(image_name))
3761 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3769 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3771 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3772 const char *snap_name;
3775 /* Skip over names until we find the one we are looking for */
3777 snap_name = rbd_dev->header.snap_names;
3778 while (which < snapc->num_snaps) {
3779 if (!strcmp(name, snap_name))
3780 return snapc->snaps[which];
3781 snap_name += strlen(snap_name) + 1;
3787 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3789 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3794 for (which = 0; !found && which < snapc->num_snaps; which++) {
3795 const char *snap_name;
3797 snap_id = snapc->snaps[which];
3798 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3799 if (IS_ERR(snap_name))
3801 found = !strcmp(name, snap_name);
3804 return found ? snap_id : CEPH_NOSNAP;
3808 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3809 * no snapshot by that name is found, or if an error occurs.
3811 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3813 if (rbd_dev->image_format == 1)
3814 return rbd_v1_snap_id_by_name(rbd_dev, name);
3816 return rbd_v2_snap_id_by_name(rbd_dev, name);
3820 * When an rbd image has a parent image, it is identified by the
3821 * pool, image, and snapshot ids (not names). This function fills
3822 * in the names for those ids. (It's OK if we can't figure out the
3823 * name for an image id, but the pool and snapshot ids should always
3824 * exist and have names.) All names in an rbd spec are dynamically
3827 * When an image being mapped (not a parent) is probed, we have the
3828 * pool name and pool id, image name and image id, and the snapshot
3829 * name. The only thing we're missing is the snapshot id.
3831 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3833 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3834 struct rbd_spec *spec = rbd_dev->spec;
3835 const char *pool_name;
3836 const char *image_name;
3837 const char *snap_name;
3841 * An image being mapped will have the pool name (etc.), but
3842 * we need to look up the snapshot id.
3844 if (spec->pool_name) {
3845 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3848 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3849 if (snap_id == CEPH_NOSNAP)
3851 spec->snap_id = snap_id;
3853 spec->snap_id = CEPH_NOSNAP;
3859 /* Get the pool name; we have to make our own copy of this */
3861 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3863 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3866 pool_name = kstrdup(pool_name, GFP_KERNEL);
3870 /* Fetch the image name; tolerate failure here */
3872 image_name = rbd_dev_image_name(rbd_dev);
3874 rbd_warn(rbd_dev, "unable to get image name");
3876 /* Look up the snapshot name, and make a copy */
3878 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3884 spec->pool_name = pool_name;
3885 spec->image_name = image_name;
3886 spec->snap_name = snap_name;
3896 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3905 struct ceph_snap_context *snapc;
3909 * We'll need room for the seq value (maximum snapshot id),
3910 * snapshot count, and array of that many snapshot ids.
3911 * For now we have a fixed upper limit on the number we're
3912 * prepared to receive.
3914 size = sizeof (__le64) + sizeof (__le32) +
3915 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3916 reply_buf = kzalloc(size, GFP_KERNEL);
3920 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3921 "rbd", "get_snapcontext", NULL, 0,
3923 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3928 end = reply_buf + ret;
3930 ceph_decode_64_safe(&p, end, seq, out);
3931 ceph_decode_32_safe(&p, end, snap_count, out);
3934 * Make sure the reported number of snapshot ids wouldn't go
3935 * beyond the end of our buffer. But before checking that,
3936 * make sure the computed size of the snapshot context we
3937 * allocate is representable in a size_t.
3939 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3944 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3948 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3954 for (i = 0; i < snap_count; i++)
3955 snapc->snaps[i] = ceph_decode_64(&p);
3957 ceph_put_snap_context(rbd_dev->header.snapc);
3958 rbd_dev->header.snapc = snapc;
3960 dout(" snap context seq = %llu, snap_count = %u\n",
3961 (unsigned long long)seq, (unsigned int)snap_count);
3968 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3979 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3980 reply_buf = kmalloc(size, GFP_KERNEL);
3982 return ERR_PTR(-ENOMEM);
3984 snapid = cpu_to_le64(snap_id);
3985 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3986 "rbd", "get_snapshot_name",
3987 &snapid, sizeof (snapid),
3989 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3991 snap_name = ERR_PTR(ret);
3996 end = reply_buf + ret;
3997 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3998 if (IS_ERR(snap_name))
4001 dout(" snap_id 0x%016llx snap_name = %s\n",
4002 (unsigned long long)snap_id, snap_name);
4009 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4011 bool first_time = rbd_dev->header.object_prefix == NULL;
4014 down_write(&rbd_dev->header_rwsem);
4017 ret = rbd_dev_v2_header_onetime(rbd_dev);
4022 ret = rbd_dev_v2_image_size(rbd_dev);
4025 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4026 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4027 rbd_dev->mapping.size = rbd_dev->header.image_size;
4029 ret = rbd_dev_v2_snap_context(rbd_dev);
4030 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4034 up_write(&rbd_dev->header_rwsem);
4039 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4044 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4046 dev = &rbd_dev->dev;
4047 dev->bus = &rbd_bus_type;
4048 dev->type = &rbd_device_type;
4049 dev->parent = &rbd_root_dev;
4050 dev->release = rbd_dev_device_release;
4051 dev_set_name(dev, "%d", rbd_dev->dev_id);
4052 ret = device_register(dev);
4054 mutex_unlock(&ctl_mutex);
4059 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4061 device_unregister(&rbd_dev->dev);
4064 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4067 * Get a unique rbd identifier for the given new rbd_dev, and add
4068 * the rbd_dev to the global list. The minimum rbd id is 1.
4070 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4072 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4074 spin_lock(&rbd_dev_list_lock);
4075 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4076 spin_unlock(&rbd_dev_list_lock);
4077 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4078 (unsigned long long) rbd_dev->dev_id);
4082 * Remove an rbd_dev from the global list, and record that its
4083 * identifier is no longer in use.
4085 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4087 struct list_head *tmp;
4088 int rbd_id = rbd_dev->dev_id;
4091 rbd_assert(rbd_id > 0);
4093 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4094 (unsigned long long) rbd_dev->dev_id);
4095 spin_lock(&rbd_dev_list_lock);
4096 list_del_init(&rbd_dev->node);
4099 * If the id being "put" is not the current maximum, there
4100 * is nothing special we need to do.
4102 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4103 spin_unlock(&rbd_dev_list_lock);
4108 * We need to update the current maximum id. Search the
4109 * list to find out what it is. We're more likely to find
4110 * the maximum at the end, so search the list backward.
4113 list_for_each_prev(tmp, &rbd_dev_list) {
4114 struct rbd_device *rbd_dev;
4116 rbd_dev = list_entry(tmp, struct rbd_device, node);
4117 if (rbd_dev->dev_id > max_id)
4118 max_id = rbd_dev->dev_id;
4120 spin_unlock(&rbd_dev_list_lock);
4123 * The max id could have been updated by rbd_dev_id_get(), in
4124 * which case it now accurately reflects the new maximum.
4125 * Be careful not to overwrite the maximum value in that
4128 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4129 dout(" max dev id has been reset\n");
4133 * Skips over white space at *buf, and updates *buf to point to the
4134 * first found non-space character (if any). Returns the length of
4135 * the token (string of non-white space characters) found. Note
4136 * that *buf must be terminated with '\0'.
4138 static inline size_t next_token(const char **buf)
4141 * These are the characters that produce nonzero for
4142 * isspace() in the "C" and "POSIX" locales.
4144 const char *spaces = " \f\n\r\t\v";
4146 *buf += strspn(*buf, spaces); /* Find start of token */
4148 return strcspn(*buf, spaces); /* Return token length */
4152 * Finds the next token in *buf, and if the provided token buffer is
4153 * big enough, copies the found token into it. The result, if
4154 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4155 * must be terminated with '\0' on entry.
4157 * Returns the length of the token found (not including the '\0').
4158 * Return value will be 0 if no token is found, and it will be >=
4159 * token_size if the token would not fit.
4161 * The *buf pointer will be updated to point beyond the end of the
4162 * found token. Note that this occurs even if the token buffer is
4163 * too small to hold it.
4165 static inline size_t copy_token(const char **buf,
4171 len = next_token(buf);
4172 if (len < token_size) {
4173 memcpy(token, *buf, len);
4174 *(token + len) = '\0';
4182 * Finds the next token in *buf, dynamically allocates a buffer big
4183 * enough to hold a copy of it, and copies the token into the new
4184 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4185 * that a duplicate buffer is created even for a zero-length token.
4187 * Returns a pointer to the newly-allocated duplicate, or a null
4188 * pointer if memory for the duplicate was not available. If
4189 * the lenp argument is a non-null pointer, the length of the token
4190 * (not including the '\0') is returned in *lenp.
4192 * If successful, the *buf pointer will be updated to point beyond
4193 * the end of the found token.
4195 * Note: uses GFP_KERNEL for allocation.
4197 static inline char *dup_token(const char **buf, size_t *lenp)
4202 len = next_token(buf);
4203 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4206 *(dup + len) = '\0';
4216 * Parse the options provided for an "rbd add" (i.e., rbd image
4217 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4218 * and the data written is passed here via a NUL-terminated buffer.
4219 * Returns 0 if successful or an error code otherwise.
4221 * The information extracted from these options is recorded in
4222 * the other parameters which return dynamically-allocated
4225 * The address of a pointer that will refer to a ceph options
4226 * structure. Caller must release the returned pointer using
4227 * ceph_destroy_options() when it is no longer needed.
4229 * Address of an rbd options pointer. Fully initialized by
4230 * this function; caller must release with kfree().
4232 * Address of an rbd image specification pointer. Fully
4233 * initialized by this function based on parsed options.
4234 * Caller must release with rbd_spec_put().
4236 * The options passed take this form:
4237 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4240 * A comma-separated list of one or more monitor addresses.
4241 * A monitor address is an ip address, optionally followed
4242 * by a port number (separated by a colon).
4243 * I.e.: ip1[:port1][,ip2[:port2]...]
4245 * A comma-separated list of ceph and/or rbd options.
4247 * The name of the rados pool containing the rbd image.
4249 * The name of the image in that pool to map.
4251 * An optional snapshot id. If provided, the mapping will
4252 * present data from the image at the time that snapshot was
4253 * created. The image head is used if no snapshot id is
4254 * provided. Snapshot mappings are always read-only.
4256 static int rbd_add_parse_args(const char *buf,
4257 struct ceph_options **ceph_opts,
4258 struct rbd_options **opts,
4259 struct rbd_spec **rbd_spec)
4263 const char *mon_addrs;
4265 size_t mon_addrs_size;
4266 struct rbd_spec *spec = NULL;
4267 struct rbd_options *rbd_opts = NULL;
4268 struct ceph_options *copts;
4271 /* The first four tokens are required */
4273 len = next_token(&buf);
4275 rbd_warn(NULL, "no monitor address(es) provided");
4279 mon_addrs_size = len + 1;
4283 options = dup_token(&buf, NULL);
4287 rbd_warn(NULL, "no options provided");
4291 spec = rbd_spec_alloc();
4295 spec->pool_name = dup_token(&buf, NULL);
4296 if (!spec->pool_name)
4298 if (!*spec->pool_name) {
4299 rbd_warn(NULL, "no pool name provided");
4303 spec->image_name = dup_token(&buf, NULL);
4304 if (!spec->image_name)
4306 if (!*spec->image_name) {
4307 rbd_warn(NULL, "no image name provided");
4312 * Snapshot name is optional; default is to use "-"
4313 * (indicating the head/no snapshot).
4315 len = next_token(&buf);
4317 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4318 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4319 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4320 ret = -ENAMETOOLONG;
4323 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4326 *(snap_name + len) = '\0';
4327 spec->snap_name = snap_name;
4329 /* Initialize all rbd options to the defaults */
4331 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4335 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4337 copts = ceph_parse_options(options, mon_addrs,
4338 mon_addrs + mon_addrs_size - 1,
4339 parse_rbd_opts_token, rbd_opts);
4340 if (IS_ERR(copts)) {
4341 ret = PTR_ERR(copts);
4362 * An rbd format 2 image has a unique identifier, distinct from the
4363 * name given to it by the user. Internally, that identifier is
4364 * what's used to specify the names of objects related to the image.
4366 * A special "rbd id" object is used to map an rbd image name to its
4367 * id. If that object doesn't exist, then there is no v2 rbd image
4368 * with the supplied name.
4370 * This function will record the given rbd_dev's image_id field if
4371 * it can be determined, and in that case will return 0. If any
4372 * errors occur a negative errno will be returned and the rbd_dev's
4373 * image_id field will be unchanged (and should be NULL).
4375 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4384 * When probing a parent image, the image id is already
4385 * known (and the image name likely is not). There's no
4386 * need to fetch the image id again in this case. We
4387 * do still need to set the image format though.
4389 if (rbd_dev->spec->image_id) {
4390 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4396 * First, see if the format 2 image id file exists, and if
4397 * so, get the image's persistent id from it.
4399 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4400 object_name = kmalloc(size, GFP_NOIO);
4403 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4404 dout("rbd id object name is %s\n", object_name);
4406 /* Response will be an encoded string, which includes a length */
4408 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4409 response = kzalloc(size, GFP_NOIO);
4415 /* If it doesn't exist we'll assume it's a format 1 image */
4417 ret = rbd_obj_method_sync(rbd_dev, object_name,
4418 "rbd", "get_id", NULL, 0,
4419 response, RBD_IMAGE_ID_LEN_MAX);
4420 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4421 if (ret == -ENOENT) {
4422 image_id = kstrdup("", GFP_KERNEL);
4423 ret = image_id ? 0 : -ENOMEM;
4425 rbd_dev->image_format = 1;
4426 } else if (ret > sizeof (__le32)) {
4429 image_id = ceph_extract_encoded_string(&p, p + ret,
4431 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4433 rbd_dev->image_format = 2;
4439 rbd_dev->spec->image_id = image_id;
4440 dout("image_id is %s\n", image_id);
4449 /* Undo whatever state changes are made by v1 or v2 image probe */
4451 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4453 struct rbd_image_header *header;
4455 rbd_dev_remove_parent(rbd_dev);
4456 rbd_spec_put(rbd_dev->parent_spec);
4457 rbd_dev->parent_spec = NULL;
4458 rbd_dev->parent_overlap = 0;
4460 /* Free dynamic fields from the header, then zero it out */
4462 header = &rbd_dev->header;
4463 ceph_put_snap_context(header->snapc);
4464 kfree(header->snap_sizes);
4465 kfree(header->snap_names);
4466 kfree(header->object_prefix);
4467 memset(header, 0, sizeof (*header));
4470 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4474 ret = rbd_dev_v2_object_prefix(rbd_dev);
4479 * Get the and check features for the image. Currently the
4480 * features are assumed to never change.
4482 ret = rbd_dev_v2_features(rbd_dev);
4486 /* If the image supports layering, get the parent info */
4488 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4489 ret = rbd_dev_v2_parent_info(rbd_dev);
4493 * Print a warning if this image has a parent.
4494 * Don't print it if the image now being probed
4495 * is itself a parent. We can tell at this point
4496 * because we won't know its pool name yet (just its
4499 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4500 rbd_warn(rbd_dev, "WARNING: kernel layering "
4501 "is EXPERIMENTAL!");
4504 /* If the image supports fancy striping, get its parameters */
4506 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4507 ret = rbd_dev_v2_striping_info(rbd_dev);
4511 /* No support for crypto and compression type format 2 images */
4515 rbd_dev->parent_overlap = 0;
4516 rbd_spec_put(rbd_dev->parent_spec);
4517 rbd_dev->parent_spec = NULL;
4518 kfree(rbd_dev->header_name);
4519 rbd_dev->header_name = NULL;
4520 kfree(rbd_dev->header.object_prefix);
4521 rbd_dev->header.object_prefix = NULL;
4526 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4528 struct rbd_device *parent = NULL;
4529 struct rbd_spec *parent_spec;
4530 struct rbd_client *rbdc;
4533 if (!rbd_dev->parent_spec)
4536 * We need to pass a reference to the client and the parent
4537 * spec when creating the parent rbd_dev. Images related by
4538 * parent/child relationships always share both.
4540 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4541 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4544 parent = rbd_dev_create(rbdc, parent_spec);
4548 ret = rbd_dev_image_probe(parent, true);
4551 rbd_dev->parent = parent;
4556 rbd_spec_put(rbd_dev->parent_spec);
4557 kfree(rbd_dev->header_name);
4558 rbd_dev_destroy(parent);
4560 rbd_put_client(rbdc);
4561 rbd_spec_put(parent_spec);
4567 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4571 /* generate unique id: find highest unique id, add one */
4572 rbd_dev_id_get(rbd_dev);
4574 /* Fill in the device name, now that we have its id. */
4575 BUILD_BUG_ON(DEV_NAME_LEN
4576 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4577 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4579 /* Get our block major device number. */
4581 ret = register_blkdev(0, rbd_dev->name);
4584 rbd_dev->major = ret;
4586 /* Set up the blkdev mapping. */
4588 ret = rbd_init_disk(rbd_dev);
4590 goto err_out_blkdev;
4592 ret = rbd_dev_mapping_set(rbd_dev);
4595 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4597 ret = rbd_bus_add_dev(rbd_dev);
4599 goto err_out_mapping;
4601 /* Everything's ready. Announce the disk to the world. */
4603 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4604 add_disk(rbd_dev->disk);
4606 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4607 (unsigned long long) rbd_dev->mapping.size);
4612 rbd_dev_mapping_clear(rbd_dev);
4614 rbd_free_disk(rbd_dev);
4616 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4618 rbd_dev_id_put(rbd_dev);
4619 rbd_dev_mapping_clear(rbd_dev);
4624 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4626 struct rbd_spec *spec = rbd_dev->spec;
4629 /* Record the header object name for this rbd image. */
4631 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4633 if (rbd_dev->image_format == 1)
4634 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4636 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4638 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4639 if (!rbd_dev->header_name)
4642 if (rbd_dev->image_format == 1)
4643 sprintf(rbd_dev->header_name, "%s%s",
4644 spec->image_name, RBD_SUFFIX);
4646 sprintf(rbd_dev->header_name, "%s%s",
4647 RBD_HEADER_PREFIX, spec->image_id);
4651 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4655 rbd_dev_unprobe(rbd_dev);
4656 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4658 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4659 kfree(rbd_dev->header_name);
4660 rbd_dev->header_name = NULL;
4661 rbd_dev->image_format = 0;
4662 kfree(rbd_dev->spec->image_id);
4663 rbd_dev->spec->image_id = NULL;
4665 rbd_dev_destroy(rbd_dev);
4669 * Probe for the existence of the header object for the given rbd
4670 * device. For format 2 images this includes determining the image
4673 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4679 * Get the id from the image id object. If it's not a
4680 * format 2 image, we'll get ENOENT back, and we'll assume
4681 * it's a format 1 image.
4683 ret = rbd_dev_image_id(rbd_dev);
4686 rbd_assert(rbd_dev->spec->image_id);
4687 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4689 ret = rbd_dev_header_name(rbd_dev);
4691 goto err_out_format;
4693 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4695 goto out_header_name;
4697 if (rbd_dev->image_format == 1)
4698 ret = rbd_dev_v1_header_info(rbd_dev);
4700 ret = rbd_dev_v2_header_info(rbd_dev);
4704 ret = rbd_dev_spec_update(rbd_dev);
4708 /* If we are mapping a snapshot it must be marked read-only */
4710 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4712 rbd_dev->mapping.read_only = read_only;
4714 ret = rbd_dev_probe_parent(rbd_dev);
4718 dout("discovered format %u image, header name is %s\n",
4719 rbd_dev->image_format, rbd_dev->header_name);
4723 rbd_dev_unprobe(rbd_dev);
4725 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4727 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4729 kfree(rbd_dev->header_name);
4730 rbd_dev->header_name = NULL;
4732 rbd_dev->image_format = 0;
4733 kfree(rbd_dev->spec->image_id);
4734 rbd_dev->spec->image_id = NULL;
4736 dout("probe failed, returning %d\n", ret);
4741 static ssize_t rbd_add(struct bus_type *bus,
4745 struct rbd_device *rbd_dev = NULL;
4746 struct ceph_options *ceph_opts = NULL;
4747 struct rbd_options *rbd_opts = NULL;
4748 struct rbd_spec *spec = NULL;
4749 struct rbd_client *rbdc;
4750 struct ceph_osd_client *osdc;
4754 if (!try_module_get(THIS_MODULE))
4757 /* parse add command */
4758 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4760 goto err_out_module;
4761 read_only = rbd_opts->read_only;
4763 rbd_opts = NULL; /* done with this */
4765 rbdc = rbd_get_client(ceph_opts);
4770 ceph_opts = NULL; /* rbd_dev client now owns this */
4773 osdc = &rbdc->client->osdc;
4774 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4776 goto err_out_client;
4777 spec->pool_id = (u64)rc;
4779 /* The ceph file layout needs to fit pool id in 32 bits */
4781 if (spec->pool_id > (u64)U32_MAX) {
4782 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4783 (unsigned long long)spec->pool_id, U32_MAX);
4785 goto err_out_client;
4788 rbd_dev = rbd_dev_create(rbdc, spec);
4790 goto err_out_client;
4791 rbdc = NULL; /* rbd_dev now owns this */
4792 spec = NULL; /* rbd_dev now owns this */
4794 rc = rbd_dev_image_probe(rbd_dev, read_only);
4796 goto err_out_rbd_dev;
4798 rc = rbd_dev_device_setup(rbd_dev);
4802 rbd_dev_image_release(rbd_dev);
4804 rbd_dev_destroy(rbd_dev);
4806 rbd_put_client(rbdc);
4809 ceph_destroy_options(ceph_opts);
4813 module_put(THIS_MODULE);
4815 dout("Error adding device %s\n", buf);
4820 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4822 struct list_head *tmp;
4823 struct rbd_device *rbd_dev;
4825 spin_lock(&rbd_dev_list_lock);
4826 list_for_each(tmp, &rbd_dev_list) {
4827 rbd_dev = list_entry(tmp, struct rbd_device, node);
4828 if (rbd_dev->dev_id == dev_id) {
4829 spin_unlock(&rbd_dev_list_lock);
4833 spin_unlock(&rbd_dev_list_lock);
4837 static void rbd_dev_device_release(struct device *dev)
4839 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4841 rbd_free_disk(rbd_dev);
4842 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4843 rbd_dev_mapping_clear(rbd_dev);
4844 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4846 rbd_dev_id_put(rbd_dev);
4847 rbd_dev_mapping_clear(rbd_dev);
4850 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4852 while (rbd_dev->parent) {
4853 struct rbd_device *first = rbd_dev;
4854 struct rbd_device *second = first->parent;
4855 struct rbd_device *third;
4858 * Follow to the parent with no grandparent and
4861 while (second && (third = second->parent)) {
4866 rbd_dev_image_release(second);
4867 first->parent = NULL;
4868 first->parent_overlap = 0;
4870 rbd_assert(first->parent_spec);
4871 rbd_spec_put(first->parent_spec);
4872 first->parent_spec = NULL;
4876 static ssize_t rbd_remove(struct bus_type *bus,
4880 struct rbd_device *rbd_dev = NULL;
4885 ret = strict_strtoul(buf, 10, &ul);
4889 /* convert to int; abort if we lost anything in the conversion */
4890 target_id = (int) ul;
4891 if (target_id != ul)
4894 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4896 rbd_dev = __rbd_get_dev(target_id);
4902 spin_lock_irq(&rbd_dev->lock);
4903 if (rbd_dev->open_count)
4906 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4907 spin_unlock_irq(&rbd_dev->lock);
4911 rbd_bus_del_dev(rbd_dev);
4912 rbd_dev_image_release(rbd_dev);
4913 module_put(THIS_MODULE);
4915 mutex_unlock(&ctl_mutex);
4921 * create control files in sysfs
4924 static int rbd_sysfs_init(void)
4928 ret = device_register(&rbd_root_dev);
4932 ret = bus_register(&rbd_bus_type);
4934 device_unregister(&rbd_root_dev);
4939 static void rbd_sysfs_cleanup(void)
4941 bus_unregister(&rbd_bus_type);
4942 device_unregister(&rbd_root_dev);
4945 static int rbd_slab_init(void)
4947 rbd_assert(!rbd_img_request_cache);
4948 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4949 sizeof (struct rbd_img_request),
4950 __alignof__(struct rbd_img_request),
4952 if (!rbd_img_request_cache)
4955 rbd_assert(!rbd_obj_request_cache);
4956 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4957 sizeof (struct rbd_obj_request),
4958 __alignof__(struct rbd_obj_request),
4960 if (!rbd_obj_request_cache)
4963 rbd_assert(!rbd_segment_name_cache);
4964 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4965 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4966 if (rbd_segment_name_cache)
4969 if (rbd_obj_request_cache) {
4970 kmem_cache_destroy(rbd_obj_request_cache);
4971 rbd_obj_request_cache = NULL;
4974 kmem_cache_destroy(rbd_img_request_cache);
4975 rbd_img_request_cache = NULL;
4980 static void rbd_slab_exit(void)
4982 rbd_assert(rbd_segment_name_cache);
4983 kmem_cache_destroy(rbd_segment_name_cache);
4984 rbd_segment_name_cache = NULL;
4986 rbd_assert(rbd_obj_request_cache);
4987 kmem_cache_destroy(rbd_obj_request_cache);
4988 rbd_obj_request_cache = NULL;
4990 rbd_assert(rbd_img_request_cache);
4991 kmem_cache_destroy(rbd_img_request_cache);
4992 rbd_img_request_cache = NULL;
4995 static int __init rbd_init(void)
4999 if (!libceph_compatible(NULL)) {
5000 rbd_warn(NULL, "libceph incompatibility (quitting)");
5004 rc = rbd_slab_init();
5007 rc = rbd_sysfs_init();
5011 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5016 static void __exit rbd_exit(void)
5018 rbd_sysfs_cleanup();
5022 module_init(rbd_init);
5023 module_exit(rbd_exit);
5025 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5026 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5027 MODULE_DESCRIPTION("rados block device");
5029 /* following authorship retained from original osdblk.c */
5030 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5032 MODULE_LICENSE("GPL");