rbd: use the higher level librbd instead of just librados

librbd stacks on top of librados to provide access
to rbd images.

Using librbd simplifies the qemu code, and allows
qemu to use new versions of the rbd format
with few (if any) changes.

Reviewed-by: Christian Brunner <chb@muc.de>
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
diff --git a/block/rbd.c b/block/rbd.c
index 249a590..2cee70d 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -1,20 +1,22 @@
 /*
  * QEMU Block driver for RADOS (Ceph)
  *
- * Copyright (C) 2010 Christian Brunner <chb@muc.de>
+ * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
+ *                         Josh Durgin <josh.durgin@dreamhost.com>
  *
  * This work is licensed under the terms of the GNU GPL, version 2.  See
  * the COPYING file in the top-level directory.
  *
  */
 
+#include <inttypes.h>
+
 #include "qemu-common.h"
 #include "qemu-error.h"
 
-#include "rbd_types.h"
 #include "block_int.h"
 
-#include <rados/librados.h>
+#include <rbd/librbd.h>
 
 
 
@@ -40,6 +42,13 @@
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
+#define RBD_MAX_CONF_NAME_SIZE 128
+#define RBD_MAX_CONF_VAL_SIZE 512
+#define RBD_MAX_CONF_SIZE 1024
+#define RBD_MAX_POOL_NAME_SIZE 128
+#define RBD_MAX_SNAP_NAME_SIZE 128
+#define RBD_MAX_SNAPS 100
+
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
     QEMUBH *bh;
@@ -48,7 +57,6 @@
     char *bounce;
     int write;
     int64_t sector_num;
-    int aiocnt;
     int error;
     struct BDRVRBDState *s;
     int cancelled;
@@ -59,7 +67,7 @@
     RBDAIOCB *acb;
     struct BDRVRBDState *s;
     int done;
-    int64_t segsize;
+    int64_t size;
     char *buf;
     int ret;
 } RADOSCB;
@@ -69,25 +77,22 @@
 
 typedef struct BDRVRBDState {
     int fds[2];
-    rados_pool_t pool;
-    rados_pool_t header_pool;
-    char name[RBD_MAX_OBJ_NAME_SIZE];
-    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
-    uint64_t size;
-    uint64_t objsize;
+    rados_t cluster;
+    rados_ioctx_t io_ctx;
+    rbd_image_t image;
+    char name[RBD_MAX_IMAGE_NAME_SIZE];
     int qemu_aio_count;
+    char *snap;
     int event_reader_pos;
     RADOSCB *event_rcb;
 } BDRVRBDState;
 
-typedef struct rbd_obj_header_ondisk RbdHeader1;
-
 static void rbd_aio_bh_cb(void *opaque);
 
-static int rbd_next_tok(char *dst, int dst_len,
-                        char *src, char delim,
-                        const char *name,
-                        char **p)
+static int qemu_rbd_next_tok(char *dst, int dst_len,
+                             char *src, char delim,
+                             const char *name,
+                             char **p)
 {
     int l;
     char *end;
@@ -115,10 +120,10 @@
     return 0;
 }
 
-static int rbd_parsename(const char *filename,
-                         char *pool, int pool_len,
-                         char *snap, int snap_len,
-                         char *name, int name_len)
+static int qemu_rbd_parsename(const char *filename,
+                              char *pool, int pool_len,
+                              char *snap, int snap_len,
+                              char *name, int name_len)
 {
     const char *start;
     char *p, *buf;
@@ -131,12 +136,12 @@
     buf = qemu_strdup(start);
     p = buf;
 
-    ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
+    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
     if (ret < 0 || !p) {
         ret = -EINVAL;
         goto done;
     }
-    ret = rbd_next_tok(name, name_len, p, '@', "object name", &p);
+    ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
     if (ret < 0) {
         goto done;
     }
@@ -145,123 +150,35 @@
         goto done;
     }
 
-    ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
+    ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
 
 done:
     qemu_free(buf);
     return ret;
 }
 
-static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
-{
-    uint32_t len = strlen(name);
-    uint32_t len_le = cpu_to_le32(len);
-    /* total_len = encoding op + name + empty buffer */
-    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
-    uint8_t *desc = NULL;
-
-    desc = qemu_malloc(total_len);
-
-    *tmap_desc = (char *)desc;
-
-    *desc = op;
-    desc++;
-    memcpy(desc, &len_le, sizeof(len_le));
-    desc += sizeof(len_le);
-    memcpy(desc, name, len);
-    desc += len;
-    len = 0; /* no need for endian conversion for 0 */
-    memcpy(desc, &len, sizeof(len));
-    desc += sizeof(len);
-
-    return (char *)desc - *tmap_desc;
-}
-
-static void free_tmap_op(char *tmap_desc)
-{
-    qemu_free(tmap_desc);
-}
-
-static int rbd_register_image(rados_pool_t pool, const char *name)
-{
-    char *tmap_desc;
-    const char *dir = RBD_DIRECTORY;
-    int ret;
-
-    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
-    if (ret < 0) {
-        return ret;
-    }
-
-    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
-    free_tmap_op(tmap_desc);
-
-    return ret;
-}
-
-static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
-{
-    int r = rados_write(pool, info_oid, 0, NULL, 0);
-    if (r < 0) {
-        return r;
-    }
-    return 0;
-}
-
-static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
-{
-    uint64_t out[1];
-    const char *info_oid = RBD_INFO;
-
-    *id = 0;
-
-    int r = touch_rbd_info(pool, info_oid);
-    if (r < 0) {
-        return r;
-    }
-
-    r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
-                   0, (char *)out, sizeof(out));
-    if (r < 0) {
-        return r;
-    }
-
-    le64_to_cpus(out);
-    *id = out[0];
-
-    return 0;
-}
-
-static int rbd_create(const char *filename, QEMUOptionParameter *options)
+static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
 {
     int64_t bytes = 0;
     int64_t objsize;
-    uint64_t size;
-    time_t mtime;
-    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
-    char pool[RBD_MAX_SEG_NAME_SIZE];
-    char n[RBD_MAX_SEG_NAME_SIZE];
-    char name[RBD_MAX_OBJ_NAME_SIZE];
-    char snap_buf[RBD_MAX_SEG_NAME_SIZE];
+    int obj_order = 0;
+    char pool[RBD_MAX_POOL_NAME_SIZE];
+    char name[RBD_MAX_IMAGE_NAME_SIZE];
+    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
     char *snap = NULL;
-    RbdHeader1 header;
-    rados_pool_t p;
-    uint64_t bid;
-    uint32_t hi, lo;
+    rados_t cluster;
+    rados_ioctx_t io_ctx;
     int ret;
 
-    if (rbd_parsename(filename,
-                      pool, sizeof(pool),
-                      snap_buf, sizeof(snap_buf),
-                      name, sizeof(name)) < 0) {
+    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
+                           snap_buf, sizeof(snap_buf),
+                           name, sizeof(name)) < 0) {
         return -EINVAL;
     }
     if (snap_buf[0] != '\0') {
         snap = snap_buf;
     }
 
-    snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX);
-
     /* Read out options */
     while (options && options->name) {
         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
@@ -277,82 +194,55 @@
                     error_report("obj size too small");
                     return -EINVAL;
                 }
-		obj_order = ffs(objsize) - 1;
+                obj_order = ffs(objsize) - 1;
             }
         }
         options++;
     }
 
-    memset(&header, 0, sizeof(header));
-    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
-    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
-    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
-    header.image_size = cpu_to_le64(bytes);
-    header.options.order = obj_order;
-    header.options.crypt_type = RBD_CRYPT_NONE;
-    header.options.comp_type = RBD_COMP_NONE;
-    header.snap_seq = 0;
-    header.snap_count = 0;
-
-    if (rados_initialize(0, NULL) < 0) {
+    if (rados_create(&cluster, NULL) < 0) {
         error_report("error initializing");
         return -EIO;
     }
 
-    if (rados_open_pool(pool, &p)) {
+    if (rados_conf_read_file(cluster, NULL) < 0) {
+        error_report("error reading config file");
+        rados_shutdown(cluster);
+        return -EIO;
+    }
+
+    if (rados_connect(cluster) < 0) {
+        error_report("error connecting");
+        rados_shutdown(cluster);
+        return -EIO;
+    }
+
+    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
         error_report("error opening pool %s", pool);
-        rados_deinitialize();
+        rados_shutdown(cluster);
         return -EIO;
     }
 
-    /* check for existing rbd header file */
-    ret = rados_stat(p, n, &size, &mtime);
-    if (ret == 0) {
-        ret=-EEXIST;
-        goto done;
-    }
-
-    ret = rbd_assign_bid(p, &bid);
-    if (ret < 0) {
-        error_report("failed assigning block id");
-        rados_deinitialize();
-        return -EIO;
-    }
-    hi = bid >> 32;
-    lo = bid & 0xFFFFFFFF;
-    snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
-
-    /* create header file */
-    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
-    if (ret < 0) {
-        goto done;
-    }
-
-    ret = rbd_register_image(p, name);
-done:
-    rados_close_pool(p);
-    rados_deinitialize();
+    ret = rbd_create(io_ctx, name, bytes, &obj_order);
+    rados_ioctx_destroy(io_ctx);
+    rados_shutdown(cluster);
 
     return ret;
 }
 
 /*
- * This aio completion is being called from rbd_aio_event_reader() and
- * runs in qemu context. It schedules a bh, but just in case the aio
+ * This aio completion is being called from qemu_rbd_aio_event_reader()
+ * and runs in qemu context. It schedules a bh, but just in case the aio
  * was not cancelled before.
  */
-static void rbd_complete_aio(RADOSCB *rcb)
+static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
     int64_t r;
 
-    acb->aiocnt--;
-
     if (acb->cancelled) {
-        if (!acb->aiocnt) {
-            qemu_vfree(acb->bounce);
-            qemu_aio_release(acb);
-        }
+        qemu_vfree(acb->bounce);
+        qemu_aio_release(acb);
         goto done;
     }
 
@@ -363,32 +253,25 @@
             acb->ret = r;
             acb->error = 1;
         } else if (!acb->error) {
-            acb->ret += rcb->segsize;
+            acb->ret = rcb->size;
         }
     } else {
-        if (r == -ENOENT) {
-            memset(rcb->buf, 0, rcb->segsize);
-            if (!acb->error) {
-                acb->ret += rcb->segsize;
-            }
-        } else if (r < 0) {
-	    memset(rcb->buf, 0, rcb->segsize);
+        if (r < 0) {
+            memset(rcb->buf, 0, rcb->size);
             acb->ret = r;
             acb->error = 1;
-        } else if (r < rcb->segsize) {
-            memset(rcb->buf + r, 0, rcb->segsize - r);
+        } else if (r < rcb->size) {
+            memset(rcb->buf + r, 0, rcb->size - r);
             if (!acb->error) {
-                acb->ret += rcb->segsize;
+                acb->ret = rcb->size;
             }
         } else if (!acb->error) {
-            acb->ret += r;
+            acb->ret = r;
         }
     }
     /* Note that acb->bh can be NULL in case where the aio was cancelled */
-    if (!acb->aiocnt) {
-        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-        qemu_bh_schedule(acb->bh);
-    }
+    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
+    qemu_bh_schedule(acb->bh);
 done:
     qemu_free(rcb);
 }
@@ -397,7 +280,7 @@
  * aio fd read handler. It runs in the qemu context and calls the
  * completion handling of completed rados aio operations.
  */
-static void rbd_aio_event_reader(void *opaque)
+static void qemu_rbd_aio_event_reader(void *opaque)
 {
     BDRVRBDState *s = opaque;
 
@@ -413,176 +296,74 @@
                 s->event_reader_pos += ret;
                 if (s->event_reader_pos == sizeof(s->event_rcb)) {
                     s->event_reader_pos = 0;
-                    rbd_complete_aio(s->event_rcb);
-                    s->qemu_aio_count --;
+                    qemu_rbd_complete_aio(s->event_rcb);
+                    s->qemu_aio_count--;
                 }
             }
         }
     } while (ret < 0 && errno == EINTR);
 }
 
-static int rbd_aio_flush_cb(void *opaque)
+static int qemu_rbd_aio_flush_cb(void *opaque)
 {
     BDRVRBDState *s = opaque;
 
     return (s->qemu_aio_count > 0);
 }
 
-
-static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
-{
-    uint32_t snap_count = le32_to_cpu(header->snap_count);
-    rados_snap_t *snaps = NULL;
-    rados_snap_t seq;
-    uint32_t i;
-    uint64_t snap_names_len = le64_to_cpu(header->snap_names_len);
-    int r;
-    rados_snap_t snapid = 0;
-
-    if (snap_count) {
-        const char *header_snap = (const char *)&header->snaps[snap_count];
-        const char *end = header_snap + snap_names_len;
-        snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
-
-        for (i=0; i < snap_count; i++) {
-            snaps[i] = le64_to_cpu(header->snaps[i].id);
-
-            if (snap && strcmp(snap, header_snap) == 0) {
-                snapid = snaps[i];
-            }
-
-            header_snap += strlen(header_snap) + 1;
-            if (header_snap > end) {
-                error_report("bad header, snapshot list broken");
-            }
-        }
-    }
-
-    if (snap && !snapid) {
-        error_report("snapshot not found");
-        qemu_free(snaps);
-        return -ENOENT;
-    }
-    seq = le32_to_cpu(header->snap_seq);
-
-    r = rados_set_snap_context(pool, seq, snaps, snap_count);
-
-    rados_set_snap(pool, snapid);
-
-    qemu_free(snaps);
-
-    return r;
-}
-
-#define BUF_READ_START_LEN    4096
-
-static int rbd_read_header(BDRVRBDState *s, char **hbuf)
-{
-    char *buf = NULL;
-    char n[RBD_MAX_SEG_NAME_SIZE];
-    uint64_t len = BUF_READ_START_LEN;
-    int r;
-
-    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
-
-    buf = qemu_malloc(len);
-
-    r = rados_read(s->header_pool, n, 0, buf, len);
-    if (r < 0) {
-        goto failed;
-    }
-
-    if (r < len) {
-        goto done;
-    }
-
-    qemu_free(buf);
-    buf = qemu_malloc(len);
-
-    r = rados_stat(s->header_pool, n, &len, NULL);
-    if (r < 0) {
-        goto failed;
-    }
-
-    r = rados_read(s->header_pool, n, 0, buf, len);
-    if (r < 0) {
-        goto failed;
-    }
-
-done:
-    *hbuf = buf;
-    return 0;
-
-failed:
-    qemu_free(buf);
-    return r;
-}
-
-static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
+static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
 {
     BDRVRBDState *s = bs->opaque;
-    RbdHeader1 *header;
-    char pool[RBD_MAX_SEG_NAME_SIZE];
-    char snap_buf[RBD_MAX_SEG_NAME_SIZE];
-    char *snap = NULL;
-    char *hbuf = NULL;
+    char pool[RBD_MAX_POOL_NAME_SIZE];
+    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
     int r;
 
-    if (rbd_parsename(filename, pool, sizeof(pool),
-                      snap_buf, sizeof(snap_buf),
-                      s->name, sizeof(s->name)) < 0) {
+    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
+                           snap_buf, sizeof(snap_buf),
+                           s->name, sizeof(s->name)) < 0) {
         return -EINVAL;
     }
+    s->snap = NULL;
     if (snap_buf[0] != '\0') {
-        snap = snap_buf;
+        s->snap = qemu_strdup(snap_buf);
     }
 
-    if ((r = rados_initialize(0, NULL)) < 0) {
+    r = rados_create(&s->cluster, NULL);
+    if (r < 0) {
         error_report("error initializing");
         return r;
     }
 
-    if ((r = rados_open_pool(pool, &s->pool))) {
-        error_report("error opening pool %s", pool);
-        rados_deinitialize();
-        return r;
-    }
-
-    if ((r = rados_open_pool(pool, &s->header_pool))) {
-        error_report("error opening pool %s", pool);
-        rados_deinitialize();
-        return r;
-    }
-
-    if ((r = rbd_read_header(s, &hbuf)) < 0) {
-        error_report("error reading header from %s", s->name);
-        goto failed;
-    }
-
-    if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
-        error_report("Invalid header signature");
-        r = -EMEDIUMTYPE;
-        goto failed;
-    }
-
-    if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
-        error_report("Unknown image version");
-        r = -EMEDIUMTYPE;
-        goto failed;
-    }
-
-    header = (RbdHeader1 *) hbuf;
-    s->size = le64_to_cpu(header->image_size);
-    s->objsize = 1ULL << header->options.order;
-    memcpy(s->block_name, header->block_name, sizeof(header->block_name));
-
-    r = rbd_set_snapc(s->pool, snap, header);
+    r = rados_conf_read_file(s->cluster, NULL);
     if (r < 0) {
-        error_report("failed setting snap context: %s", strerror(-r));
-        goto failed;
+        error_report("error reading config file");
+        rados_shutdown(s->cluster);
+        return r;
     }
 
-    bs->read_only = (snap != NULL);
+    r = rados_connect(s->cluster);
+    if (r < 0) {
+        error_report("error connecting");
+        rados_shutdown(s->cluster);
+        return r;
+    }
+
+    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
+    if (r < 0) {
+        error_report("error opening pool %s", pool);
+        rados_shutdown(s->cluster);
+        return r;
+    }
+
+    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
+    if (r < 0) {
+        error_report("error reading header from %s", s->name);
+        rados_ioctx_destroy(s->io_ctx);
+        rados_shutdown(s->cluster);
+        return r;
+    }
+
+    bs->read_only = (s->snap != NULL);
 
     s->event_reader_pos = 0;
     r = qemu_pipe(s->fds);
@@ -592,23 +373,20 @@
     }
     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL,
-        rbd_aio_flush_cb, NULL, s);
+    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
+                            NULL, qemu_rbd_aio_flush_cb, NULL, s);
 
-    qemu_free(hbuf);
 
     return 0;
 
 failed:
-    qemu_free(hbuf);
-
-    rados_close_pool(s->header_pool);
-    rados_close_pool(s->pool);
-    rados_deinitialize();
+    rbd_close(s->image);
+    rados_ioctx_destroy(s->io_ctx);
+    rados_shutdown(s->cluster);
     return r;
 }
 
-static void rbd_close(BlockDriverState *bs)
+static void qemu_rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
@@ -617,16 +395,17 @@
     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
         NULL);
 
-    rados_close_pool(s->header_pool);
-    rados_close_pool(s->pool);
-    rados_deinitialize();
+    rbd_close(s->image);
+    rados_ioctx_destroy(s->io_ctx);
+    qemu_free(s->snap);
+    rados_shutdown(s->cluster);
 }
 
 /*
  * Cancel aio. Since we don't reference acb in a non qemu threads,
  * it is safe to access it here.
  */
-static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
+static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     acb->cancelled = 1;
@@ -634,39 +413,28 @@
 
 static AIOPool rbd_aio_pool = {
     .aiocb_size = sizeof(RBDAIOCB),
-    .cancel = rbd_aio_cancel,
+    .cancel = qemu_rbd_aio_cancel,
 };
 
-/*
- * This is the callback function for rados_aio_read and _write
- *
- * Note: this function is being called from a non qemu thread so
- * we need to be careful about what we do here. Generally we only
- * write to the block notification pipe, and do the rest of the
- * io completion handling from rbd_aio_event_reader() which
- * runs in a qemu context.
- */
-static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
+static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
 {
-    int ret;
-    rcb->ret = rados_aio_get_return_value(c);
-    rados_aio_release(c);
+    int ret = 0;
     while (1) {
         fd_set wfd;
-        int fd = rcb->s->fds[RBD_FD_WRITE];
+        int fd = s->fds[RBD_FD_WRITE];
 
-        /* send the rcb pointer to the qemu thread that is responsible
-           for the aio completion. Must do it in a qemu thread context */
+        /* send the op pointer to the qemu thread that is responsible
+           for the aio/op completion. Must do it in a qemu thread context */
         ret = write(fd, (void *)&rcb, sizeof(rcb));
         if (ret >= 0) {
             break;
         }
         if (errno == EINTR) {
             continue;
-	}
+        }
         if (errno != EAGAIN) {
             break;
-	}
+        }
 
         FD_ZERO(&wfd);
         FD_SET(fd, &wfd);
@@ -675,13 +443,31 @@
         } while (ret < 0 && errno == EINTR);
     }
 
+    return ret;
+}
+
+/*
+ * This is the callback function for rbd_aio_read and _write
+ *
+ * Note: this function is being called from a non qemu thread so
+ * we need to be careful about what we do here. Generally we only
+ * write to the block notification pipe, and do the rest of the
+ * io completion handling from qemu_rbd_aio_event_reader() which
+ * runs in a qemu context.
+ */
+static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
+{
+    int ret;
+    rcb->ret = rbd_aio_get_return_value(c);
+    rbd_aio_release(c);
+    ret = qemu_rbd_send_pipe(rcb->s, rcb);
     if (ret < 0) {
-        error_report("failed writing to acb->s->fds\n");
+        error_report("failed writing to acb->s->fds");
         qemu_free(rcb);
     }
 }
 
-/* Callback when all queued rados_aio requests are complete */
+/* Callback when all queued rbd_aio requests are complete */
 
 static void rbd_aio_bh_cb(void *opaque)
 {
@@ -707,9 +493,7 @@
 {
     RBDAIOCB *acb;
     RADOSCB *rcb;
-    rados_completion_t c;
-    char n[RBD_MAX_SEG_NAME_SIZE];
-    int64_t segnr, segoffs, segsize, last_segnr;
+    rbd_completion_t c;
     int64_t off, size;
     char *buf;
 
@@ -719,7 +503,6 @@
     acb->write = write;
     acb->qiov = qiov;
     acb->bounce = qemu_blockalign(bs, qiov->size);
-    acb->aiocnt = 0;
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
@@ -734,95 +517,81 @@
 
     off = sector_num * BDRV_SECTOR_SIZE;
     size = nb_sectors * BDRV_SECTOR_SIZE;
-    segnr = off / s->objsize;
-    segoffs = off % s->objsize;
-    segsize = s->objsize - segoffs;
 
-    last_segnr = ((off + size - 1) / s->objsize);
-    acb->aiocnt = (last_segnr - segnr) + 1;
+    s->qemu_aio_count++; /* All the RADOSCB */
 
-    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
+    rcb = qemu_malloc(sizeof(RADOSCB));
+    rcb->done = 0;
+    rcb->acb = acb;
+    rcb->buf = buf;
+    rcb->s = acb->s;
+    rcb->size = size;
 
-    while (size > 0) {
-        if (size < segsize) {
-            segsize = size;
-        }
-
-        snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name,
-                 segnr);
-
-        rcb = qemu_malloc(sizeof(RADOSCB));
-        rcb->done = 0;
-        rcb->acb = acb;
-        rcb->segsize = segsize;
-        rcb->buf = buf;
-        rcb->s = acb->s;
-
-        if (write) {
-            rados_aio_create_completion(rcb, NULL,
-                                        (rados_callback_t) rbd_finish_aiocb,
-                                        &c);
-            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
-        } else {
-            rados_aio_create_completion(rcb,
-                                        (rados_callback_t) rbd_finish_aiocb,
-                                        NULL, &c);
-            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
-        }
-
-        buf += segsize;
-        size -= segsize;
-        segoffs = 0;
-        segsize = s->objsize;
-        segnr++;
+    if (write) {
+        rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
+        rbd_aio_write(s->image, off, size, buf, c);
+    } else {
+        rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
+        rbd_aio_read(s->image, off, size, buf, c);
     }
 
     return &acb->common;
 }
 
-static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
-                                       int64_t sector_num, QEMUIOVector * qiov,
-                                       int nb_sectors,
-                                       BlockDriverCompletionFunc * cb,
-                                       void *opaque)
+static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
+                                            int64_t sector_num,
+                                            QEMUIOVector *qiov,
+                                            int nb_sectors,
+                                            BlockDriverCompletionFunc *cb,
+                                            void *opaque)
 {
     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
 }
 
-static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
-                                        int64_t sector_num, QEMUIOVector * qiov,
-                                        int nb_sectors,
-                                        BlockDriverCompletionFunc * cb,
-                                        void *opaque)
+static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
+                                             int64_t sector_num,
+                                             QEMUIOVector *qiov,
+                                             int nb_sectors,
+                                             BlockDriverCompletionFunc *cb,
+                                             void *opaque)
 {
     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
 }
 
-static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
+static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 {
     BDRVRBDState *s = bs->opaque;
-    bdi->cluster_size = s->objsize;
+    rbd_image_info_t info;
+    int r;
+
+    r = rbd_stat(s->image, &info, sizeof(info));
+    if (r < 0) {
+        return r;
+    }
+
+    bdi->cluster_size = info.obj_size;
     return 0;
 }
 
-static int64_t rbd_getlength(BlockDriverState * bs)
+static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
+    rbd_image_info_t info;
+    int r;
 
-    return s->size;
+    r = rbd_stat(s->image, &info, sizeof(info));
+    if (r < 0) {
+        return r;
+    }
+
+    return info.size;
 }
 
-static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
+static int qemu_rbd_snap_create(BlockDriverState *bs,
+                                QEMUSnapshotInfo *sn_info)
 {
     BDRVRBDState *s = bs->opaque;
-    char inbuf[512], outbuf[128];
-    uint64_t snap_id;
     int r;
-    char *p = inbuf;
-    char *end = inbuf + sizeof(inbuf);
-    char n[RBD_MAX_SEG_NAME_SIZE];
-    char *hbuf = NULL;
-    RbdHeader1 *header;
 
     if (sn_info->name[0] == '\0') {
         return -EINVAL; /* we need a name for rbd snapshots */
@@ -841,185 +610,57 @@
         return -ERANGE;
     }
 
-    r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
+    r = rbd_snap_create(s->image, sn_info->name);
     if (r < 0) {
-        error_report("failed to create snap id: %s", strerror(-r));
+        error_report("failed to create snap: %s", strerror(-r));
         return r;
     }
 
-    *(uint32_t *)p = strlen(sn_info->name);
-    cpu_to_le32s((uint32_t *)p);
-    p += sizeof(uint32_t);
-    strncpy(p, sn_info->name, end - p);
-    p += strlen(p);
-    if (p + sizeof(snap_id) > end) {
-        error_report("invalid input parameter");
-        return -EINVAL;
-    }
-
-    *(uint64_t *)p = snap_id;
-    cpu_to_le64s((uint64_t *)p);
-
-    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
-
-    r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
-                   sizeof(inbuf), outbuf, sizeof(outbuf));
-    if (r < 0) {
-        error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
-        return r;
-    }
-
-    sprintf(sn_info->id_str, "%s", sn_info->name);
-
-    r = rbd_read_header(s, &hbuf);
-    if (r < 0) {
-        error_report("failed reading header: %s", strerror(-r));
-        return r;
-    }
-
-    header = (RbdHeader1 *) hbuf;
-    r = rbd_set_snapc(s->pool, sn_info->name, header);
-    if (r < 0) {
-        error_report("failed setting snap context: %s", strerror(-r));
-        goto failed;
-    }
-
     return 0;
-
-failed:
-    qemu_free(header);
-    return r;
 }
 
-static int decode32(char **p, const char *end, uint32_t *v)
-{
-    if (*p + 4 > end) {
-	return -ERANGE;
-    }
-
-    *v = *(uint32_t *)(*p);
-    le32_to_cpus(v);
-    *p += 4;
-    return 0;
-}
-
-static int decode64(char **p, const char *end, uint64_t *v)
-{
-    if (*p + 8 > end) {
-        return -ERANGE;
-    }
-
-    *v = *(uint64_t *)(*p);
-    le64_to_cpus(v);
-    *p += 8;
-    return 0;
-}
-
-static int decode_str(char **p, const char *end, char **s)
-{
-    uint32_t len;
-    int r;
-
-    if ((r = decode32(p, end, &len)) < 0) {
-        return r;
-    }
-
-    *s = qemu_malloc(len + 1);
-    memcpy(*s, *p, len);
-    *p += len;
-    (*s)[len] = '\0';
-
-    return len;
-}
-
-static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
+static int qemu_rbd_snap_list(BlockDriverState *bs,
+                              QEMUSnapshotInfo **psn_tab)
 {
     BDRVRBDState *s = bs->opaque;
-    char n[RBD_MAX_SEG_NAME_SIZE];
     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
-    RbdHeader1 *header;
-    char *hbuf = NULL;
-    char *outbuf = NULL, *end, *buf;
-    uint64_t len;
-    uint64_t snap_seq;
-    uint32_t snap_count;
-    int r, i;
+    int i, snap_count;
+    rbd_snap_info_t *snaps;
+    int max_snaps = RBD_MAX_SNAPS;
 
-    /* read header to estimate how much space we need to read the snap
-     * list */
-    if ((r = rbd_read_header(s, &hbuf)) < 0) {
-        goto done_err;
-    }
-    header = (RbdHeader1 *)hbuf;
-    len = le64_to_cpu(header->snap_names_len);
-    len += 1024; /* should have already been enough, but new snapshots might
-                    already been created since we read the header. just allocate
-                    a bit more, so that in most cases it'll suffice anyway */
-    qemu_free(hbuf);
-
-    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
-    while (1) {
-        qemu_free(outbuf);
-        outbuf = qemu_malloc(len);
-
-        r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0,
-                       outbuf, len);
-        if (r < 0) {
-            error_report("rbd.snap_list execution failed failed: %s", strerror(-r));
-            goto done_err;
+    do {
+        snaps = qemu_malloc(sizeof(*snaps) * max_snaps);
+        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
+        if (snap_count < 0) {
+            qemu_free(snaps);
         }
-        if (r != len) {
-            break;
-	}
+    } while (snap_count == -ERANGE);
 
-        /* if we're here, we probably raced with some snaps creation */
-        len *= 2;
-    }
-    buf = outbuf;
-    end = buf + len;
-
-    if ((r = decode64(&buf, end, &snap_seq)) < 0) {
-        goto done_err;
-    }
-    if ((r = decode32(&buf, end, &snap_count)) < 0) {
-        goto done_err;
+    if (snap_count <= 0) {
+        return snap_count;
     }
 
     sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
-    for (i = 0; i < snap_count; i++) {
-        uint64_t id, image_size;
-        char *snap_name;
 
-        if ((r = decode64(&buf, end, &id)) < 0) {
-            goto done_err;
-        }
-        if ((r = decode64(&buf, end, &image_size)) < 0) {
-            goto done_err;
-        }
-        if ((r = decode_str(&buf, end, &snap_name)) < 0) {
-            goto done_err;
-        }
+    for (i = 0; i < snap_count; i++) {
+        const char *snap_name = snaps[i].name;
 
         sn_info = sn_tab + i;
         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
-        qemu_free(snap_name);
 
-        sn_info->vm_state_size = image_size;
+        sn_info->vm_state_size = snaps[i].size;
         sn_info->date_sec = 0;
         sn_info->date_nsec = 0;
         sn_info->vm_clock_nsec = 0;
     }
+    rbd_snap_list_end(snaps);
+
     *psn_tab = sn_tab;
-    qemu_free(outbuf);
     return snap_count;
-done_err:
-    qemu_free(sn_tab);
-    qemu_free(outbuf);
-    return r;
 }
 
-static QEMUOptionParameter rbd_create_options[] = {
+static QEMUOptionParameter qemu_rbd_create_options[] = {
     {
      .name = BLOCK_OPT_SIZE,
      .type = OPT_SIZE,
@@ -1036,19 +677,19 @@
 static BlockDriver bdrv_rbd = {
     .format_name        = "rbd",
     .instance_size      = sizeof(BDRVRBDState),
-    .bdrv_file_open     = rbd_open,
-    .bdrv_close         = rbd_close,
-    .bdrv_create        = rbd_create,
-    .bdrv_get_info      = rbd_getinfo,
-    .create_options     = rbd_create_options,
-    .bdrv_getlength     = rbd_getlength,
+    .bdrv_file_open     = qemu_rbd_open,
+    .bdrv_close         = qemu_rbd_close,
+    .bdrv_create        = qemu_rbd_create,
+    .bdrv_get_info      = qemu_rbd_getinfo,
+    .create_options     = qemu_rbd_create_options,
+    .bdrv_getlength     = qemu_rbd_getlength,
     .protocol_name      = "rbd",
 
-    .bdrv_aio_readv     = rbd_aio_readv,
-    .bdrv_aio_writev    = rbd_aio_writev,
+    .bdrv_aio_readv     = qemu_rbd_aio_readv,
+    .bdrv_aio_writev    = qemu_rbd_aio_writev,
 
-    .bdrv_snapshot_create = rbd_snap_create,
-    .bdrv_snapshot_list = rbd_snap_list,
+    .bdrv_snapshot_create = qemu_rbd_snap_create,
+    .bdrv_snapshot_list = qemu_rbd_snap_list,
 };
 
 static void bdrv_rbd_init(void)
diff --git a/block/rbd_types.h b/block/rbd_types.h
deleted file mode 100644
index f4cca99..0000000
--- a/block/rbd_types.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.LIB.
- *
- */
-
-#ifndef CEPH_RBD_TYPES_H
-#define CEPH_RBD_TYPES_H
-
-
-/*
- * rbd image 'foo' consists of objects
- *   foo.rbd      - image metadata
- *   foo.00000000
- *   foo.00000001
- *   ...          - data
- */
-
-#define RBD_SUFFIX              ".rbd"
-#define RBD_DIRECTORY           "rbd_directory"
-#define RBD_INFO                "rbd_info"
-
-#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
-
-#define RBD_MAX_OBJ_NAME_SIZE   96
-#define RBD_MAX_BLOCK_NAME_SIZE 24
-#define RBD_MAX_SEG_NAME_SIZE   128
-
-#define RBD_COMP_NONE           0
-#define RBD_CRYPT_NONE          0
-
-#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
-#define RBD_HEADER_SIGNATURE    "RBD"
-#define RBD_HEADER_VERSION      "001.005"
-
-struct rbd_info {
-    uint64_t max_id;
-} __attribute__ ((packed));
-
-struct rbd_obj_snap_ondisk {
-    uint64_t id;
-    uint64_t image_size;
-} __attribute__((packed));
-
-struct rbd_obj_header_ondisk {
-    char text[40];
-    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
-    char signature[4];
-    char version[8];
-    struct {
-        uint8_t order;
-        uint8_t crypt_type;
-        uint8_t comp_type;
-        uint8_t unused;
-    } __attribute__((packed)) options;
-    uint64_t image_size;
-    uint64_t snap_seq;
-    uint32_t snap_count;
-    uint32_t reserved;
-    uint64_t snap_names_len;
-    struct rbd_obj_snap_ondisk snaps[0];
-} __attribute__((packed));
-
-
-#endif
diff --git a/configure b/configure
index d38b952..bdacb49 100755
--- a/configure
+++ b/configure
@@ -1917,41 +1917,24 @@
 if test "$rbd" != "no" ; then
   cat > $TMPC <<EOF
 #include <stdio.h>
-#include <rados/librados.h>
-int main(void) { rados_initialize(0, NULL); return 0; }
-EOF
-  rbd_libs="-lrados"
-  if compile_prog "" "$rbd_libs" ; then
-    librados_too_old=no
-    cat > $TMPC <<EOF
-#include <stdio.h>
-#include <rados/librados.h>
-#ifndef CEPH_OSD_TMAP_SET
-#error missing CEPH_OSD_TMAP_SET
-#endif
+#include <rbd/librbd.h>
 int main(void) {
-    int (*func)(const rados_pool_t pool, uint64_t *snapid) = rados_selfmanaged_snap_create;
-    rados_initialize(0, NULL);
+    rados_t cluster;
+    rados_create(&cluster, NULL);
     return 0;
 }
 EOF
-    if compile_prog "" "$rbd_libs" ; then
-      rbd=yes
-      libs_tools="$rbd_libs $libs_tools"
-      libs_softmmu="$rbd_libs $libs_softmmu"
-    else
-      rbd=no
-      librados_too_old=yes
-    fi
+  rbd_libs="-lrbd -lrados"
+  if compile_prog "" "$rbd_libs" ; then
+    rbd=yes
+    libs_tools="$rbd_libs $libs_tools"
+    libs_softmmu="$rbd_libs $libs_softmmu"
   else
     if test "$rbd" = "yes" ; then
       feature_not_found "rados block device"
     fi
     rbd=no
   fi
-  if test "$librados_too_old" = "yes" ; then
-    echo "-> Your librados version is too old - upgrade needed to have rbd support"
-  fi
 fi
 
 ##########################################