Merge remote-tracking branch 'remotes/dgilbert/tags/pull-migration-20180710a' into staging

Migration pull 2018-07-10 (for 3.0)

Migration fixes and migration test fixes, mostly
around postcopy and postcopy recovery

# gpg: Signature made Tue 10 Jul 2018 16:27:19 BST
# gpg:                using RSA key 0516331EBC5BFDE7
# gpg: Good signature from "Dr. David Alan Gilbert (RH2) <dgilbert@redhat.com>"
# Primary key fingerprint: 45F5 C71B 4A0C B7FB 977A  9FA9 0516 331E BC5B FDE7

* remotes/dgilbert/tags/pull-migration-20180710a:
  migration: reorder MIG_CMD_POSTCOPY_RESUME
  tests: hide stderr for postcopy recovery test
  tests: add postcopy recovery test
  tests: introduce wait_for_migration_status()
  tests: introduce migrate_query*() helpers
  tests: allow migrate() to take extra flags
  tests: introduce migrate_postcopy_* helpers
  migration: show pause/recover state on dst host
  migration: fix incorrect bitmap size calculation
  migration: loosen recovery check when load vm
  migration: simplify check to use qemu file buffer
  migration: unify incoming processing
  migration: unbreak postcopy recovery
  migration: move income process out of multifd
  migration: delay postcopy paused state

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
diff --git a/migration/exec.c b/migration/exec.c
index 0bbeb63..375d2e1 100644
--- a/migration/exec.c
+++ b/migration/exec.c
@@ -49,9 +49,6 @@
 {
     migration_channel_process_incoming(ioc);
     object_unref(OBJECT(ioc));
-    if (!migrate_use_multifd()) {
-        migration_incoming_process();
-    }
     return G_SOURCE_REMOVE;
 }
 
diff --git a/migration/fd.c b/migration/fd.c
index fee34ff..a7c13df 100644
--- a/migration/fd.c
+++ b/migration/fd.c
@@ -49,9 +49,6 @@
 {
     migration_channel_process_incoming(ioc);
     object_unref(OBJECT(ioc));
-    if (!migrate_use_multifd()) {
-        migration_incoming_process();
-    }
     return G_SOURCE_REMOVE;
 }
 
diff --git a/migration/migration.c b/migration/migration.c
index 94d71f8..8d56d56 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -466,7 +466,8 @@
     qemu_coroutine_enter(co);
 }
 
-void migration_fd_process_incoming(QEMUFile *f)
+/* Returns true if recovered from a paused migration, otherwise false */
+static bool postcopy_try_recover(QEMUFile *f)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
 
@@ -491,23 +492,52 @@
          * that source is ready to reply to page requests.
          */
         qemu_sem_post(&mis->postcopy_pause_sem_dst);
-    } else {
-        /* New incoming migration */
-        migration_incoming_setup(f);
-        migration_incoming_process();
+        return true;
     }
+
+    return false;
+}
+
+void migration_fd_process_incoming(QEMUFile *f)
+{
+    if (postcopy_try_recover(f)) {
+        return;
+    }
+
+    migration_incoming_setup(f);
+    migration_incoming_process();
 }
 
 void migration_ioc_process_incoming(QIOChannel *ioc)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    bool start_migration;
 
     if (!mis->from_src_file) {
+        /* The first connection (multifd may have multiple) */
         QEMUFile *f = qemu_fopen_channel_input(ioc);
+
+        /* If it's a recovery, we're done */
+        if (postcopy_try_recover(f)) {
+            return;
+        }
+
         migration_incoming_setup(f);
-        return;
+
+        /*
+         * Common migration only needs one channel, so we can start
+         * right now.  Multifd needs more than one channel, we wait.
+         */
+        start_migration = !migrate_use_multifd();
+    } else {
+        /* Multiple connections */
+        assert(migrate_use_multifd());
+        start_migration = multifd_recv_new_channel(ioc);
     }
-    multifd_recv_new_channel(ioc);
+
+    if (start_migration) {
+        migration_incoming_process();
+    }
 }
 
 /**
@@ -881,6 +911,8 @@
     case MIGRATION_STATUS_CANCELLED:
     case MIGRATION_STATUS_ACTIVE:
     case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
     case MIGRATION_STATUS_FAILED:
     case MIGRATION_STATUS_COLO:
         info->has_status = true;
diff --git a/migration/ram.c b/migration/ram.c
index 1cd98d6..52dd678 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -235,7 +235,7 @@
     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
 
     /* Size of the bitmap, in bytes */
-    size = nbits / 8;
+    size = DIV_ROUND_UP(nbits, 8);
 
     /*
      * size is always aligned to 8 bytes for 64bit machines, but it
@@ -1311,7 +1311,8 @@
     return thread_count == atomic_read(&multifd_recv_state->count);
 }
 
-void multifd_recv_new_channel(QIOChannel *ioc)
+/* Return true if multifd is ready for the migration, otherwise false */
+bool multifd_recv_new_channel(QIOChannel *ioc)
 {
     MultiFDRecvParams *p;
     Error *local_err = NULL;
@@ -1320,7 +1321,7 @@
     id = multifd_recv_initial_packet(ioc, &local_err);
     if (id < 0) {
         multifd_recv_terminate_threads(local_err);
-        return;
+        return false;
     }
 
     p = &multifd_recv_state->params[id];
@@ -1328,7 +1329,7 @@
         error_setg(&local_err, "multifd: received id '%d' already setup'",
                    id);
         multifd_recv_terminate_threads(local_err);
-        return;
+        return false;
     }
     p->c = ioc;
     object_ref(OBJECT(ioc));
@@ -1339,9 +1340,7 @@
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
     atomic_inc(&multifd_recv_state->count);
-    if (multifd_recv_state->count == migrate_multifd_channels()) {
-        migration_incoming_process();
-    }
+    return multifd_recv_state->count == migrate_multifd_channels();
 }
 
 /**
@@ -3581,7 +3580,7 @@
 {
     int flags = 0, ret = 0;
     bool place_needed = false;
-    bool matching_page_sizes = false;
+    bool matches_target_page_size = false;
     MigrationIncomingState *mis = migration_incoming_get_current();
     /* Temporary page that is later 'placed' */
     void *postcopy_host_page = postcopy_get_tmp_page(mis);
@@ -3621,7 +3620,7 @@
                 ret = -EINVAL;
                 break;
             }
-            matching_page_sizes = block->page_size == TARGET_PAGE_SIZE;
+            matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
             /*
              * Postcopy requires that we place whole host pages atomically;
              * these may be huge pages for RAMBlocks that are backed by
@@ -3669,12 +3668,17 @@
 
         case RAM_SAVE_FLAG_PAGE:
             all_zero = false;
-            if (!place_needed || !matching_page_sizes) {
+            if (!matches_target_page_size) {
+                /* For huge pages, we always use temporary buffer */
                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
             } else {
-                /* Avoids the qemu_file copy during postcopy, which is
-                 * going to do a copy later; can only do it when we
-                 * do this read in one go (matching page sizes)
+                /*
+                 * For small pages that matches target page size, we
+                 * avoid the qemu_file copy.  Instead we directly use
+                 * the buffer of QEMUFile to place the page.  Note: we
+                 * cannot do any QEMUFile operation before using that
+                 * buffer to make sure the buffer is valid when
+                 * placing the page.
                  */
                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
                                          TARGET_PAGE_SIZE);
@@ -3940,7 +3944,7 @@
     int ret = -EINVAL;
     QEMUFile *file = s->rp_state.from_dst_file;
     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
-    uint64_t local_size = nbits / 8;
+    uint64_t local_size = DIV_ROUND_UP(nbits, 8);
     uint64_t size, end_mark;
 
     trace_ram_dirty_bitmap_reload_begin(block->idstr);
diff --git a/migration/ram.h b/migration/ram.h
index d386f4d..457bf54 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -46,7 +46,7 @@
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
 bool multifd_recv_all_channels_created(void);
-void multifd_recv_new_channel(QIOChannel *ioc);
+bool multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/savevm.c b/migration/savevm.c
index c2f34ff..7f92567 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -81,8 +81,8 @@
     MIG_CMD_POSTCOPY_RAM_DISCARD,  /* A list of pages to discard that
                                       were previously sent during
                                       precopy but are dirty. */
-    MIG_CMD_POSTCOPY_RESUME,       /* resume postcopy on dest */
     MIG_CMD_PACKAGED,          /* Send a wrapped stream within this stream */
+    MIG_CMD_POSTCOPY_RESUME,   /* resume postcopy on dest */
     MIG_CMD_RECV_BITMAP,       /* Request for recved bitmap on dst */
     MIG_CMD_MAX
 };
@@ -2194,9 +2194,6 @@
     /* Clear the triggered bit to allow one recovery */
     mis->postcopy_recover_triggered = false;
 
-    migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_ACTIVE,
-                      MIGRATION_STATUS_POSTCOPY_PAUSED);
-
     assert(mis->from_src_file);
     qemu_file_shutdown(mis->from_src_file);
     qemu_fclose(mis->from_src_file);
@@ -2209,6 +2206,9 @@
     mis->to_src_file = NULL;
     qemu_mutex_unlock(&mis->rp_mutex);
 
+    migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_ACTIVE,
+                      MIGRATION_STATUS_POSTCOPY_PAUSED);
+
     /* Notify the fault thread for the invalidated file handle */
     postcopy_fault_thread_notify(mis);
 
@@ -2276,18 +2276,14 @@
         qemu_file_set_error(f, ret);
 
         /*
-         * Detect whether it is:
-         *
-         * 1. postcopy running (after receiving all device data, which
-         *    must be in POSTCOPY_INCOMING_RUNNING state.  Note that
-         *    POSTCOPY_INCOMING_LISTENING is still not enough, it's
-         *    still receiving device states).
-         * 2. network failure (-EIO)
-         *
-         * If so, we try to wait for a recovery.
+         * If we are during an active postcopy, then we pause instead
+         * of bail out to at least keep the VM's dirty data.  Note
+         * that POSTCOPY_INCOMING_LISTENING stage is still not enough,
+         * during which we're still receiving device states and we
+         * still haven't yet started the VM on destination.
          */
         if (postcopy_state_get() == POSTCOPY_INCOMING_RUNNING &&
-            ret == -EIO && postcopy_pause_incoming(mis)) {
+            postcopy_pause_incoming(mis)) {
             /* Reset f to point to the newly created channel */
             f = mis->from_src_file;
             goto retry;
diff --git a/migration/socket.c b/migration/socket.c
index 3456eb7..f4c8174 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -168,12 +168,7 @@
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_net_listener_disconnect(listener);
-
         object_unref(OBJECT(listener));
-
-        if (!migrate_use_multifd()) {
-            migration_incoming_process();
-        }
     }
 }
 
diff --git a/tests/migration-test.c b/tests/migration-test.c
index 331efb0..086f727 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -168,6 +168,37 @@
     return response;
 }
 
+/*
+ * Note: caller is responsible to free the returned object via
+ * qobject_unref() after use
+ */
+static QDict *migrate_query(QTestState *who)
+{
+    QDict *rsp, *rsp_return;
+
+    rsp = wait_command(who, "{ 'execute': 'query-migrate' }");
+    rsp_return = qdict_get_qdict(rsp, "return");
+    g_assert(rsp_return);
+    qobject_ref(rsp_return);
+    qobject_unref(rsp);
+
+    return rsp_return;
+}
+
+/*
+ * Note: caller is responsible to free the returned object via
+ * g_free() after use
+ */
+static gchar *migrate_query_status(QTestState *who)
+{
+    QDict *rsp_return = migrate_query(who);
+    gchar *status = g_strdup(qdict_get_str(rsp_return, "status"));
+
+    g_assert(status);
+    qobject_unref(rsp_return);
+
+    return status;
+}
 
 /*
  * It's tricky to use qemu's migration event capability with qtest,
@@ -176,11 +207,10 @@
 
 static uint64_t get_migration_pass(QTestState *who)
 {
-    QDict *rsp, *rsp_return, *rsp_ram;
+    QDict *rsp_return, *rsp_ram;
     uint64_t result;
 
-    rsp = wait_command(who, "{ 'execute': 'query-migrate' }");
-    rsp_return = qdict_get_qdict(rsp, "return");
+    rsp_return = migrate_query(who);
     if (!qdict_haskey(rsp_return, "ram")) {
         /* Still in setup */
         result = 0;
@@ -188,33 +218,30 @@
         rsp_ram = qdict_get_qdict(rsp_return, "ram");
         result = qdict_get_try_int(rsp_ram, "dirty-sync-count", 0);
     }
-    qobject_unref(rsp);
+    qobject_unref(rsp_return);
     return result;
 }
 
 static void read_blocktime(QTestState *who)
 {
-    QDict *rsp, *rsp_return;
+    QDict *rsp_return;
 
-    rsp = wait_command(who, "{ 'execute': 'query-migrate' }");
-    rsp_return = qdict_get_qdict(rsp, "return");
+    rsp_return = migrate_query(who);
     g_assert(qdict_haskey(rsp_return, "postcopy-blocktime"));
-    qobject_unref(rsp);
+    qobject_unref(rsp_return);
 }
 
-static void wait_for_migration_complete(QTestState *who)
+static void wait_for_migration_status(QTestState *who,
+                                      const char *goal)
 {
     while (true) {
-        QDict *rsp, *rsp_return;
         bool completed;
-        const char *status;
+        char *status;
 
-        rsp = wait_command(who, "{ 'execute': 'query-migrate' }");
-        rsp_return = qdict_get_qdict(rsp, "return");
-        status = qdict_get_str(rsp_return, "status");
-        completed = strcmp(status, "completed") == 0;
+        status = migrate_query_status(who);
+        completed = strcmp(status, goal) == 0;
         g_assert_cmpstr(status, !=,  "failed");
-        qobject_unref(rsp);
+        g_free(status);
         if (completed) {
             return;
         }
@@ -222,6 +249,11 @@
     }
 }
 
+static void wait_for_migration_complete(QTestState *who)
+{
+    wait_for_migration_status(who, "completed");
+}
+
 static void wait_for_migration_pass(QTestState *who)
 {
     uint64_t initial_pass = get_migration_pass(who);
@@ -320,6 +352,29 @@
     migrate_check_parameter(who, parameter, value);
 }
 
+static void migrate_pause(QTestState *who)
+{
+    QDict *rsp;
+
+    rsp = wait_command(who, "{ 'execute': 'migrate-pause' }");
+    g_assert(qdict_haskey(rsp, "return"));
+    qobject_unref(rsp);
+}
+
+static void migrate_recover(QTestState *who, const char *uri)
+{
+    QDict *rsp;
+    gchar *cmd = g_strdup_printf(
+        "{ 'execute': 'migrate-recover', "
+        "  'id': 'recover-cmd', "
+        "  'arguments': { 'uri': '%s' } }", uri);
+
+    rsp = wait_command(who, cmd);
+    g_assert(qdict_haskey(rsp, "return"));
+    g_free(cmd);
+    qobject_unref(rsp);
+}
+
 static void migrate_set_capability(QTestState *who, const char *capability,
                                    const char *value)
 {
@@ -337,27 +392,33 @@
     qobject_unref(rsp);
 }
 
-static void migrate(QTestState *who, const char *uri)
+static void migrate(QTestState *who, const char *uri, const char *extra)
 {
     QDict *rsp;
     gchar *cmd;
 
     cmd = g_strdup_printf("{ 'execute': 'migrate',"
-                          "'arguments': { 'uri': '%s' } }",
-                          uri);
+                          "  'arguments': { 'uri': '%s' %s } }",
+                          uri, extra ? extra : "");
     rsp = qtest_qmp(who, cmd);
     g_free(cmd);
     g_assert(qdict_haskey(rsp, "return"));
     qobject_unref(rsp);
 }
 
-static void migrate_start_postcopy(QTestState *who)
+static void migrate_postcopy_start(QTestState *from, QTestState *to)
 {
     QDict *rsp;
 
-    rsp = wait_command(who, "{ 'execute': 'migrate-start-postcopy' }");
+    rsp = wait_command(from, "{ 'execute': 'migrate-start-postcopy' }");
     g_assert(qdict_haskey(rsp, "return"));
     qobject_unref(rsp);
+
+    if (!got_stop) {
+        qtest_qmp_eventwait(from, "STOP");
+    }
+
+    qtest_qmp_eventwait(to, "RESUME");
 }
 
 static int test_migrate_start(QTestState **from, QTestState **to,
@@ -510,13 +571,15 @@
     qtest_quit(from);
 }
 
-static void test_postcopy(void)
+static int migrate_postcopy_prepare(QTestState **from_ptr,
+                                     QTestState **to_ptr,
+                                     bool hide_error)
 {
     char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
     QTestState *from, *to;
 
-    if (test_migrate_start(&from, &to, uri, false)) {
-        return;
+    if (test_migrate_start(&from, &to, uri, hide_error)) {
+        return -1;
     }
 
     migrate_set_capability(from, "postcopy-ram", "true");
@@ -533,49 +596,114 @@
     /* Wait for the first serial output from the source */
     wait_for_serial("src_serial");
 
-    migrate(from, uri);
+    migrate(from, uri, NULL);
+    g_free(uri);
 
     wait_for_migration_pass(from);
 
-    migrate_start_postcopy(from);
+    *from_ptr = from;
+    *to_ptr = to;
 
-    if (!got_stop) {
-        qtest_qmp_eventwait(from, "STOP");
-    }
+    return 0;
+}
 
-    qtest_qmp_eventwait(to, "RESUME");
-
-    wait_for_serial("dest_serial");
+static void migrate_postcopy_complete(QTestState *from, QTestState *to)
+{
     wait_for_migration_complete(from);
 
+    /* Make sure we get at least one "B" on destination */
+    wait_for_serial("dest_serial");
+
     if (uffd_feature_thread_id) {
         read_blocktime(to);
     }
-    g_free(uri);
 
     test_migrate_end(from, to, true);
 }
 
+static void test_postcopy(void)
+{
+    QTestState *from, *to;
+
+    if (migrate_postcopy_prepare(&from, &to, false)) {
+        return;
+    }
+    migrate_postcopy_start(from, to);
+    migrate_postcopy_complete(from, to);
+}
+
+static void test_postcopy_recovery(void)
+{
+    QTestState *from, *to;
+    char *uri;
+
+    if (migrate_postcopy_prepare(&from, &to, true)) {
+        return;
+    }
+
+    /* Turn postcopy speed down, 4K/s is slow enough on any machines */
+    migrate_set_parameter(from, "max-postcopy-bandwidth", "4096");
+
+    /* Now we start the postcopy */
+    migrate_postcopy_start(from, to);
+
+    /*
+     * Wait until postcopy is really started; we can only run the
+     * migrate-pause command during a postcopy
+     */
+    wait_for_migration_status(from, "postcopy-active");
+
+    /*
+     * Manually stop the postcopy migration. This emulates a network
+     * failure with the migration socket
+     */
+    migrate_pause(from);
+
+    /*
+     * Wait for destination side to reach postcopy-paused state.  The
+     * migrate-recover command can only succeed if destination machine
+     * is in the paused state
+     */
+    wait_for_migration_status(to, "postcopy-paused");
+
+    /*
+     * Create a new socket to emulate a new channel that is different
+     * from the broken migration channel; tell the destination to
+     * listen to the new port
+     */
+    uri = g_strdup_printf("unix:%s/migsocket-recover", tmpfs);
+    migrate_recover(to, uri);
+
+    /*
+     * Try to rebuild the migration channel using the resume flag and
+     * the newly created channel
+     */
+    wait_for_migration_status(from, "postcopy-paused");
+    migrate(from, uri, ", 'resume': true");
+    g_free(uri);
+
+    /* Restore the postcopy bandwidth to unlimited */
+    migrate_set_parameter(from, "max-postcopy-bandwidth", "0");
+
+    migrate_postcopy_complete(from, to);
+}
+
 static void test_baddest(void)
 {
     QTestState *from, *to;
     QDict *rsp, *rsp_return;
-    const char *status;
+    char *status;
     bool failed;
 
     if (test_migrate_start(&from, &to, "tcp:0:0", true)) {
         return;
     }
-    migrate(from, "tcp:0:0");
+    migrate(from, "tcp:0:0", NULL);
     do {
-        rsp = wait_command(from, "{ 'execute': 'query-migrate' }");
-        rsp_return = qdict_get_qdict(rsp, "return");
-
-        status = qdict_get_str(rsp_return, "status");
-
+        status = migrate_query_status(from);
         g_assert(!strcmp(status, "setup") || !(strcmp(status, "failed")));
         failed = !strcmp(status, "failed");
-        qobject_unref(rsp);
+        g_free(status);
     } while (!failed);
 
     /* Is the machine currently running? */
@@ -610,7 +738,7 @@
     /* Wait for the first serial output from the source */
     wait_for_serial("src_serial");
 
-    migrate(from, uri);
+    migrate(from, uri, NULL);
 
     wait_for_migration_pass(from);
 
@@ -650,6 +778,7 @@
     module_call_init(MODULE_INIT_QOM);
 
     qtest_add_func("/migration/postcopy/unix", test_postcopy);
+    qtest_add_func("/migration/postcopy/recovery", test_postcopy_recovery);
     qtest_add_func("/migration/deprecated", test_deprecated);
     qtest_add_func("/migration/bad_dest", test_baddest);
     qtest_add_func("/migration/precopy/unix", test_precopy_unix);