| /* |
| * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) |
| * (a.k.a. Fault Tolerance or Continuous Replication) |
| * |
| * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. |
| * Copyright (c) 2016 FUJITSU LIMITED |
| * Copyright (c) 2016 Intel Corporation |
| * |
| * This work is licensed under the terms of the GNU GPL, version 2 or |
| * later. See the COPYING file in the top-level directory. |
| */ |
| |
| #include "qemu/osdep.h" |
| #include "sysemu/sysemu.h" |
| #include "qapi/error.h" |
| #include "qapi/qapi-commands-migration.h" |
| #include "migration.h" |
| #include "qemu-file.h" |
| #include "savevm.h" |
| #include "migration/colo.h" |
| #include "io/channel-buffer.h" |
| #include "trace.h" |
| #include "qemu/error-report.h" |
| #include "qemu/main-loop.h" |
| #include "qemu/rcu.h" |
| #include "migration/failover.h" |
| #include "migration/ram.h" |
| #include "block/replication.h" |
| #include "net/colo-compare.h" |
| #include "net/colo.h" |
| #include "block/block.h" |
| #include "qapi/qapi-events-migration.h" |
| #include "sysemu/cpus.h" |
| #include "sysemu/runstate.h" |
| #include "net/filter.h" |
| #include "options.h" |
| |
| static bool vmstate_loading; |
| static Notifier packets_compare_notifier; |
| |
| /* User need to know colo mode after COLO failover */ |
| static COLOMode last_colo_mode; |
| |
| #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) |
| |
| bool migration_in_colo_state(void) |
| { |
| MigrationState *s = migrate_get_current(); |
| |
| return (s->state == MIGRATION_STATUS_COLO); |
| } |
| |
| bool migration_incoming_in_colo_state(void) |
| { |
| MigrationIncomingState *mis = migration_incoming_get_current(); |
| |
| return mis && (mis->state == MIGRATION_STATUS_COLO); |
| } |
| |
| static bool colo_runstate_is_stopped(void) |
| { |
| return runstate_check(RUN_STATE_COLO) || !runstate_is_running(); |
| } |
| |
| static void colo_checkpoint_notify(void) |
| { |
| MigrationState *s = migrate_get_current(); |
| int64_t next_notify_time; |
| |
| qemu_event_set(&s->colo_checkpoint_event); |
| s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); |
| next_notify_time = s->colo_checkpoint_time + migrate_checkpoint_delay(); |
| timer_mod(s->colo_delay_timer, next_notify_time); |
| } |
| |
| static void colo_checkpoint_notify_timer(void *opaque) |
| { |
| colo_checkpoint_notify(); |
| } |
| |
| void colo_checkpoint_delay_set(void) |
| { |
| if (migration_in_colo_state()) { |
| colo_checkpoint_notify(); |
| } |
| } |
| |
| static void secondary_vm_do_failover(void) |
| { |
| /* COLO needs enable block-replication */ |
| int old_state; |
| MigrationIncomingState *mis = migration_incoming_get_current(); |
| Error *local_err = NULL; |
| |
| /* Can not do failover during the process of VM's loading VMstate, Or |
| * it will break the secondary VM. |
| */ |
| if (vmstate_loading) { |
| old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, |
| FAILOVER_STATUS_RELAUNCH); |
| if (old_state != FAILOVER_STATUS_ACTIVE) { |
| error_report("Unknown error while do failover for secondary VM," |
| "old_state: %s", FailoverStatus_str(old_state)); |
| } |
| return; |
| } |
| |
| migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, |
| MIGRATION_STATUS_COMPLETED); |
| |
| replication_stop_all(true, &local_err); |
| if (local_err) { |
| error_report_err(local_err); |
| local_err = NULL; |
| } |
| |
| /* Notify all filters of all NIC to do checkpoint */ |
| colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err); |
| if (local_err) { |
| error_report_err(local_err); |
| } |
| |
| if (!autostart) { |
| error_report("\"-S\" qemu option will be ignored in secondary side"); |
| /* recover runstate to normal migration finish state */ |
| autostart = true; |
| } |
| /* |
| * Make sure COLO incoming thread not block in recv or send, |
| * If mis->from_src_file and mis->to_src_file use the same fd, |
| * The second shutdown() will return -1, we ignore this value, |
| * It is harmless. |
| */ |
| if (mis->from_src_file) { |
| qemu_file_shutdown(mis->from_src_file); |
| } |
| if (mis->to_src_file) { |
| qemu_file_shutdown(mis->to_src_file); |
| } |
| |
| old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, |
| FAILOVER_STATUS_COMPLETED); |
| if (old_state != FAILOVER_STATUS_ACTIVE) { |
| error_report("Incorrect state (%s) while doing failover for " |
| "secondary VM", FailoverStatus_str(old_state)); |
| return; |
| } |
| /* Notify COLO incoming thread that failover work is finished */ |
| qemu_sem_post(&mis->colo_incoming_sem); |
| |
| /* For Secondary VM, jump to incoming co */ |
| if (mis->colo_incoming_co) { |
| qemu_coroutine_enter(mis->colo_incoming_co); |
| } |
| } |
| |
| static void primary_vm_do_failover(void) |
| { |
| MigrationState *s = migrate_get_current(); |
| int old_state; |
| Error *local_err = NULL; |
| |
| migrate_set_state(&s->state, MIGRATION_STATUS_COLO, |
| MIGRATION_STATUS_COMPLETED); |
| /* |
| * kick COLO thread which might wait at |
| * qemu_sem_wait(&s->colo_checkpoint_sem). |
| */ |
| colo_checkpoint_notify(); |
| |
| /* |
| * Wake up COLO thread which may blocked in recv() or send(), |
| * The s->rp_state.from_dst_file and s->to_dst_file may use the |
| * same fd, but we still shutdown the fd for twice, it is harmless. |
| */ |
| if (s->to_dst_file) { |
| qemu_file_shutdown(s->to_dst_file); |
| } |
| if (s->rp_state.from_dst_file) { |
| qemu_file_shutdown(s->rp_state.from_dst_file); |
| } |
| |
| old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, |
| FAILOVER_STATUS_COMPLETED); |
| if (old_state != FAILOVER_STATUS_ACTIVE) { |
| error_report("Incorrect state (%s) while doing failover for Primary VM", |
| FailoverStatus_str(old_state)); |
| return; |
| } |
| |
| replication_stop_all(true, &local_err); |
| if (local_err) { |
| error_report_err(local_err); |
| local_err = NULL; |
| } |
| |
| /* Notify COLO thread that failover work is finished */ |
| qemu_sem_post(&s->colo_exit_sem); |
| } |
| |
| COLOMode get_colo_mode(void) |
| { |
| if (migration_in_colo_state()) { |
| return COLO_MODE_PRIMARY; |
| } else if (migration_incoming_in_colo_state()) { |
| return COLO_MODE_SECONDARY; |
| } else { |
| return COLO_MODE_NONE; |
| } |
| } |
| |
| void colo_do_failover(void) |
| { |
| /* Make sure VM stopped while failover happened. */ |
| if (!colo_runstate_is_stopped()) { |
| vm_stop_force_state(RUN_STATE_COLO); |
| } |
| |
| switch (last_colo_mode = get_colo_mode()) { |
| case COLO_MODE_PRIMARY: |
| primary_vm_do_failover(); |
| break; |
| case COLO_MODE_SECONDARY: |
| secondary_vm_do_failover(); |
| break; |
| default: |
| error_report("colo_do_failover failed because the colo mode" |
| " could not be obtained"); |
| } |
| } |
| |
| void qmp_xen_set_replication(bool enable, bool primary, |
| bool has_failover, bool failover, |
| Error **errp) |
| { |
| ReplicationMode mode = primary ? |
| REPLICATION_MODE_PRIMARY : |
| REPLICATION_MODE_SECONDARY; |
| |
| if (has_failover && enable) { |
| error_setg(errp, "Parameter 'failover' is only for" |
| " stopping replication"); |
| return; |
| } |
| |
| if (enable) { |
| replication_start_all(mode, errp); |
| } else { |
| if (!has_failover) { |
| failover = NULL; |
| } |
| replication_stop_all(failover, failover ? NULL : errp); |
| } |
| } |
| |
| ReplicationStatus *qmp_query_xen_replication_status(Error **errp) |
| { |
| Error *err = NULL; |
| ReplicationStatus *s = g_new0(ReplicationStatus, 1); |
| |
| replication_get_error_all(&err); |
| if (err) { |
| s->error = true; |
| s->desc = g_strdup(error_get_pretty(err)); |
| } else { |
| s->error = false; |
| } |
| |
| error_free(err); |
| return s; |
| } |
| |
| void qmp_xen_colo_do_checkpoint(Error **errp) |
| { |
| Error *err = NULL; |
| |
| replication_do_checkpoint_all(&err); |
| if (err) { |
| error_propagate(errp, err); |
| return; |
| } |
| /* Notify all filters of all NIC to do checkpoint */ |
| colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp); |
| } |
| |
| COLOStatus *qmp_query_colo_status(Error **errp) |
| { |
| COLOStatus *s = g_new0(COLOStatus, 1); |
| |
| s->mode = get_colo_mode(); |
| s->last_mode = last_colo_mode; |
| |
| switch (failover_get_state()) { |
| case FAILOVER_STATUS_NONE: |
| s->reason = COLO_EXIT_REASON_NONE; |
| break; |
| case FAILOVER_STATUS_COMPLETED: |
| s->reason = COLO_EXIT_REASON_REQUEST; |
| break; |
| default: |
| if (migration_in_colo_state()) { |
| s->reason = COLO_EXIT_REASON_PROCESSING; |
| } else { |
| s->reason = COLO_EXIT_REASON_ERROR; |
| } |
| } |
| |
| return s; |
| } |
| |
| static void colo_send_message(QEMUFile *f, COLOMessage msg, |
| Error **errp) |
| { |
| int ret; |
| |
| if (msg >= COLO_MESSAGE__MAX) { |
| error_setg(errp, "%s: Invalid message", __func__); |
| return; |
| } |
| qemu_put_be32(f, msg); |
| ret = qemu_fflush(f); |
| if (ret < 0) { |
| error_setg_errno(errp, -ret, "Can't send COLO message"); |
| } |
| trace_colo_send_message(COLOMessage_str(msg)); |
| } |
| |
| static void colo_send_message_value(QEMUFile *f, COLOMessage msg, |
| uint64_t value, Error **errp) |
| { |
| Error *local_err = NULL; |
| int ret; |
| |
| colo_send_message(f, msg, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| qemu_put_be64(f, value); |
| ret = qemu_fflush(f); |
| if (ret < 0) { |
| error_setg_errno(errp, -ret, "Failed to send value for message:%s", |
| COLOMessage_str(msg)); |
| } |
| } |
| |
| static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) |
| { |
| COLOMessage msg; |
| int ret; |
| |
| msg = qemu_get_be32(f); |
| ret = qemu_file_get_error(f); |
| if (ret < 0) { |
| error_setg_errno(errp, -ret, "Can't receive COLO message"); |
| return msg; |
| } |
| if (msg >= COLO_MESSAGE__MAX) { |
| error_setg(errp, "%s: Invalid message", __func__); |
| return msg; |
| } |
| trace_colo_receive_message(COLOMessage_str(msg)); |
| return msg; |
| } |
| |
| static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, |
| Error **errp) |
| { |
| COLOMessage msg; |
| Error *local_err = NULL; |
| |
| msg = colo_receive_message(f, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| if (msg != expect_msg) { |
| error_setg(errp, "Unexpected COLO message %d, expected %d", |
| msg, expect_msg); |
| } |
| } |
| |
| static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, |
| Error **errp) |
| { |
| Error *local_err = NULL; |
| uint64_t value; |
| int ret; |
| |
| colo_receive_check_message(f, expect_msg, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return 0; |
| } |
| |
| value = qemu_get_be64(f); |
| ret = qemu_file_get_error(f); |
| if (ret < 0) { |
| error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", |
| COLOMessage_str(expect_msg)); |
| } |
| return value; |
| } |
| |
| static int colo_do_checkpoint_transaction(MigrationState *s, |
| QIOChannelBuffer *bioc, |
| QEMUFile *fb) |
| { |
| Error *local_err = NULL; |
| int ret = -1; |
| |
| colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, |
| &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| colo_receive_check_message(s->rp_state.from_dst_file, |
| COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| /* Reset channel-buffer directly */ |
| qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); |
| bioc->usage = 0; |
| |
| bql_lock(); |
| if (failover_get_state() != FAILOVER_STATUS_NONE) { |
| bql_unlock(); |
| goto out; |
| } |
| vm_stop_force_state(RUN_STATE_COLO); |
| bql_unlock(); |
| trace_colo_vm_state_change("run", "stop"); |
| /* |
| * Failover request bh could be called after vm_stop_force_state(), |
| * So we need check failover_request_is_active() again. |
| */ |
| if (failover_get_state() != FAILOVER_STATUS_NONE) { |
| goto out; |
| } |
| bql_lock(); |
| |
| replication_do_checkpoint_all(&local_err); |
| if (local_err) { |
| bql_unlock(); |
| goto out; |
| } |
| |
| colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); |
| if (local_err) { |
| bql_unlock(); |
| goto out; |
| } |
| /* Note: device state is saved into buffer */ |
| ret = qemu_save_device_state(fb); |
| |
| bql_unlock(); |
| if (ret < 0) { |
| goto out; |
| } |
| |
| if (migrate_auto_converge()) { |
| mig_throttle_counter_reset(); |
| } |
| /* |
| * Only save VM's live state, which not including device state. |
| * TODO: We may need a timeout mechanism to prevent COLO process |
| * to be blocked here. |
| */ |
| qemu_savevm_live_state(s->to_dst_file); |
| |
| qemu_fflush(fb); |
| |
| /* |
| * We need the size of the VMstate data in Secondary side, |
| * With which we can decide how much data should be read. |
| */ |
| colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, |
| bioc->usage, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); |
| ret = qemu_fflush(s->to_dst_file); |
| if (ret < 0) { |
| goto out; |
| } |
| |
| colo_receive_check_message(s->rp_state.from_dst_file, |
| COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| qemu_event_reset(&s->colo_checkpoint_event); |
| colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| colo_receive_check_message(s->rp_state.from_dst_file, |
| COLO_MESSAGE_VMSTATE_LOADED, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| ret = 0; |
| |
| bql_lock(); |
| vm_start(); |
| bql_unlock(); |
| trace_colo_vm_state_change("stop", "run"); |
| |
| out: |
| if (local_err) { |
| error_report_err(local_err); |
| } |
| return ret; |
| } |
| |
| static void colo_compare_notify_checkpoint(Notifier *notifier, void *data) |
| { |
| colo_checkpoint_notify(); |
| } |
| |
| static void colo_process_checkpoint(MigrationState *s) |
| { |
| QIOChannelBuffer *bioc; |
| QEMUFile *fb = NULL; |
| Error *local_err = NULL; |
| int ret; |
| |
| if (get_colo_mode() != COLO_MODE_PRIMARY) { |
| error_report("COLO mode must be COLO_MODE_PRIMARY"); |
| return; |
| } |
| |
| failover_init_state(); |
| |
| s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); |
| if (!s->rp_state.from_dst_file) { |
| error_report("Open QEMUFile from_dst_file failed"); |
| goto out; |
| } |
| |
| packets_compare_notifier.notify = colo_compare_notify_checkpoint; |
| colo_compare_register_notifier(&packets_compare_notifier); |
| |
| /* |
| * Wait for Secondary finish loading VM states and enter COLO |
| * restore. |
| */ |
| colo_receive_check_message(s->rp_state.from_dst_file, |
| COLO_MESSAGE_CHECKPOINT_READY, &local_err); |
| if (local_err) { |
| goto out; |
| } |
| bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); |
| fb = qemu_file_new_output(QIO_CHANNEL(bioc)); |
| object_unref(OBJECT(bioc)); |
| |
| bql_lock(); |
| replication_start_all(REPLICATION_MODE_PRIMARY, &local_err); |
| if (local_err) { |
| bql_unlock(); |
| goto out; |
| } |
| |
| vm_start(); |
| bql_unlock(); |
| trace_colo_vm_state_change("stop", "run"); |
| |
| timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) + |
| migrate_checkpoint_delay()); |
| |
| while (s->state == MIGRATION_STATUS_COLO) { |
| if (failover_get_state() != FAILOVER_STATUS_NONE) { |
| error_report("failover request"); |
| goto out; |
| } |
| |
| qemu_event_wait(&s->colo_checkpoint_event); |
| |
| if (s->state != MIGRATION_STATUS_COLO) { |
| goto out; |
| } |
| ret = colo_do_checkpoint_transaction(s, bioc, fb); |
| if (ret < 0) { |
| goto out; |
| } |
| } |
| |
| out: |
| /* Throw the unreported error message after exited from loop */ |
| if (local_err) { |
| error_report_err(local_err); |
| } |
| |
| if (fb) { |
| qemu_fclose(fb); |
| } |
| |
| /* |
| * There are only two reasons we can get here, some error happened |
| * or the user triggered failover. |
| */ |
| switch (failover_get_state()) { |
| case FAILOVER_STATUS_COMPLETED: |
| qapi_event_send_colo_exit(COLO_MODE_PRIMARY, |
| COLO_EXIT_REASON_REQUEST); |
| break; |
| default: |
| qapi_event_send_colo_exit(COLO_MODE_PRIMARY, |
| COLO_EXIT_REASON_ERROR); |
| } |
| |
| /* Hope this not to be too long to wait here */ |
| qemu_sem_wait(&s->colo_exit_sem); |
| qemu_sem_destroy(&s->colo_exit_sem); |
| |
| /* |
| * It is safe to unregister notifier after failover finished. |
| * Besides, colo_delay_timer and colo_checkpoint_sem can't be |
| * released before unregister notifier, or there will be use-after-free |
| * error. |
| */ |
| colo_compare_unregister_notifier(&packets_compare_notifier); |
| timer_free(s->colo_delay_timer); |
| qemu_event_destroy(&s->colo_checkpoint_event); |
| |
| /* |
| * Must be called after failover BH is completed, |
| * Or the failover BH may shutdown the wrong fd that |
| * re-used by other threads after we release here. |
| */ |
| if (s->rp_state.from_dst_file) { |
| qemu_fclose(s->rp_state.from_dst_file); |
| s->rp_state.from_dst_file = NULL; |
| } |
| } |
| |
| void migrate_start_colo_process(MigrationState *s) |
| { |
| bql_unlock(); |
| qemu_event_init(&s->colo_checkpoint_event, false); |
| s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST, |
| colo_checkpoint_notify_timer, NULL); |
| |
| qemu_sem_init(&s->colo_exit_sem, 0); |
| colo_process_checkpoint(s); |
| bql_lock(); |
| } |
| |
| static void colo_incoming_process_checkpoint(MigrationIncomingState *mis, |
| QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp) |
| { |
| uint64_t total_size; |
| uint64_t value; |
| Error *local_err = NULL; |
| int ret; |
| |
| bql_lock(); |
| vm_stop_force_state(RUN_STATE_COLO); |
| bql_unlock(); |
| trace_colo_vm_state_change("run", "stop"); |
| |
| /* FIXME: This is unnecessary for periodic checkpoint mode */ |
| colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, |
| &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| |
| colo_receive_check_message(mis->from_src_file, |
| COLO_MESSAGE_VMSTATE_SEND, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| |
| bql_lock(); |
| cpu_synchronize_all_states(); |
| ret = qemu_loadvm_state_main(mis->from_src_file, mis); |
| bql_unlock(); |
| |
| if (ret < 0) { |
| error_setg(errp, "Load VM's live state (ram) error"); |
| return; |
| } |
| |
| value = colo_receive_message_value(mis->from_src_file, |
| COLO_MESSAGE_VMSTATE_SIZE, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| |
| /* |
| * Read VM device state data into channel buffer, |
| * It's better to re-use the memory allocated. |
| * Here we need to handle the channel buffer directly. |
| */ |
| if (value > bioc->capacity) { |
| bioc->capacity = value; |
| bioc->data = g_realloc(bioc->data, bioc->capacity); |
| } |
| total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); |
| if (total_size != value) { |
| error_setg(errp, "Got %" PRIu64 " VMState data, less than expected" |
| " %" PRIu64, total_size, value); |
| return; |
| } |
| bioc->usage = total_size; |
| qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); |
| |
| colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, |
| &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| |
| bql_lock(); |
| vmstate_loading = true; |
| colo_flush_ram_cache(); |
| ret = qemu_load_device_state(fb); |
| if (ret < 0) { |
| error_setg(errp, "COLO: load device state failed"); |
| vmstate_loading = false; |
| bql_unlock(); |
| return; |
| } |
| |
| replication_get_error_all(&local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| vmstate_loading = false; |
| bql_unlock(); |
| return; |
| } |
| |
| /* discard colo disk buffer */ |
| replication_do_checkpoint_all(&local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| vmstate_loading = false; |
| bql_unlock(); |
| return; |
| } |
| /* Notify all filters of all NIC to do checkpoint */ |
| colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err); |
| |
| if (local_err) { |
| error_propagate(errp, local_err); |
| vmstate_loading = false; |
| bql_unlock(); |
| return; |
| } |
| |
| vmstate_loading = false; |
| vm_start(); |
| bql_unlock(); |
| trace_colo_vm_state_change("stop", "run"); |
| |
| if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { |
| return; |
| } |
| |
| colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, |
| &local_err); |
| error_propagate(errp, local_err); |
| } |
| |
| static void colo_wait_handle_message(MigrationIncomingState *mis, |
| QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp) |
| { |
| COLOMessage msg; |
| Error *local_err = NULL; |
| |
| msg = colo_receive_message(mis->from_src_file, &local_err); |
| if (local_err) { |
| error_propagate(errp, local_err); |
| return; |
| } |
| |
| switch (msg) { |
| case COLO_MESSAGE_CHECKPOINT_REQUEST: |
| colo_incoming_process_checkpoint(mis, fb, bioc, errp); |
| break; |
| default: |
| error_setg(errp, "Got unknown COLO message: %d", msg); |
| break; |
| } |
| } |
| |
| void colo_shutdown(void) |
| { |
| MigrationIncomingState *mis = NULL; |
| MigrationState *s = NULL; |
| |
| switch (get_colo_mode()) { |
| case COLO_MODE_PRIMARY: |
| s = migrate_get_current(); |
| qemu_event_set(&s->colo_checkpoint_event); |
| qemu_sem_post(&s->colo_exit_sem); |
| break; |
| case COLO_MODE_SECONDARY: |
| mis = migration_incoming_get_current(); |
| qemu_sem_post(&mis->colo_incoming_sem); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| static void *colo_process_incoming_thread(void *opaque) |
| { |
| MigrationIncomingState *mis = opaque; |
| QEMUFile *fb = NULL; |
| QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ |
| Error *local_err = NULL; |
| |
| rcu_register_thread(); |
| qemu_sem_init(&mis->colo_incoming_sem, 0); |
| |
| migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, |
| MIGRATION_STATUS_COLO); |
| |
| if (get_colo_mode() != COLO_MODE_SECONDARY) { |
| error_report("COLO mode must be COLO_MODE_SECONDARY"); |
| return NULL; |
| } |
| |
| /* Make sure all file formats throw away their mutable metadata */ |
| bql_lock(); |
| bdrv_activate_all(&local_err); |
| if (local_err) { |
| bql_unlock(); |
| error_report_err(local_err); |
| return NULL; |
| } |
| bql_unlock(); |
| |
| failover_init_state(); |
| |
| mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); |
| if (!mis->to_src_file) { |
| error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); |
| goto out; |
| } |
| /* |
| * Note: the communication between Primary side and Secondary side |
| * should be sequential, we set the fd to unblocked in migration incoming |
| * coroutine, and here we are in the COLO incoming thread, so it is ok to |
| * set the fd back to blocked. |
| */ |
| qemu_file_set_blocking(mis->from_src_file, true); |
| |
| colo_incoming_start_dirty_log(); |
| |
| bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); |
| fb = qemu_file_new_input(QIO_CHANNEL(bioc)); |
| object_unref(OBJECT(bioc)); |
| |
| bql_lock(); |
| replication_start_all(REPLICATION_MODE_SECONDARY, &local_err); |
| if (local_err) { |
| bql_unlock(); |
| goto out; |
| } |
| vm_start(); |
| bql_unlock(); |
| trace_colo_vm_state_change("stop", "run"); |
| |
| colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, |
| &local_err); |
| if (local_err) { |
| goto out; |
| } |
| |
| while (mis->state == MIGRATION_STATUS_COLO) { |
| colo_wait_handle_message(mis, fb, bioc, &local_err); |
| if (local_err) { |
| error_report_err(local_err); |
| break; |
| } |
| |
| if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { |
| failover_set_state(FAILOVER_STATUS_RELAUNCH, |
| FAILOVER_STATUS_NONE); |
| failover_request_active(NULL); |
| break; |
| } |
| |
| if (failover_get_state() != FAILOVER_STATUS_NONE) { |
| error_report("failover request"); |
| break; |
| } |
| } |
| |
| out: |
| /* |
| * There are only two reasons we can get here, some error happened |
| * or the user triggered failover. |
| */ |
| switch (failover_get_state()) { |
| case FAILOVER_STATUS_COMPLETED: |
| qapi_event_send_colo_exit(COLO_MODE_SECONDARY, |
| COLO_EXIT_REASON_REQUEST); |
| break; |
| default: |
| qapi_event_send_colo_exit(COLO_MODE_SECONDARY, |
| COLO_EXIT_REASON_ERROR); |
| } |
| |
| if (fb) { |
| qemu_fclose(fb); |
| } |
| |
| /* Hope this not to be too long to loop here */ |
| qemu_sem_wait(&mis->colo_incoming_sem); |
| qemu_sem_destroy(&mis->colo_incoming_sem); |
| |
| rcu_unregister_thread(); |
| return NULL; |
| } |
| |
| int coroutine_fn colo_incoming_co(void) |
| { |
| MigrationIncomingState *mis = migration_incoming_get_current(); |
| QemuThread th; |
| |
| assert(bql_locked()); |
| |
| if (!migration_incoming_colo_enabled()) { |
| return 0; |
| } |
| |
| qemu_thread_create(&th, "COLO incoming", colo_process_incoming_thread, |
| mis, QEMU_THREAD_JOINABLE); |
| |
| mis->colo_incoming_co = qemu_coroutine_self(); |
| qemu_coroutine_yield(); |
| mis->colo_incoming_co = NULL; |
| |
| bql_unlock(); |
| /* Wait checkpoint incoming thread exit before free resource */ |
| qemu_thread_join(&th); |
| bql_lock(); |
| |
| /* We hold the global BQL, so it is safe here */ |
| colo_release_ram_cache(); |
| |
| return 0; |
| } |