linux-aio: queue requests that cannot be submitted
Keep a queue of requests that were not submitted; pass them to
the kernel when a completion is reported, unless the queue is
plugged.
The array of iocbs is rebuilt every time from scratch. This
avoids keeping the iocbs array and list synchronized.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Message-id: 1418305950-30924-2-git-send-email-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..b6fbfd8 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -35,14 +35,13 @@
size_t nbytes;
QEMUIOVector *qiov;
bool is_read;
- QLIST_ENTRY(qemu_laiocb) node;
+ QSIMPLEQ_ENTRY(qemu_laiocb) next;
};
typedef struct {
- struct iocb *iocbs[MAX_QUEUED_IO];
int plugged;
- unsigned int size;
unsigned int idx;
+ QSIMPLEQ_HEAD(, qemu_laiocb) pending;
} LaioQueue;
struct qemu_laio_state {
@@ -59,6 +58,8 @@
int event_max;
};
+static int ioq_submit(struct qemu_laio_state *s);
+
static inline ssize_t io_event_ret(struct io_event *ev)
{
return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +136,10 @@
qemu_laio_process_completion(s, laiocb);
}
+
+ if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
+ ioq_submit(s);
+ }
}
static void qemu_laio_completion_cb(EventNotifier *e)
@@ -172,52 +177,40 @@
static void ioq_init(LaioQueue *io_q)
{
- io_q->size = MAX_QUEUED_IO;
- io_q->idx = 0;
+ QSIMPLEQ_INIT(&io_q->pending);
io_q->plugged = 0;
+ io_q->idx = 0;
}
static int ioq_submit(struct qemu_laio_state *s)
{
- int ret, i = 0;
- int len = s->io_q.idx;
+ int ret, i;
+ int len = 0;
+ struct qemu_laiocb *aiocb;
+ struct iocb *iocbs[MAX_QUEUED_IO];
- do {
- ret = io_submit(s->ctx, len, s->io_q.iocbs);
- } while (i++ < 3 && ret == -EAGAIN);
-
- /* empty io queue */
- s->io_q.idx = 0;
-
- if (ret < 0) {
- i = 0;
- } else {
- i = ret;
+ QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
+ iocbs[len++] = &aiocb->iocb;
+ if (len == MAX_QUEUED_IO) {
+ break;
+ }
}
- for (; i < len; i++) {
- struct qemu_laiocb *laiocb =
- container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+ ret = io_submit(s->ctx, len, iocbs);
+ if (ret == -EAGAIN) {
+ ret = 0;
+ }
+ if (ret < 0) {
+ abort();
+ }
- laiocb->ret = (ret < 0) ? ret : -EIO;
- qemu_laio_process_completion(s, laiocb);
+ for (i = 0; i < ret; i++) {
+ s->io_q.idx--;
+ QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
}
return ret;
}
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
-{
- unsigned int idx = s->io_q.idx;
-
- s->io_q.iocbs[idx++] = iocb;
- s->io_q.idx = idx;
-
- /* submit immediately if queue is full */
- if (idx == s->io_q.size) {
- ioq_submit(s);
- }
-}
-
void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
{
struct qemu_laio_state *s = aio_ctx;
@@ -236,7 +229,7 @@
return 0;
}
- if (s->io_q.idx > 0) {
+ if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ret = ioq_submit(s);
}
@@ -276,12 +269,10 @@
}
io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
- if (!s->io_q.plugged) {
- if (io_submit(s->ctx, 1, &iocbs) < 0) {
- goto out_free_aiocb;
- }
- } else {
- ioq_enqueue(s, iocbs);
+ QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
+ s->io_q.idx++;
+ if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
+ ioq_submit(s);
}
return &laiocb->common;