sheepdog: reorganize coroutine flow
Delimit co_recv's lifetime clearly in aio_read_response.
Do a simple qemu_coroutine_enter in aio_read_response, letting
sd_co_writev call sd_write_done.
Handle nr_pending in the same way in sd_co_rw_vector,
sd_write_done and sd_co_flush_to_disk.
Remove sd_co_rw_vector's return value; just leave with no
pending requests.
[Jeff: added missing 'return' back, spotted by Paolo after
series was applied.]
Signed-off-by: Jeff Cody <jcody@redhat.com>
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 5fde37a..e0985df 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -345,8 +345,6 @@
enum AIOCBState aiocb_type;
Coroutine *coroutine;
- void (*aio_done_func)(SheepdogAIOCB *);
-
int nr_pending;
uint32_t min_affect_data_idx;
@@ -449,14 +447,13 @@
*
* 1. In sd_co_rw_vector, we send the I/O requests to the server and
* link the requests to the inflight_list in the
- * BDRVSheepdogState. The function exits without waiting for
+ * BDRVSheepdogState. The function yields while waiting for
* receiving the response.
*
* 2. We receive the response in aio_read_response, the fd handler to
- * the sheepdog connection. If metadata update is needed, we send
- * the write request to the vdi object in sd_write_done, the write
- * completion function. We switch back to sd_co_readv/writev after
- * all the requests belonging to the AIOCB are finished.
+ * the sheepdog connection. We switch back to sd_co_readv/sd_writev
+ * after all the requests belonging to the AIOCB are finished. If
+ * needed, sd_co_writev will send another requests for the vdi object.
*/
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -491,12 +488,6 @@
acb->nr_pending--;
}
-static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
-{
- qemu_coroutine_enter(acb->coroutine);
- qemu_aio_unref(acb);
-}
-
static const AIOCBInfo sd_aiocb_info = {
.aiocb_size = sizeof(SheepdogAIOCB),
};
@@ -517,7 +508,6 @@
acb->sector_num = sector_num;
acb->nb_sectors = nb_sectors;
- acb->aio_done_func = NULL;
acb->coroutine = qemu_coroutine_self();
acb->ret = 0;
acb->nr_pending = 0;
@@ -788,9 +778,6 @@
switch (acb->aiocb_type) {
case AIOCB_WRITE_UDATA:
- /* this coroutine context is no longer suitable for co_recv
- * because we may send data to update vdi objects */
- s->co_recv = NULL;
if (!is_data_obj(aio_req->oid)) {
break;
}
@@ -838,6 +825,11 @@
}
}
+ /* No more data for this aio_req (reload_inode below uses its own file
+ * descriptor handler which doesn't use co_recv).
+ */
+ s->co_recv = NULL;
+
switch (rsp.result) {
case SD_RES_SUCCESS:
break;
@@ -855,7 +847,7 @@
aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
}
resend_aioreq(s, aio_req);
- goto out;
+ return;
default:
acb->ret = -EIO;
error_report("%s", sd_strerror(rsp.result));
@@ -868,13 +860,12 @@
* We've finished all requests which belong to the AIOCB, so
* we can switch back to sd_co_readv/writev now.
*/
- acb->aio_done_func(acb);
+ qemu_coroutine_enter(acb->coroutine);
}
-out:
- s->co_recv = NULL;
+
return;
+
err:
- s->co_recv = NULL;
reconnect_to_sdog(opaque);
}
@@ -1973,7 +1964,6 @@
/*
* This function is called after writing data objects. If we need to
* update metadata, this sends a write request to the vdi object.
- * Otherwise, this switches back to sd_co_readv/writev.
*/
static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
{
@@ -1986,6 +1976,7 @@
mx = acb->max_dirty_data_idx;
if (mn <= mx) {
/* we need to update the vdi object. */
+ ++acb->nr_pending;
offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
mn * sizeof(s->inode.data_vdi_id[0]);
data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
@@ -1999,13 +1990,10 @@
data_len, offset, 0, false, 0, offset);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
-
- acb->aio_done_func = sd_finish_aiocb;
- acb->aiocb_type = AIOCB_WRITE_UDATA;
- return;
+ if (--acb->nr_pending) {
+ qemu_coroutine_yield();
+ }
}
-
- sd_finish_aiocb(acb);
}
/* Delete current working VDI on the snapshot chain */
@@ -2117,7 +2105,7 @@
* Returns 1 when we need to wait a response, 0 when there is no sent
* request and -errno in error cases.
*/
-static int coroutine_fn sd_co_rw_vector(void *p)
+static void coroutine_fn sd_co_rw_vector(void *p)
{
SheepdogAIOCB *acb = p;
int ret = 0;
@@ -2138,7 +2126,7 @@
ret = sd_create_branch(s);
if (ret) {
acb->ret = -EIO;
- goto out;
+ return;
}
}
@@ -2212,11 +2200,9 @@
idx++;
done += len;
}
-out:
- if (!--acb->nr_pending) {
- return acb->ret;
+ if (--acb->nr_pending) {
+ qemu_coroutine_yield();
}
- return 1;
}
static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
@@ -2249,7 +2235,6 @@
}
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
- acb->aio_done_func = sd_write_done;
acb->aiocb_type = AIOCB_WRITE_UDATA;
retry:
@@ -2258,20 +2243,14 @@
goto retry;
}
- ret = sd_co_rw_vector(acb);
- if (ret <= 0) {
- QLIST_REMOVE(acb, aiocb_siblings);
- qemu_co_queue_restart_all(&s->overlapping_queue);
- qemu_aio_unref(acb);
- return ret;
- }
-
- qemu_coroutine_yield();
+ sd_co_rw_vector(acb);
+ sd_write_done(acb);
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overlapping_queue);
-
- return acb->ret;
+ ret = acb->ret;
+ qemu_aio_unref(acb);
+ return ret;
}
static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
@@ -2283,7 +2262,6 @@
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
acb->aiocb_type = AIOCB_READ_UDATA;
- acb->aio_done_func = sd_finish_aiocb;
retry:
if (check_overlapping_aiocb(s, acb)) {
@@ -2291,25 +2269,20 @@
goto retry;
}
- ret = sd_co_rw_vector(acb);
- if (ret <= 0) {
- QLIST_REMOVE(acb, aiocb_siblings);
- qemu_co_queue_restart_all(&s->overlapping_queue);
- qemu_aio_unref(acb);
- return ret;
- }
-
- qemu_coroutine_yield();
+ sd_co_rw_vector(acb);
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overlapping_queue);
- return acb->ret;
+ ret = acb->ret;
+ qemu_aio_unref(acb);
+ return ret;
}
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
{
BDRVSheepdogState *s = bs->opaque;
SheepdogAIOCB *acb;
+ int ret;
AIOReq *aio_req;
if (s->cache_flags != SD_FLAG_CMD_CACHE) {
@@ -2318,15 +2291,19 @@
acb = sd_aio_setup(bs, NULL, 0, 0);
acb->aiocb_type = AIOCB_FLUSH_CACHE;
- acb->aio_done_func = sd_finish_aiocb;
+ acb->nr_pending++;
aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
0, 0, 0, false, 0, 0);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
- qemu_coroutine_yield();
- return acb->ret;
+ if (--acb->nr_pending) {
+ qemu_coroutine_yield();
+ }
+ ret = acb->ret;
+ qemu_aio_unref(acb);
+ return ret;
}
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2783,7 +2760,6 @@
acb = sd_aio_setup(bs, &discard_iov, offset >> BDRV_SECTOR_BITS,
count >> BDRV_SECTOR_BITS);
acb->aiocb_type = AIOCB_DISCARD_OBJ;
- acb->aio_done_func = sd_finish_aiocb;
retry:
if (check_overlapping_aiocb(s, acb)) {
@@ -2791,20 +2767,13 @@
goto retry;
}
- ret = sd_co_rw_vector(acb);
- if (ret <= 0) {
- QLIST_REMOVE(acb, aiocb_siblings);
- qemu_co_queue_restart_all(&s->overlapping_queue);
- qemu_aio_unref(acb);
- return ret;
- }
-
- qemu_coroutine_yield();
+ sd_co_rw_vector(acb);
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overlapping_queue);
-
- return acb->ret;
+ ret = acb->ret;
+ qemu_aio_unref(acb);
+ return ret;
}
static coroutine_fn int64_t