[ovs-dev] [poll group RFC 5/6] lib: Add poll group support in jsonrpc library.

Andy Zhou azhou at ovn.org
Fri Feb 19 04:20:55 UTC 2016


Signed-off-by: Andy Zhou <azhou at ovn.org>
---
 lib/jsonrpc.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 lib/jsonrpc.h |  7 ++++++
 2 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 35428a6..e8ea8b4 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -138,11 +138,15 @@ jsonrpc_run(struct jsonrpc *rpc)
 }
 
 /* Arranges for the poll loop to wake up when 'rpc' needs to perform
- * maintenance activities. */
+ * maintenance activities.
+ *
+ * This function should not be called when 'rpc' has joined a poll
+ * group. Use poll_group_get_events() instead.   */
 void
 jsonrpc_wait(struct jsonrpc *rpc)
 {
     if (!rpc->status) {
+        ovs_assert(!stream_joined(rpc->stream));
         stream_run_wait(rpc->stream);
         if (!list_is_empty(&rpc->output)) {
             stream_send_wait(rpc->stream);
@@ -353,11 +357,36 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
     return EAGAIN;
 }
 
+bool
+jsonrpc_joined_poll_group(const struct jsonrpc *rpc)
+{
+    return rpc->status ? false : stream_joined(rpc->stream);
+}
+
+bool
+jsonrpc_has_pending_input(const struct jsonrpc *rpc)
+{
+    return rpc->status ? false : !byteq_is_empty(&rpc->input);
+}
+
+void
+jsonrpc_poll_group_update(struct jsonrpc *rpc, bool write)
+{
+    if (!rpc->status) {
+        ovs_assert(stream_joined(rpc->stream));
+        stream_update(rpc->stream, write);
+    }
+}
+
 /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
  * than EAGAIN. */
 void
 jsonrpc_recv_wait(struct jsonrpc *rpc)
 {
+    if (!rpc->status) {
+        ovs_assert(!stream_joined(rpc->stream));
+    }
+
     if (rpc->status || !byteq_is_empty(&rpc->input)) {
         poll_immediate_wake_at(rpc->name);
     } else {
@@ -377,6 +406,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 
     fatal_signal_run();
 
+    ovs_assert(!stream_joined(rpc->stream));
+
     error = jsonrpc_send(rpc, msg);
     if (error) {
         return error;
@@ -397,6 +428,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 int
 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
 {
+    ovs_assert(!stream_joined(rpc->stream));
+
     for (;;) {
         int error = jsonrpc_recv(rpc, msgp);
         if (error != EAGAIN) {
@@ -827,7 +860,6 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     s->stream = NULL;
     s->pstream = NULL;
     s->seqno = 0;
-
     return s;
 }
 
@@ -976,12 +1008,26 @@ jsonrpc_session_run(struct jsonrpc_session *s)
     }
 }
 
+/* Only wait for stream within the 's'. Poll group does not
+ * handle pstream, and stream's initial connection, these
+ * are still using poll loop.   */
 void
 jsonrpc_session_wait(struct jsonrpc_session *s)
 {
+    /*  When s->rpc is set, The jsonrpc session is in connected
+     *  state. Check if 's' has already registered with a poll group.
+     *
+     *  s->stream is set when the stream is not in a connected state
+     *  continue to let poll loop handle it.
+     *
+     *  poll group currently does not work with pstream. Let
+     *  poll loop handle all pstreams. */
     if (s->rpc) {
-        jsonrpc_wait(s->rpc);
-    } else if (s->stream) {
+       if (!jsonrpc_joined_poll_group(s->rpc)) {
+            jsonrpc_wait(s->rpc);
+        }
+    }
+    if (s->stream) {
         stream_run_wait(s->stream);
         stream_connect_wait(s->stream);
     }
@@ -1055,6 +1101,26 @@ jsonrpc_session_recv(struct jsonrpc_session *s)
     return NULL;
 }
 
+bool
+jsonrpc_session_joined_poll_group(const struct jsonrpc_session *s)
+{
+    return s->rpc ? jsonrpc_joined_poll_group(s->rpc) : false;
+}
+
+bool
+jsonrpc_session_has_pending_input(const struct jsonrpc_session *s)
+{
+    return s->rpc ? jsonrpc_has_pending_input(s->rpc) : false;
+}
+
+void
+jsonrpc_session_poll_group_update(struct jsonrpc_session *s, bool write)
+{
+    if (s->rpc) {
+        jsonrpc_poll_group_update(s->rpc, write);
+    }
+}
+
 void
 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
 {
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 5f46e3b..c8e6690 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -53,6 +53,10 @@ size_t jsonrpc_get_backlog(const struct jsonrpc *);
 unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
 const char *jsonrpc_get_name(const struct jsonrpc *);
 
+bool jsonrpc_joined_poll_group(const struct jsonrpc *);
+bool jsonrpc_has_pending_input(const struct jsonrpc *);
+void jsonrpc_poll_group_update(struct jsonrpc *, bool);
+
 int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
 int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **);
 void jsonrpc_recv_wait(struct jsonrpc *);
@@ -107,6 +111,9 @@ void jsonrpc_session_run(struct jsonrpc_session *);
 void jsonrpc_session_wait(struct jsonrpc_session *);
 
 size_t jsonrpc_session_get_backlog(const struct jsonrpc_session *);
+bool jsonrpc_session_joined_poll_group(const struct jsonrpc_session *);
+bool jsonrpc_session_has_pending_input(const struct jsonrpc_session *);
+void jsonrpc_session_poll_group_update(struct jsonrpc_session *, bool);
 const char *jsonrpc_session_get_name(const struct jsonrpc_session *);
 
 int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
-- 
1.9.1




More information about the dev mailing list