qmp: cleanup qmp queues properly

Marc-André Lureau reported that we can have this happen:

1. client1 connects, send command C1
2. client1 disconnects before getting response for C1
3. client2 connects, who might receive response of C1

However client2 should not receive remaining responses for client1.

Basically, we should clean up the request/response queue elements when:

- after a session is closed
- before destroying the queues

Some helpers are introduced to achieve that.  We need to make sure we're
with the lock when operating on those queues.  This also needed the
declaration of QMPRequest moved earlier.

Reported-by: Marc-André Lureau <marcandre.lureau@redhat.com>
Signed-off-by: Peter Xu <peterx@redhat.com>
Message-Id: <20180326063901.27425-3-peterx@redhat.com>
Reviewed-by: Marc-André Lureau <marcandre.lureau@redhat.com>
[eblake: drop pointless qmp_response_free(), drop queue flush on connect
since a clean queue on disconnect is sufficient]
Tested-by: Christian Borntraeger <borntraeger@de.ibm.com>
Signed-off-by: Eric Blake <eblake@redhat.com>
diff --git a/monitor.c b/monitor.c
index 3b1ef34..32d6114 100644
--- a/monitor.c
+++ b/monitor.c
@@ -234,6 +234,22 @@
     QEMUBH *qmp_respond_bh;
 } mon_global;
 
+struct QMPRequest {
+    /* Owner of the request */
+    Monitor *mon;
+    /* "id" field of the request */
+    QObject *id;
+    /* Request object to be handled */
+    QObject *req;
+    /*
+     * Whether we need to resume the monitor afterward.  This flag is
+     * used to emulate the old QMP server behavior that the current
+     * command must be completed before execution of the next one.
+     */
+    bool need_resume;
+};
+typedef struct QMPRequest QMPRequest;
+
 /* QMP checker flags */
 #define QMP_ACCEPT_UNKNOWNS 1
 
@@ -310,6 +326,38 @@
     }
 }
 
+static void qmp_request_free(QMPRequest *req)
+{
+    qobject_decref(req->id);
+    qobject_decref(req->req);
+    g_free(req);
+}
+
+/* Must with the mon->qmp.qmp_queue_lock held */
+static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
+{
+    while (!g_queue_is_empty(mon->qmp.qmp_requests)) {
+        qmp_request_free(g_queue_pop_head(mon->qmp.qmp_requests));
+    }
+}
+
+/* Must with the mon->qmp.qmp_queue_lock held */
+static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
+{
+    while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
+        qobject_decref(g_queue_pop_head(mon->qmp.qmp_responses));
+    }
+}
+
+static void monitor_qmp_cleanup_queues(Monitor *mon)
+{
+    qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+    monitor_qmp_cleanup_req_queue_locked(mon);
+    monitor_qmp_cleanup_resp_queue_locked(mon);
+    qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+}
+
+
 static void monitor_flush_locked(Monitor *mon);
 
 static gboolean monitor_unblocked(GIOChannel *chan, GIOCondition cond,
@@ -701,6 +749,8 @@
     QDECREF(mon->outbuf);
     qemu_mutex_destroy(&mon->out_lock);
     qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
+    monitor_qmp_cleanup_req_queue_locked(mon);
+    monitor_qmp_cleanup_resp_queue_locked(mon);
     g_queue_free(mon->qmp.qmp_requests);
     g_queue_free(mon->qmp.qmp_responses);
 }
@@ -4009,22 +4059,6 @@
     qobject_decref(rsp);
 }
 
-struct QMPRequest {
-    /* Owner of the request */
-    Monitor *mon;
-    /* "id" field of the request */
-    QObject *id;
-    /* Request object to be handled */
-    QObject *req;
-    /*
-     * Whether we need to resume the monitor afterward.  This flag is
-     * used to emulate the old QMP server behavior that the current
-     * command must be completed before execution of the next one.
-     */
-    bool need_resume;
-};
-typedef struct QMPRequest QMPRequest;
-
 /*
  * Dispatch one single QMP request. The function will free the req_obj
  * and objects inside it before return.
@@ -4191,9 +4225,7 @@
             qapi_event_send_command_dropped(id,
                                             COMMAND_DROP_REASON_QUEUE_FULL,
                                             &error_abort);
-            qobject_decref(id);
-            qobject_decref(req);
-            g_free(req_obj);
+            qmp_request_free(req_obj);
             return;
         }
     }
@@ -4335,6 +4367,7 @@
         mon_refcount++;
         break;
     case CHR_EVENT_CLOSED:
+        monitor_qmp_cleanup_queues(mon);
         json_message_parser_destroy(&mon->qmp.parser);
         json_message_parser_init(&mon->qmp.parser, handle_qmp_command);
         mon_refcount--;