Merge tag 'pull-nbd-2022-04-26' of https://repo.or.cz/qemu/ericb into staging

nbd patches for 2022-04-26

- Paolo Bonzini: thread-safety improvements to NBD client
- Vladimir Sementsov-Ogievsky: finer-grained selection of bitmaps during
  nbd-export

# -----BEGIN PGP SIGNATURE-----
#
# iQEzBAABCAAdFiEEccLMIrHEYCkn0vOqp6FrSiUnQ2oFAmJoUhAACgkQp6FrSiUn
# Q2qnpgf/YCuONdwAndjEo7he5c1BfB/F2sujQJJ00CebUqnz5OFKQ85RwLC8DCGB
# rXnxqC/NF4yyYM+6uYWDpggDd0bJVKbfG7NE/AZsEZrK+n9xMkvGLRwGqMugUii+
# Px4Ba98++0giqGoAI8pU/wQZNh1I6uGabv/DPRTpwzBjbfAcATqV09OzaGiK3SRC
# Zm/55zmXm1zM4XSUtUzN1gILPG09P+51m6NVkANZbzps9e2PtfFy8EsWc5+YhuBM
# 5K7sN+5g8GpRhz6j8RkrhrbNpvg3bGvgRJRMcW7Bo8KVUdvT1Jng6xs8CIRv39AF
# jDJwGe+cq5p5PNuyqOrVSA/ynBZxBw==
# =I1yM
# -----END PGP SIGNATURE-----
# gpg: Signature made Tue 26 Apr 2022 01:12:00 PM PDT
# gpg:                using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]

* tag 'pull-nbd-2022-04-26' of https://repo.or.cz/qemu/ericb:
  nbd: document what is protected by the CoMutexes
  nbd: take receive_mutex when reading requests[].receiving
  nbd: move s->state under requests_lock
  nbd: code motion and function renaming
  nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
  nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection
  nbd: remove peppering of nbd_client_connected
  nbd: mark more coroutine_fns
  nbd: safeguard against waking up invalid coroutine
  iotests/223: check new possibility of exporting bitmaps by node/name
  qapi: nbd-export: allow select bitmaps by node/name pair
  qapi: rename BlockDirtyBitmapMergeSource to BlockDirtyBitmapOrStr

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
diff --git a/block/coroutines.h b/block/coroutines.h
index b293e94..8ea70d4 100644
--- a/block/coroutines.h
+++ b/block/coroutines.h
@@ -59,7 +59,8 @@
                                         QEMUIOVector *qiov, int64_t pos);
 
 int coroutine_fn
-nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
+nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking,
+                               Error **errp);
 
 
 int coroutine_fn
@@ -109,7 +110,7 @@
                                BlockDriverState **file,
                                int *depth);
 int generated_co_wrapper
-nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
+nbd_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
 
 int generated_co_wrapper
 blk_do_preadv(BlockBackend *blk, int64_t offset, int64_t bytes,
diff --git a/block/monitor/bitmap-qmp-cmds.c b/block/monitor/bitmap-qmp-cmds.c
index 8e35616..2b677c4 100644
--- a/block/monitor/bitmap-qmp-cmds.c
+++ b/block/monitor/bitmap-qmp-cmds.c
@@ -257,12 +257,12 @@
 }
 
 BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
-                                          BlockDirtyBitmapMergeSourceList *bms,
+                                          BlockDirtyBitmapOrStrList *bms,
                                           HBitmap **backup, Error **errp)
 {
     BlockDriverState *bs;
     BdrvDirtyBitmap *dst, *src, *anon;
-    BlockDirtyBitmapMergeSourceList *lst;
+    BlockDirtyBitmapOrStrList *lst;
 
     GLOBAL_STATE_CODE();
 
@@ -317,7 +317,7 @@
 }
 
 void qmp_block_dirty_bitmap_merge(const char *node, const char *target,
-                                  BlockDirtyBitmapMergeSourceList *bitmaps,
+                                  BlockDirtyBitmapOrStrList *bitmaps,
                                   Error **errp)
 {
     block_dirty_bitmap_merge(node, target, bitmaps, NULL, errp);
diff --git a/block/nbd.c b/block/nbd.c
index 567872a..6085ab1 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -35,7 +35,6 @@
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
-#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
@@ -58,7 +57,6 @@
     Coroutine *coroutine;
     uint64_t offset;        /* original offset of the request */
     bool receiving;         /* sleeping in the yield in nbd_receive_replies */
-    bool reply_possible;    /* reply header not yet received */
 } NBDClientRequest;
 
 typedef enum NBDClientState {
@@ -72,18 +70,29 @@
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
-    CoQueue free_sema;
-
-    CoMutex receive_mutex;
-    int in_flight;
+    /*
+     * Protects state, free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
     NBDClientState state;
-
+    CoQueue free_sema;
+    int in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
     QEMUTimer *reconnect_delay_timer;
+
+    /* Protects sending data on the socket.  */
+    CoMutex send_mutex;
+
+    /*
+     * Protects receiving reply headers from the socket, as well as the
+     * fields reply and requests[].receiving
+     */
+    CoMutex receive_mutex;
+    NBDReply reply;
+
     QEMUTimer *open_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
-    NBDReply reply;
     BlockDriverState *bs;
 
     /* Connection parameters */
@@ -128,12 +137,8 @@
     s->x_dirty_bitmap = NULL;
 }
 
-static bool nbd_client_connected(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
-}
-
-static bool nbd_recv_coroutine_wake_one(NBDClientRequest *req)
+/* Called with s->receive_mutex taken.  */
+static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
         req->receiving = false;
@@ -144,33 +149,39 @@
     return false;
 }
 
-static void nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 {
     int i;
 
+    QEMU_LOCK_GUARD(&s->receive_mutex);
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
+        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
             return;
         }
     }
 }
 
-static void nbd_channel_error(BDRVNBDState *s, int ret)
+/* Called with s->requests_lock held.  */
+static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
 {
-    if (nbd_client_connected(s)) {
+    if (s->state == NBD_CLIENT_CONNECTED) {
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
     }
 
     if (ret == -EIO) {
-        if (nbd_client_connected(s)) {
+        if (s->state == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
         s->state = NBD_CLIENT_QUIT;
     }
+}
 
-    nbd_recv_coroutines_wake(s, true);
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
+{
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    nbd_channel_error_locked(s, ret);
 }
 
 static void reconnect_delay_timer_del(BDRVNBDState *s)
@@ -185,23 +196,18 @@
 {
     BDRVNBDState *s = opaque;
 
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        nbd_co_establish_connection_cancel(s->conn);
-        while (qemu_co_enter_next(&s->free_sema, NULL)) {
-            /* Resume all queued requests */
-        }
-    }
-
     reconnect_delay_timer_del(s);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+            return;
+        }
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+    }
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 {
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
-        return;
-    }
-
     assert(!s->reconnect_delay_timer);
     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
                                              QEMU_CLOCK_REALTIME,
@@ -224,7 +230,9 @@
         s->ioc = NULL;
     }
 
-    s->state = NBD_CLIENT_QUIT;
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_QUIT;
+    }
 }
 
 static void open_timer_del(BDRVNBDState *s)
@@ -253,16 +261,13 @@
     timer_mod(s->open_timer, expire_time_ns);
 }
 
-static bool nbd_client_connecting(BDRVNBDState *s)
+static bool nbd_client_will_reconnect(BDRVNBDState *s)
 {
-    NBDClientState state = qatomic_load_acquire(&s->state);
-    return state == NBD_CLIENT_CONNECTING_WAIT ||
-        state == NBD_CLIENT_CONNECTING_NOWAIT;
-}
-
-static bool nbd_client_connecting_wait(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+    /*
+     * Called only after a socket error, so this is not performance sensitive.
+     */
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
 
 /*
@@ -311,11 +316,10 @@
 }
 
 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
-                                                Error **errp)
+                                                bool blocking, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
-    bool blocking = nbd_client_connecting_wait(s);
     IO_CODE();
 
     assert(!s->ioc);
@@ -350,34 +354,42 @@
     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 
     /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
-    qemu_co_queue_restart_all(&s->free_sema);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_CONNECTED;
+    }
 
     return 0;
 }
 
-/* called under s->send_mutex */
+/* Called with s->requests_lock held.  */
+static bool nbd_client_connecting(BDRVNBDState *s)
+{
+    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
+
+/* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
-    assert(nbd_client_connecting(s));
-    assert(s->in_flight == 0);
-
-    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
-        !s->reconnect_delay_timer)
-    {
-        /*
-         * It's first reconnect attempt after switching to
-         * NBD_CLIENT_CONNECTING_WAIT
-         */
-        reconnect_delay_timer_init(s,
-            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
-            s->reconnect_delay * NANOSECONDS_PER_SECOND);
-    }
+    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
 
     /*
      * Now we are sure that nobody is accessing the channel, and no one will
      * try until we set the state to CONNECTED.
      */
+    assert(nbd_client_connecting(s));
+    assert(s->in_flight == 1);
+
+    if (blocking && !s->reconnect_delay_timer) {
+        /*
+         * It's the first reconnect attempt after switching to
+         * NBD_CLIENT_CONNECTING_WAIT
+         */
+        g_assert(s->reconnect_delay);
+        reconnect_delay_timer_init(s,
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+            s->reconnect_delay * NANOSECONDS_PER_SECOND);
+    }
 
     /* Finalize previous connection if any */
     if (s->ioc) {
@@ -388,7 +400,9 @@
         s->ioc = NULL;
     }
 
-    nbd_co_do_establish_connection(s->bs, NULL);
+    qemu_mutex_unlock(&s->requests_lock);
+    nbd_co_do_establish_connection(s->bs, blocking, NULL);
+    qemu_mutex_lock(&s->requests_lock);
 
     /*
      * The reconnect attempt is done (maybe successfully, maybe not), so
@@ -410,10 +424,6 @@
             return 0;
         }
 
-        if (!nbd_client_connected(s)) {
-            return -EIO;
-        }
-
         if (s->reply.handle != 0) {
             /*
              * Some other request is being handled now. It should already be
@@ -428,11 +438,10 @@
 
             qemu_coroutine_yield();
             /*
-             * We may be woken for 3 reasons:
+             * We may be woken for 2 reasons:
              * 1. From this function, executing in parallel coroutine, when our
              *    handle is received.
-             * 2. From nbd_channel_error(), when connection is lost.
-             * 3. From nbd_co_receive_one_chunk(), when previous request is
+             * 2. From nbd_co_receive_one_chunk(), when previous request is
              *    finished and s->reply.handle set to 0.
              * Anyway, it's OK to lock the mutex and go to the next iteration.
              */
@@ -454,44 +463,43 @@
             nbd_channel_error(s, -EINVAL);
             return -EINVAL;
         }
+        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+            nbd_channel_error(s, -EINVAL);
+            return -EINVAL;
+        }
         if (s->reply.handle == handle) {
             /* We are done */
             return 0;
         }
-        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
-        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].reply_possible) {
-            nbd_channel_error(s, -EINVAL);
-            return -EINVAL;
-        }
         nbd_recv_coroutine_wake_one(&s->requests[ind2]);
     }
 }
 
-static int nbd_co_send_request(BlockDriverState *bs,
-                               NBDRequest *request,
-                               QEMUIOVector *qiov)
+static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
+                                            NBDRequest *request,
+                                            QEMUIOVector *qiov)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
+    qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
-           (!nbd_client_connected(s) && s->in_flight > 0))
-    {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
-    }
-
-    if (nbd_client_connecting(s)) {
-        nbd_reconnect_attempt(s);
-    }
-
-    if (!nbd_client_connected(s)) {
-        rc = -EIO;
-        goto err;
+           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
+    if (s->state != NBD_CLIENT_CONNECTED) {
+        if (nbd_client_connecting(s)) {
+            nbd_reconnect_attempt(s);
+            qemu_co_queue_restart_all(&s->free_sema);
+        }
+        if (s->state != NBD_CLIENT_CONNECTED) {
+            rc = -EIO;
+            goto err;
+        }
+    }
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
         if (s->requests[i].coroutine == NULL) {
@@ -499,14 +507,13 @@
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
-    s->requests[i].reply_possible = true;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -514,7 +521,7 @@
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (nbd_client_connected(s) && rc >= 0) {
+        if (rc >= 0) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -526,17 +533,19 @@
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
-        nbd_channel_error(s, rc);
+        qemu_mutex_lock(&s->requests_lock);
+err:
+        nbd_channel_error_locked(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
-            s->in_flight--;
         }
+        s->in_flight--;
         qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -723,9 +732,9 @@
     return 0;
 }
 
-static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
-                                              uint64_t orig_offset,
-                                              QEMUIOVector *qiov, Error **errp)
+static int coroutine_fn
+nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
+                                   QEMUIOVector *qiov, Error **errp)
 {
     QEMUIOVector sub_qiov;
     uint64_t offset;
@@ -831,8 +840,8 @@
     }
     *request_ret = 0;
 
-    nbd_receive_replies(s, handle);
-    if (!nbd_client_connected(s)) {
+    ret = nbd_receive_replies(s, handle);
+    if (ret < 0) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -924,7 +933,7 @@
     }
     s->reply.handle = 0;
 
-    nbd_recv_coroutines_wake(s, false);
+    nbd_recv_coroutines_wake(s);
 
     return ret;
 }
@@ -984,11 +993,6 @@
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (!nbd_client_connected(s)) {
-        error_setg(&local_err, "Connection closed");
-        nbd_iter_channel_error(iter, -EIO, &local_err);
-        goto break_loop;
-    }
 
     if (iter->done) {
         /* Previous iteration was last. */
@@ -1009,7 +1013,7 @@
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
+    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
         goto break_loop;
     }
 
@@ -1031,18 +1035,17 @@
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
     qemu_co_queue_next(&s->free_sema);
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
 
-static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
-                                      int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
+                                                   int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
 
@@ -1055,9 +1058,9 @@
     return iter.ret;
 }
 
-static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
-                                        uint64_t offset, QEMUIOVector *qiov,
-                                        int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
+                                                     uint64_t offset, QEMUIOVector *qiov,
+                                                     int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1107,10 +1110,10 @@
     return iter.ret;
 }
 
-static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
-                                            uint64_t handle, uint64_t length,
-                                            NBDExtent *extent,
-                                            int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
+                                                         uint64_t handle, uint64_t length,
+                                                         NBDExtent *extent,
+                                                         int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1167,8 +1170,8 @@
     return iter.ret;
 }
 
-static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
-                          QEMUIOVector *write_qiov)
+static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request,
+                                       QEMUIOVector *write_qiov)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1199,14 +1202,14 @@
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
-                                int64_t bytes, QEMUIOVector *qiov,
-                                BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
+                                             int64_t bytes, QEMUIOVector *qiov,
+                                             BdrvRequestFlags flags)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1258,14 +1261,14 @@
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
-                                 int64_t bytes, QEMUIOVector *qiov,
-                                 BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
+                                              int64_t bytes, QEMUIOVector *qiov,
+                                              BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1288,8 +1291,8 @@
     return nbd_co_request(bs, &request, qiov);
 }
 
-static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
-                                       int64_t bytes, BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+                                                    int64_t bytes, BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1323,7 +1326,7 @@
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_client_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = { .type = NBD_CMD_FLUSH };
@@ -1338,8 +1341,8 @@
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
-                                  int64_t bytes)
+static int coroutine_fn nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
+                                               int64_t bytes)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1416,7 +1419,7 @@
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     if (ret < 0 || request_ret < 0) {
         return ret ? ret : request_ret;
@@ -1448,8 +1451,9 @@
     BlockDriverState *bs = opaque;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+    QEMU_LOCK_GUARD(&s->requests_lock);
     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    s->state = NBD_CLIENT_QUIT;
 }
 
 static void nbd_client_close(BlockDriverState *bs)
@@ -1869,8 +1873,9 @@
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
     qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -1893,7 +1898,7 @@
     }
 
     s->state = NBD_CLIENT_CONNECTING_WAIT;
-    ret = nbd_do_establish_connection(bs, errp);
+    ret = nbd_do_establish_connection(bs, true, errp);
     if (ret < 0) {
         goto fail;
     }
@@ -1915,7 +1920,7 @@
     return ret;
 }
 
-static int nbd_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_co_flush(BlockDriverState *bs)
 {
     return nbd_client_co_flush(bs);
 }
@@ -2057,10 +2062,11 @@
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
     }
+    qemu_mutex_unlock(&s->requests_lock);
 
     nbd_co_establish_connection_cancel(s->conn);
 }
diff --git a/blockdev-nbd.c b/blockdev-nbd.c
index 9840d25..7f6531c 100644
--- a/blockdev-nbd.c
+++ b/blockdev-nbd.c
@@ -211,8 +211,14 @@
     QAPI_CLONE_MEMBERS(BlockExportOptionsNbdBase, &export_opts->u.nbd,
                        qapi_NbdServerAddOptions_base(arg));
     if (arg->has_bitmap) {
+        BlockDirtyBitmapOrStr *el = g_new(BlockDirtyBitmapOrStr, 1);
+
+        *el = (BlockDirtyBitmapOrStr) {
+            .type = QTYPE_QSTRING,
+            .u.local = g_strdup(arg->bitmap),
+        };
         export_opts->u.nbd.has_bitmaps = true;
-        QAPI_LIST_PREPEND(export_opts->u.nbd.bitmaps, g_strdup(arg->bitmap));
+        QAPI_LIST_PREPEND(export_opts->u.nbd.bitmaps, el);
     }
 
     /*
diff --git a/include/block/block_int-global-state.h b/include/block/block_int-global-state.h
index 0f21b05..8b2e95f 100644
--- a/include/block/block_int-global-state.h
+++ b/include/block/block_int-global-state.h
@@ -262,7 +262,7 @@
                                            BlockDriverState **pbs,
                                            Error **errp);
 BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
-                                          BlockDirtyBitmapMergeSourceList *bms,
+                                          BlockDirtyBitmapOrStrList *bms,
                                           HBitmap **backup, Error **errp);
 BdrvDirtyBitmap *block_dirty_bitmap_remove(const char *node, const char *name,
                                            bool release,
diff --git a/nbd/server.c b/nbd/server.c
index c5644fd..4cdbc06 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -1643,7 +1643,7 @@
     uint64_t perm, shared_perm;
     bool readonly = !exp_args->writable;
     bool shared = !exp_args->writable;
-    strList *bitmaps;
+    BlockDirtyBitmapOrStrList *bitmaps;
     size_t i;
     int ret;
 
@@ -1709,40 +1709,59 @@
     }
     exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
     for (i = 0, bitmaps = arg->bitmaps; bitmaps;
-         i++, bitmaps = bitmaps->next) {
-        const char *bitmap = bitmaps->value;
+         i++, bitmaps = bitmaps->next)
+    {
+        const char *bitmap;
         BlockDriverState *bs = blk_bs(blk);
         BdrvDirtyBitmap *bm = NULL;
 
-        while (bs) {
-            bm = bdrv_find_dirty_bitmap(bs, bitmap);
-            if (bm != NULL) {
-                break;
+        switch (bitmaps->value->type) {
+        case QTYPE_QSTRING:
+            bitmap = bitmaps->value->u.local;
+            while (bs) {
+                bm = bdrv_find_dirty_bitmap(bs, bitmap);
+                if (bm != NULL) {
+                    break;
+                }
+
+                bs = bdrv_filter_or_cow_bs(bs);
             }
 
-            bs = bdrv_filter_or_cow_bs(bs);
+            if (bm == NULL) {
+                ret = -ENOENT;
+                error_setg(errp, "Bitmap '%s' is not found",
+                           bitmaps->value->u.local);
+                goto fail;
+            }
+
+            if (readonly && bdrv_is_writable(bs) &&
+                bdrv_dirty_bitmap_enabled(bm)) {
+                ret = -EINVAL;
+                error_setg(errp, "Enabled bitmap '%s' incompatible with "
+                           "readonly export", bitmap);
+                goto fail;
+            }
+            break;
+        case QTYPE_QDICT:
+            bitmap = bitmaps->value->u.external.name;
+            bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
+                                           bitmap, NULL, errp);
+            if (!bm) {
+                ret = -ENOENT;
+                goto fail;
+            }
+            break;
+        default:
+            abort();
         }
 
-        if (bm == NULL) {
-            ret = -ENOENT;
-            error_setg(errp, "Bitmap '%s' is not found", bitmap);
-            goto fail;
-        }
+        assert(bm);
 
         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
             ret = -EINVAL;
             goto fail;
         }
 
-        if (readonly && bdrv_is_writable(bs) &&
-            bdrv_dirty_bitmap_enabled(bm)) {
-            ret = -EINVAL;
-            error_setg(errp,
-                       "Enabled bitmap '%s' incompatible with readonly export",
-                       bitmap);
-            goto fail;
-        }
-
         exp->export_bitmaps[i] = bm;
         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
     }
diff --git a/qapi/block-core.json b/qapi/block-core.json
index beeb919..b66494e 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -2079,7 +2079,7 @@
             '*persistent': 'bool', '*disabled': 'bool' } }
 
 ##
-# @BlockDirtyBitmapMergeSource:
+# @BlockDirtyBitmapOrStr:
 #
 # @local: name of the bitmap, attached to the same node as target bitmap.
 #
@@ -2087,7 +2087,7 @@
 #
 # Since: 4.1
 ##
-{ 'alternate': 'BlockDirtyBitmapMergeSource',
+{ 'alternate': 'BlockDirtyBitmapOrStr',
   'data': { 'local': 'str',
             'external': 'BlockDirtyBitmap' } }
 
@@ -2106,7 +2106,7 @@
 ##
 { 'struct': 'BlockDirtyBitmapMerge',
   'data': { 'node': 'str', 'target': 'str',
-            'bitmaps': ['BlockDirtyBitmapMergeSource'] } }
+            'bitmaps': ['BlockDirtyBitmapOrStr'] } }
 
 ##
 # @block-dirty-bitmap-add:
diff --git a/qapi/block-export.json b/qapi/block-export.json
index 1e34927..1de16d2 100644
--- a/qapi/block-export.json
+++ b/qapi/block-export.json
@@ -6,6 +6,7 @@
 ##
 
 { 'include': 'sockets.json' }
+{ 'include': 'block-core.json' }
 
 ##
 # @NbdServerOptions:
@@ -89,6 +90,7 @@
 #           @device, so the NBD client can use NBD_OPT_SET_META_CONTEXT with
 #           the metadata context name "qemu:dirty-bitmap:BITMAP" to inspect
 #           each bitmap.
+#           Since 7.1 bitmap may be specified by node/name pair.
 #
 # @allocation-depth: Also export the allocation depth map for @device, so
 #                    the NBD client can use NBD_OPT_SET_META_CONTEXT with
@@ -99,7 +101,8 @@
 ##
 { 'struct': 'BlockExportOptionsNbd',
   'base': 'BlockExportOptionsNbdBase',
-  'data': { '*bitmaps': ['str'], '*allocation-depth': 'bool' } }
+  'data': { '*bitmaps': ['BlockDirtyBitmapOrStr'],
+            '*allocation-depth': 'bool' } }
 
 ##
 # @BlockExportOptionsVhostUserBlk:
diff --git a/qemu-img.c b/qemu-img.c
index 4c84134..4cf4d24 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -1623,16 +1623,16 @@
                                   const char *src_node, const char *src_name,
                                   Error **errp)
 {
-    BlockDirtyBitmapMergeSource *merge_src;
-    BlockDirtyBitmapMergeSourceList *list = NULL;
+    BlockDirtyBitmapOrStr *merge_src;
+    BlockDirtyBitmapOrStrList *list = NULL;
 
-    merge_src = g_new0(BlockDirtyBitmapMergeSource, 1);
+    merge_src = g_new0(BlockDirtyBitmapOrStr, 1);
     merge_src->type = QTYPE_QDICT;
     merge_src->u.external.node = g_strdup(src_node);
     merge_src->u.external.name = g_strdup(src_name);
     QAPI_LIST_PREPEND(list, merge_src);
     qmp_block_dirty_bitmap_merge(dst_node, dst_name, list, errp);
-    qapi_free_BlockDirtyBitmapMergeSourceList(list);
+    qapi_free_BlockDirtyBitmapOrStrList(list);
 }
 
 enum ImgConvertBlockStatus {
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 397ffa6..db63980 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -567,7 +567,7 @@
     QDict *options = NULL;
     const char *export_name = NULL; /* defaults to "" later for server mode */
     const char *export_description = NULL;
-    strList *bitmaps = NULL;
+    BlockDirtyBitmapOrStrList *bitmaps = NULL;
     bool alloc_depth = false;
     const char *tlscredsid = NULL;
     const char *tlshostname = NULL;
@@ -687,7 +687,14 @@
             alloc_depth = true;
             break;
         case 'B':
-            QAPI_LIST_PREPEND(bitmaps, g_strdup(optarg));
+            {
+                BlockDirtyBitmapOrStr *el = g_new(BlockDirtyBitmapOrStr, 1);
+                *el = (BlockDirtyBitmapOrStr) {
+                    .type = QTYPE_QSTRING,
+                    .u.local = g_strdup(optarg),
+                };
+                QAPI_LIST_PREPEND(bitmaps, el);
+            }
             break;
         case 'k':
             sockpath = optarg;
diff --git a/tests/qemu-iotests/223 b/tests/qemu-iotests/223
index da87f2f..0bbb283 100755
--- a/tests/qemu-iotests/223
+++ b/tests/qemu-iotests/223
@@ -120,6 +120,11 @@
     "file":{"driver":"file", "filename":"'"$TEST_IMG"'"}}}' "return"
 _send_qemu_cmd $QEMU_HANDLE '{"execute":"block-dirty-bitmap-disable",
   "arguments":{"node":"n", "name":"b"}}' "return"
+_send_qemu_cmd $QEMU_HANDLE '{"execute":"blockdev-add",
+  "arguments":{"driver":"null-co", "node-name":"null",
+    "size": 4194304}}' "return"
+_send_qemu_cmd $QEMU_HANDLE '{"execute":"block-dirty-bitmap-add",
+  "arguments":{"node":"null", "name":"b3"}}' "return"
 
 for attempt in normal iothread; do
 
@@ -155,6 +160,9 @@
 _send_qemu_cmd $QEMU_HANDLE '{"execute":"nbd-server-add",
   "arguments":{"device":"n", "name":"n2", "writable":true,
   "description":"some text", "bitmap":"b2"}}' "return"
+_send_qemu_cmd $QEMU_HANDLE '{"execute":"block-export-add",
+  "arguments":{"type": "nbd", "node-name":"n", "id":"n3", "name": "n3",
+  "bitmaps":[{"node":"null","name":"b3"}]}}' "return"
 $QEMU_NBD_PROG -L -k "$SOCK_DIR/nbd"
 
 echo
@@ -179,6 +187,14 @@
   "$IMG,x-dirty-bitmap=qemu:dirty-bitmap:b2" | _filter_qemu_img_map
 
 echo
+echo "=== Check bitmap taken from another node ==="
+echo
+
+IMG="driver=nbd,export=n3,server.type=unix,server.path=$SOCK_DIR/nbd"
+$QEMU_IMG map --output=json --image-opts \
+  "$IMG,x-dirty-bitmap=qemu:dirty-bitmap:b3" | _filter_qemu_img_map
+
+echo
 echo "=== End qemu NBD server ==="
 echo
 
diff --git a/tests/qemu-iotests/223.out b/tests/qemu-iotests/223.out
index e58ea5a..0647941 100644
--- a/tests/qemu-iotests/223.out
+++ b/tests/qemu-iotests/223.out
@@ -33,6 +33,13 @@
 {"execute":"block-dirty-bitmap-disable",
   "arguments":{"node":"n", "name":"b"}}
 {"return": {}}
+{"execute":"blockdev-add",
+  "arguments":{"driver":"null-co", "node-name":"null",
+    "size": 4194304}}
+{"return": {}}
+{"execute":"block-dirty-bitmap-add",
+  "arguments":{"node":"null", "name":"b3"}}
+{"return": {}}
 
 === Set up NBD with normal access ===
 
@@ -69,7 +76,11 @@
   "arguments":{"device":"n", "name":"n2", "writable":true,
   "description":"some text", "bitmap":"b2"}}
 {"return": {}}
-exports available: 2
+{"execute":"block-export-add",
+  "arguments":{"type": "nbd", "node-name":"n", "id":"n3", "name": "n3",
+  "bitmaps":[{"node":"null","name":"b3"}]}}
+{"return": {}}
+exports available: 3
  export: 'n'
   size:  4194304
   flags: 0x58f ( readonly flush fua df multi cache )
@@ -89,6 +100,15 @@
   available meta contexts: 2
    base:allocation
    qemu:dirty-bitmap:b2
+ export: 'n3'
+  size:  4194304
+  flags: 0x58f ( readonly flush fua df multi cache )
+  min block: 1
+  opt block: 4096
+  max block: 33554432
+  available meta contexts: 2
+   base:allocation
+   qemu:dirty-bitmap:b3
 
 === Contrast normal status to large granularity dirty-bitmap ===
 
@@ -114,6 +134,10 @@
 { "start": 1024, "length": 2096128, "depth": 0, "present": true, "zero": false, "data": true, "offset": OFFSET},
 { "start": 2097152, "length": 2097152, "depth": 0, "present": false, "zero": false, "data": false}]
 
+=== Check bitmap taken from another node ===
+
+[{ "start": 0, "length": 4194304, "depth": 0, "present": true, "zero": false, "data": true, "offset": OFFSET}]
+
 === End qemu NBD server ===
 
 {"execute":"nbd-server-remove",
@@ -128,6 +152,7 @@
 {"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "BLOCK_EXPORT_DELETED", "data": {"id": "n2"}}
 {"error": {"class": "GenericError", "desc": "Export 'n2' is not found"}}
 {"execute":"nbd-server-stop"}
+{"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "BLOCK_EXPORT_DELETED", "data": {"id": "n3"}}
 {"return": {}}
 {"execute":"nbd-server-stop"}
 {"error": {"class": "GenericError", "desc": "NBD server not running"}}
@@ -170,7 +195,11 @@
   "arguments":{"device":"n", "name":"n2", "writable":true,
   "description":"some text", "bitmap":"b2"}}
 {"return": {}}
-exports available: 2
+{"execute":"block-export-add",
+  "arguments":{"type": "nbd", "node-name":"n", "id":"n3", "name": "n3",
+  "bitmaps":[{"node":"null","name":"b3"}]}}
+{"return": {}}
+exports available: 3
  export: 'n'
   size:  4194304
   flags: 0x58f ( readonly flush fua df multi cache )
@@ -190,6 +219,15 @@
   available meta contexts: 2
    base:allocation
    qemu:dirty-bitmap:b2
+ export: 'n3'
+  size:  4194304
+  flags: 0x58f ( readonly flush fua df multi cache )
+  min block: 1
+  opt block: 4096
+  max block: 33554432
+  available meta contexts: 2
+   base:allocation
+   qemu:dirty-bitmap:b3
 
 === Contrast normal status to large granularity dirty-bitmap ===
 
@@ -215,6 +253,10 @@
 { "start": 1024, "length": 2096128, "depth": 0, "present": true, "zero": false, "data": true, "offset": OFFSET},
 { "start": 2097152, "length": 2097152, "depth": 0, "present": false, "zero": false, "data": false}]
 
+=== Check bitmap taken from another node ===
+
+[{ "start": 0, "length": 4194304, "depth": 0, "present": true, "zero": false, "data": true, "offset": OFFSET}]
+
 === End qemu NBD server ===
 
 {"execute":"nbd-server-remove",
@@ -229,6 +271,7 @@
 {"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "BLOCK_EXPORT_DELETED", "data": {"id": "n2"}}
 {"error": {"class": "GenericError", "desc": "Export 'n2' is not found"}}
 {"execute":"nbd-server-stop"}
+{"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "BLOCK_EXPORT_DELETED", "data": {"id": "n3"}}
 {"return": {}}
 {"execute":"nbd-server-stop"}
 {"error": {"class": "GenericError", "desc": "NBD server not running"}}