net/colo-compare.c: Fix deadlock in compare_chr_send
The chr_out chardev is connected to a filter-redirector
running in the main loop. qemu_chr_fe_write_all might block
here in compare_chr_send if the (socket-)buffer is full.
If another filter-redirector in the main loop want's to
send data to chr_pri_in it might also block if the buffer
is full. This leads to a deadlock because both event loops
get blocked.
Fix this by converting compare_chr_send to a coroutine and
putting the packets in a send queue.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Zhang Chen <chen.zhang@intel.com>
Tested-by: Zhang Chen <chen.zhang@intel.com>
Signed-off-by: Zhang Chen <chen.zhang@intel.com>
Signed-off-by: Jason Wang <jasowang@redhat.com>
diff --git a/net/colo-compare.c b/net/colo-compare.c
index e557da7..62ecd38 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -32,6 +32,9 @@
#include "migration/migration.h"
#include "util.h"
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,23 @@
* |packet | |packet + |packet | |packet +
* +--------+ +--------+ +--------+ +--------+
*/
+
+typedef struct SendCo {
+ Coroutine *co;
+ struct CompareState *s;
+ CharBackend *chr;
+ GQueue send_list;
+ bool notify_remote_frame;
+ bool done;
+ int ret;
+} SendCo;
+
+typedef struct SendEntry {
+ uint32_t size;
+ uint32_t vnet_hdr_len;
+ uint8_t *buf;
+} SendEntry;
+
typedef struct CompareState {
Object parent;
@@ -91,6 +111,8 @@
SocketReadState pri_rs;
SocketReadState sec_rs;
SocketReadState notify_rs;
+ SendCo out_sendco;
+ SendCo notify_sendco;
bool vnet_hdr;
uint32_t compare_timeout;
uint32_t expired_scan_cycle;
@@ -124,10 +146,11 @@
static int compare_chr_send(CompareState *s,
- const uint8_t *buf,
+ uint8_t *buf,
uint32_t size,
uint32_t vnet_hdr_len,
- bool notify_remote_frame);
+ bool notify_remote_frame,
+ bool zero_copy);
static bool packet_matches_str(const char *str,
const uint8_t *buf,
@@ -145,7 +168,7 @@
char msg[] = "DO_CHECKPOINT";
int ret = 0;
- ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
if (ret < 0) {
error_report("Notify Xen COLO-frame failed");
}
@@ -272,12 +295,13 @@
pkt->data,
pkt->size,
pkt->vnet_hdr_len,
- false);
+ false,
+ true);
if (ret < 0) {
error_report("colo send primary packet failed");
}
trace_colo_compare_main("packet same and release packet");
- packet_destroy(pkt, NULL);
+ packet_destroy_partial(pkt, NULL);
}
/*
@@ -699,65 +723,115 @@
}
}
+static void coroutine_fn _compare_chr_send(void *opaque)
+{
+ SendCo *sendco = opaque;
+ CompareState *s = sendco->s;
+ int ret = 0;
+
+ while (!g_queue_is_empty(&sendco->send_list)) {
+ SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+ uint32_t len = htonl(entry->size);
+
+ ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
+
+ if (ret != sizeof(len)) {
+ g_free(entry->buf);
+ g_slice_free(SendEntry, entry);
+ goto err;
+ }
+
+ if (!sendco->notify_remote_frame && s->vnet_hdr) {
+ /*
+ * We send vnet header len make other module(like filter-redirector)
+ * know how to parse net packet correctly.
+ */
+ len = htonl(entry->vnet_hdr_len);
+
+ ret = qemu_chr_fe_write_all(sendco->chr,
+ (uint8_t *)&len,
+ sizeof(len));
+
+ if (ret != sizeof(len)) {
+ g_free(entry->buf);
+ g_slice_free(SendEntry, entry);
+ goto err;
+ }
+ }
+
+ ret = qemu_chr_fe_write_all(sendco->chr,
+ (uint8_t *)entry->buf,
+ entry->size);
+
+ if (ret != entry->size) {
+ g_free(entry->buf);
+ g_slice_free(SendEntry, entry);
+ goto err;
+ }
+
+ g_free(entry->buf);
+ g_slice_free(SendEntry, entry);
+ }
+
+ sendco->ret = 0;
+ goto out;
+
+err:
+ while (!g_queue_is_empty(&sendco->send_list)) {
+ SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+ g_free(entry->buf);
+ g_slice_free(SendEntry, entry);
+ }
+ sendco->ret = ret < 0 ? ret : -EIO;
+out:
+ sendco->co = NULL;
+ sendco->done = true;
+ aio_wait_kick();
+}
+
static int compare_chr_send(CompareState *s,
- const uint8_t *buf,
+ uint8_t *buf,
uint32_t size,
uint32_t vnet_hdr_len,
- bool notify_remote_frame)
+ bool notify_remote_frame,
+ bool zero_copy)
{
- int ret = 0;
- uint32_t len = htonl(size);
+ SendCo *sendco;
+ SendEntry *entry;
+
+ if (notify_remote_frame) {
+ sendco = &s->notify_sendco;
+ } else {
+ sendco = &s->out_sendco;
+ }
if (!size) {
return 0;
}
- if (notify_remote_frame) {
- ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
- (uint8_t *)&len,
- sizeof(len));
+ entry = g_slice_new(SendEntry);
+ entry->size = size;
+ entry->vnet_hdr_len = vnet_hdr_len;
+ if (zero_copy) {
+ entry->buf = buf;
} else {
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
+ entry->buf = g_malloc(size);
+ memcpy(entry->buf, buf, size);
}
+ g_queue_push_head(&sendco->send_list, entry);
- if (ret != sizeof(len)) {
- goto err;
- }
-
- if (s->vnet_hdr) {
- /*
- * We send vnet header len make other module(like filter-redirector)
- * know how to parse net packet correctly.
- */
- len = htonl(vnet_hdr_len);
-
- if (!notify_remote_frame) {
- ret = qemu_chr_fe_write_all(&s->chr_out,
- (uint8_t *)&len,
- sizeof(len));
- }
-
- if (ret != sizeof(len)) {
- goto err;
+ if (sendco->done) {
+ sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
+ sendco->done = false;
+ qemu_coroutine_enter(sendco->co);
+ if (sendco->done) {
+ /* report early errors */
+ return sendco->ret;
}
}
- if (notify_remote_frame) {
- ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
- (uint8_t *)buf,
- size);
- } else {
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
- }
-
- if (ret != size) {
- goto err;
- }
-
+ /* assume success */
return 0;
-
-err:
- return ret < 0 ? ret : -EIO;
}
static int compare_chr_can_read(void *opaque)
@@ -1063,6 +1137,7 @@
pri_rs->buf,
pri_rs->packet_len,
pri_rs->vnet_hdr_len,
+ false,
false);
} else {
/* compare packet in the specified connection */
@@ -1093,7 +1168,7 @@
if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
notify_rs->buf,
notify_rs->packet_len)) {
- ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
if (ret < 0) {
error_report("Notify Xen COLO-frame INIT failed");
}
@@ -1199,6 +1274,20 @@
QTAILQ_INSERT_TAIL(&net_compares, s, next);
+ s->out_sendco.s = s;
+ s->out_sendco.chr = &s->chr_out;
+ s->out_sendco.notify_remote_frame = false;
+ s->out_sendco.done = true;
+ g_queue_init(&s->out_sendco.send_list);
+
+ if (s->notify_dev) {
+ s->notify_sendco.s = s;
+ s->notify_sendco.chr = &s->chr_notify_dev;
+ s->notify_sendco.notify_remote_frame = true;
+ s->notify_sendco.done = true;
+ g_queue_init(&s->notify_sendco.send_list);
+ }
+
g_queue_init(&s->conn_list);
qemu_mutex_init(&event_mtx);
@@ -1225,8 +1314,9 @@
pkt->data,
pkt->size,
pkt->vnet_hdr_len,
- false);
- packet_destroy(pkt, NULL);
+ false,
+ true);
+ packet_destroy_partial(pkt, NULL);
}
while (!g_queue_is_empty(&conn->secondary_list)) {
pkt = g_queue_pop_head(&conn->secondary_list);
@@ -1297,10 +1387,23 @@
}
}
+ AioContext *ctx = iothread_get_aio_context(s->iothread);
+ aio_context_acquire(ctx);
+ AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
+ if (s->notify_dev) {
+ AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
+ }
+ aio_context_release(ctx);
+
/* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+ AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
g_queue_clear(&s->conn_list);
+ g_queue_clear(&s->out_sendco.send_list);
+ if (s->notify_dev) {
+ g_queue_clear(&s->notify_sendco.send_list);
+ }
if (s->connection_track_table) {
g_hash_table_destroy(s->connection_track_table);
diff --git a/net/colo.c b/net/colo.c
index 8196b35..a6c66d8 100644
--- a/net/colo.c
+++ b/net/colo.c
@@ -185,6 +185,13 @@
g_slice_free(Packet, pkt);
}
+void packet_destroy_partial(void *opaque, void *user_data)
+{
+ Packet *pkt = opaque;
+
+ g_slice_free(Packet, pkt);
+}
+
/*
* Clear hashtable, stop this hash growing really huge
*/
diff --git a/net/colo.h b/net/colo.h
index 679314b..573ab91 100644
--- a/net/colo.h
+++ b/net/colo.h
@@ -102,5 +102,6 @@
void connection_hashtable_reset(GHashTable *connection_track_table);
Packet *packet_new(const void *data, int size, int vnet_hdr_len);
void packet_destroy(void *opaque, void *user_data);
+void packet_destroy_partial(void *opaque, void *user_data);
#endif /* NET_COLO_H */