Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2019-09-05-v2' into staging

nbd patches for 2019-09-05

- Advertise NBD_FLAG_CAN_MULTI_CONN on readonly images
- Tolerate larger set of server error responses during handshake
- More precision on handling fallocate() failures due to alignment
- Better documentation of NBD connection URIs
- Implement new extension NBD_CMD_FLAG_FAST_ZERO to benefit qemu-img convert

# gpg: Signature made Thu 05 Sep 2019 22:08:17 BST
# gpg:                using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2  F3AA A7A1 6B4A 2527 436A

* remotes/ericb/tags/pull-nbd-2019-09-05-v2:
  nbd: Implement server use of NBD FAST_ZERO
  nbd: Implement client use of NBD FAST_ZERO
  nbd: Prepare for NBD_CMD_FLAG_FAST_ZERO
  nbd: Improve per-export flag handling in server
  docs: Update preferred NBD device syntax
  block: workaround for unaligned byte range in fallocate()
  nbd: Tolerate more errors to structured reply request
  nbd: Use g_autofree in a few places
  nbd: Advertise multi-conn for shared read-only connections

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
diff --git a/block/file-posix.c b/block/file-posix.c
index 71f168e..87c5a4c 100644
--- a/block/file-posix.c
+++ b/block/file-posix.c
@@ -1588,6 +1588,13 @@
     if (s->has_write_zeroes) {
         int ret = do_fallocate(s->fd, FALLOC_FL_ZERO_RANGE,
                                aiocb->aio_offset, aiocb->aio_nbytes);
+        if (ret == -EINVAL) {
+            /*
+             * Allow falling back to pwrite for file systems that
+             * do not support fallocate() for an unaligned byte range.
+             */
+            return -ENOTSUP;
+        }
         if (ret == 0 || ret != -ENOTSUP) {
             return ret;
         }
diff --git a/block/io.c b/block/io.c
index 0fa10831..16a598f 100644
--- a/block/io.c
+++ b/block/io.c
@@ -1746,7 +1746,7 @@
             assert(!bs->supported_zero_flags);
         }
 
-        if (ret < 0 && !(flags & BDRV_REQ_NO_FALLBACK)) {
+        if (ret == -ENOTSUP && !(flags & BDRV_REQ_NO_FALLBACK)) {
             /* Fall back to bounce buffer if write zeroes is unsupported */
             BdrvRequestFlags write_flags = flags & ~BDRV_REQ_ZERO_WRITE;
 
diff --git a/block/nbd.c b/block/nbd.c
index beed46f..813c40d 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -1044,6 +1044,10 @@
     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
         request.flags |= NBD_CMD_FLAG_NO_HOLE;
     }
+    if (flags & BDRV_REQ_NO_FALLBACK) {
+        assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
+        request.flags |= NBD_CMD_FLAG_FAST_ZERO;
+    }
 
     if (!bytes) {
         return 0;
@@ -1239,6 +1243,9 @@
     }
     if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
         bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
+        if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
+            bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
+        }
     }
 
     s->sioc = sioc;
@@ -1374,7 +1381,7 @@
 static void nbd_parse_filename(const char *filename, QDict *options,
                                Error **errp)
 {
-    char *file;
+    g_autofree char *file = NULL;
     char *export_name;
     const char *host_spec;
     const char *unixpath;
@@ -1396,7 +1403,7 @@
     export_name = strstr(file, EN_OPTSTR);
     if (export_name) {
         if (export_name[strlen(EN_OPTSTR)] == 0) {
-            goto out;
+            return;
         }
         export_name[0] = 0; /* truncate 'file' */
         export_name += strlen(EN_OPTSTR);
@@ -1407,11 +1414,11 @@
     /* extract the host_spec - fail if it's not nbd:... */
     if (!strstart(file, "nbd:", &host_spec)) {
         error_setg(errp, "File name string for NBD must start with 'nbd:'");
-        goto out;
+        return;
     }
 
     if (!*host_spec) {
-        goto out;
+        return;
     }
 
     /* are we a UNIX or TCP socket? */
@@ -1431,9 +1438,6 @@
     out_inet:
         qapi_free_InetSocketAddress(addr);
     }
-
-out:
-    g_free(file);
 }
 
 static bool nbd_process_legacy_socket_options(QDict *output_options,
diff --git a/blockdev-nbd.c b/blockdev-nbd.c
index c621686..213f226 100644
--- a/blockdev-nbd.c
+++ b/blockdev-nbd.c
@@ -187,8 +187,7 @@
         writable = false;
     }
 
-    exp = nbd_export_new(bs, 0, len, name, NULL, bitmap,
-                         writable ? 0 : NBD_FLAG_READ_ONLY,
+    exp = nbd_export_new(bs, 0, len, name, NULL, bitmap, !writable, !writable,
                          NULL, false, on_eject_blk, errp);
     if (!exp) {
         return;
diff --git a/docs/interop/nbd.txt b/docs/interop/nbd.txt
index fc64473..4511880 100644
--- a/docs/interop/nbd.txt
+++ b/docs/interop/nbd.txt
@@ -53,3 +53,5 @@
 * 2.12: NBD_CMD_BLOCK_STATUS for "base:allocation"
 * 3.0: NBD_OPT_STARTTLS with TLS Pre-Shared Keys (PSK),
 NBD_CMD_BLOCK_STATUS for "qemu:dirty-bitmap:", NBD_CMD_CACHE
+* 4.2: NBD_FLAG_CAN_MULTI_CONN for sharable read-only exports,
+NBD_CMD_FLAG_FAST_ZERO
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 7b36d67..2155074 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -140,6 +140,7 @@
     NBD_FLAG_CAN_MULTI_CONN_BIT     =  8, /* Multi-client cache consistent */
     NBD_FLAG_SEND_RESIZE_BIT        =  9, /* Send resize */
     NBD_FLAG_SEND_CACHE_BIT         = 10, /* Send CACHE (prefetch) */
+    NBD_FLAG_SEND_FAST_ZERO_BIT     = 11, /* FAST_ZERO flag for WRITE_ZEROES */
 };
 
 #define NBD_FLAG_HAS_FLAGS         (1 << NBD_FLAG_HAS_FLAGS_BIT)
@@ -153,6 +154,7 @@
 #define NBD_FLAG_CAN_MULTI_CONN    (1 << NBD_FLAG_CAN_MULTI_CONN_BIT)
 #define NBD_FLAG_SEND_RESIZE       (1 << NBD_FLAG_SEND_RESIZE_BIT)
 #define NBD_FLAG_SEND_CACHE        (1 << NBD_FLAG_SEND_CACHE_BIT)
+#define NBD_FLAG_SEND_FAST_ZERO    (1 << NBD_FLAG_SEND_FAST_ZERO_BIT)
 
 /* New-style handshake (global) flags, sent from server to client, and
    control what will happen during handshake phase. */
@@ -205,6 +207,7 @@
 #define NBD_CMD_FLAG_DF         (1 << 2) /* don't fragment structured read */
 #define NBD_CMD_FLAG_REQ_ONE    (1 << 3) /* only one extent in BLOCK_STATUS
                                           * reply chunk */
+#define NBD_CMD_FLAG_FAST_ZERO  (1 << 4) /* fail if WRITE_ZEROES is not fast */
 
 /* Supported request types */
 enum {
@@ -270,6 +273,7 @@
 #define NBD_EINVAL     22
 #define NBD_ENOSPC     28
 #define NBD_EOVERFLOW  75
+#define NBD_ENOTSUP    95
 #define NBD_ESHUTDOWN  108
 
 /* Details collected by NBD_OPT_EXPORT_NAME and NBD_OPT_GO */
@@ -326,7 +330,7 @@
 
 NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
                           uint64_t size, const char *name, const char *desc,
-                          const char *bitmap, uint16_t nbdflags,
+                          const char *bitmap, bool readonly, bool shared,
                           void (*close)(NBDExport *), bool writethrough,
                           BlockBackend *on_eject_blk, Error **errp);
 void nbd_export_close(NBDExport *exp);
diff --git a/nbd/client.c b/nbd/client.c
index 49bf990..b9dc829 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -1,5 +1,5 @@
 /*
- *  Copyright (C) 2016-2018 Red Hat, Inc.
+ *  Copyright (C) 2016-2019 Red Hat, Inc.
  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
  *
  *  Network Block Device Client Side
@@ -142,17 +142,18 @@
     return 0;
 }
 
-/* If reply represents success, return 1 without further action.
- * If reply represents an error, consume the optional payload of
- * the packet on ioc.  Then return 0 for unsupported (so the client
- * can fall back to other approaches), or -1 with errp set for other
- * errors.
+/*
+ * If reply represents success, return 1 without further action.  If
+ * reply represents an error, consume the optional payload of the
+ * packet on ioc.  Then return 0 for unsupported (so the client can
+ * fall back to other approaches), where @strict determines if only
+ * ERR_UNSUP or all errors fit that category, or -1 with errp set for
+ * other errors.
  */
 static int nbd_handle_reply_err(QIOChannel *ioc, NBDOptionReply *reply,
-                                Error **errp)
+                                bool strict, Error **errp)
 {
-    char *msg = NULL;
-    int result = -1;
+    g_autofree char *msg = NULL;
 
     if (!(reply->type & (1 << 31))) {
         return 1;
@@ -163,26 +164,28 @@
             error_setg(errp, "server error %" PRIu32
                        " (%s) message is too long",
                        reply->type, nbd_rep_lookup(reply->type));
-            goto cleanup;
+            goto err;
         }
         msg = g_malloc(reply->length + 1);
         if (nbd_read(ioc, msg, reply->length, NULL, errp) < 0) {
             error_prepend(errp, "Failed to read option error %" PRIu32
                           " (%s) message: ",
                           reply->type, nbd_rep_lookup(reply->type));
-            goto cleanup;
+            goto err;
         }
         msg[reply->length] = '\0';
         trace_nbd_server_error_msg(reply->type,
                                    nbd_reply_type_lookup(reply->type), msg);
     }
 
-    switch (reply->type) {
-    case NBD_REP_ERR_UNSUP:
-        trace_nbd_reply_err_unsup(reply->option, nbd_opt_lookup(reply->option));
-        result = 0;
-        goto cleanup;
+    if (reply->type == NBD_REP_ERR_UNSUP || !strict) {
+        trace_nbd_reply_err_ignored(reply->option,
+                                    nbd_opt_lookup(reply->option),
+                                    reply->type, nbd_rep_lookup(reply->type));
+        return 0;
+    }
 
+    switch (reply->type) {
     case NBD_REP_ERR_POLICY:
         error_setg(errp, "Denied by server for option %" PRIu32 " (%s)",
                    reply->option, nbd_opt_lookup(reply->option));
@@ -227,12 +230,9 @@
         error_append_hint(errp, "server reported: %s\n", msg);
     }
 
- cleanup:
-    g_free(msg);
-    if (result < 0) {
-        nbd_send_opt_abort(ioc);
-    }
-    return result;
+ err:
+    nbd_send_opt_abort(ioc);
+    return -1;
 }
 
 /* nbd_receive_list:
@@ -247,18 +247,17 @@
 static int nbd_receive_list(QIOChannel *ioc, char **name, char **description,
                             Error **errp)
 {
-    int ret = -1;
     NBDOptionReply reply;
     uint32_t len;
     uint32_t namelen;
-    char *local_name = NULL;
-    char *local_desc = NULL;
+    g_autofree char *local_name = NULL;
+    g_autofree char *local_desc = NULL;
     int error;
 
     if (nbd_receive_option_reply(ioc, NBD_OPT_LIST, &reply, errp) < 0) {
         return -1;
     }
-    error = nbd_handle_reply_err(ioc, &reply, errp);
+    error = nbd_handle_reply_err(ioc, &reply, true, errp);
     if (error <= 0) {
         return error;
     }
@@ -298,7 +297,7 @@
     local_name = g_malloc(namelen + 1);
     if (nbd_read(ioc, local_name, namelen, "export name", errp) < 0) {
         nbd_send_opt_abort(ioc);
-        goto out;
+        return -1;
     }
     local_name[namelen] = '\0';
     len -= namelen;
@@ -306,24 +305,17 @@
         local_desc = g_malloc(len + 1);
         if (nbd_read(ioc, local_desc, len, "export description", errp) < 0) {
             nbd_send_opt_abort(ioc);
-            goto out;
+            return -1;
         }
         local_desc[len] = '\0';
     }
 
     trace_nbd_receive_list(local_name, local_desc ?: "");
-    *name = local_name;
-    local_name = NULL;
+    *name = g_steal_pointer(&local_name);
     if (description) {
-        *description = local_desc;
-        local_desc = NULL;
+        *description = g_steal_pointer(&local_desc);
     }
-    ret = 1;
-
- out:
-    g_free(local_name);
-    g_free(local_desc);
-    return ret;
+    return 1;
 }
 
 
@@ -371,7 +363,7 @@
         if (nbd_receive_option_reply(ioc, opt, &reply, errp) < 0) {
             return -1;
         }
-        error = nbd_handle_reply_err(ioc, &reply, errp);
+        error = nbd_handle_reply_err(ioc, &reply, true, errp);
         if (error <= 0) {
             return error;
         }
@@ -546,12 +538,15 @@
     }
 }
 
-/* nbd_request_simple_option: Send an option request, and parse the reply
+/*
+ * nbd_request_simple_option: Send an option request, and parse the reply.
+ * @strict controls whether ERR_UNSUP or all errors produce 0 status.
  * return 1 for successful negotiation,
  *        0 if operation is unsupported,
  *        -1 with errp set for any other error
  */
-static int nbd_request_simple_option(QIOChannel *ioc, int opt, Error **errp)
+static int nbd_request_simple_option(QIOChannel *ioc, int opt, bool strict,
+                                     Error **errp)
 {
     NBDOptionReply reply;
     int error;
@@ -563,7 +558,7 @@
     if (nbd_receive_option_reply(ioc, opt, &reply, errp) < 0) {
         return -1;
     }
-    error = nbd_handle_reply_err(ioc, &reply, errp);
+    error = nbd_handle_reply_err(ioc, &reply, strict, errp);
     if (error <= 0) {
         return error;
     }
@@ -595,7 +590,7 @@
     QIOChannelTLS *tioc;
     struct NBDTLSHandshakeData data = { 0 };
 
-    ret = nbd_request_simple_option(ioc, NBD_OPT_STARTTLS, errp);
+    ret = nbd_request_simple_option(ioc, NBD_OPT_STARTTLS, true, errp);
     if (ret <= 0) {
         if (ret == 0) {
             error_setg(errp, "Server don't support STARTTLS option");
@@ -695,7 +690,7 @@
         return -1;
     }
 
-    ret = nbd_handle_reply_err(ioc, &reply, errp);
+    ret = nbd_handle_reply_err(ioc, &reply, false, errp);
     if (ret <= 0) {
         return ret;
     }
@@ -951,7 +946,7 @@
             if (structured_reply) {
                 result = nbd_request_simple_option(ioc,
                                                    NBD_OPT_STRUCTURED_REPLY,
-                                                   errp);
+                                                   false, errp);
                 if (result < 0) {
                     return -EINVAL;
                 }
diff --git a/nbd/common.c b/nbd/common.c
index cc8b278..ddfe7d1 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -201,6 +201,8 @@
         return "ENOSPC";
     case NBD_EOVERFLOW:
         return "EOVERFLOW";
+    case NBD_ENOTSUP:
+        return "ENOTSUP";
     case NBD_ESHUTDOWN:
         return "ESHUTDOWN";
     default:
@@ -231,6 +233,9 @@
     case NBD_EOVERFLOW:
         ret = EOVERFLOW;
         break;
+    case NBD_ENOTSUP:
+        ret = ENOTSUP;
+        break;
     case NBD_ESHUTDOWN:
         ret = ESHUTDOWN;
         break;
diff --git a/nbd/server.c b/nbd/server.c
index f55ccf8..28c3c8b 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -55,6 +55,11 @@
         return NBD_ENOSPC;
     case EOVERFLOW:
         return NBD_EOVERFLOW;
+    case ENOTSUP:
+#if ENOTSUP != EOPNOTSUPP
+    case EOPNOTSUPP:
+#endif
+        return NBD_ENOTSUP;
     case ESHUTDOWN:
         return NBD_ESHUTDOWN;
     case EINVAL:
@@ -206,7 +211,7 @@
 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
                             Error **errp, const char *fmt, va_list va)
 {
-    char *msg;
+    g_autofree char *msg = NULL;
     int ret;
     size_t len;
 
@@ -216,18 +221,14 @@
     trace_nbd_negotiate_send_rep_err(msg);
     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
     if (ret < 0) {
-        goto out;
+        return ret;
     }
     if (nbd_write(client->ioc, msg, len, errp) < 0) {
         error_prepend(errp, "write failed (error message): ");
-        ret = -EIO;
-    } else {
-        ret = 0;
+        return -EIO;
     }
 
-out:
-    g_free(msg);
-    return ret;
+    return 0;
 }
 
 /* Send an error reply.
@@ -423,14 +424,14 @@
 
 /* Send a reply to NBD_OPT_EXPORT_NAME.
  * Return -errno on error, 0 on success. */
-static int nbd_negotiate_handle_export_name(NBDClient *client,
-                                            uint16_t myflags, bool no_zeroes,
+static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
                                             Error **errp)
 {
     char name[NBD_MAX_NAME_SIZE + 1];
     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
     size_t len;
     int ret;
+    uint16_t myflags;
 
     /* Client sends:
         [20 ..  xx]   export name (length bytes)
@@ -458,10 +459,13 @@
         return -EINVAL;
     }
 
-    trace_nbd_negotiate_new_style_size_flags(client->exp->size,
-                                             client->exp->nbdflags | myflags);
+    myflags = client->exp->nbdflags;
+    if (client->structured_reply) {
+        myflags |= NBD_FLAG_SEND_DF;
+    }
+    trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
     stq_be_p(buf, client->exp->size);
-    stw_be_p(buf + 8, client->exp->nbdflags | myflags);
+    stw_be_p(buf + 8, myflags);
     len = no_zeroes ? 10 : sizeof(buf);
     ret = nbd_write(client->ioc, buf, len, errp);
     if (ret < 0) {
@@ -526,8 +530,7 @@
 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
  * Return -errno on error, 0 if ready for next option, and 1 to move
  * into transmission phase.  */
-static int nbd_negotiate_handle_info(NBDClient *client, uint16_t myflags,
-                                     Error **errp)
+static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
 {
     int rc;
     char name[NBD_MAX_NAME_SIZE + 1];
@@ -540,6 +543,7 @@
     uint32_t sizes[3];
     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
     uint32_t check_align = 0;
+    uint16_t myflags;
 
     /* Client sends:
         4 bytes: L, name length (can be 0)
@@ -637,10 +641,13 @@
     }
 
     /* Send NBD_INFO_EXPORT always */
-    trace_nbd_negotiate_new_style_size_flags(exp->size,
-                                             exp->nbdflags | myflags);
+    myflags = exp->nbdflags;
+    if (client->structured_reply) {
+        myflags |= NBD_FLAG_SEND_DF;
+    }
+    trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
     stq_be_p(buf, exp->size);
-    stw_be_p(buf + 8, exp->nbdflags | myflags);
+    stw_be_p(buf + 8, myflags);
     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
                                  sizeof(buf), buf, errp);
     if (rc < 0) {
@@ -1037,8 +1044,7 @@
  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
  *         errp is not set
  */
-static int nbd_negotiate_options(NBDClient *client, uint16_t myflags,
-                                 Error **errp)
+static int nbd_negotiate_options(NBDClient *client, Error **errp)
 {
     uint32_t flags;
     bool fixedNewstyle = false;
@@ -1172,13 +1178,12 @@
                 return 1;
 
             case NBD_OPT_EXPORT_NAME:
-                return nbd_negotiate_handle_export_name(client,
-                                                        myflags, no_zeroes,
+                return nbd_negotiate_handle_export_name(client, no_zeroes,
                                                         errp);
 
             case NBD_OPT_INFO:
             case NBD_OPT_GO:
-                ret = nbd_negotiate_handle_info(client, myflags, errp);
+                ret = nbd_negotiate_handle_info(client, errp);
                 if (ret == 1) {
                     assert(option == NBD_OPT_GO);
                     return 0;
@@ -1209,7 +1214,6 @@
                 } else {
                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
                     client->structured_reply = true;
-                    myflags |= NBD_FLAG_SEND_DF;
                 }
                 break;
 
@@ -1232,8 +1236,7 @@
              */
             switch (option) {
             case NBD_OPT_EXPORT_NAME:
-                return nbd_negotiate_handle_export_name(client,
-                                                        myflags, no_zeroes,
+                return nbd_negotiate_handle_export_name(client, no_zeroes,
                                                         errp);
 
             default:
@@ -1259,9 +1262,6 @@
 {
     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
     int ret;
-    const uint16_t myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
-                              NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA |
-                              NBD_FLAG_SEND_WRITE_ZEROES | NBD_FLAG_SEND_CACHE);
 
     /* Old style negotiation header, no room for options
         [ 0 ..   7]   passwd       ("NBDMAGIC")
@@ -1289,7 +1289,7 @@
         error_prepend(errp, "write failed: ");
         return -EINVAL;
     }
-    ret = nbd_negotiate_options(client, myflags, errp);
+    ret = nbd_negotiate_options(client, errp);
     if (ret != 0) {
         if (ret < 0) {
             error_prepend(errp, "option negotiation failed: ");
@@ -1461,7 +1461,7 @@
 
 NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
                           uint64_t size, const char *name, const char *desc,
-                          const char *bitmap, uint16_t nbdflags,
+                          const char *bitmap, bool readonly, bool shared,
                           void (*close)(NBDExport *), bool writethrough,
                           BlockBackend *on_eject_blk, Error **errp)
 {
@@ -1485,7 +1485,7 @@
     /* Don't allow resize while the NBD server is running, otherwise we don't
      * care what happens with the node. */
     perm = BLK_PERM_CONSISTENT_READ;
-    if ((nbdflags & NBD_FLAG_READ_ONLY) == 0) {
+    if (!readonly) {
         perm |= BLK_PERM_WRITE;
     }
     blk = blk_new(bdrv_get_aio_context(bs), perm,
@@ -1505,7 +1505,17 @@
     exp->dev_offset = dev_offset;
     exp->name = g_strdup(name);
     exp->description = g_strdup(desc);
-    exp->nbdflags = nbdflags;
+    exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
+                     NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
+    if (readonly) {
+        exp->nbdflags |= NBD_FLAG_READ_ONLY;
+        if (shared) {
+            exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
+        }
+    } else {
+        exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
+                          NBD_FLAG_SEND_FAST_ZERO);
+    }
     assert(size <= INT64_MAX - dev_offset);
     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
 
@@ -1530,7 +1540,7 @@
             goto fail;
         }
 
-        if ((nbdflags & NBD_FLAG_READ_ONLY) && bdrv_is_writable(bs) &&
+        if (readonly && bdrv_is_writable(bs) &&
             bdrv_dirty_bitmap_enabled(bm)) {
             error_setg(errp,
                        "Enabled bitmap '%s' incompatible with readonly export",
@@ -2157,7 +2167,7 @@
     if (request->type == NBD_CMD_READ && client->structured_reply) {
         valid_flags |= NBD_CMD_FLAG_DF;
     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
-        valid_flags |= NBD_CMD_FLAG_NO_HOLE;
+        valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
     }
@@ -2296,6 +2306,9 @@
         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
             flags |= BDRV_REQ_MAY_UNMAP;
         }
+        if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
+            flags |= BDRV_REQ_NO_FALLBACK;
+        }
         ret = blk_pwrite_zeroes(exp->blk, request->from + exp->dev_offset,
                                 request->len, flags);
         return nbd_send_generic_reply(client, request->handle, ret,
diff --git a/nbd/trace-events b/nbd/trace-events
index 7ab6b37..f6cde96 100644
--- a/nbd/trace-events
+++ b/nbd/trace-events
@@ -4,7 +4,7 @@
 nbd_send_option_request(uint32_t opt, const char *name, uint32_t len) "Sending option request %" PRIu32" (%s), len %" PRIu32
 nbd_receive_option_reply(uint32_t option, const char *optname, uint32_t type, const char *typename, uint32_t length) "Received option reply %" PRIu32" (%s), type %" PRIu32" (%s), len %" PRIu32
 nbd_server_error_msg(uint32_t err, const char *type, const char *msg) "server reported error 0x%" PRIx32 " (%s) with additional message: %s"
-nbd_reply_err_unsup(uint32_t option, const char *name) "server doesn't understand request %" PRIu32 " (%s), attempting fallback"
+nbd_reply_err_ignored(uint32_t option, const char *name, uint32_t reply, const char *reply_name) "server failed request %" PRIu32 " (%s) with error 0x%" PRIx32 " (%s), attempting fallback"
 nbd_receive_list(const char *name, const char *desc) "export list includes '%s', description '%s'"
 nbd_opt_info_go_start(const char *opt, const char *name) "Attempting %s for export '%s'"
 nbd_opt_info_go_success(const char *opt) "Export is ready after %s request"
diff --git a/qemu-doc.texi b/qemu-doc.texi
index b2654c7..4d828a5 100644
--- a/qemu-doc.texi
+++ b/qemu-doc.texi
@@ -301,9 +301,16 @@
 
 @item NBD
 QEMU supports NBD (Network Block Devices) both using TCP protocol as well
-as Unix Domain Sockets.
+as Unix Domain Sockets.  With TCP, the default port is 10809.
 
-Syntax for specifying a NBD device using TCP
+Syntax for specifying a NBD device using TCP, in preferred URI form:
+``nbd://<server-ip>[:<port>]/[<export>]''
+
+Syntax for specifying a NBD device using Unix Domain Sockets; remember
+that '?' is a shell glob character and may need quoting:
+``nbd+unix:///[<export>]?socket=<domain-socket>''
+
+Older syntax that is also recognized:
 ``nbd:<server-ip>:<port>[:exportname=<export>]''
 
 Syntax for specifying a NBD device using Unix Domain Sockets
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 83b6c32..9032b6d 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -294,6 +294,7 @@
                 [NBD_FLAG_CAN_MULTI_CONN_BIT]       = "multi",
                 [NBD_FLAG_SEND_RESIZE_BIT]          = "resize",
                 [NBD_FLAG_SEND_CACHE_BIT]           = "cache",
+                [NBD_FLAG_SEND_FAST_ZERO_BIT]       = "fast-zero",
             };
 
             printf("  size:  %" PRIu64 "\n", list[i].size);
@@ -600,7 +601,7 @@
     BlockBackend *blk;
     BlockDriverState *bs;
     uint64_t dev_offset = 0;
-    uint16_t nbdflags = 0;
+    bool readonly = false;
     bool disconnect = false;
     const char *bindto = NULL;
     const char *port = NULL;
@@ -782,7 +783,7 @@
             }
             /* fall through */
         case 'r':
-            nbdflags |= NBD_FLAG_READ_ONLY;
+            readonly = true;
             flags &= ~BDRV_O_RDWR;
             break;
         case 'P':
@@ -1173,7 +1174,7 @@
     }
 
     export = nbd_export_new(bs, dev_offset, fd_size, export_name,
-                            export_description, bitmap, nbdflags,
+                            export_description, bitmap, readonly, shared > 1,
                             nbd_export_closed, writethrough, NULL,
                             &error_fatal);
 
diff --git a/tests/qemu-iotests/223.out b/tests/qemu-iotests/223.out
index d5201b2..5d00398 100644
--- a/tests/qemu-iotests/223.out
+++ b/tests/qemu-iotests/223.out
@@ -40,7 +40,7 @@
 exports available: 2
  export: 'n'
   size:  4194304
-  flags: 0x4ef ( readonly flush fua trim zeroes df cache )
+  flags: 0x58f ( readonly flush fua df multi cache )
   min block: 1
   opt block: 4096
   max block: 33554432
@@ -49,7 +49,7 @@
    qemu:dirty-bitmap:b
  export: 'n2'
   size:  4194304
-  flags: 0x4ed ( flush fua trim zeroes df cache )
+  flags: 0xced ( flush fua trim zeroes df cache fast-zero )
   min block: 1
   opt block: 4096
   max block: 33554432
diff --git a/tests/qemu-iotests/233.out b/tests/qemu-iotests/233.out
index 9b46284..24321ef 100644
--- a/tests/qemu-iotests/233.out
+++ b/tests/qemu-iotests/233.out
@@ -20,10 +20,10 @@
 server reported: TLS not configured
 
 == check plain client to TLS server fails ==
-qemu-img: Could not open 'nbd://localhost:PORT': TLS negotiation required before option 8 (structured reply)
-server reported: Option 0x8 not permitted before TLS
-qemu-nbd: TLS negotiation required before option 8 (structured reply)
-server reported: Option 0x8 not permitted before TLS
+qemu-img: Could not open 'nbd://localhost:PORT': TLS negotiation required before option 7 (go)
+server reported: Option 0x7 not permitted before TLS
+qemu-nbd: TLS negotiation required before option 3 (list)
+server reported: Option 0x3 not permitted before TLS
 
 == check TLS works ==
 image: nbd://127.0.0.1:PORT
@@ -37,7 +37,7 @@
 exports available: 1
  export: ''
   size:  67108864
-  flags: 0x4ed ( flush fua trim zeroes df cache )
+  flags: 0xced ( flush fua trim zeroes df cache fast-zero )
   min block: 1
   opt block: 4096
   max block: 33554432