|  | /* | 
|  | * Multifd device state migration | 
|  | * | 
|  | * Copyright (C) 2024,2025 Oracle and/or its affiliates. | 
|  | * | 
|  | * This work is licensed under the terms of the GNU GPL, version 2 or later. | 
|  | * See the COPYING file in the top-level directory. | 
|  | * | 
|  | * SPDX-License-Identifier: GPL-2.0-or-later | 
|  | */ | 
|  |  | 
|  | #include "qemu/osdep.h" | 
|  | #include "qapi/error.h" | 
|  | #include "qemu/lockable.h" | 
|  | #include "block/thread-pool.h" | 
|  | #include "migration.h" | 
|  | #include "migration/misc.h" | 
|  | #include "multifd.h" | 
|  | #include "options.h" | 
|  |  | 
|  | static struct { | 
|  | QemuMutex queue_job_mutex; | 
|  |  | 
|  | MultiFDSendData *send_data; | 
|  |  | 
|  | ThreadPool *threads; | 
|  | bool threads_abort; | 
|  | } *multifd_send_device_state; | 
|  |  | 
|  | void multifd_device_state_send_setup(void) | 
|  | { | 
|  | assert(!multifd_send_device_state); | 
|  | multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state)); | 
|  |  | 
|  | qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); | 
|  |  | 
|  | multifd_send_device_state->send_data = multifd_send_data_alloc(); | 
|  |  | 
|  | multifd_send_device_state->threads = thread_pool_new(); | 
|  | multifd_send_device_state->threads_abort = false; | 
|  | } | 
|  |  | 
|  | void multifd_device_state_send_cleanup(void) | 
|  | { | 
|  | g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free); | 
|  | g_clear_pointer(&multifd_send_device_state->send_data, | 
|  | multifd_send_data_free); | 
|  |  | 
|  | qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex); | 
|  |  | 
|  | g_clear_pointer(&multifd_send_device_state, g_free); | 
|  | } | 
|  |  | 
|  | void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state) | 
|  | { | 
|  | g_clear_pointer(&device_state->idstr, g_free); | 
|  | g_clear_pointer(&device_state->buf, g_free); | 
|  | } | 
|  |  | 
|  | static void multifd_device_state_fill_packet(MultiFDSendParams *p) | 
|  | { | 
|  | MultiFDDeviceState_t *device_state = &p->data->u.device_state; | 
|  | MultiFDPacketDeviceState_t *packet = p->packet_device_state; | 
|  |  | 
|  | packet->hdr.flags = cpu_to_be32(p->flags); | 
|  | strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1); | 
|  | packet->idstr[sizeof(packet->idstr) - 1] = 0; | 
|  | packet->instance_id = cpu_to_be32(device_state->instance_id); | 
|  | packet->next_packet_size = cpu_to_be32(p->next_packet_size); | 
|  | } | 
|  |  | 
|  | static void multifd_prepare_header_device_state(MultiFDSendParams *p) | 
|  | { | 
|  | p->iov[0].iov_len = sizeof(*p->packet_device_state); | 
|  | p->iov[0].iov_base = p->packet_device_state; | 
|  | p->iovs_num++; | 
|  | } | 
|  |  | 
|  | void multifd_device_state_send_prepare(MultiFDSendParams *p) | 
|  | { | 
|  | MultiFDDeviceState_t *device_state = &p->data->u.device_state; | 
|  |  | 
|  | assert(multifd_payload_device_state(p->data)); | 
|  |  | 
|  | multifd_prepare_header_device_state(p); | 
|  |  | 
|  | assert(!(p->flags & MULTIFD_FLAG_SYNC)); | 
|  |  | 
|  | p->next_packet_size = device_state->buf_len; | 
|  | if (p->next_packet_size > 0) { | 
|  | p->iov[p->iovs_num].iov_base = device_state->buf; | 
|  | p->iov[p->iovs_num].iov_len = p->next_packet_size; | 
|  | p->iovs_num++; | 
|  | } | 
|  |  | 
|  | p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE; | 
|  |  | 
|  | multifd_device_state_fill_packet(p); | 
|  | } | 
|  |  | 
|  | bool multifd_queue_device_state(char *idstr, uint32_t instance_id, | 
|  | char *data, size_t len) | 
|  | { | 
|  | /* Device state submissions can come from multiple threads */ | 
|  | QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex); | 
|  | MultiFDDeviceState_t *device_state; | 
|  |  | 
|  | assert(multifd_payload_empty(multifd_send_device_state->send_data)); | 
|  |  | 
|  | multifd_set_payload_type(multifd_send_device_state->send_data, | 
|  | MULTIFD_PAYLOAD_DEVICE_STATE); | 
|  | device_state = &multifd_send_device_state->send_data->u.device_state; | 
|  | device_state->idstr = g_strdup(idstr); | 
|  | device_state->instance_id = instance_id; | 
|  | device_state->buf = g_memdup2(data, len); | 
|  | device_state->buf_len = len; | 
|  |  | 
|  | if (!multifd_send(&multifd_send_device_state->send_data)) { | 
|  | multifd_send_data_clear(multifd_send_device_state->send_data); | 
|  | return false; | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | bool multifd_device_state_supported(void) | 
|  | { | 
|  | return migrate_multifd() && !migrate_mapped_ram() && | 
|  | migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; | 
|  | } | 
|  |  | 
|  | static void multifd_device_state_save_thread_data_free(void *opaque) | 
|  | { | 
|  | SaveLiveCompletePrecopyThreadData *data = opaque; | 
|  |  | 
|  | g_clear_pointer(&data->idstr, g_free); | 
|  | g_free(data); | 
|  | } | 
|  |  | 
|  | static int multifd_device_state_save_thread(void *opaque) | 
|  | { | 
|  | SaveLiveCompletePrecopyThreadData *data = opaque; | 
|  | g_autoptr(Error) local_err = NULL; | 
|  |  | 
|  | if (!data->hdlr(data, &local_err)) { | 
|  | MigrationState *s = migrate_get_current(); | 
|  |  | 
|  | /* | 
|  | * Can't call abort_device_state_save_threads() here since new | 
|  | * save threads could still be in process of being launched | 
|  | * (if, for example, the very first save thread launched exited | 
|  | * with an error very quickly). | 
|  | */ | 
|  |  | 
|  | assert(local_err); | 
|  |  | 
|  | /* | 
|  | * In case of multiple save threads failing which thread error | 
|  | * return we end setting is purely arbitrary. | 
|  | */ | 
|  | migrate_set_error(s, local_err); | 
|  | } | 
|  |  | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | bool multifd_device_state_save_thread_should_exit(void) | 
|  | { | 
|  | return qatomic_read(&multifd_send_device_state->threads_abort); | 
|  | } | 
|  |  | 
|  | void | 
|  | multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, | 
|  | char *idstr, uint32_t instance_id, | 
|  | void *opaque) | 
|  | { | 
|  | SaveLiveCompletePrecopyThreadData *data; | 
|  |  | 
|  | assert(multifd_device_state_supported()); | 
|  | assert(multifd_send_device_state); | 
|  |  | 
|  | assert(!qatomic_read(&multifd_send_device_state->threads_abort)); | 
|  |  | 
|  | data = g_new(SaveLiveCompletePrecopyThreadData, 1); | 
|  | data->hdlr = hdlr; | 
|  | data->idstr = g_strdup(idstr); | 
|  | data->instance_id = instance_id; | 
|  | data->handler_opaque = opaque; | 
|  |  | 
|  | thread_pool_submit_immediate(multifd_send_device_state->threads, | 
|  | multifd_device_state_save_thread, | 
|  | data, | 
|  | multifd_device_state_save_thread_data_free); | 
|  | } | 
|  |  | 
|  | void multifd_abort_device_state_save_threads(void) | 
|  | { | 
|  | assert(multifd_device_state_supported()); | 
|  |  | 
|  | qatomic_set(&multifd_send_device_state->threads_abort, true); | 
|  | } | 
|  |  | 
|  | bool multifd_join_device_state_save_threads(void) | 
|  | { | 
|  | MigrationState *s = migrate_get_current(); | 
|  |  | 
|  | assert(multifd_device_state_supported()); | 
|  |  | 
|  | thread_pool_wait(multifd_send_device_state->threads); | 
|  |  | 
|  | return !migrate_has_error(s); | 
|  | } |