nbd: convert qemu-nbd server to use I/O channels for connection setup

This converts the qemu-nbd server to use the QIOChannelSocket
class for initial listener socket setup and accepting of client
connections. Actual I/O is still being performed against the
socket file descriptor using the POSIX socket APIs.

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
Message-Id: <1455129674-17255-5-git-send-email-berrange@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 130c306..bc309e0 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -22,19 +22,17 @@
 #include "block/block_int.h"
 #include "block/nbd.h"
 #include "qemu/main-loop.h"
-#include "qemu/sockets.h"
 #include "qemu/error-report.h"
 #include "qemu/config-file.h"
 #include "block/snapshot.h"
 #include "qapi/util.h"
 #include "qapi/qmp/qstring.h"
 #include "qom/object_interfaces.h"
+#include "io/channel-socket.h"
 
 #include <getopt.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <arpa/inet.h>
+#include <sys/types.h>
+#include <signal.h>
 #include <libgen.h>
 #include <pthread.h>
 
@@ -53,7 +51,8 @@
 static enum { RUNNING, TERMINATE, TERMINATING, TERMINATED } state;
 static int shared = 1;
 static int nb_fds;
-static int server_fd;
+static QIOChannelSocket *server_ioc;
+static int server_watch = -1;
 
 static void usage(const char *name)
 {
@@ -236,19 +235,21 @@
     char *device = arg;
     off_t size;
     uint32_t nbdflags;
-    int fd, sock;
+    QIOChannelSocket *sioc;
+    int fd;
     int ret;
     pthread_t show_parts_thread;
     Error *local_error = NULL;
 
-
-    sock = socket_connect(saddr, &local_error, NULL, NULL);
-    if (sock < 0) {
+    sioc = qio_channel_socket_new();
+    if (qio_channel_socket_connect_sync(sioc,
+                                        saddr,
+                                        &local_error) < 0) {
         error_report_err(local_error);
         goto out;
     }
 
-    ret = nbd_receive_negotiate(sock, NULL, &nbdflags,
+    ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
                                 &size, &local_error);
     if (ret < 0) {
         if (local_error) {
@@ -264,7 +265,7 @@
         goto out_socket;
     }
 
-    ret = nbd_init(fd, sock, nbdflags, size);
+    ret = nbd_init(fd, sioc->fd, nbdflags, size);
     if (ret < 0) {
         goto out_fd;
     }
@@ -285,13 +286,14 @@
         goto out_fd;
     }
     close(fd);
+    object_unref(OBJECT(sioc));
     kill(getpid(), SIGTERM);
     return (void *) EXIT_SUCCESS;
 
 out_fd:
     close(fd);
 out_socket:
-    closesocket(sock);
+    object_unref(OBJECT(sioc));
 out:
     kill(getpid(), SIGTERM);
     return (void *) EXIT_FAILURE;
@@ -308,7 +310,7 @@
     state = TERMINATED;
 }
 
-static void nbd_update_server_fd_handler(int fd);
+static void nbd_update_server_watch(void);
 
 static void nbd_client_closed(NBDClient *client)
 {
@@ -316,37 +318,51 @@
     if (nb_fds == 0 && !persistent && state == RUNNING) {
         state = TERMINATE;
     }
-    nbd_update_server_fd_handler(server_fd);
+    nbd_update_server_watch();
     nbd_client_put(client);
 }
 
-static void nbd_accept(void *opaque)
+static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
 {
-    struct sockaddr_in addr;
-    socklen_t addr_len = sizeof(addr);
+    QIOChannelSocket *cioc;
+    int fd;
 
-    int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len);
-    if (fd < 0) {
-        perror("accept");
-        return;
+    cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
+                                     NULL);
+    if (!cioc) {
+        return TRUE;
     }
 
     if (state >= TERMINATE) {
-        close(fd);
-        return;
+        object_unref(OBJECT(cioc));
+        return TRUE;
     }
 
     nb_fds++;
-    nbd_update_server_fd_handler(server_fd);
-    nbd_client_new(exp, fd, nbd_client_closed);
+    nbd_update_server_watch();
+    fd = dup(cioc->fd);
+    if (fd >= 0) {
+        nbd_client_new(exp, fd, nbd_client_closed);
+    }
+    object_unref(OBJECT(cioc));
+
+    return TRUE;
 }
 
-static void nbd_update_server_fd_handler(int fd)
+static void nbd_update_server_watch(void)
 {
     if (nbd_can_accept()) {
-        qemu_set_fd_handler(fd, nbd_accept, NULL, (void *)(uintptr_t)fd);
+        if (server_watch == -1) {
+            server_watch = qio_channel_add_watch(QIO_CHANNEL(server_ioc),
+                                                 G_IO_IN,
+                                                 nbd_accept,
+                                                 NULL, NULL);
+        }
     } else {
-        qemu_set_fd_handler(fd, NULL, NULL, NULL);
+        if (server_watch != -1) {
+            g_source_remove(server_watch);
+            server_watch = -1;
+        }
     }
 }
 
@@ -433,7 +449,6 @@
     int flags = BDRV_O_RDWR;
     int partition = -1;
     int ret = 0;
-    int fd;
     bool seen_cache = false;
     bool seen_discard = false;
     bool seen_aio = false;
@@ -632,15 +647,15 @@
     }
 
     if (disconnect) {
-        fd = open(argv[optind], O_RDWR);
-        if (fd < 0) {
+        int nbdfd = open(argv[optind], O_RDWR);
+        if (nbdfd < 0) {
             error_report("Cannot open %s: %s", argv[optind],
                          strerror(errno));
             exit(EXIT_FAILURE);
         }
-        nbd_disconnect(fd);
+        nbd_disconnect(nbdfd);
 
-        close(fd);
+        close(nbdfd);
 
         printf("%s disconnected\n", argv[optind]);
 
@@ -773,8 +788,9 @@
         exit(EXIT_FAILURE);
     }
 
-    fd = socket_listen(saddr, &local_err);
-    if (fd < 0) {
+    server_ioc = qio_channel_socket_new();
+    if (qio_channel_socket_listen_sync(server_ioc, saddr, &local_err) < 0) {
+        object_unref(OBJECT(server_ioc));
         error_report_err(local_err);
         return 1;
     }
@@ -792,8 +808,7 @@
         memset(&client_thread, 0, sizeof(client_thread));
     }
 
-    server_fd = fd;
-    nbd_update_server_fd_handler(fd);
+    nbd_update_server_watch();
 
     /* now when the initialization is (almost) complete, chdir("/")
      * to free any busy filesystems */