blockjob: introduce .drain callback for jobs
This is required to decouple block jobs from running in an
AioContext. With multiqueue block devices, a BlockDriverState
does not really belong to a single AioContext.
The solution is to first wait until all I/O operations are
complete; then loop in the main thread for the block job to
complete entirely.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Message-Id: <1477565348-5458-3-git-send-email-pbonzini@redhat.com>
Signed-off-by: Fam Zheng <famz@redhat.com>
diff --git a/block/backup.c b/block/backup.c
index 02dbe48..81d4042 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -300,6 +300,21 @@
cow_request_end(req);
}
+static void backup_drain(BlockJob *job)
+{
+ BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+
+ /* Need to keep a reference in case blk_drain triggers execution
+ * of backup_complete...
+ */
+ if (s->target) {
+ BlockBackend *target = s->target;
+ blk_ref(target);
+ blk_drain(target);
+ blk_unref(target);
+ }
+}
+
static const BlockJobDriver backup_job_driver = {
.instance_size = sizeof(BackupBlockJob),
.job_type = BLOCK_JOB_TYPE_BACKUP,
@@ -307,6 +322,7 @@
.commit = backup_commit,
.abort = backup_abort,
.attached_aio_context = backup_attached_aio_context,
+ .drain = backup_drain,
};
static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -331,6 +347,7 @@
BackupCompleteData *data = opaque;
blk_unref(s->target);
+ s->target = NULL;
block_job_completed(job, data->ret);
g_free(data);
diff --git a/block/mirror.c b/block/mirror.c
index a433e68..52f879a 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -469,7 +469,11 @@
}
}
-static void mirror_drain(MirrorBlockJob *s)
+/* This is also used for the .pause callback. There is no matching
+ * mirror_resume() because mirror_run() will begin iterating again
+ * when the job is resumed.
+ */
+static void mirror_wait_for_all_io(MirrorBlockJob *s)
{
while (s->in_flight > 0) {
mirror_wait_for_io(s);
@@ -528,6 +532,7 @@
g_free(s->replaces);
bdrv_op_unblock_all(target_bs, s->common.blocker);
blk_unref(s->target);
+ s->target = NULL;
block_job_completed(&s->common, data->ret);
g_free(data);
bdrv_drained_end(src);
@@ -582,7 +587,7 @@
sector_num += nb_sectors;
}
- mirror_drain(s);
+ mirror_wait_for_all_io(s);
}
/* First part, loop on the sectors and initialize the dirty bitmap. */
@@ -787,7 +792,7 @@
* the target is a copy of the source.
*/
assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
- mirror_drain(s);
+ mirror_wait_for_all_io(s);
}
assert(s->in_flight == 0);
@@ -872,14 +877,11 @@
block_job_enter(&s->common);
}
-/* There is no matching mirror_resume() because mirror_run() will begin
- * iterating again when the job is resumed.
- */
-static void coroutine_fn mirror_pause(BlockJob *job)
+static void mirror_pause(BlockJob *job)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
- mirror_drain(s);
+ mirror_wait_for_all_io(s);
}
static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
@@ -889,6 +891,21 @@
blk_set_aio_context(s->target, new_context);
}
+static void mirror_drain(BlockJob *job)
+{
+ MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+
+ /* Need to keep a reference in case blk_drain triggers execution
+ * of mirror_complete...
+ */
+ if (s->target) {
+ BlockBackend *target = s->target;
+ blk_ref(target);
+ blk_drain(target);
+ blk_unref(target);
+ }
+}
+
static const BlockJobDriver mirror_job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = BLOCK_JOB_TYPE_MIRROR,
@@ -896,6 +913,7 @@
.complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
+ .drain = mirror_drain,
};
static const BlockJobDriver commit_active_job_driver = {
@@ -905,6 +923,7 @@
.complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
+ .drain = mirror_drain,
};
static void mirror_start_job(const char *job_id, BlockDriverState *bs,
diff --git a/blockjob.c b/blockjob.c
index 43fecbe1..7c88b30 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -74,17 +74,6 @@
return NULL;
}
-/* Normally the job runs in its BlockBackend's AioContext. The exception is
- * block_job_defer_to_main_loop() where it runs in the QEMU main loop. Code
- * that supports both cases uses this helper function.
- */
-static AioContext *block_job_get_aio_context(BlockJob *job)
-{
- return job->deferred_to_main_loop ?
- qemu_get_aio_context() :
- blk_get_aio_context(job->blk);
-}
-
static void block_job_attached_aio_context(AioContext *new_context,
void *opaque)
{
@@ -97,6 +86,17 @@
block_job_resume(job);
}
+static void block_job_drain(BlockJob *job)
+{
+ /* If job is !job->busy this kicks it into the next pause point. */
+ block_job_enter(job);
+
+ blk_drain(job->blk);
+ if (job->driver->drain) {
+ job->driver->drain(job);
+ }
+}
+
static void block_job_detach_aio_context(void *opaque)
{
BlockJob *job = opaque;
@@ -106,12 +106,8 @@
block_job_pause(job);
- if (!job->paused) {
- /* If job is !job->busy this kicks it into the next pause point. */
- block_job_enter(job);
- }
while (!job->paused && !job->completed) {
- aio_poll(block_job_get_aio_context(job), true);
+ block_job_drain(job);
}
block_job_unref(job);
@@ -413,14 +409,21 @@
assert(blk_bs(job->blk)->job == job);
block_job_ref(job);
+
finish(job, &local_err);
if (local_err) {
error_propagate(errp, local_err);
block_job_unref(job);
return -EBUSY;
}
+ /* block_job_drain calls block_job_enter, and it should be enough to
+ * induce progress until the job completes or moves to the main thread.
+ */
+ while (!job->deferred_to_main_loop && !job->completed) {
+ block_job_drain(job);
+ }
while (!job->completed) {
- aio_poll(block_job_get_aio_context(job), true);
+ aio_poll(qemu_get_aio_context(), true);
}
ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
block_job_unref(job);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 4ddb4ae..2bb39f4 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -92,6 +92,13 @@
* besides job->blk to the new AioContext.
*/
void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
+
+ /*
+ * If the callback is not NULL, it will be invoked when the job has to be
+ * synchronously cancelled or completed; it should drain BlockDriverStates
+ * as required to ensure progress.
+ */
+ void (*drain)(BlockJob *job);
} BlockJobDriver;
/**