| /* |
| * net stream generic functions |
| * |
| * Copyright Red Hat |
| * |
| * SPDX-License-Identifier: GPL-2.0-or-later |
| */ |
| |
| #include "qemu/osdep.h" |
| #include "qemu/iov.h" |
| #include "qapi/error.h" |
| #include "net/net.h" |
| #include "io/channel.h" |
| #include "io/net-listener.h" |
| |
| #include "stream_data.h" |
| |
| static gboolean net_stream_data_writable(QIOChannel *ioc, |
| GIOCondition condition, gpointer data) |
| { |
| NetStreamData *d = data; |
| |
| d->ioc_write_tag = 0; |
| |
| qemu_flush_queued_packets(&d->nc); |
| |
| return G_SOURCE_REMOVE; |
| } |
| |
| ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf, |
| size_t size) |
| { |
| uint32_t len = htonl(size); |
| struct iovec iov[] = { |
| { |
| .iov_base = &len, |
| .iov_len = sizeof(len), |
| }, { |
| .iov_base = (void *)buf, |
| .iov_len = size, |
| }, |
| }; |
| struct iovec local_iov[2]; |
| unsigned int nlocal_iov; |
| size_t remaining; |
| ssize_t ret; |
| |
| remaining = iov_size(iov, 2) - d->send_index; |
| nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining); |
| ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL); |
| if (ret == QIO_CHANNEL_ERR_BLOCK) { |
| ret = 0; /* handled further down */ |
| } |
| if (ret == -1) { |
| d->send_index = 0; |
| return -errno; |
| } |
| if (ret < (ssize_t)remaining) { |
| d->send_index += ret; |
| d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT, |
| net_stream_data_writable, d, |
| NULL); |
| return 0; |
| } |
| d->send_index = 0; |
| return size; |
| } |
| |
| static void net_stream_data_send_completed(NetClientState *nc, ssize_t len) |
| { |
| NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc); |
| |
| if (!d->ioc_read_tag) { |
| d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, |
| NULL); |
| } |
| } |
| |
| void net_stream_data_rs_finalize(SocketReadState *rs) |
| { |
| NetStreamData *d = container_of(rs, NetStreamData, rs); |
| |
| if (qemu_send_packet_async(&d->nc, rs->buf, |
| rs->packet_len, |
| net_stream_data_send_completed) == 0) { |
| if (d->ioc_read_tag) { |
| g_source_remove(d->ioc_read_tag); |
| d->ioc_read_tag = 0; |
| } |
| } |
| } |
| |
| gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition, |
| NetStreamData *d) |
| { |
| int size; |
| int ret; |
| QEMU_UNINITIALIZED char buf1[NET_BUFSIZE]; |
| const char *buf; |
| |
| size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL); |
| if (size < 0) { |
| if (errno != EWOULDBLOCK) { |
| goto eoc; |
| } |
| } else if (size == 0) { |
| /* end of connection */ |
| eoc: |
| d->ioc_read_tag = 0; |
| if (d->ioc_write_tag) { |
| g_source_remove(d->ioc_write_tag); |
| d->ioc_write_tag = 0; |
| } |
| if (d->listener) { |
| qemu_set_info_str(&d->nc, "listening"); |
| qio_net_listener_set_client_func(d->listener, |
| d->listen, d, NULL); |
| } |
| object_unref(OBJECT(d->ioc)); |
| d->ioc = NULL; |
| |
| net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); |
| d->nc.link_down = true; |
| |
| return G_SOURCE_REMOVE; |
| } |
| buf = buf1; |
| |
| ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size); |
| |
| if (ret == -1) { |
| goto eoc; |
| } |
| |
| return G_SOURCE_CONTINUE; |
| } |
| |
| void net_stream_data_listen(QIONetListener *listener, |
| QIOChannelSocket *cioc, |
| NetStreamData *d) |
| { |
| object_ref(OBJECT(cioc)); |
| |
| qio_net_listener_set_client_func(d->listener, NULL, d, NULL); |
| |
| d->ioc = QIO_CHANNEL(cioc); |
| qio_channel_set_name(d->ioc, "stream-server"); |
| d->nc.link_down = false; |
| |
| d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); |
| } |
| |
| int net_stream_data_client_connected(QIOTask *task, NetStreamData *d) |
| { |
| QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc); |
| SocketAddress *addr; |
| int ret; |
| Error *err = NULL; |
| |
| if (qio_task_propagate_error(task, &err)) { |
| qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err)); |
| error_free(err); |
| goto error; |
| } |
| |
| addr = qio_channel_socket_get_remote_address(sioc, NULL); |
| g_assert(addr != NULL); |
| |
| ret = qemu_socket_try_set_nonblock(sioc->fd); |
| if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { |
| qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)", |
| addr->u.fd.str, -ret); |
| qapi_free_SocketAddress(addr); |
| goto error; |
| } |
| g_assert(ret == 0); |
| qapi_free_SocketAddress(addr); |
| |
| net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); |
| |
| /* Disable Nagle algorithm on TCP sockets to reduce latency */ |
| qio_channel_set_delay(d->ioc, false); |
| |
| d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); |
| d->nc.link_down = false; |
| |
| return 0; |
| error: |
| object_unref(OBJECT(d->ioc)); |
| d->ioc = NULL; |
| |
| return -1; |
| } |