[ovs-dev] [poll group RFC 3/6] lib: Update stream class to support poll group.
Andy Zhou
azhou at ovn.org
Fri Feb 19 20:40:21 UTC 2016
Add new APIs in stream class that works with poll group.
Signed-off-by: Andy Zhou <azhou at ovn.org>
---
lib/stream-fd.c | 32 ++++++++++++++++++++
lib/stream-provider.h | 17 +++++++++++
lib/stream-ssl.c | 40 ++++++++++++++++++++++++-
lib/stream-tcp.c | 3 ++
lib/stream-unix.c | 3 ++
lib/stream.c | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++-
lib/stream.h | 5 ++++
7 files changed, 181 insertions(+), 2 deletions(-)
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 31bfc6e..496a517 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -28,6 +28,7 @@
#include "socket-util.h"
#include "util.h"
#include "stream-provider.h"
+#include "poll-group.h"
#include "stream.h"
#include "openvswitch/vlog.h"
@@ -160,6 +161,34 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
}
}
+static int
+fd_join(struct stream *stream)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ struct poll_group *group = stream->poll_group;
+ void *caller_event = stream->caller_event;
+
+ return poll_group_join(group, s->fd, caller_event);
+}
+
+static int
+fd_update(struct stream *stream, bool write)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ struct poll_group *group = stream->poll_group;
+
+ return poll_group_update(group, s->fd, write, stream->caller_event);
+}
+
+static int
+fd_leave(struct stream *stream)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ struct poll_group *group = stream->poll_group;
+
+ return poll_group_leave(group, s->fd);
+}
+
static const struct stream_class stream_fd_class = {
"fd", /* name */
false, /* needs_probes */
@@ -171,6 +200,9 @@ static const struct stream_class stream_fd_class = {
NULL, /* run */
NULL, /* run_wait */
fd_wait, /* wait */
+ fd_join, /* join */
+ fd_update, /* update */
+ fd_leave, /* leave */
};
/* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 2226a80..74dad08 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -20,6 +20,7 @@
#include <sys/types.h>
#include "stream.h"
+struct poll_group;
/* Active stream connection. */
/* Active stream connection.
@@ -30,6 +31,11 @@ struct stream {
int state;
int error;
char *name;
+
+ /* poll_group related states. */
+ struct poll_group *poll_group;
+ void *caller_event;
+ bool joined;
};
void stream_init(struct stream *, const struct stream_class *,
@@ -123,6 +129,17 @@ struct stream_class {
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
* action of the given 'type'. */
void (*wait)(struct stream *stream, enum stream_wait_type type);
+
+ /* Arranges for 'stream' to join poll group when it is connected. */
+ int (*join)(struct stream *stream);
+
+ /* Let 'stream' inform pull group about its interest to be waken up
+ * by tx_ready. */
+ int (*update)(struct stream *stream, bool write);
+
+ /* Disconnects stream from poll group. A disconnected stream will fall
+ * back to use poll loop directly. */
+ int (*leave)(struct stream *stream);
};
/* Passive listener for incoming stream connections.
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 2699633..88dea03 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -38,6 +38,7 @@
#include "ofpbuf.h"
#include "openflow/openflow.h"
#include "packets.h"
+#include "poll-group.h"
#include "poll-loop.h"
#include "shash.h"
#include "socket-util.h"
@@ -670,7 +671,6 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
case EAGAIN:
return n;
default:
- sslv->txbuf = NULL;
return -error;
}
}
@@ -684,6 +684,12 @@ ssl_run(struct stream *stream)
if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
ssl_clear_txbuf(sslv);
}
+
+ if (sslv->tx_want != SSL_NOTHING) {
+ if (want_to_poll_events(sslv->tx_want) == SSL_WRITING) {
+ stream_update(stream, true);
+ }
+ }
}
static void
@@ -747,6 +753,35 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
}
}
+static int
+ssl_join(struct stream *stream)
+{
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+ struct poll_group *group = stream->poll_group;
+ void *caller_event = stream->caller_event;
+
+ return poll_group_join(group, sslv->fd, caller_event);
+}
+
+static int
+ssl_update(struct stream *stream, bool write)
+{
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+ struct poll_group *group = stream->poll_group;
+ void *caller_event = stream->caller_event;
+
+ return poll_group_update(group, sslv->fd, write, caller_event);
+}
+
+static int
+ssl_leave(struct stream *stream)
+{
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+ struct poll_group *group = stream->poll_group;
+
+ return poll_group_leave(group, sslv->fd);
+}
+
const struct stream_class ssl_stream_class = {
"ssl", /* name */
true, /* needs_probes */
@@ -758,6 +793,9 @@ const struct stream_class ssl_stream_class = {
ssl_run, /* run */
ssl_run_wait, /* run_wait */
ssl_wait, /* wait */
+ ssl_join, /* join */
+ ssl_update, /* update */
+ ssl_leave, /* leave */
};
/* Passive SSL. */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index fc5a606..472cfa2 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -73,6 +73,9 @@ const struct stream_class tcp_stream_class = {
NULL, /* run */
NULL, /* run_wait */
NULL, /* wait */
+ NULL, /* join */
+ NULL, /* update */
+ NULL, /* leave */
};
#ifdef _WIN32
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index cadd180..b3ce2ce 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -72,6 +72,9 @@ const struct stream_class unix_stream_class = {
NULL, /* run */
NULL, /* run_wait */
NULL, /* wait */
+ NULL, /* join */
+ NULL, /* update */
+ NULL, /* leave */
};
/* Passive UNIX socket. */
diff --git a/lib/stream.c b/lib/stream.c
index 217191c..905f600 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015, 2016 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
#include <inttypes.h>
#include <netinet/in.h>
#include <poll.h>
+#include <poll-group.h>
#include <stdlib.h>
#include <string.h>
#include "coverage.h"
@@ -187,6 +188,23 @@ stream_lookup_class(const char *name, const struct stream_class **classp)
return EAFNOSUPPORT;
}
+/* Helper function that is called when a stream's state transit from
+ * SCS_CONNECTING to SCS_CONNECTED state */
+static void
+stream_join_poll_group(struct stream *stream)
+{
+ if (stream->poll_group && stream->caller_event && !stream->joined) {
+ int ret = stream->class->join(stream);
+ if (ret) {
+ VLOG_ERR("Stream [%s] failed to join poll_group [%s]\n",
+ stream_get_name(stream),
+ poll_group_get_name(stream->poll_group));
+ } else {
+ stream->joined = true;
+ }
+ }
+}
+
/* Returns 0 if 'name' is a stream name in the form "TYPE:ARGS" and TYPE is
* a supported stream type, otherwise EAFNOSUPPORT. */
int
@@ -278,6 +296,7 @@ stream_close(struct stream *stream)
{
if (stream != NULL) {
char *name = stream->name;
+ stream_leave(stream);
(stream->class->close)(stream);
free(name);
}
@@ -317,13 +336,16 @@ stream_connect(struct stream *stream)
last_state = stream->state;
switch (stream->state) {
case SCS_CONNECTING:
+ stream_leave(stream);
scs_connecting(stream);
break;
case SCS_CONNECTED:
+ stream_join_poll_group(stream);
return 0;
case SCS_DISCONNECTED:
+ stream_leave(stream);
return stream->error;
default:
@@ -430,6 +452,62 @@ stream_send_wait(struct stream *stream)
stream_wait(stream, STREAM_SEND);
}
+int
+stream_join(struct stream *stream, struct poll_group *group, void *caller_event)
+{
+ if (stream->poll_group || stream->caller_event || stream->joined) {
+ return 1;
+ }
+
+ if (!stream->class->join || !stream->class->update
+ ||!stream->class->leave) {
+ return 1;
+ }
+
+ stream->poll_group = group;
+ stream->caller_event= caller_event;
+
+ if (stream->state == SCS_CONNECTED) {
+ stream_join_poll_group(stream);
+ }
+
+ return 0;
+}
+
+int
+stream_update(struct stream *stream, bool write)
+{
+ int ret;
+
+ if (!stream->poll_group) {
+ return 1;
+ }
+
+ ret = stream->class->update(stream, write);
+
+ return ret;
+}
+
+int
+stream_leave(struct stream *stream)
+{
+ int ret = 0;
+
+ if (stream->poll_group && stream->joined) {
+ ret = stream->class->leave(stream);
+ stream->joined = false;
+ }
+
+ return ret;
+}
+
+/* Report if a stream is a member of a polll group. */
+bool
+stream_joined(struct stream *stream)
+{
+ return stream->joined;
+}
+
/* Given 'name', a pstream name in the form "TYPE:ARGS", stores the class
* named "TYPE" into '*classp' and returns 0. Returns EAFNOSUPPORT and stores
* a null pointer into '*classp' if 'name' is in the wrong form or if no such
@@ -628,6 +706,9 @@ stream_init(struct stream *stream, const struct stream_class *class,
: SCS_DISCONNECTED);
stream->error = connect_status;
stream->name = xstrdup(name);
+ stream->poll_group = NULL;
+ stream->caller_event = NULL;
+ stream->joined = false;
ovs_assert(stream->state != SCS_CONNECTING || class->connect);
}
diff --git a/lib/stream.h b/lib/stream.h
index f8e1891..c7648c7 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -27,6 +27,7 @@
struct pstream;
struct stream;
+struct poll_group;
struct vlog_module;
void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
@@ -40,6 +41,10 @@ const char *stream_get_name(const struct stream *);
int stream_connect(struct stream *);
int stream_recv(struct stream *, void *buffer, size_t n);
int stream_send(struct stream *, const void *buffer, size_t n);
+int stream_join(struct stream *, struct poll_group *group, void *caller_event);
+int stream_update(struct stream *, bool write);
+int stream_leave(struct stream *);
+bool stream_joined(struct stream *);
void stream_run(struct stream *);
void stream_run_wait(struct stream *);
--
1.9.1
More information about the dev
mailing list