qcow2: add compress threads

Do data compression in separate threads. This significantly improve
performance for qemu-img convert with -W (allow async writes) and -c
(compressed) options.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
diff --git a/block/qcow2.c b/block/qcow2.c
index 9cee653..33b61b7 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -44,6 +44,7 @@
 #include "qapi/qobject-input-visitor.h"
 #include "qapi/qapi-visit-block-core.h"
 #include "crypto.h"
+#include "block/thread-pool.h"
 
 /*
   Differences with QCOW:
@@ -1544,6 +1545,9 @@
         qcow2_check_refcounts(bs, &result, 0);
     }
 #endif
+
+    qemu_co_queue_init(&s->compress_wait_queue);
+
     return ret;
 
  fail:
@@ -3695,6 +3699,62 @@
     return ret;
 }
 
+#define MAX_COMPRESS_THREADS 4
+
+typedef struct Qcow2CompressData {
+    void *dest;
+    const void *src;
+    size_t size;
+    ssize_t ret;
+} Qcow2CompressData;
+
+static int qcow2_compress_pool_func(void *opaque)
+{
+    Qcow2CompressData *data = opaque;
+
+    data->ret = qcow2_compress(data->dest, data->src, data->size);
+
+    return 0;
+}
+
+static void qcow2_compress_complete(void *opaque, int ret)
+{
+    qemu_coroutine_enter(opaque);
+}
+
+/* See qcow2_compress definition for parameters description */
+static ssize_t qcow2_co_compress(BlockDriverState *bs,
+                                 void *dest, const void *src, size_t size)
+{
+    BDRVQcow2State *s = bs->opaque;
+    BlockAIOCB *acb;
+    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    Qcow2CompressData arg = {
+        .dest = dest,
+        .src = src,
+        .size = size,
+    };
+
+    while (s->nb_compress_threads >= MAX_COMPRESS_THREADS) {
+        qemu_co_queue_wait(&s->compress_wait_queue, NULL);
+    }
+
+    s->nb_compress_threads++;
+    acb = thread_pool_submit_aio(pool, qcow2_compress_pool_func, &arg,
+                                 qcow2_compress_complete,
+                                 qemu_coroutine_self());
+
+    if (!acb) {
+        s->nb_compress_threads--;
+        return -EINVAL;
+    }
+    qemu_coroutine_yield();
+    s->nb_compress_threads--;
+    qemu_co_queue_next(&s->compress_wait_queue);
+
+    return arg.ret;
+}
+
 /* XXX: put compressed sectors first, then all the cluster aligned
    tables to avoid losing bytes in alignment */
 static coroutine_fn int
@@ -3739,7 +3799,7 @@
 
     out_buf = g_malloc(s->cluster_size);
 
-    out_len = qcow2_compress(out_buf, buf, s->cluster_size);
+    out_len = qcow2_co_compress(bs, out_buf, buf, s->cluster_size);
     if (out_len == -2) {
         ret = -EINVAL;
         goto fail;