thread-pool: use ThreadPool from the running thread
Use qemu_get_current_aio_context() where possible, since we always
submit work to the current thread anyways.
We want to also be sure that the thread submitting the work is
the same as the one processing the pool, to avoid adding
synchronization to the pool list.
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Message-Id: <20230203131731.851116-4-eesposit@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
diff --git a/block/file-posix.c b/block/file-posix.c
index 30cb4ae..173b3b1 100644
--- a/block/file-posix.c
+++ b/block/file-posix.c
@@ -2040,11 +2040,10 @@
return result;
}
-static int coroutine_fn raw_thread_pool_submit(BlockDriverState *bs,
- ThreadPoolFunc func, void *arg)
+static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
{
/* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
- ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
return thread_pool_submit_co(pool, func, arg);
}
@@ -2112,7 +2111,7 @@
};
assert(qiov->size == bytes);
- return raw_thread_pool_submit(bs, handle_aiocb_rw, &acb);
+ return raw_thread_pool_submit(handle_aiocb_rw, &acb);
}
static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
@@ -2181,7 +2180,7 @@
return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
}
#endif
- return raw_thread_pool_submit(bs, handle_aiocb_flush, &acb);
+ return raw_thread_pool_submit(handle_aiocb_flush, &acb);
}
static void raw_aio_attach_aio_context(BlockDriverState *bs,
@@ -2243,7 +2242,7 @@
},
};
- return raw_thread_pool_submit(bs, handle_aiocb_truncate, &acb);
+ return raw_thread_pool_submit(handle_aiocb_truncate, &acb);
}
static int coroutine_fn raw_co_truncate(BlockDriverState *bs, int64_t offset,
@@ -2992,7 +2991,7 @@
acb.aio_type |= QEMU_AIO_BLKDEV;
}
- ret = raw_thread_pool_submit(bs, handle_aiocb_discard, &acb);
+ ret = raw_thread_pool_submit(handle_aiocb_discard, &acb);
raw_account_discard(s, bytes, ret);
return ret;
}
@@ -3067,7 +3066,7 @@
handler = handle_aiocb_write_zeroes;
}
- return raw_thread_pool_submit(bs, handler, &acb);
+ return raw_thread_pool_submit(handler, &acb);
}
static int coroutine_fn raw_co_pwrite_zeroes(
@@ -3305,7 +3304,7 @@
},
};
- return raw_thread_pool_submit(bs, handle_aiocb_copy_range, &acb);
+ return raw_thread_pool_submit(handle_aiocb_copy_range, &acb);
}
BlockDriver bdrv_file = {
@@ -3635,7 +3634,7 @@
struct sg_io_hdr *io_hdr = buf;
if (io_hdr->cmdp[0] == PERSISTENT_RESERVE_OUT ||
io_hdr->cmdp[0] == PERSISTENT_RESERVE_IN) {
- return pr_manager_execute(s->pr_mgr, bdrv_get_aio_context(bs),
+ return pr_manager_execute(s->pr_mgr, qemu_get_current_aio_context(),
s->fd, io_hdr);
}
}
@@ -3651,7 +3650,7 @@
},
};
- return raw_thread_pool_submit(bs, handle_aiocb_ioctl, &acb);
+ return raw_thread_pool_submit(handle_aiocb_ioctl, &acb);
}
#endif /* linux */
diff --git a/block/file-win32.c b/block/file-win32.c
index 1763b86..0aedb08 100644
--- a/block/file-win32.c
+++ b/block/file-win32.c
@@ -168,7 +168,7 @@
acb->aio_offset = offset;
trace_file_paio_submit(acb, opaque, offset, count, type);
- pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ pool = aio_get_thread_pool(qemu_get_current_aio_context());
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}
diff --git a/block/qcow2-threads.c b/block/qcow2-threads.c
index 953bbe6..6d2e6b7 100644
--- a/block/qcow2-threads.c
+++ b/block/qcow2-threads.c
@@ -43,7 +43,7 @@
{
int ret;
BDRVQcow2State *s = bs->opaque;
- ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
qemu_co_mutex_lock(&s->lock);
while (s->nb_threads >= QCOW2_MAX_THREADS) {
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 95ff2b0..c408bde 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -29,12 +29,17 @@
ThreadPool *thread_pool_new(struct AioContext *ctx);
void thread_pool_free(ThreadPool *pool);
+/*
+ * thread_pool_submit* API: submit I/O requests in the thread's
+ * current AioContext.
+ */
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
ThreadPoolFunc *func, void *arg,
BlockCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
#endif
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 31113b5..a70abb8 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -48,7 +48,7 @@
/* Access to this list is protected by lock. */
QTAILQ_ENTRY(ThreadPoolElement) reqs;
- /* Access to this list is protected by the global mutex. */
+ /* This list is only written by the thread pool's mother thread. */
QLIST_ENTRY(ThreadPoolElement) all;
};
@@ -175,7 +175,6 @@
ThreadPool *pool = opaque;
ThreadPoolElement *elem, *next;
- aio_context_acquire(pool->ctx);
restart:
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
if (elem->state != THREAD_DONE) {
@@ -195,9 +194,7 @@
*/
qemu_bh_schedule(pool->completion_bh);
- aio_context_release(pool->ctx);
elem->common.cb(elem->common.opaque, elem->ret);
- aio_context_acquire(pool->ctx);
/* We can safely cancel the completion_bh here regardless of someone
* else having scheduled it meanwhile because we reenter the
@@ -211,7 +208,6 @@
qemu_aio_unref(elem);
}
}
- aio_context_release(pool->ctx);
}
static void thread_pool_cancel(BlockAIOCB *acb)
@@ -251,6 +247,9 @@
{
ThreadPoolElement *req;
+ /* Assert that the thread submitting work is the same running the pool */
+ assert(pool->ctx == qemu_get_current_aio_context());
+
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
req->func = func;
req->arg = arg;