rcu: add call_rcu
Asynchronous callbacks provided by call_rcu are particularly important
for QEMU, because the BQL makes it hard to use synchronize_rcu.
In addition, the current RCU implementation is not particularly friendly
to multiple concurrent synchronize_rcu callers, making call_rcu even
more important.
Reviewed-by: Fam Zheng <famz@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
diff --git a/util/rcu.c b/util/rcu.c
index 1f737d5..c9c3e6e 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -26,6 +26,7 @@
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
+#include "qemu-common.h"
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
@@ -33,6 +34,7 @@
#include <errno.h>
#include "qemu/rcu.h"
#include "qemu/atomic.h"
+#include "qemu/thread.h"
/*
* Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
@@ -149,6 +151,116 @@
qemu_mutex_unlock(&rcu_gp_lock);
}
+
+#define RCU_CALL_MIN_SIZE 30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu. Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+ struct rcu_head **old_tail;
+
+ node->next = NULL;
+ old_tail = atomic_xchg(&tail, &node->next);
+ atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+ struct rcu_head *node, *next;
+
+retry:
+ /* Test for an empty list, which we do not expect. Note that for
+ * the consumer head and tail are always consistent. The head
+ * is consistent because only the consumer reads/writes it.
+ * The tail, because it is the first step in the enqueuing.
+ * It is only the next pointers that might be inconsistent.
+ */
+ if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+ abort();
+ }
+
+ /* If the head node has NULL in its next pointer, the value is
+ * wrong and we need to wait until its enqueuer finishes the update.
+ */
+ node = head;
+ next = atomic_mb_read(&head->next);
+ if (!next) {
+ return NULL;
+ }
+
+ /* Since we are the sole consumer, and we excluded the empty case
+ * above, the queue will always have at least two nodes: the
+ * dummy node, and the one being removed. So we do not need to update
+ * the tail pointer.
+ */
+ head = next;
+
+ /* If we dequeued the dummy node, add it back at the end and retry. */
+ if (node == &dummy) {
+ enqueue(node);
+ goto retry;
+ }
+
+ return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+ struct rcu_head *node;
+
+ for (;;) {
+ int tries = 0;
+ int n = atomic_read(&rcu_call_count);
+
+ /* Heuristically wait for a decent number of callbacks to pile up.
+ * Fetch rcu_call_count now, we only must process elements that were
+ * added before synchronize_rcu() starts.
+ */
+ while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+ g_usleep(100000);
+ qemu_event_reset(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ if (n < RCU_CALL_MIN_SIZE) {
+ qemu_event_wait(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ }
+ }
+
+ atomic_sub(&rcu_call_count, n);
+ synchronize_rcu();
+ while (n > 0) {
+ node = try_dequeue();
+ while (!node) {
+ qemu_event_reset(&rcu_call_ready_event);
+ node = try_dequeue();
+ if (!node) {
+ qemu_event_wait(&rcu_call_ready_event);
+ node = try_dequeue();
+ }
+ }
+
+ n--;
+ node->func(node);
+ }
+ }
+ abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+ node->func = func;
+ enqueue(node);
+ atomic_inc(&rcu_call_count);
+ qemu_event_set(&rcu_call_ready_event);
+}
+
void rcu_register_thread(void)
{
assert(rcu_reader.ctr == 0);
@@ -166,7 +278,14 @@
static void __attribute__((__constructor__)) rcu_init(void)
{
+ QemuThread thread;
+
qemu_mutex_init(&rcu_gp_lock);
qemu_event_init(&rcu_gp_event, true);
+
+ qemu_event_init(&rcu_call_ready_event, false);
+ qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
+ NULL, QEMU_THREAD_DETACHED);
+
rcu_register_thread();
}