qcow2: improve qcow2_co_write_zeroes()

There is a possibility that qcow2_co_write_zeroes() will be called
with the partial block. This could be synthetically triggered with
    qemu-io -c "write -z 32k 4k"
and can happen in the real life in qemu-nbd. The latter happens under
the following conditions:
    (1) qemu-nbd is started with --detect-zeroes=on and is connected to the
        kernel NBD client
    (2) third party program opens kernel NBD device with O_DIRECT
    (3) third party program performs write operation with memory buffer
        not aligned to the page
In this case qcow2_co_write_zeroes() is unable to perform the operation
and mark entire cluster as zeroed and returns ENOTSUP. Thus the caller
switches to non-optimized version and writes real zeroes to the disk.

The patch creates a shortcut. If the block is read as zeroes, f.e. if
it is unallocated, the request is extended to cover full block.
User-visible situation with this block is not changed. Before the patch
the block is filled in the image with real zeroes. After that patch the
block is marked as zeroed in metadata. Thus any subsequent changes in
backing store chain are not affected.

Kevin, thank you for a cool suggestion.

Signed-off-by: Denis V. Lunev <den@openvz.org>
Reviewed-by: Roman Kagan <rkagan@virtuozzo.com>
CC: Kevin Wolf <kwolf@redhat.com>
CC: Max Reitz <mreitz@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
diff --git a/block/qcow2.c b/block/qcow2.c
index 3090538..555627a 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -2411,21 +2411,74 @@
     return ret;
 }
 
+
+static bool is_zero_cluster(BlockDriverState *bs, int64_t start)
+{
+    BDRVQcow2State *s = bs->opaque;
+    int nr;
+    BlockDriverState *file;
+    int64_t res = bdrv_get_block_status_above(bs, NULL, start,
+                                              s->cluster_sectors, &nr, &file);
+    return res >= 0 && ((res & BDRV_BLOCK_ZERO) || !(res & BDRV_BLOCK_DATA));
+}
+
+static bool is_zero_cluster_top_locked(BlockDriverState *bs, int64_t start)
+{
+    BDRVQcow2State *s = bs->opaque;
+    int nr = s->cluster_sectors;
+    uint64_t off;
+    int ret;
+
+    ret = qcow2_get_cluster_offset(bs, start << BDRV_SECTOR_BITS, &nr, &off);
+    return ret == QCOW2_CLUSTER_UNALLOCATED || ret == QCOW2_CLUSTER_ZERO;
+}
+
 static coroutine_fn int qcow2_co_write_zeroes(BlockDriverState *bs,
     int64_t sector_num, int nb_sectors, BdrvRequestFlags flags)
 {
     int ret;
     BDRVQcow2State *s = bs->opaque;
 
-    /* Emulate misaligned zero writes */
-    if (sector_num % s->cluster_sectors || nb_sectors % s->cluster_sectors) {
-        return -ENOTSUP;
+    int head = sector_num % s->cluster_sectors;
+    int tail = (sector_num + nb_sectors) % s->cluster_sectors;
+
+    if (head != 0 || tail != 0) {
+        int64_t cl_end = -1;
+
+        sector_num -= head;
+        nb_sectors += head;
+
+        if (tail != 0) {
+            nb_sectors += s->cluster_sectors - tail;
+        }
+
+        if (!is_zero_cluster(bs, sector_num)) {
+            return -ENOTSUP;
+        }
+
+        if (nb_sectors > s->cluster_sectors) {
+            /* Technically the request can cover 2 clusters, f.e. 4k write
+               at s->cluster_sectors - 2k offset. One of these cluster can
+               be zeroed, one unallocated */
+            cl_end = sector_num + nb_sectors - s->cluster_sectors;
+            if (!is_zero_cluster(bs, cl_end)) {
+                return -ENOTSUP;
+            }
+        }
+
+        qemu_co_mutex_lock(&s->lock);
+        /* We can have new write after previous check */
+        if (!is_zero_cluster_top_locked(bs, sector_num) ||
+                (cl_end > 0 && !is_zero_cluster_top_locked(bs, cl_end))) {
+            qemu_co_mutex_unlock(&s->lock);
+            return -ENOTSUP;
+        }
+    } else {
+        qemu_co_mutex_lock(&s->lock);
     }
 
     /* Whatever is left can use real zero clusters */
-    qemu_co_mutex_lock(&s->lock);
-    ret = qcow2_zero_clusters(bs, sector_num << BDRV_SECTOR_BITS,
-        nb_sectors);
+    ret = qcow2_zero_clusters(bs, sector_num << BDRV_SECTOR_BITS, nb_sectors);
     qemu_co_mutex_unlock(&s->lock);
 
     return ret;