sheepdog: serialize requests to overwrapping area
Current sheepdog driver only serializes create requests in oid
unit. This mechanism isn't enough for handling requests to
overwrapping area spanning multiple oids, so it can result bugs like
below:
https://bugs.launchpad.net/sheepdog-project/+bug/1456421
This patch adds a new serialization mechanism for the problem. The
difference from the old one is:
1. serialize entire aiocb if their targetting areas overwrap
2. serialize all requests (read, write, and discard), not only creates
This patch also removes the old mechanism because the new one can be
an alternative.
Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Stefan Hajnoczi <stefanha@redhat.com>
Cc: Teruaki Ishizaki <ishizaki.teruaki@lab.ntt.co.jp>
Cc: Vasiliy Tolstov <v.tolstov@selfip.ru>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi@lab.ntt.co.jp>
Tested-by: Vasiliy Tolstov <v.tolstov@selfip.ru>
Signed-off-by: Jeff Cody <jcody@redhat.com>
diff --git a/block/sheepdog.c b/block/sheepdog.c
index bd7cbed..9585beb 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -318,6 +318,10 @@
AIOCB_DISCARD_OBJ,
};
+#define AIOCBOverwrapping(x, y) \
+ (!(x->max_affect_data_idx < y->min_affect_data_idx \
+ || y->max_affect_data_idx < x->min_affect_data_idx))
+
struct SheepdogAIOCB {
BlockAIOCB common;
@@ -334,6 +338,11 @@
bool cancelable;
int nr_pending;
+
+ uint32_t min_affect_data_idx;
+ uint32_t max_affect_data_idx;
+
+ QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
};
typedef struct BDRVSheepdogState {
@@ -362,8 +371,10 @@
/* Every aio request must be linked to either of these queues. */
QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
- QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
+
+ CoQueue overwrapping_queue;
+ QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
} BDRVSheepdogState;
static const char * sd_strerror(int err)
@@ -498,13 +509,7 @@
AIOReq *aioreq, *next;
if (sd_acb_cancelable(acb)) {
- /* Remove outstanding requests from pending and failed queues. */
- QLIST_FOREACH_SAFE(aioreq, &s->pending_aio_head, aio_siblings,
- next) {
- if (aioreq->aiocb == acb) {
- free_aio_req(s, aioreq);
- }
- }
+ /* Remove outstanding requests from failed queue. */
QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
next) {
if (aioreq->aiocb == acb) {
@@ -529,6 +534,10 @@
int64_t sector_num, int nb_sectors)
{
SheepdogAIOCB *acb;
+ uint32_t object_size;
+ BDRVSheepdogState *s = bs->opaque;
+
+ object_size = (UINT32_C(1) << s->inode.block_size_shift);
acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
@@ -542,6 +551,11 @@
acb->coroutine = qemu_coroutine_self();
acb->ret = 0;
acb->nr_pending = 0;
+
+ acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
+ acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
+ acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
+
return acb;
}
@@ -703,38 +717,6 @@
static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
static void co_write_request(void *opaque);
-static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
-{
- AIOReq *aio_req;
-
- QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
- if (aio_req->oid == oid) {
- return aio_req;
- }
- }
-
- return NULL;
-}
-
-/*
- * This function searchs pending requests to the object `oid', and
- * sends them.
- */
-static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
-{
- AIOReq *aio_req;
- SheepdogAIOCB *acb;
-
- while ((aio_req = find_pending_req(s, oid)) != NULL) {
- acb = aio_req->aiocb;
- /* move aio_req from pending list to inflight one */
- QLIST_REMOVE(aio_req, aio_siblings);
- QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
- add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
- acb->aiocb_type);
- }
-}
-
static coroutine_fn void reconnect_to_sdog(void *opaque)
{
BDRVSheepdogState *s = opaque;
@@ -840,12 +822,6 @@
s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
}
- /*
- * Some requests may be blocked because simultaneous
- * create requests are not allowed, so we search the
- * pending requests here.
- */
- send_pending_req(s, aio_req->oid);
}
break;
case AIOCB_READ_UDATA:
@@ -1341,30 +1317,6 @@
return ret;
}
-/* Return true if the specified request is linked to the pending list. */
-static bool check_simultaneous_create(BDRVSheepdogState *s, AIOReq *aio_req)
-{
- AIOReq *areq;
- QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
- if (areq != aio_req && areq->oid == aio_req->oid) {
- /*
- * Sheepdog cannot handle simultaneous create requests to the same
- * object, so we cannot send the request until the previous request
- * finishes.
- */
- DPRINTF("simultaneous create to %" PRIx64 "\n", aio_req->oid);
- aio_req->flags = 0;
- aio_req->base_oid = 0;
- aio_req->create = false;
- QLIST_REMOVE(aio_req, aio_siblings);
- QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
- return true;
- }
- }
-
- return false;
-}
-
static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
{
SheepdogAIOCB *acb = aio_req->aiocb;
@@ -1379,10 +1331,6 @@
goto out;
}
- if (check_simultaneous_create(s, aio_req)) {
- return;
- }
-
if (s->inode.data_vdi_id[idx]) {
aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
aio_req->flags |= SD_FLAG_CMD_COW;
@@ -1458,8 +1406,8 @@
filename = qemu_opt_get(opts, "filename");
QLIST_INIT(&s->inflight_aio_head);
- QLIST_INIT(&s->pending_aio_head);
QLIST_INIT(&s->failed_aio_head);
+ QLIST_INIT(&s->inflight_aiocb_head);
s->fd = -1;
memset(vdi, 0, sizeof(vdi));
@@ -1524,6 +1472,7 @@
bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
pstrcpy(s->name, sizeof(s->name), vdi);
qemu_co_mutex_init(&s->lock);
+ qemu_co_queue_init(&s->overwrapping_queue);
qemu_opts_del(opts);
g_free(buf);
return 0;
@@ -2195,12 +2144,6 @@
old_oid, done);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
- if (create) {
- if (check_simultaneous_create(s, aio_req)) {
- goto done;
- }
- }
-
add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
acb->aiocb_type);
done:
@@ -2215,6 +2158,20 @@
return 1;
}
+static bool check_overwrapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
+{
+ SheepdogAIOCB *cb;
+
+ QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
+ if (AIOCBOverwrapping(aiocb, cb)) {
+ return true;
+ }
+ }
+
+ QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
+ return false;
+}
+
static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov)
{
@@ -2234,14 +2191,25 @@
acb->aio_done_func = sd_write_done;
acb->aiocb_type = AIOCB_WRITE_UDATA;
+retry:
+ if (check_overwrapping_aiocb(s, acb)) {
+ qemu_co_queue_wait(&s->overwrapping_queue);
+ goto retry;
+ }
+
ret = sd_co_rw_vector(acb);
if (ret <= 0) {
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb);
return ret;
}
qemu_coroutine_yield();
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
+
return acb->ret;
}
@@ -2250,19 +2218,30 @@
{
SheepdogAIOCB *acb;
int ret;
+ BDRVSheepdogState *s = bs->opaque;
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
acb->aiocb_type = AIOCB_READ_UDATA;
acb->aio_done_func = sd_finish_aiocb;
+retry:
+ if (check_overwrapping_aiocb(s, acb)) {
+ qemu_co_queue_wait(&s->overwrapping_queue);
+ goto retry;
+ }
+
ret = sd_co_rw_vector(acb);
if (ret <= 0) {
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb);
return ret;
}
qemu_coroutine_yield();
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
return acb->ret;
}
@@ -2610,14 +2589,25 @@
acb->aiocb_type = AIOCB_DISCARD_OBJ;
acb->aio_done_func = sd_finish_aiocb;
+retry:
+ if (check_overwrapping_aiocb(s, acb)) {
+ qemu_co_queue_wait(&s->overwrapping_queue);
+ goto retry;
+ }
+
ret = sd_co_rw_vector(acb);
if (ret <= 0) {
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb);
return ret;
}
qemu_coroutine_yield();
+ QLIST_REMOVE(acb, aiocb_siblings);
+ qemu_co_queue_restart_all(&s->overwrapping_queue);
+
return acb->ret;
}