[ovs-dev] [poll group RFC 5/6] lib: Add poll group support in jsonrpc library.
Andy Zhou
azhou at ovn.org
Fri Feb 19 04:20:55 UTC 2016
Signed-off-by: Andy Zhou <azhou at ovn.org>
---
lib/jsonrpc.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++----
lib/jsonrpc.h | 7 ++++++
2 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 35428a6..e8ea8b4 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -138,11 +138,15 @@ jsonrpc_run(struct jsonrpc *rpc)
}
/* Arranges for the poll loop to wake up when 'rpc' needs to perform
- * maintenance activities. */
+ * maintenance activities.
+ *
+ * This function should not be called when 'rpc' has joined a poll
+ * group. Use poll_group_get_events() instead. */
void
jsonrpc_wait(struct jsonrpc *rpc)
{
if (!rpc->status) {
+ ovs_assert(!stream_joined(rpc->stream));
stream_run_wait(rpc->stream);
if (!list_is_empty(&rpc->output)) {
stream_send_wait(rpc->stream);
@@ -353,11 +357,36 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
return EAGAIN;
}
+bool
+jsonrpc_joined_poll_group(const struct jsonrpc *rpc)
+{
+ return rpc->status ? false : stream_joined(rpc->stream);
+}
+
+bool
+jsonrpc_has_pending_input(const struct jsonrpc *rpc)
+{
+ return rpc->status ? false : !byteq_is_empty(&rpc->input);
+}
+
+void
+jsonrpc_poll_group_update(struct jsonrpc *rpc, bool write)
+{
+ if (!rpc->status) {
+ ovs_assert(stream_joined(rpc->stream));
+ stream_update(rpc->stream, write);
+ }
+}
+
/* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
* than EAGAIN. */
void
jsonrpc_recv_wait(struct jsonrpc *rpc)
{
+ if (!rpc->status) {
+ ovs_assert(!stream_joined(rpc->stream));
+ }
+
if (rpc->status || !byteq_is_empty(&rpc->input)) {
poll_immediate_wake_at(rpc->name);
} else {
@@ -377,6 +406,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
fatal_signal_run();
+ ovs_assert(!stream_joined(rpc->stream));
+
error = jsonrpc_send(rpc, msg);
if (error) {
return error;
@@ -397,6 +428,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
int
jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
{
+ ovs_assert(!stream_joined(rpc->stream));
+
for (;;) {
int error = jsonrpc_recv(rpc, msgp);
if (error != EAGAIN) {
@@ -827,7 +860,6 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
s->stream = NULL;
s->pstream = NULL;
s->seqno = 0;
-
return s;
}
@@ -976,12 +1008,26 @@ jsonrpc_session_run(struct jsonrpc_session *s)
}
}
+/* Only wait for stream within the 's'. Poll group does not
+ * handle pstream, and stream's initial connection, these
+ * are still using poll loop. */
void
jsonrpc_session_wait(struct jsonrpc_session *s)
{
+ /* When s->rpc is set, The jsonrpc session is in connected
+ * state. Check if 's' has already registered with a poll group.
+ *
+ * s->stream is set when the stream is not in a connected state
+ * continue to let poll loop handle it.
+ *
+ * poll group currently does not work with pstream. Let
+ * poll loop handle all pstreams. */
if (s->rpc) {
- jsonrpc_wait(s->rpc);
- } else if (s->stream) {
+ if (!jsonrpc_joined_poll_group(s->rpc)) {
+ jsonrpc_wait(s->rpc);
+ }
+ }
+ if (s->stream) {
stream_run_wait(s->stream);
stream_connect_wait(s->stream);
}
@@ -1055,6 +1101,26 @@ jsonrpc_session_recv(struct jsonrpc_session *s)
return NULL;
}
+bool
+jsonrpc_session_joined_poll_group(const struct jsonrpc_session *s)
+{
+ return s->rpc ? jsonrpc_joined_poll_group(s->rpc) : false;
+}
+
+bool
+jsonrpc_session_has_pending_input(const struct jsonrpc_session *s)
+{
+ return s->rpc ? jsonrpc_has_pending_input(s->rpc) : false;
+}
+
+void
+jsonrpc_session_poll_group_update(struct jsonrpc_session *s, bool write)
+{
+ if (s->rpc) {
+ jsonrpc_poll_group_update(s->rpc, write);
+ }
+}
+
void
jsonrpc_session_recv_wait(struct jsonrpc_session *s)
{
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 5f46e3b..c8e6690 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -53,6 +53,10 @@ size_t jsonrpc_get_backlog(const struct jsonrpc *);
unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
const char *jsonrpc_get_name(const struct jsonrpc *);
+bool jsonrpc_joined_poll_group(const struct jsonrpc *);
+bool jsonrpc_has_pending_input(const struct jsonrpc *);
+void jsonrpc_poll_group_update(struct jsonrpc *, bool);
+
int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **);
void jsonrpc_recv_wait(struct jsonrpc *);
@@ -107,6 +111,9 @@ void jsonrpc_session_run(struct jsonrpc_session *);
void jsonrpc_session_wait(struct jsonrpc_session *);
size_t jsonrpc_session_get_backlog(const struct jsonrpc_session *);
+bool jsonrpc_session_joined_poll_group(const struct jsonrpc_session *);
+bool jsonrpc_session_has_pending_input(const struct jsonrpc_session *);
+void jsonrpc_session_poll_group_update(struct jsonrpc_session *, bool);
const char *jsonrpc_session_get_name(const struct jsonrpc_session *);
int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
--
1.9.1
More information about the dev
mailing list