sheepdog: reorganize coroutine flow

Delimit co_recv's lifetime clearly in aio_read_response.

Do a simple qemu_coroutine_enter in aio_read_response, letting
sd_co_writev call sd_write_done.

Handle nr_pending in the same way in sd_co_rw_vector,
sd_write_done and sd_co_flush_to_disk.

Remove sd_co_rw_vector's return value; just leave with no
pending requests.

[Jeff: added missing 'return' back, spotted by Paolo after
       series was applied.]

Signed-off-by: Jeff Cody <jcody@redhat.com>
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 5fde37a..e0985df 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -345,8 +345,6 @@
     enum AIOCBState aiocb_type;
 
     Coroutine *coroutine;
-    void (*aio_done_func)(SheepdogAIOCB *);
-
     int nr_pending;
 
     uint32_t min_affect_data_idx;
@@ -449,14 +447,13 @@
  *
  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
  *    link the requests to the inflight_list in the
- *    BDRVSheepdogState.  The function exits without waiting for
+ *    BDRVSheepdogState.  The function yields while waiting for
  *    receiving the response.
  *
  * 2. We receive the response in aio_read_response, the fd handler to
- *    the sheepdog connection.  If metadata update is needed, we send
- *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  We switch back to sd_co_readv/writev after
- *    all the requests belonging to the AIOCB are finished.
+ *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
+ *    after all the requests belonging to the AIOCB are finished.  If
+ *    needed, sd_co_writev will send another requests for the vdi object.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -491,12 +488,6 @@
     acb->nr_pending--;
 }
 
-static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
-{
-    qemu_coroutine_enter(acb->coroutine);
-    qemu_aio_unref(acb);
-}
-
 static const AIOCBInfo sd_aiocb_info = {
     .aiocb_size     = sizeof(SheepdogAIOCB),
 };
@@ -517,7 +508,6 @@
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
 
-    acb->aio_done_func = NULL;
     acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     acb->nr_pending = 0;
@@ -788,9 +778,6 @@
 
     switch (acb->aiocb_type) {
     case AIOCB_WRITE_UDATA:
-        /* this coroutine context is no longer suitable for co_recv
-         * because we may send data to update vdi objects */
-        s->co_recv = NULL;
         if (!is_data_obj(aio_req->oid)) {
             break;
         }
@@ -838,6 +825,11 @@
         }
     }
 
+    /* No more data for this aio_req (reload_inode below uses its own file
+     * descriptor handler which doesn't use co_recv).
+    */
+    s->co_recv = NULL;
+
     switch (rsp.result) {
     case SD_RES_SUCCESS:
         break;
@@ -855,7 +847,7 @@
             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
         }
         resend_aioreq(s, aio_req);
-        goto out;
+        return;
     default:
         acb->ret = -EIO;
         error_report("%s", sd_strerror(rsp.result));
@@ -868,13 +860,12 @@
          * We've finished all requests which belong to the AIOCB, so
          * we can switch back to sd_co_readv/writev now.
          */
-        acb->aio_done_func(acb);
+        qemu_coroutine_enter(acb->coroutine);
     }
-out:
-    s->co_recv = NULL;
+
     return;
+
 err:
-    s->co_recv = NULL;
     reconnect_to_sdog(opaque);
 }
 
@@ -1973,7 +1964,6 @@
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this switches back to sd_co_readv/writev.
  */
 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
 {
@@ -1986,6 +1976,7 @@
     mx = acb->max_dirty_data_idx;
     if (mn <= mx) {
         /* we need to update the vdi object. */
+        ++acb->nr_pending;
         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
             mn * sizeof(s->inode.data_vdi_id[0]);
         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
@@ -1999,13 +1990,10 @@
                                 data_len, offset, 0, false, 0, offset);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
-
-        acb->aio_done_func = sd_finish_aiocb;
-        acb->aiocb_type = AIOCB_WRITE_UDATA;
-        return;
+        if (--acb->nr_pending) {
+            qemu_coroutine_yield();
+        }
     }
-
-    sd_finish_aiocb(acb);
 }
 
 /* Delete current working VDI on the snapshot chain */
@@ -2117,7 +2105,7 @@
  * Returns 1 when we need to wait a response, 0 when there is no sent
  * request and -errno in error cases.
  */
-static int coroutine_fn sd_co_rw_vector(void *p)
+static void coroutine_fn sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -2138,7 +2126,7 @@
         ret = sd_create_branch(s);
         if (ret) {
             acb->ret = -EIO;
-            goto out;
+            return;
         }
     }
 
@@ -2212,11 +2200,9 @@
         idx++;
         done += len;
     }
-out:
-    if (!--acb->nr_pending) {
-        return acb->ret;
+    if (--acb->nr_pending) {
+        qemu_coroutine_yield();
     }
-    return 1;
 }
 
 static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
@@ -2249,7 +2235,6 @@
     }
 
     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
-    acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
 retry:
@@ -2258,20 +2243,14 @@
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
+    sd_write_done(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
@@ -2283,7 +2262,6 @@
 
     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
     acb->aiocb_type = AIOCB_READ_UDATA;
-    acb->aio_done_func = sd_finish_aiocb;
 
 retry:
     if (check_overlapping_aiocb(s, acb)) {
@@ -2291,25 +2269,20 @@
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
 {
     BDRVSheepdogState *s = bs->opaque;
     SheepdogAIOCB *acb;
+    int ret;
     AIOReq *aio_req;
 
     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
@@ -2318,15 +2291,19 @@
 
     acb = sd_aio_setup(bs, NULL, 0, 0);
     acb->aiocb_type = AIOCB_FLUSH_CACHE;
-    acb->aio_done_func = sd_finish_aiocb;
 
+    acb->nr_pending++;
     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                             0, 0, 0, false, 0, 0);
     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
     add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
 
-    qemu_coroutine_yield();
-    return acb->ret;
+    if (--acb->nr_pending) {
+        qemu_coroutine_yield();
+    }
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2783,7 +2760,6 @@
     acb = sd_aio_setup(bs, &discard_iov, offset >> BDRV_SECTOR_BITS,
                        count >> BDRV_SECTOR_BITS);
     acb->aiocb_type = AIOCB_DISCARD_OBJ;
-    acb->aio_done_func = sd_finish_aiocb;
 
 retry:
     if (check_overlapping_aiocb(s, acb)) {
@@ -2791,20 +2767,13 @@
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static coroutine_fn int64_t