|  | /* | 
|  | * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) | 
|  | * (a.k.a. Fault Tolerance or Continuous Replication) | 
|  | * | 
|  | * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. | 
|  | * Copyright (c) 2016 FUJITSU LIMITED | 
|  | * Copyright (c) 2016 Intel Corporation | 
|  | * | 
|  | * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> | 
|  | * | 
|  | * This work is licensed under the terms of the GNU GPL, version 2 or | 
|  | * later.  See the COPYING file in the top-level directory. | 
|  | */ | 
|  |  | 
|  | #include "qemu/osdep.h" | 
|  | #include "qemu/error-report.h" | 
|  | #include "trace.h" | 
|  | #include "qapi/error.h" | 
|  | #include "net/net.h" | 
|  | #include "net/eth.h" | 
|  | #include "qom/object_interfaces.h" | 
|  | #include "qemu/iov.h" | 
|  | #include "qom/object.h" | 
|  | #include "net/queue.h" | 
|  | #include "chardev/char-fe.h" | 
|  | #include "qemu/sockets.h" | 
|  | #include "colo.h" | 
|  | #include "system/iothread.h" | 
|  | #include "net/colo-compare.h" | 
|  | #include "migration/colo.h" | 
|  | #include "util.h" | 
|  |  | 
|  | #include "block/aio-wait.h" | 
|  | #include "qemu/coroutine.h" | 
|  |  | 
|  | #define TYPE_COLO_COMPARE "colo-compare" | 
|  | typedef struct CompareState CompareState; | 
|  | DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE, | 
|  | TYPE_COLO_COMPARE) | 
|  |  | 
|  | static QTAILQ_HEAD(, CompareState) net_compares = | 
|  | QTAILQ_HEAD_INITIALIZER(net_compares); | 
|  |  | 
|  | static NotifierList colo_compare_notifiers = | 
|  | NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); | 
|  |  | 
|  | #define COMPARE_READ_LEN_MAX NET_BUFSIZE | 
|  | #define MAX_QUEUE_SIZE 1024 | 
|  |  | 
|  | #define COLO_COMPARE_FREE_PRIMARY     0x01 | 
|  | #define COLO_COMPARE_FREE_SECONDARY   0x02 | 
|  |  | 
|  | #define REGULAR_PACKET_CHECK_MS 1000 | 
|  | #define DEFAULT_TIME_OUT_MS 3000 | 
|  |  | 
|  | /* #define DEBUG_COLO_PACKETS */ | 
|  |  | 
|  | static QemuMutex colo_compare_mutex; | 
|  | static bool colo_compare_active; | 
|  | static QemuMutex event_mtx; | 
|  | static QemuCond event_complete_cond; | 
|  | static int event_unhandled_count; | 
|  | static uint32_t max_queue_size; | 
|  |  | 
|  | /* | 
|  | *  + CompareState ++ | 
|  | *  |               | | 
|  | *  +---------------+   +---------------+         +---------------+ | 
|  | *  |   conn list   + - >      conn     + ------- >      conn     + -- > ...... | 
|  | *  +---------------+   +---------------+         +---------------+ | 
|  | *  |               |     |           |             |          | | 
|  | *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+ | 
|  | *                    |primary |  |secondary    |primary | |secondary | 
|  | *                    |packet  |  |packet  +    |packet  | |packet  + | 
|  | *                    +--------+  +--------+    +--------+ +--------+ | 
|  | *                        |           |             |          | | 
|  | *                    +---v----+  +---v----+    +---v----+ +---v----+ | 
|  | *                    |primary |  |secondary    |primary | |secondary | 
|  | *                    |packet  |  |packet  +    |packet  | |packet  + | 
|  | *                    +--------+  +--------+    +--------+ +--------+ | 
|  | *                        |           |             |          | | 
|  | *                    +---v----+  +---v----+    +---v----+ +---v----+ | 
|  | *                    |primary |  |secondary    |primary | |secondary | 
|  | *                    |packet  |  |packet  +    |packet  | |packet  + | 
|  | *                    +--------+  +--------+    +--------+ +--------+ | 
|  | */ | 
|  |  | 
|  | typedef struct SendCo { | 
|  | Coroutine *co; | 
|  | struct CompareState *s; | 
|  | CharBackend *chr; | 
|  | GQueue send_list; | 
|  | bool notify_remote_frame; | 
|  | bool done; | 
|  | int ret; | 
|  | } SendCo; | 
|  |  | 
|  | typedef struct SendEntry { | 
|  | uint32_t size; | 
|  | uint32_t vnet_hdr_len; | 
|  | uint8_t *buf; | 
|  | } SendEntry; | 
|  |  | 
|  | struct CompareState { | 
|  | Object parent; | 
|  |  | 
|  | char *pri_indev; | 
|  | char *sec_indev; | 
|  | char *outdev; | 
|  | char *notify_dev; | 
|  | CharBackend chr_pri_in; | 
|  | CharBackend chr_sec_in; | 
|  | CharBackend chr_out; | 
|  | CharBackend chr_notify_dev; | 
|  | SocketReadState pri_rs; | 
|  | SocketReadState sec_rs; | 
|  | SocketReadState notify_rs; | 
|  | SendCo out_sendco; | 
|  | SendCo notify_sendco; | 
|  | bool vnet_hdr; | 
|  | uint64_t compare_timeout; | 
|  | uint32_t expired_scan_cycle; | 
|  |  | 
|  | /* | 
|  | * Record the connection that through the NIC | 
|  | * Element type: Connection | 
|  | */ | 
|  | GQueue conn_list; | 
|  | /* Record the connection without repetition */ | 
|  | GHashTable *connection_track_table; | 
|  |  | 
|  | IOThread *iothread; | 
|  | GMainContext *worker_context; | 
|  | QEMUTimer *packet_check_timer; | 
|  |  | 
|  | QEMUBH *event_bh; | 
|  | enum colo_event event; | 
|  |  | 
|  | QTAILQ_ENTRY(CompareState) next; | 
|  | }; | 
|  |  | 
|  | typedef struct CompareClass { | 
|  | ObjectClass parent_class; | 
|  | } CompareClass; | 
|  |  | 
|  | enum { | 
|  | PRIMARY_IN = 0, | 
|  | SECONDARY_IN, | 
|  | }; | 
|  |  | 
|  | static const char *colo_mode[] = { | 
|  | [PRIMARY_IN] = "primary", | 
|  | [SECONDARY_IN] = "secondary", | 
|  | }; | 
|  |  | 
|  | static int compare_chr_send(CompareState *s, | 
|  | uint8_t *buf, | 
|  | uint32_t size, | 
|  | uint32_t vnet_hdr_len, | 
|  | bool notify_remote_frame, | 
|  | bool zero_copy); | 
|  |  | 
|  | static bool packet_matches_str(const char *str, | 
|  | const uint8_t *buf, | 
|  | uint32_t packet_len) | 
|  | { | 
|  | if (packet_len != strlen(str)) { | 
|  | return false; | 
|  | } | 
|  |  | 
|  | return !memcmp(str, buf, packet_len); | 
|  | } | 
|  |  | 
|  | static void notify_remote_frame(CompareState *s) | 
|  | { | 
|  | char msg[] = "DO_CHECKPOINT"; | 
|  | int ret = 0; | 
|  |  | 
|  | ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false); | 
|  | if (ret < 0) { | 
|  | error_report("Notify Xen COLO-frame failed"); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void colo_compare_inconsistency_notify(CompareState *s) | 
|  | { | 
|  | if (s->notify_dev) { | 
|  | notify_remote_frame(s); | 
|  | } else { | 
|  | notifier_list_notify(&colo_compare_notifiers, | 
|  | NULL); | 
|  | } | 
|  | } | 
|  |  | 
|  | /* Use restricted to colo_insert_packet() */ | 
|  | static gint seq_sorter(Packet *a, Packet *b, gpointer data) | 
|  | { | 
|  | return b->tcp_seq - a->tcp_seq; | 
|  | } | 
|  |  | 
|  | static void fill_pkt_tcp_info(void *data, uint32_t *max_ack) | 
|  | { | 
|  | Packet *pkt = data; | 
|  | struct tcp_hdr *tcphd; | 
|  |  | 
|  | tcphd = (struct tcp_hdr *)pkt->transport_header; | 
|  |  | 
|  | pkt->tcp_seq = ntohl(tcphd->th_seq); | 
|  | pkt->tcp_ack = ntohl(tcphd->th_ack); | 
|  | /* Need to consider ACK will bigger than uint32_t MAX */ | 
|  | *max_ack = pkt->tcp_ack - *max_ack > 0 ? pkt->tcp_ack : *max_ack; | 
|  | pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data | 
|  | + (tcphd->th_off << 2); | 
|  | pkt->payload_size = pkt->size - pkt->header_size; | 
|  | pkt->seq_end = pkt->tcp_seq + pkt->payload_size; | 
|  | pkt->flags = tcphd->th_flags; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Return 1 on success, if return 0 means the | 
|  | * packet will be dropped | 
|  | */ | 
|  | static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack) | 
|  | { | 
|  | if (g_queue_get_length(queue) <= max_queue_size) { | 
|  | if (pkt->ip->ip_p == IPPROTO_TCP) { | 
|  | fill_pkt_tcp_info(pkt, max_ack); | 
|  | g_queue_insert_sorted(queue, | 
|  | pkt, | 
|  | (GCompareDataFunc)seq_sorter, | 
|  | NULL); | 
|  | } else { | 
|  | g_queue_push_tail(queue, pkt); | 
|  | } | 
|  | return 1; | 
|  | } | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Return 0 on success, if return -1 means the pkt | 
|  | * is unsupported(arp and ipv6) and will be sent later | 
|  | */ | 
|  | static int packet_enqueue(CompareState *s, int mode, Connection **con) | 
|  | { | 
|  | ConnectionKey key; | 
|  | Packet *pkt = NULL; | 
|  | Connection *conn; | 
|  | int ret; | 
|  |  | 
|  | if (mode == PRIMARY_IN) { | 
|  | pkt = packet_new(s->pri_rs.buf, | 
|  | s->pri_rs.packet_len, | 
|  | s->pri_rs.vnet_hdr_len); | 
|  | } else { | 
|  | pkt = packet_new(s->sec_rs.buf, | 
|  | s->sec_rs.packet_len, | 
|  | s->sec_rs.vnet_hdr_len); | 
|  | } | 
|  |  | 
|  | if (parse_packet_early(pkt)) { | 
|  | packet_destroy(pkt, NULL); | 
|  | pkt = NULL; | 
|  | return -1; | 
|  | } | 
|  | fill_connection_key(pkt, &key, false); | 
|  |  | 
|  | conn = connection_get(s->connection_track_table, | 
|  | &key, | 
|  | &s->conn_list); | 
|  |  | 
|  | if (!conn->processing) { | 
|  | g_queue_push_tail(&s->conn_list, conn); | 
|  | conn->processing = true; | 
|  | } | 
|  |  | 
|  | if (mode == PRIMARY_IN) { | 
|  | ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack); | 
|  | } else { | 
|  | ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack); | 
|  | } | 
|  |  | 
|  | if (!ret) { | 
|  | trace_colo_compare_drop_packet(colo_mode[mode], | 
|  | "queue size too big, drop packet"); | 
|  | packet_destroy(pkt, NULL); | 
|  | pkt = NULL; | 
|  | } | 
|  |  | 
|  | *con = conn; | 
|  |  | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | static inline bool after(uint32_t seq1, uint32_t seq2) | 
|  | { | 
|  | return (int32_t)(seq1 - seq2) > 0; | 
|  | } | 
|  |  | 
|  | static void colo_release_primary_pkt(CompareState *s, Packet *pkt) | 
|  | { | 
|  | int ret; | 
|  | ret = compare_chr_send(s, | 
|  | pkt->data, | 
|  | pkt->size, | 
|  | pkt->vnet_hdr_len, | 
|  | false, | 
|  | true); | 
|  | if (ret < 0) { | 
|  | error_report("colo send primary packet failed"); | 
|  | } | 
|  | trace_colo_compare_main("packet same and release packet"); | 
|  | packet_destroy_partial(pkt, NULL); | 
|  | } | 
|  |  | 
|  | /* | 
|  | * The IP packets sent by primary and secondary | 
|  | * will be compared in here | 
|  | * TODO support ip fragment, Out-Of-Order | 
|  | * return:    0  means packet same | 
|  | *            > 0 || < 0 means packet different | 
|  | */ | 
|  | static int colo_compare_packet_payload(Packet *ppkt, | 
|  | Packet *spkt, | 
|  | uint16_t poffset, | 
|  | uint16_t soffset, | 
|  | uint16_t len) | 
|  |  | 
|  | { | 
|  | if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) { | 
|  | char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; | 
|  |  | 
|  | strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); | 
|  | strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); | 
|  | strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); | 
|  | strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); | 
|  |  | 
|  | trace_colo_compare_ip_info(ppkt->size, pri_ip_src, | 
|  | pri_ip_dst, spkt->size, | 
|  | sec_ip_src, sec_ip_dst); | 
|  | } | 
|  |  | 
|  | return memcmp(ppkt->data + poffset, spkt->data + soffset, len); | 
|  | } | 
|  |  | 
|  | /* | 
|  | * return true means that the payload is consist and | 
|  | * need to make the next comparison, false means do | 
|  | * the checkpoint | 
|  | */ | 
|  | static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, | 
|  | int8_t *mark, uint32_t max_ack) | 
|  | { | 
|  | *mark = 0; | 
|  |  | 
|  | if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { | 
|  | if (!colo_compare_packet_payload(ppkt, spkt, | 
|  | ppkt->header_size, spkt->header_size, | 
|  | ppkt->payload_size)) { | 
|  | *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; | 
|  | return true; | 
|  | } | 
|  | } | 
|  |  | 
|  | /* one part of secondary packet payload still need to be compared */ | 
|  | if (!after(ppkt->seq_end, spkt->seq_end)) { | 
|  | if (!colo_compare_packet_payload(ppkt, spkt, | 
|  | ppkt->header_size + ppkt->offset, | 
|  | spkt->header_size + spkt->offset, | 
|  | ppkt->payload_size - ppkt->offset)) { | 
|  | if (!after(ppkt->tcp_ack, max_ack)) { | 
|  | *mark = COLO_COMPARE_FREE_PRIMARY; | 
|  | spkt->offset += ppkt->payload_size - ppkt->offset; | 
|  | return true; | 
|  | } else { | 
|  | /* secondary guest hasn't ack the data, don't send | 
|  | * out this packet | 
|  | */ | 
|  | return false; | 
|  | } | 
|  | } | 
|  | } else { | 
|  | /* primary packet is longer than secondary packet, compare | 
|  | * the same part and mark the primary packet offset | 
|  | */ | 
|  | if (!colo_compare_packet_payload(ppkt, spkt, | 
|  | ppkt->header_size + ppkt->offset, | 
|  | spkt->header_size + spkt->offset, | 
|  | spkt->payload_size - spkt->offset)) { | 
|  | *mark = COLO_COMPARE_FREE_SECONDARY; | 
|  | ppkt->offset += spkt->payload_size - spkt->offset; | 
|  | return true; | 
|  | } | 
|  | } | 
|  |  | 
|  | return false; | 
|  | } | 
|  |  | 
|  | static void colo_compare_tcp(CompareState *s, Connection *conn) | 
|  | { | 
|  | Packet *ppkt = NULL, *spkt = NULL; | 
|  | int8_t mark; | 
|  |  | 
|  | /* | 
|  | * If ppkt and spkt have the same payload, but ppkt's ACK | 
|  | * is greater than spkt's ACK, in this case we can not | 
|  | * send the ppkt because it will cause the secondary guest | 
|  | * to miss sending some data in the next. Therefore, we | 
|  | * record the maximum ACK in the current queue at both | 
|  | * primary side and secondary side. Only when the ack is | 
|  | * less than the smaller of the two maximum ack, then we | 
|  | * can ensure that the packet's payload is acknowledged by | 
|  | * primary and secondary. | 
|  | */ | 
|  | uint32_t min_ack = MIN(conn->pack, conn->sack); | 
|  |  | 
|  | pri: | 
|  | if (g_queue_is_empty(&conn->primary_list)) { | 
|  | return; | 
|  | } | 
|  | ppkt = g_queue_pop_tail(&conn->primary_list); | 
|  | sec: | 
|  | if (g_queue_is_empty(&conn->secondary_list)) { | 
|  | g_queue_push_tail(&conn->primary_list, ppkt); | 
|  | return; | 
|  | } | 
|  | spkt = g_queue_pop_tail(&conn->secondary_list); | 
|  |  | 
|  | if (ppkt->tcp_seq == ppkt->seq_end) { | 
|  | colo_release_primary_pkt(s, ppkt); | 
|  | ppkt = NULL; | 
|  | } | 
|  |  | 
|  | if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) { | 
|  | trace_colo_compare_main("pri: this packet has compared"); | 
|  | colo_release_primary_pkt(s, ppkt); | 
|  | ppkt = NULL; | 
|  | } | 
|  |  | 
|  | if (spkt->tcp_seq == spkt->seq_end) { | 
|  | packet_destroy(spkt, NULL); | 
|  | if (!ppkt) { | 
|  | goto pri; | 
|  | } else { | 
|  | goto sec; | 
|  | } | 
|  | } else { | 
|  | if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) { | 
|  | trace_colo_compare_main("sec: this packet has compared"); | 
|  | packet_destroy(spkt, NULL); | 
|  | if (!ppkt) { | 
|  | goto pri; | 
|  | } else { | 
|  | goto sec; | 
|  | } | 
|  | } | 
|  | if (!ppkt) { | 
|  | g_queue_push_tail(&conn->secondary_list, spkt); | 
|  | goto pri; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) { | 
|  | trace_colo_compare_tcp_info("pri", | 
|  | ppkt->tcp_seq, ppkt->tcp_ack, | 
|  | ppkt->header_size, ppkt->payload_size, | 
|  | ppkt->offset, ppkt->flags); | 
|  |  | 
|  | trace_colo_compare_tcp_info("sec", | 
|  | spkt->tcp_seq, spkt->tcp_ack, | 
|  | spkt->header_size, spkt->payload_size, | 
|  | spkt->offset, spkt->flags); | 
|  |  | 
|  | if (mark == COLO_COMPARE_FREE_PRIMARY) { | 
|  | conn->compare_seq = ppkt->seq_end; | 
|  | colo_release_primary_pkt(s, ppkt); | 
|  | g_queue_push_tail(&conn->secondary_list, spkt); | 
|  | goto pri; | 
|  | } else if (mark == COLO_COMPARE_FREE_SECONDARY) { | 
|  | conn->compare_seq = spkt->seq_end; | 
|  | packet_destroy(spkt, NULL); | 
|  | goto sec; | 
|  | } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) { | 
|  | conn->compare_seq = ppkt->seq_end; | 
|  | colo_release_primary_pkt(s, ppkt); | 
|  | packet_destroy(spkt, NULL); | 
|  | goto pri; | 
|  | } | 
|  | } else { | 
|  | g_queue_push_tail(&conn->primary_list, ppkt); | 
|  | g_queue_push_tail(&conn->secondary_list, spkt); | 
|  |  | 
|  | #ifdef DEBUG_COLO_PACKETS | 
|  | qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size); | 
|  | qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size); | 
|  | #endif | 
|  |  | 
|  | colo_compare_inconsistency_notify(s); | 
|  | } | 
|  | } | 
|  |  | 
|  |  | 
|  | /* | 
|  | * Called from the compare thread on the primary | 
|  | * for compare udp packet | 
|  | */ | 
|  | static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) | 
|  | { | 
|  | uint16_t network_header_length = ppkt->ip->ip_hl << 2; | 
|  | uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; | 
|  |  | 
|  | trace_colo_compare_main("compare udp"); | 
|  |  | 
|  | /* | 
|  | * Because of ppkt and spkt are both in the same connection, | 
|  | * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are | 
|  | * same with spkt. In addition, IP header's Identification is a random | 
|  | * field, we can handle it in IP fragmentation function later. | 
|  | * COLO just concern the response net packet payload from primary guest | 
|  | * and secondary guest are same or not, So we ignored all IP header include | 
|  | * other field like TOS,TTL,IP Checksum. we only need to compare | 
|  | * the ip payload here. | 
|  | */ | 
|  | if (ppkt->size != spkt->size) { | 
|  | trace_colo_compare_main("UDP: payload size of packets are different"); | 
|  | return -1; | 
|  | } | 
|  | if (colo_compare_packet_payload(ppkt, spkt, offset, offset, | 
|  | ppkt->size - offset)) { | 
|  | trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); | 
|  | trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); | 
|  | #ifdef DEBUG_COLO_PACKETS | 
|  | qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size); | 
|  | qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size); | 
|  | #endif | 
|  | return -1; | 
|  | } else { | 
|  | return 0; | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the compare thread on the primary | 
|  | * for compare icmp packet | 
|  | */ | 
|  | static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) | 
|  | { | 
|  | uint16_t network_header_length = ppkt->ip->ip_hl << 2; | 
|  | uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; | 
|  |  | 
|  | trace_colo_compare_main("compare icmp"); | 
|  |  | 
|  | /* | 
|  | * Because of ppkt and spkt are both in the same connection, | 
|  | * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are | 
|  | * same with spkt. In addition, IP header's Identification is a random | 
|  | * field, we can handle it in IP fragmentation function later. | 
|  | * COLO just concern the response net packet payload from primary guest | 
|  | * and secondary guest are same or not, So we ignored all IP header include | 
|  | * other field like TOS,TTL,IP Checksum. we only need to compare | 
|  | * the ip payload here. | 
|  | */ | 
|  | if (ppkt->size != spkt->size) { | 
|  | trace_colo_compare_main("ICMP: payload size of packets are different"); | 
|  | return -1; | 
|  | } | 
|  | if (colo_compare_packet_payload(ppkt, spkt, offset, offset, | 
|  | ppkt->size - offset)) { | 
|  | trace_colo_compare_icmp_miscompare("primary pkt size", | 
|  | ppkt->size); | 
|  | trace_colo_compare_icmp_miscompare("Secondary pkt size", | 
|  | spkt->size); | 
|  | #ifdef DEBUG_COLO_PACKETS | 
|  | qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size); | 
|  | qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size); | 
|  | #endif | 
|  | return -1; | 
|  | } else { | 
|  | return 0; | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the compare thread on the primary | 
|  | * for compare other packet | 
|  | */ | 
|  | static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) | 
|  | { | 
|  | uint16_t offset = ppkt->vnet_hdr_len; | 
|  |  | 
|  | trace_colo_compare_main("compare other"); | 
|  | if (ppkt->size != spkt->size) { | 
|  | trace_colo_compare_main("Other: payload size of packets are different"); | 
|  | return -1; | 
|  | } | 
|  | return colo_compare_packet_payload(ppkt, spkt, offset, offset, | 
|  | ppkt->size - offset); | 
|  | } | 
|  |  | 
|  | static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) | 
|  | { | 
|  | int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); | 
|  |  | 
|  | if ((now - pkt->creation_ms) > (*check_time)) { | 
|  | trace_colo_old_packet_check_found(pkt->creation_ms); | 
|  | return 0; | 
|  | } else { | 
|  | return 1; | 
|  | } | 
|  | } | 
|  |  | 
|  | void colo_compare_register_notifier(Notifier *notify) | 
|  | { | 
|  | notifier_list_add(&colo_compare_notifiers, notify); | 
|  | } | 
|  |  | 
|  | void colo_compare_unregister_notifier(Notifier *notify) | 
|  | { | 
|  | notifier_remove(notify); | 
|  | } | 
|  |  | 
|  | static int colo_old_packet_check_one_conn(Connection *conn, | 
|  | CompareState *s) | 
|  | { | 
|  | if (!g_queue_is_empty(&conn->primary_list)) { | 
|  | if (g_queue_find_custom(&conn->primary_list, | 
|  | &s->compare_timeout, | 
|  | (GCompareFunc)colo_old_packet_check_one)) | 
|  | goto out; | 
|  | } | 
|  |  | 
|  | if (!g_queue_is_empty(&conn->secondary_list)) { | 
|  | if (g_queue_find_custom(&conn->secondary_list, | 
|  | &s->compare_timeout, | 
|  | (GCompareFunc)colo_old_packet_check_one)) | 
|  | goto out; | 
|  | } | 
|  |  | 
|  | return 1; | 
|  |  | 
|  | out: | 
|  | /* Do checkpoint will flush old packet */ | 
|  | colo_compare_inconsistency_notify(s); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Look for old packets that the secondary hasn't matched, | 
|  | * if we have some then we have to checkpoint to wake | 
|  | * the secondary up. | 
|  | */ | 
|  | static void colo_old_packet_check(void *opaque) | 
|  | { | 
|  | CompareState *s = opaque; | 
|  |  | 
|  | /* | 
|  | * If we find one old packet, stop finding job and notify | 
|  | * COLO frame do checkpoint. | 
|  | */ | 
|  | g_queue_find_custom(&s->conn_list, s, | 
|  | (GCompareFunc)colo_old_packet_check_one_conn); | 
|  | } | 
|  |  | 
|  | static void colo_compare_packet(CompareState *s, Connection *conn, | 
|  | int (*HandlePacket)(Packet *spkt, | 
|  | Packet *ppkt)) | 
|  | { | 
|  | Packet *pkt = NULL; | 
|  | GList *result = NULL; | 
|  |  | 
|  | while (!g_queue_is_empty(&conn->primary_list) && | 
|  | !g_queue_is_empty(&conn->secondary_list)) { | 
|  | pkt = g_queue_pop_tail(&conn->primary_list); | 
|  | result = g_queue_find_custom(&conn->secondary_list, | 
|  | pkt, (GCompareFunc)HandlePacket); | 
|  |  | 
|  | if (result) { | 
|  | colo_release_primary_pkt(s, pkt); | 
|  | packet_destroy(result->data, NULL); | 
|  | g_queue_delete_link(&conn->secondary_list, result); | 
|  | } else { | 
|  | /* | 
|  | * If one packet arrive late, the secondary_list or | 
|  | * primary_list will be empty, so we can't compare it | 
|  | * until next comparison. If the packets in the list are | 
|  | * timeout, it will trigger a checkpoint request. | 
|  | */ | 
|  | trace_colo_compare_main("packet different"); | 
|  | g_queue_push_tail(&conn->primary_list, pkt); | 
|  |  | 
|  | colo_compare_inconsistency_notify(s); | 
|  | break; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the compare thread on the primary | 
|  | * for compare packet with secondary list of the | 
|  | * specified connection when a new packet was | 
|  | * queued to it. | 
|  | */ | 
|  | static void colo_compare_connection(void *opaque, void *user_data) | 
|  | { | 
|  | CompareState *s = user_data; | 
|  | Connection *conn = opaque; | 
|  |  | 
|  | switch (conn->ip_proto) { | 
|  | case IPPROTO_TCP: | 
|  | colo_compare_tcp(s, conn); | 
|  | break; | 
|  | case IPPROTO_UDP: | 
|  | colo_compare_packet(s, conn, colo_packet_compare_udp); | 
|  | break; | 
|  | case IPPROTO_ICMP: | 
|  | colo_compare_packet(s, conn, colo_packet_compare_icmp); | 
|  | break; | 
|  | default: | 
|  | colo_compare_packet(s, conn, colo_packet_compare_other); | 
|  | break; | 
|  | } | 
|  | } | 
|  |  | 
|  | static void coroutine_fn _compare_chr_send(void *opaque) | 
|  | { | 
|  | SendCo *sendco = opaque; | 
|  | CompareState *s = sendco->s; | 
|  | int ret = 0; | 
|  |  | 
|  | while (!g_queue_is_empty(&sendco->send_list)) { | 
|  | SendEntry *entry = g_queue_pop_tail(&sendco->send_list); | 
|  | uint32_t len = htonl(entry->size); | 
|  |  | 
|  | ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len)); | 
|  |  | 
|  | if (ret != sizeof(len)) { | 
|  | g_free(entry->buf); | 
|  | g_slice_free(SendEntry, entry); | 
|  | goto err; | 
|  | } | 
|  |  | 
|  | if (!sendco->notify_remote_frame && s->vnet_hdr) { | 
|  | /* | 
|  | * We send vnet header len make other module(like filter-redirector) | 
|  | * know how to parse net packet correctly. | 
|  | */ | 
|  | len = htonl(entry->vnet_hdr_len); | 
|  |  | 
|  | ret = qemu_chr_fe_write_all(sendco->chr, | 
|  | (uint8_t *)&len, | 
|  | sizeof(len)); | 
|  |  | 
|  | if (ret != sizeof(len)) { | 
|  | g_free(entry->buf); | 
|  | g_slice_free(SendEntry, entry); | 
|  | goto err; | 
|  | } | 
|  | } | 
|  |  | 
|  | ret = qemu_chr_fe_write_all(sendco->chr, | 
|  | (uint8_t *)entry->buf, | 
|  | entry->size); | 
|  |  | 
|  | if (ret != entry->size) { | 
|  | g_free(entry->buf); | 
|  | g_slice_free(SendEntry, entry); | 
|  | goto err; | 
|  | } | 
|  |  | 
|  | g_free(entry->buf); | 
|  | g_slice_free(SendEntry, entry); | 
|  | } | 
|  |  | 
|  | sendco->ret = 0; | 
|  | goto out; | 
|  |  | 
|  | err: | 
|  | while (!g_queue_is_empty(&sendco->send_list)) { | 
|  | SendEntry *entry = g_queue_pop_tail(&sendco->send_list); | 
|  | g_free(entry->buf); | 
|  | g_slice_free(SendEntry, entry); | 
|  | } | 
|  | sendco->ret = ret < 0 ? ret : -EIO; | 
|  | out: | 
|  | sendco->co = NULL; | 
|  | sendco->done = true; | 
|  | aio_wait_kick(); | 
|  | } | 
|  |  | 
|  | static int compare_chr_send(CompareState *s, | 
|  | uint8_t *buf, | 
|  | uint32_t size, | 
|  | uint32_t vnet_hdr_len, | 
|  | bool notify_remote_frame, | 
|  | bool zero_copy) | 
|  | { | 
|  | SendCo *sendco; | 
|  | SendEntry *entry; | 
|  |  | 
|  | if (notify_remote_frame) { | 
|  | sendco = &s->notify_sendco; | 
|  | } else { | 
|  | sendco = &s->out_sendco; | 
|  | } | 
|  |  | 
|  | if (!size) { | 
|  | return -1; | 
|  | } | 
|  |  | 
|  | entry = g_slice_new(SendEntry); | 
|  | entry->size = size; | 
|  | entry->vnet_hdr_len = vnet_hdr_len; | 
|  | if (zero_copy) { | 
|  | entry->buf = buf; | 
|  | } else { | 
|  | entry->buf = g_malloc(size); | 
|  | memcpy(entry->buf, buf, size); | 
|  | } | 
|  | g_queue_push_tail(&sendco->send_list, entry); | 
|  |  | 
|  | if (sendco->done) { | 
|  | sendco->co = qemu_coroutine_create(_compare_chr_send, sendco); | 
|  | sendco->done = false; | 
|  | qemu_coroutine_enter(sendco->co); | 
|  | if (sendco->done) { | 
|  | /* report early errors */ | 
|  | return sendco->ret; | 
|  | } | 
|  | } | 
|  |  | 
|  | /* assume success */ | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | static int compare_chr_can_read(void *opaque) | 
|  | { | 
|  | return COMPARE_READ_LEN_MAX; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the main thread on the primary for packets | 
|  | * arriving over the socket from the primary. | 
|  | */ | 
|  | static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(opaque); | 
|  | int ret; | 
|  |  | 
|  | ret = net_fill_rstate(&s->pri_rs, buf, size); | 
|  | if (ret == -1) { | 
|  | qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, | 
|  | NULL, NULL, true); | 
|  | error_report("colo-compare primary_in error"); | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the main thread on the primary for packets | 
|  | * arriving over the socket from the secondary. | 
|  | */ | 
|  | static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(opaque); | 
|  | int ret; | 
|  |  | 
|  | ret = net_fill_rstate(&s->sec_rs, buf, size); | 
|  | if (ret == -1) { | 
|  | qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, | 
|  | NULL, NULL, true); | 
|  | error_report("colo-compare secondary_in error"); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void compare_notify_chr(void *opaque, const uint8_t *buf, int size) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(opaque); | 
|  | int ret; | 
|  |  | 
|  | ret = net_fill_rstate(&s->notify_rs, buf, size); | 
|  | if (ret == -1) { | 
|  | qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL, | 
|  | NULL, NULL, true); | 
|  | error_report("colo-compare notify_dev error"); | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Check old packet regularly so it can watch for any packets | 
|  | * that the secondary hasn't produced equivalents of. | 
|  | */ | 
|  | static void check_old_packet_regular(void *opaque) | 
|  | { | 
|  | CompareState *s = opaque; | 
|  |  | 
|  | /* if have old packet we will notify checkpoint */ | 
|  | colo_old_packet_check(s); | 
|  | timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) + | 
|  | s->expired_scan_cycle); | 
|  | } | 
|  |  | 
|  | /* Public API, Used for COLO frame to notify compare event */ | 
|  | void colo_notify_compares_event(void *opaque, int event, Error **errp) | 
|  | { | 
|  | CompareState *s; | 
|  | qemu_mutex_lock(&colo_compare_mutex); | 
|  |  | 
|  | if (!colo_compare_active) { | 
|  | qemu_mutex_unlock(&colo_compare_mutex); | 
|  | return; | 
|  | } | 
|  |  | 
|  | qemu_mutex_lock(&event_mtx); | 
|  | QTAILQ_FOREACH(s, &net_compares, next) { | 
|  | s->event = event; | 
|  | qemu_bh_schedule(s->event_bh); | 
|  | event_unhandled_count++; | 
|  | } | 
|  | /* Wait all compare threads to finish handling this event */ | 
|  | while (event_unhandled_count > 0) { | 
|  | qemu_cond_wait(&event_complete_cond, &event_mtx); | 
|  | } | 
|  |  | 
|  | qemu_mutex_unlock(&event_mtx); | 
|  | qemu_mutex_unlock(&colo_compare_mutex); | 
|  | } | 
|  |  | 
|  | static void colo_compare_timer_init(CompareState *s) | 
|  | { | 
|  | AioContext *ctx = iothread_get_aio_context(s->iothread); | 
|  |  | 
|  | s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST, | 
|  | SCALE_MS, check_old_packet_regular, | 
|  | s); | 
|  | timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) + | 
|  | s->expired_scan_cycle); | 
|  | } | 
|  |  | 
|  | static void colo_compare_timer_del(CompareState *s) | 
|  | { | 
|  | if (s->packet_check_timer) { | 
|  | timer_free(s->packet_check_timer); | 
|  | s->packet_check_timer = NULL; | 
|  | } | 
|  | } | 
|  |  | 
|  | static void colo_flush_packets(void *opaque, void *user_data); | 
|  |  | 
|  | static void colo_compare_handle_event(void *opaque) | 
|  | { | 
|  | CompareState *s = opaque; | 
|  |  | 
|  | switch (s->event) { | 
|  | case COLO_EVENT_CHECKPOINT: | 
|  | g_queue_foreach(&s->conn_list, colo_flush_packets, s); | 
|  | break; | 
|  | case COLO_EVENT_FAILOVER: | 
|  | break; | 
|  | default: | 
|  | break; | 
|  | } | 
|  |  | 
|  | qemu_mutex_lock(&event_mtx); | 
|  | assert(event_unhandled_count > 0); | 
|  | event_unhandled_count--; | 
|  | qemu_cond_broadcast(&event_complete_cond); | 
|  | qemu_mutex_unlock(&event_mtx); | 
|  | } | 
|  |  | 
|  | static void colo_compare_iothread(CompareState *s) | 
|  | { | 
|  | AioContext *ctx = iothread_get_aio_context(s->iothread); | 
|  | object_ref(OBJECT(s->iothread)); | 
|  | s->worker_context = iothread_get_g_main_context(s->iothread); | 
|  |  | 
|  | qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, | 
|  | compare_pri_chr_in, NULL, NULL, | 
|  | s, s->worker_context, true); | 
|  | qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, | 
|  | compare_sec_chr_in, NULL, NULL, | 
|  | s, s->worker_context, true); | 
|  | if (s->notify_dev) { | 
|  | qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read, | 
|  | compare_notify_chr, NULL, NULL, | 
|  | s, s->worker_context, true); | 
|  | } | 
|  |  | 
|  | colo_compare_timer_init(s); | 
|  | s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s); | 
|  | } | 
|  |  | 
|  | static char *compare_get_pri_indev(Object *obj, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | return g_strdup(s->pri_indev); | 
|  | } | 
|  |  | 
|  | static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | g_free(s->pri_indev); | 
|  | s->pri_indev = g_strdup(value); | 
|  | } | 
|  |  | 
|  | static char *compare_get_sec_indev(Object *obj, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | return g_strdup(s->sec_indev); | 
|  | } | 
|  |  | 
|  | static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | g_free(s->sec_indev); | 
|  | s->sec_indev = g_strdup(value); | 
|  | } | 
|  |  | 
|  | static char *compare_get_outdev(Object *obj, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | return g_strdup(s->outdev); | 
|  | } | 
|  |  | 
|  | static void compare_set_outdev(Object *obj, const char *value, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | g_free(s->outdev); | 
|  | s->outdev = g_strdup(value); | 
|  | } | 
|  |  | 
|  | static bool compare_get_vnet_hdr(Object *obj, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | return s->vnet_hdr; | 
|  | } | 
|  |  | 
|  | static void compare_set_vnet_hdr(Object *obj, | 
|  | bool value, | 
|  | Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | s->vnet_hdr = value; | 
|  | } | 
|  |  | 
|  | static char *compare_get_notify_dev(Object *obj, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | return g_strdup(s->notify_dev); | 
|  | } | 
|  |  | 
|  | static void compare_set_notify_dev(Object *obj, const char *value, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | g_free(s->notify_dev); | 
|  | s->notify_dev = g_strdup(value); | 
|  | } | 
|  |  | 
|  | static void compare_get_timeout(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  | uint64_t value = s->compare_timeout; | 
|  |  | 
|  | visit_type_uint64(v, name, &value, errp); | 
|  | } | 
|  |  | 
|  | static void compare_set_timeout(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  | uint32_t value; | 
|  |  | 
|  | if (!visit_type_uint32(v, name, &value, errp)) { | 
|  | return; | 
|  | } | 
|  | if (!value) { | 
|  | error_setg(errp, "Property '%s.%s' requires a positive value", | 
|  | object_get_typename(obj), name); | 
|  | return; | 
|  | } | 
|  | s->compare_timeout = value; | 
|  | } | 
|  |  | 
|  | static void compare_get_expired_scan_cycle(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  | uint32_t value = s->expired_scan_cycle; | 
|  |  | 
|  | visit_type_uint32(v, name, &value, errp); | 
|  | } | 
|  |  | 
|  | static void compare_set_expired_scan_cycle(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  | uint32_t value; | 
|  |  | 
|  | if (!visit_type_uint32(v, name, &value, errp)) { | 
|  | return; | 
|  | } | 
|  | if (!value) { | 
|  | error_setg(errp, "Property '%s.%s' requires a positive value", | 
|  | object_get_typename(obj), name); | 
|  | return; | 
|  | } | 
|  | s->expired_scan_cycle = value; | 
|  | } | 
|  |  | 
|  | static void get_max_queue_size(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | uint32_t value = max_queue_size; | 
|  |  | 
|  | visit_type_uint32(v, name, &value, errp); | 
|  | } | 
|  |  | 
|  | static void set_max_queue_size(Object *obj, Visitor *v, | 
|  | const char *name, void *opaque, | 
|  | Error **errp) | 
|  | { | 
|  | uint64_t value; | 
|  |  | 
|  | if (!visit_type_uint64(v, name, &value, errp)) { | 
|  | return; | 
|  | } | 
|  | if (!value) { | 
|  | error_setg(errp, "Property '%s.%s' requires a positive value", | 
|  | object_get_typename(obj), name); | 
|  | return; | 
|  | } | 
|  | max_queue_size = value; | 
|  | } | 
|  |  | 
|  | static void compare_pri_rs_finalize(SocketReadState *pri_rs) | 
|  | { | 
|  | CompareState *s = container_of(pri_rs, CompareState, pri_rs); | 
|  | Connection *conn = NULL; | 
|  |  | 
|  | if (packet_enqueue(s, PRIMARY_IN, &conn)) { | 
|  | trace_colo_compare_main("primary: unsupported packet in"); | 
|  | compare_chr_send(s, | 
|  | pri_rs->buf, | 
|  | pri_rs->packet_len, | 
|  | pri_rs->vnet_hdr_len, | 
|  | false, | 
|  | false); | 
|  | } else { | 
|  | /* compare packet in the specified connection */ | 
|  | colo_compare_connection(conn, s); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void compare_sec_rs_finalize(SocketReadState *sec_rs) | 
|  | { | 
|  | CompareState *s = container_of(sec_rs, CompareState, sec_rs); | 
|  | Connection *conn = NULL; | 
|  |  | 
|  | if (packet_enqueue(s, SECONDARY_IN, &conn)) { | 
|  | trace_colo_compare_main("secondary: unsupported packet in"); | 
|  | } else { | 
|  | /* compare packet in the specified connection */ | 
|  | colo_compare_connection(conn, s); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void compare_notify_rs_finalize(SocketReadState *notify_rs) | 
|  | { | 
|  | CompareState *s = container_of(notify_rs, CompareState, notify_rs); | 
|  |  | 
|  | const char msg[] = "COLO_COMPARE_GET_XEN_INIT"; | 
|  | int ret; | 
|  |  | 
|  | if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", | 
|  | notify_rs->buf, | 
|  | notify_rs->packet_len)) { | 
|  | ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false); | 
|  | if (ret < 0) { | 
|  | error_report("Notify Xen COLO-frame INIT failed"); | 
|  | } | 
|  | } else if (packet_matches_str("COLO_CHECKPOINT", | 
|  | notify_rs->buf, | 
|  | notify_rs->packet_len)) { | 
|  | /* colo-compare do checkpoint, flush pri packet and remove sec packet */ | 
|  | g_queue_foreach(&s->conn_list, colo_flush_packets, s); | 
|  | } else { | 
|  | error_report("COLO compare got unsupported instruction"); | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Return 0 is success. | 
|  | * Return 1 is failed. | 
|  | */ | 
|  | static int find_and_check_chardev(Chardev **chr, | 
|  | char *chr_name, | 
|  | Error **errp) | 
|  | { | 
|  | *chr = qemu_chr_find(chr_name); | 
|  | if (*chr == NULL) { | 
|  | error_setg(errp, "Device '%s' not found", | 
|  | chr_name); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { | 
|  | error_setg(errp, "chardev \"%s\" is not reconnectable", | 
|  | chr_name); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) { | 
|  | error_setg(errp, "chardev \"%s\" cannot switch context", | 
|  | chr_name); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Called from the main thread on the primary | 
|  | * to setup colo-compare. | 
|  | */ | 
|  | static void colo_compare_complete(UserCreatable *uc, Error **errp) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(uc); | 
|  | Chardev *chr; | 
|  |  | 
|  | if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { | 
|  | error_setg(errp, "colo compare needs 'primary_in' ," | 
|  | "'secondary_in','outdev','iothread' property set"); | 
|  | return; | 
|  | } else if (!strcmp(s->pri_indev, s->outdev) || | 
|  | !strcmp(s->sec_indev, s->outdev) || | 
|  | !strcmp(s->pri_indev, s->sec_indev)) { | 
|  | error_setg(errp, "'indev' and 'outdev' could not be same " | 
|  | "for compare module"); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (!s->compare_timeout) { | 
|  | /* Set default value to 3000 MS */ | 
|  | s->compare_timeout = DEFAULT_TIME_OUT_MS; | 
|  | } | 
|  |  | 
|  | if (!s->expired_scan_cycle) { | 
|  | /* Set default value to 1000 MS */ | 
|  | s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS; | 
|  | } | 
|  |  | 
|  | if (!max_queue_size) { | 
|  | /* Set default queue size to 1024 */ | 
|  | max_queue_size = MAX_QUEUE_SIZE; | 
|  | } | 
|  |  | 
|  | if (find_and_check_chardev(&chr, s->pri_indev, errp) || | 
|  | !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (find_and_check_chardev(&chr, s->sec_indev, errp) || | 
|  | !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (find_and_check_chardev(&chr, s->outdev, errp) || | 
|  | !qemu_chr_fe_init(&s->chr_out, chr, errp)) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); | 
|  | net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); | 
|  |  | 
|  | /* Try to enable remote notify chardev, currently just for Xen COLO */ | 
|  | if (s->notify_dev) { | 
|  | if (find_and_check_chardev(&chr, s->notify_dev, errp) || | 
|  | !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize, | 
|  | s->vnet_hdr); | 
|  | } | 
|  |  | 
|  | s->out_sendco.s = s; | 
|  | s->out_sendco.chr = &s->chr_out; | 
|  | s->out_sendco.notify_remote_frame = false; | 
|  | s->out_sendco.done = true; | 
|  | g_queue_init(&s->out_sendco.send_list); | 
|  |  | 
|  | if (s->notify_dev) { | 
|  | s->notify_sendco.s = s; | 
|  | s->notify_sendco.chr = &s->chr_notify_dev; | 
|  | s->notify_sendco.notify_remote_frame = true; | 
|  | s->notify_sendco.done = true; | 
|  | g_queue_init(&s->notify_sendco.send_list); | 
|  | } | 
|  |  | 
|  | g_queue_init(&s->conn_list); | 
|  |  | 
|  | s->connection_track_table = g_hash_table_new_full(connection_key_hash, | 
|  | connection_key_equal, | 
|  | g_free, | 
|  | NULL); | 
|  |  | 
|  | colo_compare_iothread(s); | 
|  |  | 
|  | qemu_mutex_lock(&colo_compare_mutex); | 
|  | if (!colo_compare_active) { | 
|  | qemu_mutex_init(&event_mtx); | 
|  | qemu_cond_init(&event_complete_cond); | 
|  | colo_compare_active = true; | 
|  | } | 
|  | QTAILQ_INSERT_TAIL(&net_compares, s, next); | 
|  | qemu_mutex_unlock(&colo_compare_mutex); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | static void colo_flush_packets(void *opaque, void *user_data) | 
|  | { | 
|  | CompareState *s = user_data; | 
|  | Connection *conn = opaque; | 
|  | Packet *pkt = NULL; | 
|  |  | 
|  | while (!g_queue_is_empty(&conn->primary_list)) { | 
|  | pkt = g_queue_pop_tail(&conn->primary_list); | 
|  | compare_chr_send(s, | 
|  | pkt->data, | 
|  | pkt->size, | 
|  | pkt->vnet_hdr_len, | 
|  | false, | 
|  | true); | 
|  | packet_destroy_partial(pkt, NULL); | 
|  | } | 
|  | while (!g_queue_is_empty(&conn->secondary_list)) { | 
|  | pkt = g_queue_pop_tail(&conn->secondary_list); | 
|  | packet_destroy(pkt, NULL); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void colo_compare_class_init(ObjectClass *oc, void *data) | 
|  | { | 
|  | UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); | 
|  |  | 
|  | ucc->complete = colo_compare_complete; | 
|  | } | 
|  |  | 
|  | static void colo_compare_init(Object *obj) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  |  | 
|  | object_property_add_str(obj, "primary_in", | 
|  | compare_get_pri_indev, compare_set_pri_indev); | 
|  | object_property_add_str(obj, "secondary_in", | 
|  | compare_get_sec_indev, compare_set_sec_indev); | 
|  | object_property_add_str(obj, "outdev", | 
|  | compare_get_outdev, compare_set_outdev); | 
|  | object_property_add_link(obj, "iothread", TYPE_IOTHREAD, | 
|  | (Object **)&s->iothread, | 
|  | object_property_allow_set_link, | 
|  | OBJ_PROP_LINK_STRONG); | 
|  | /* This parameter just for Xen COLO */ | 
|  | object_property_add_str(obj, "notify_dev", | 
|  | compare_get_notify_dev, compare_set_notify_dev); | 
|  |  | 
|  | object_property_add(obj, "compare_timeout", "uint64", | 
|  | compare_get_timeout, | 
|  | compare_set_timeout, NULL, NULL); | 
|  |  | 
|  | object_property_add(obj, "expired_scan_cycle", "uint32", | 
|  | compare_get_expired_scan_cycle, | 
|  | compare_set_expired_scan_cycle, NULL, NULL); | 
|  |  | 
|  | object_property_add(obj, "max_queue_size", "uint32", | 
|  | get_max_queue_size, | 
|  | set_max_queue_size, NULL, NULL); | 
|  |  | 
|  | s->vnet_hdr = false; | 
|  | object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, | 
|  | compare_set_vnet_hdr); | 
|  | } | 
|  |  | 
|  | void colo_compare_cleanup(void) | 
|  | { | 
|  | CompareState *tmp = NULL; | 
|  | CompareState *n = NULL; | 
|  |  | 
|  | QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) { | 
|  | object_unparent(OBJECT(tmp)); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void colo_compare_finalize(Object *obj) | 
|  | { | 
|  | CompareState *s = COLO_COMPARE(obj); | 
|  | CompareState *tmp = NULL; | 
|  |  | 
|  | qemu_mutex_lock(&colo_compare_mutex); | 
|  | QTAILQ_FOREACH(tmp, &net_compares, next) { | 
|  | if (tmp == s) { | 
|  | QTAILQ_REMOVE(&net_compares, s, next); | 
|  | break; | 
|  | } | 
|  | } | 
|  | if (QTAILQ_EMPTY(&net_compares)) { | 
|  | colo_compare_active = false; | 
|  | qemu_mutex_destroy(&event_mtx); | 
|  | qemu_cond_destroy(&event_complete_cond); | 
|  | } | 
|  | qemu_mutex_unlock(&colo_compare_mutex); | 
|  |  | 
|  | qemu_chr_fe_deinit(&s->chr_pri_in, false); | 
|  | qemu_chr_fe_deinit(&s->chr_sec_in, false); | 
|  | qemu_chr_fe_deinit(&s->chr_out, false); | 
|  | if (s->notify_dev) { | 
|  | qemu_chr_fe_deinit(&s->chr_notify_dev, false); | 
|  | } | 
|  |  | 
|  | colo_compare_timer_del(s); | 
|  |  | 
|  | qemu_bh_delete(s->event_bh); | 
|  |  | 
|  | AioContext *ctx = iothread_get_aio_context(s->iothread); | 
|  | AIO_WAIT_WHILE(ctx, !s->out_sendco.done); | 
|  | if (s->notify_dev) { | 
|  | AIO_WAIT_WHILE(ctx, !s->notify_sendco.done); | 
|  | } | 
|  |  | 
|  | /* Release all unhandled packets after compare thead exited */ | 
|  | g_queue_foreach(&s->conn_list, colo_flush_packets, s); | 
|  | AIO_WAIT_WHILE(NULL, !s->out_sendco.done); | 
|  |  | 
|  | g_queue_clear(&s->conn_list); | 
|  | g_queue_clear(&s->out_sendco.send_list); | 
|  | if (s->notify_dev) { | 
|  | g_queue_clear(&s->notify_sendco.send_list); | 
|  | } | 
|  |  | 
|  | if (s->connection_track_table) { | 
|  | g_hash_table_destroy(s->connection_track_table); | 
|  | } | 
|  |  | 
|  | object_unref(OBJECT(s->iothread)); | 
|  |  | 
|  | g_free(s->pri_indev); | 
|  | g_free(s->sec_indev); | 
|  | g_free(s->outdev); | 
|  | g_free(s->notify_dev); | 
|  | } | 
|  |  | 
|  | static void __attribute__((__constructor__)) colo_compare_init_globals(void) | 
|  | { | 
|  | colo_compare_active = false; | 
|  | qemu_mutex_init(&colo_compare_mutex); | 
|  | } | 
|  |  | 
|  | static const TypeInfo colo_compare_info = { | 
|  | .name = TYPE_COLO_COMPARE, | 
|  | .parent = TYPE_OBJECT, | 
|  | .instance_size = sizeof(CompareState), | 
|  | .instance_init = colo_compare_init, | 
|  | .instance_finalize = colo_compare_finalize, | 
|  | .class_size = sizeof(CompareClass), | 
|  | .class_init = colo_compare_class_init, | 
|  | .interfaces = (InterfaceInfo[]) { | 
|  | { TYPE_USER_CREATABLE }, | 
|  | { } | 
|  | } | 
|  | }; | 
|  |  | 
|  | static void register_types(void) | 
|  | { | 
|  | type_register_static(&colo_compare_info); | 
|  | } | 
|  |  | 
|  | type_init(register_types); |