From 2a4ca9d5c3e49ca6df7c53878fac252312cc8806 Mon Sep 17 00:00:00 2001 From: Deomid Ryabkov <rojer@cesanta.com> Date: Thu, 22 Oct 2015 13:04:35 +0100 Subject: [PATCH] Introduce a network interface API, refactor UDP PUBLISHED_FROM=6e961e2760b2b64e211978ede5df8ca353ea5512 --- .../captive_dns_server/captive_dns_server.c | 1 + examples/rules.mk | 2 +- mongoose.c | 1740 +++++++++-------- mongoose.h | 66 +- 4 files changed, 1000 insertions(+), 809 deletions(-) diff --git a/examples/captive_dns_server/captive_dns_server.c b/examples/captive_dns_server/captive_dns_server.c index fa698417f..ac5781c28 100644 --- a/examples/captive_dns_server/captive_dns_server.c +++ b/examples/captive_dns_server/captive_dns_server.c @@ -43,6 +43,7 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { */ mg_dns_send_reply(nc, &reply); + nc->flags |= MG_F_SEND_AND_CLOSE; break; } } diff --git a/examples/rules.mk b/examples/rules.mk index 06502591c..775c1d088 100644 --- a/examples/rules.mk +++ b/examples/rules.mk @@ -4,7 +4,7 @@ CFLAGS = -W -Wall -I../.. -pthread -DMG_ENABLE_SSL -DMG_ENABLE_IPV6 -DMG_ENABLE_ all: $(PROG) $(PROG): $(SOURCES) - $(CC) $(SOURCES) -o $@ $(CFLAGS) + $(CC) $(SOURCES) -g -o $@ $(CFLAGS) $(PROG).exe: $(SOURCES) cl $(SOURCES) /I../.. /DMG_ENABLE_SSL /MD /Fe$@ diff --git a/mongoose.c b/mongoose.c index 5773bde1e..b2342b790 100644 --- a/mongoose.c +++ b/mongoose.c @@ -66,10 +66,9 @@ /* Amalgamated: #include "../mongoose.h" */ /* internals that need to be accessible in unit tests */ -MG_INTERNAL struct mg_connection *mg_finish_connect(struct mg_connection *nc, - int proto, - union socket_address *sa, - struct mg_add_sock_opts); +MG_INTERNAL struct mg_connection *mg_do_connect(struct mg_connection *nc, + int proto, + union socket_address *sa); MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa, int *proto, char *host, size_t host_len); @@ -78,6 +77,8 @@ MG_INTERNAL void mg_forward(struct mg_connection *, struct mg_connection *); MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c); MG_INTERNAL void mg_remove_conn(struct mg_connection *c); +MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock); + #ifndef MG_DISABLE_FILESYSTEM MG_INTERNAL int find_index_file(char *, size_t, const char *, cs_stat_t *); #endif @@ -1764,8 +1765,6 @@ int json_emit(char *buf, int buf_len, const char *fmt, ...) { #endif #define MG_CTL_MSG_MESSAGE_SIZE 8192 -#define MG_READ_BUFFER_SIZE 1024 -#define MG_UDP_RECEIVE_BUFFER_SIZE 1500 #define MG_VPRINTF_BUFFER_SIZE 100 #define MG_MAX_HOST_LEN 200 @@ -1791,12 +1790,16 @@ struct ctl_msg { char message[MG_CTL_MSG_MESSAGE_SIZE]; }; +int mg_is_error(int n); +void mg_set_non_blocking_mode(sock_t sock); + static void mg_ev_mgr_init(struct mg_mgr *mgr); static void mg_ev_mgr_free(struct mg_mgr *mgr); static void mg_ev_mgr_add_conn(struct mg_connection *nc); static void mg_ev_mgr_remove_conn(struct mg_connection *nc); MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c) { + DBG(("%p %p", mgr, c)); c->mgr = mgr; c->next = mgr->active_connections; mgr->active_connections = c; @@ -1816,8 +1819,8 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) { unsigned long flags_before; mg_event_handler_t ev_handler; - DBG(("%p flags=%lu ev=%d ev_data=%p rmbl=%d", nc, nc->flags, ev, ev_data, - (int) nc->recv_mbuf.len)); + DBG(("%p ev=%d ev_data=%p flags=%lu rmbl=%d smbl=%d", nc, ev, ev_data, + nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); #ifndef MG_DISABLE_FILESYSTEM /* LCOV_EXCL_START */ @@ -1842,42 +1845,14 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) { (nc->flags & _MG_CALLBACK_MODIFIABLE_FLAGS_MASK); } } - DBG(("call done, flags %d", (int) nc->flags)); -} - -static size_t mg_out(struct mg_connection *nc, const void *buf, size_t len) { - if (nc->flags & MG_F_UDP) { - int n = sendto(nc->sock, buf, len, 0, &nc->sa.sa, sizeof(nc->sa.sin)); - DBG(("%p %d %d %d %s:%hu", nc, nc->sock, n, errno, - inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port))); - return n < 0 ? 0 : n; - } else { - return mbuf_append(&nc->send_mbuf, buf, len); - } + DBG(("%p after flags=%d rmbl=%d smbl=%d", nc, (int) nc->flags, + (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); } static void mg_destroy_conn(struct mg_connection *conn) { - if (conn->sock != INVALID_SOCKET) { - closesocket(conn->sock); - /* - * avoid users accidentally double close a socket - * because it can lead to difficult to debug situations. - * It would happen only if reusing a destroyed mg_connection - * but it's not always possible to run the code through an - * address sanitizer. - */ - conn->sock = INVALID_SOCKET; - } + mg_if_destroy_conn(conn); mbuf_free(&conn->recv_mbuf); mbuf_free(&conn->send_mbuf); -#ifdef MG_ENABLE_SSL - if (conn->ssl != NULL) { - SSL_free(conn->ssl); - } - if (conn->ssl_ctx != NULL) { - SSL_CTX_free(conn->ssl_ctx); - } -#endif MG_FREE(conn); } @@ -1906,12 +1881,6 @@ void mg_mgr_init(struct mg_mgr *m, void *user_data) { signal(SIGPIPE, SIG_IGN); #endif -#ifndef MG_DISABLE_SOCKETPAIR - do { - mg_socketpair(m->ctl, SOCK_DGRAM); - } while (m->ctl[0] == INVALID_SOCKET); -#endif - #ifdef MG_ENABLE_SSL { static int init_done; @@ -1976,7 +1945,7 @@ int mg_vprintf(struct mg_connection *nc, const char *fmt, va_list ap) { int len; if ((len = mg_avprintf(&buf, sizeof(mem), fmt, ap)) > 0) { - mg_out(nc, buf, len); + mg_send(nc, buf, len); } if (buf != mem && buf != NULL) { MG_FREE(buf); /* LCOV_EXCL_LINE */ @@ -1994,62 +1963,6 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) { return len; } -static void mg_set_non_blocking_mode(sock_t sock) { -#ifdef _WIN32 - unsigned long on = 1; - ioctlsocket(sock, FIONBIO, &on); -#elif defined(MG_CC3200) - cc3200_set_non_blocking_mode(sock); -#else - int flags = fcntl(sock, F_GETFL, 0); - fcntl(sock, F_SETFL, flags | O_NONBLOCK); -#endif -} - -#ifndef MG_DISABLE_SOCKETPAIR -int mg_socketpair(sock_t sp[2], int sock_type) { - union socket_address sa; - sock_t sock; - socklen_t len = sizeof(sa.sin); - int ret = 0; - - sock = sp[0] = sp[1] = INVALID_SOCKET; - - (void) memset(&sa, 0, sizeof(sa)); - sa.sin.sin_family = AF_INET; - sa.sin.sin_port = htons(0); - sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ - - if ((sock = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) { - } else if (bind(sock, &sa.sa, len) != 0) { - } else if (sock_type == SOCK_STREAM && listen(sock, 1) != 0) { - } else if (getsockname(sock, &sa.sa, &len) != 0) { - } else if ((sp[0] = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) { - } else if (connect(sp[0], &sa.sa, len) != 0) { - } else if (sock_type == SOCK_DGRAM && - (getsockname(sp[0], &sa.sa, &len) != 0 || - connect(sock, &sa.sa, len) != 0)) { - } else if ((sp[1] = (sock_type == SOCK_DGRAM ? sock - : accept(sock, &sa.sa, &len))) == - INVALID_SOCKET) { - } else { - mg_set_close_on_exec(sp[0]); - mg_set_close_on_exec(sp[1]); - if (sock_type == SOCK_STREAM) closesocket(sock); - ret = 1; - } - - if (!ret) { - if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); - if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); - if (sock != INVALID_SOCKET) closesocket(sock); - sock = sp[0] = sp[1] = INVALID_SOCKET; - } - - return ret; -} -#endif /* MG_DISABLE_SOCKETPAIR */ - /* TODO(lsm): use non-blocking resolver */ static int mg_resolve2(const char *host, struct in_addr *ina) { #ifdef MG_ENABLE_GETADDRINFO @@ -2091,8 +2004,7 @@ MG_INTERNAL struct mg_connection *mg_create_connection( struct mg_add_sock_opts opts) { struct mg_connection *conn; - if ((conn = (struct mg_connection *) MG_MALLOC(sizeof(*conn))) != NULL) { - memset(conn, 0, sizeof(*conn)); + if ((conn = (struct mg_connection *) MG_CALLOC(1, sizeof(*conn))) != NULL) { conn->sock = INVALID_SOCKET; conn->handler = callback; conn->mgr = mgr; @@ -2105,26 +2017,13 @@ MG_INTERNAL struct mg_connection *mg_create_connection( * doesn't compile with pedantic ansi flags. */ conn->recv_mbuf_limit = ~0; + } else { + MG_SET_PTRPTR(opts.error_string, "failed create connection"); } return conn; } -/* Associate a socket to a connection and and add to the manager. */ -MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock) { -#if !defined(MG_CC3200) - /* Can't get non-blocking connect to work. - * TODO(rojer): Figure out why it fails where blocking succeeds. - */ - mg_set_non_blocking_mode(sock); -#endif - mg_set_close_on_exec(sock); - nc->sock = sock; - mg_add_conn(nc->mgr, nc); - - DBG(("%p %d", nc, sock)); -} - /* * Address format: [PROTO://][HOST]:PORT * @@ -2194,53 +2093,6 @@ MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa, return port < 0xffffUL && str[len] == '\0' ? len : -1; } -/* 'sa' must be an initialized address to bind to */ -static sock_t mg_open_listening_socket(union socket_address *sa, int proto) { - socklen_t sa_len = - (sa->sa.sa_family == AF_INET) ? sizeof(sa->sin) : sizeof(sa->sin6); - sock_t sock = INVALID_SOCKET; -#if !defined(MG_CC3200) && !defined(MG_LWIP) - int on = 1; -#endif - - if ((sock = socket(sa->sa.sa_family, proto, 0)) != INVALID_SOCKET && -#if !defined(MG_CC3200) && \ - !defined(MG_LWIP) /* CC3200 and LWIP don't support either */ -#if defined(_WIN32) && defined(SO_EXCLUSIVEADDRUSE) - /* "Using SO_REUSEADDR and SO_EXCLUSIVEADDRUSE" http://goo.gl/RmrFTm */ - !setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (void *) &on, - sizeof(on)) && -#endif - -#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE) - /* - * SO_RESUSEADDR is not enabled on Windows because the semantics of - * SO_REUSEADDR on UNIX and Windows is different. On Windows, - * SO_REUSEADDR allows to bind a socket to a port without error even if - * the port is already open by another program. This is not the behavior - * SO_REUSEADDR was designed for, and leads to hard-to-track failure - * scenarios. Therefore, SO_REUSEADDR was disabled on Windows unless - * SO_EXCLUSIVEADDRUSE is supported and set on a socket. - */ - !setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) && -#endif -#endif /* !MG_CC3200 && !MG_LWIP */ - - !bind(sock, &sa->sa, sa_len) && - (proto == SOCK_DGRAM || listen(sock, SOMAXCONN) == 0)) { -#if !defined(MG_CC3200) && !defined(MG_LWIP) /* TODO(rojer): Fix this. */ - mg_set_non_blocking_mode(sock); - /* In case port was set to 0, get the real port number */ - (void) getsockname(sock, &sa->sa, &sa_len); -#endif - } else if (sock != INVALID_SOCKET) { - closesocket(sock); - sock = INVALID_SOCKET; - } - - return sock; -} - #ifdef MG_ENABLE_SSL /* * Certificate generation script is at @@ -2414,54 +2266,39 @@ static int mg_ssl_err(struct mg_connection *conn, int res) { } #endif /* MG_ENABLE_SSL */ -static struct mg_connection *accept_conn(struct mg_connection *ls) { - struct mg_connection *c = NULL; - union socket_address sa; - socklen_t len = sizeof(sa); - sock_t sock = INVALID_SOCKET; - - /* NOTE(lsm): on Windows, sock is always > FD_SETSIZE */ - if ((sock = accept(ls->sock, &sa.sa, &len)) == INVALID_SOCKET) { - } else if ((c = mg_add_sock(ls->mgr, sock, ls->handler)) == NULL) { +void mg_if_accept_tcp_cb(struct mg_connection *lc, sock_t sock, + union socket_address *sa, size_t sa_len) { + struct mg_add_sock_opts opts; + struct mg_connection *nc; + (void) sa_len; + memset(&opts, 0, sizeof(opts)); + nc = mg_create_connection(lc->mgr, lc->handler, opts); + if (nc == NULL) { closesocket(sock); + return; + } + nc->listener = lc; + nc->proto_data = lc->proto_data; + nc->proto_handler = lc->proto_handler; + nc->user_data = lc->user_data; + nc->recv_mbuf_limit = lc->recv_mbuf_limit; + nc->sa = *sa; + mg_set_sock(nc, sock); /* XXX */ + mg_add_conn(nc->mgr, nc); #ifdef MG_ENABLE_SSL - } else if (ls->ssl_ctx != NULL && ((c->ssl = SSL_new(ls->ssl_ctx)) == NULL || - SSL_set_fd(c->ssl, sock) != 1)) { - DBG(("SSL error")); - mg_close_conn(c); - c = NULL; -#endif - } else { - c->listener = ls; - c->proto_data = ls->proto_data; - c->proto_handler = ls->proto_handler; - c->user_data = ls->user_data; - c->recv_mbuf_limit = ls->recv_mbuf_limit; - c->sa = sa; - if (c->ssl == NULL) { /* SSL connections need to perform handshake. */ - mg_call(c, MG_EV_ACCEPT, &sa); + if (lc->ssl_ctx != NULL) { + nc->ssl = SSL_new(lc->ssl_ctx); + if (nc->ssl == NULL || SSL_set_fd(nc->ssl, sock) != 1) { + DBG(("SSL error")); + mg_close_conn(nc); } - DBG(("%p %d %p %p", c, c->sock, c->ssl_ctx, c->ssl)); - } - - return c; -} - -static int mg_is_error(int n) { -#ifdef MG_CC3200 - DBG(("n = %d, errno = %d", n, errno)); - if (n < 0) errno = n; -#endif - return n == 0 || (n < 0 && errno != EINTR && errno != EINPROGRESS && - errno != EAGAIN && errno != EWOULDBLOCK -#ifdef MG_CC3200 - && errno != SL_EALREADY -#endif -#ifdef _WIN32 - && WSAGetLastError() != WSAEINTR && - WSAGetLastError() != WSAEWOULDBLOCK + } else #endif - ); + { + mg_call(nc, MG_EV_ACCEPT, &nc->sa); + } + DBG(("%p %p %d %d, %p %p", lc, nc, nc->sock, (int) nc->flags, lc->ssl_ctx, + nc->ssl)); } static size_t recv_avail_size(struct mg_connection *conn, size_t max) { @@ -2475,6 +2312,7 @@ static size_t recv_avail_size(struct mg_connection *conn, size_t max) { static void mg_ssl_begin(struct mg_connection *nc) { int server_side = nc->listener != NULL; int res = server_side ? SSL_accept(nc->ssl) : SSL_connect(nc->ssl); + DBG(("%p %d res %d %d %d", nc, server_side, res, errno, mg_ssl_err(nc, res))); if (res == 1) { nc->flags |= MG_F_SSL_HANDSHAKE_DONE; @@ -2486,766 +2324,1078 @@ static void mg_ssl_begin(struct mg_connection *nc) { /* In case port was set to 0, get the real port number */ (void) getsockname(nc->sock, &sa.sa, &sa_len); mg_call(nc, MG_EV_ACCEPT, &sa); + } else { + int err = 0; + mg_call(nc, MG_EV_CONNECT, &err); } } else { int ssl_err = mg_ssl_err(nc, res); if (ssl_err != SSL_ERROR_WANT_READ && ssl_err != SSL_ERROR_WANT_WRITE) { nc->flags |= MG_F_CLOSE_IMMEDIATELY; + if (!server_side) { + int err = 0; + mg_call(nc, MG_EV_CONNECT, &err); + } } } } #endif /* MG_ENABLE_SSL */ -static void mg_connect_done(struct mg_connection *conn, int err) { -#ifdef MG_ENABLE_SSL - if (err == 0 && conn->ssl != NULL) { - mg_ssl_begin(conn); +void mg_send(struct mg_connection *nc, const void *buf, int len) { + if (nc->flags & MG_F_UDP) { + mg_if_udp_send(nc, buf, len); + } else { + mg_if_tcp_send(nc, buf, len); + } +#ifndef MG_DISABLE_FILESYSTEM + if (nc->mgr && nc->mgr->hexdump_file != NULL) { + mg_hexdump_connection(nc, nc->mgr->hexdump_file, len, MG_EV_SEND); } #endif - DBG(("%p connect, err=%d", conn, err)); - if (err != 0) { - conn->flags |= MG_F_CLOSE_IMMEDIATELY; +} + +void mg_if_sent_cb(struct mg_connection *nc, int num_sent) { + if (num_sent < 0) { + nc->flags |= MG_F_CLOSE_IMMEDIATELY; + } + mg_call(nc, MG_EV_SEND, &num_sent); +} + +static void mg_recv_common(struct mg_connection *nc, void *buf, int len) { + DBG(("%p %d %u", nc, len, (unsigned int) nc->recv_mbuf.len)); + if (nc->recv_mbuf.len == 0) { + /* Adopt buf as recv_mbuf's backing store. */ + mbuf_free(&nc->recv_mbuf); + nc->recv_mbuf.buf = (char *) buf; + nc->recv_mbuf.size = nc->recv_mbuf.len = len; } else { - conn->flags &= ~MG_F_CONNECTING; + size_t avail = recv_avail_size(nc, len); + len = avail; + mbuf_append(&nc->recv_mbuf, buf, len); + MG_FREE(buf); } - mg_call(conn, MG_EV_CONNECT, &err); - return; + mg_call(nc, MG_EV_RECV, &len); } -static void mg_read_from_socket(struct mg_connection *conn) { - int n = 0; - char buf[MG_READ_BUFFER_SIZE]; -#ifdef MG_ENABLE_SSL - if (conn->ssl != NULL) { - if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) { - /* SSL library may have more bytes ready to read then we ask to read. - * Therefore, read in a loop until we read everything. Without the loop, - * we skip to the next select() cycle which can just timeout. */ - while ((n = SSL_read(conn->ssl, buf, sizeof(buf))) > 0) { - DBG(("%p %d bytes <- %d (SSL)", conn, n, conn->sock)); - mbuf_append(&conn->recv_mbuf, buf, n); - mg_call(conn, MG_EV_RECV, &n); - if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break; - } - mg_ssl_err(conn, n); - } else { - mg_ssl_begin(conn); - return; +void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len) { + mg_recv_common(nc, buf, len); + mg_if_recved(nc, len); +} + +void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len, + union socket_address *sa, size_t sa_len) { + assert(nc->flags & MG_F_UDP); + DBG(("%p %u", nc, (unsigned int) len)); + if (nc->flags & MG_F_LISTENING) { + struct mg_connection *lc = nc; + /* + * Do we have an existing connection for this source? + * This is very inefficient for long connection lists. + */ + for (nc = mg_next(lc->mgr, NULL); nc != NULL; nc = mg_next(lc->mgr, nc)) { + if (memcmp(&nc->sa.sa, &sa->sa, sa_len) == 0) break; } - } else -#endif - while ((n = (int) MG_RECV_FUNC( - conn->sock, buf, recv_avail_size(conn, sizeof(buf)), 0)) > 0) { - DBG(("%p %d bytes (PLAIN) <- %d", conn, n, conn->sock)); - mbuf_append(&conn->recv_mbuf, buf, n); - mg_call(conn, MG_EV_RECV, &n); - if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break; + if (nc == NULL) { + struct mg_add_sock_opts opts; + memset(&opts, 0, sizeof(opts)); + nc = mg_create_connection(lc->mgr, lc->handler, opts); + } + if (nc != NULL) { + nc->sock = lc->sock; + nc->listener = lc; + nc->sa = *sa; + nc->proto_data = lc->proto_data; + nc->proto_handler = lc->proto_handler; + nc->user_data = lc->user_data; + nc->recv_mbuf_limit = lc->recv_mbuf_limit; + nc->flags = MG_F_UDP; + mg_add_conn(lc->mgr, nc); + mg_call(nc, MG_EV_ACCEPT, &nc->sa); + } else { + DBG(("OOM")); } - DBG(("recv returns %d", n)); - - if (mg_is_error(n)) { - conn->flags |= MG_F_CLOSE_IMMEDIATELY; } + if (nc != NULL) { + mg_recv_common(nc, buf, len); + } else { + /* Drop on the floor. */ + MG_FREE(buf); + } + mg_if_recved(nc, len); } -static void mg_write_to_socket(struct mg_connection *conn) { - struct mbuf *io = &conn->send_mbuf; - int n = 0; - -#ifdef MG_LWIP - /* With LWIP we don't know if the socket is ready */ - if (io->len == 0) return; -#endif +/* + * Schedules an async connect for a resolved address and proto. + * Called from two places: `mg_connect_opt()` and from async resolver. + * When called from the async resolver, it must trigger `MG_EV_CONNECT` event + * with a failure flag to indicate connection failure. + */ +MG_INTERNAL struct mg_connection *mg_do_connect(struct mg_connection *nc, + int proto, + union socket_address *sa) { + DBG(("%p %s://%s:%hu", nc, proto == SOCK_DGRAM ? "udp" : "tcp", + inet_ntoa(sa->sin.sin_addr), ntohs(sa->sin.sin_port))); - assert(io->len > 0); + nc->flags |= MG_F_CONNECTING; + if (proto == SOCK_DGRAM) { + mg_if_connect_udp(nc); + } else { + mg_if_connect_tcp(nc, sa); + } + mg_add_conn(nc->mgr, nc); + return nc; +} +void mg_if_connect_cb(struct mg_connection *nc, int err) { + DBG(("%p connect, err=%d", nc, err)); + nc->flags &= ~MG_F_CONNECTING; + if (err == 0) { #ifdef MG_ENABLE_SSL - if (conn->ssl != NULL) { - if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) { - n = SSL_write(conn->ssl, io->buf, io->len); - if (n <= 0) { - int ssl_err = mg_ssl_err(conn, n); - if (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE) { - return; /* Call us again */ - } else { - conn->flags |= MG_F_CLOSE_IMMEDIATELY; - } - } else { - /* Successful SSL operation, clear off SSL wait flags */ - conn->flags &= ~(MG_F_WANT_READ | MG_F_WANT_WRITE); - } - } else { - mg_ssl_begin(conn); + if (nc->ssl != NULL) { + SSL_set_fd(nc->ssl, nc->sock); + mg_ssl_begin(nc); return; } - } else #endif - { - n = (int) MG_SEND_FUNC(conn->sock, io->buf, io->len, 0); + } else { + nc->flags |= MG_F_CLOSE_IMMEDIATELY; } + mg_call(nc, MG_EV_CONNECT, &err); +} - DBG(("%p %d bytes -> %d", conn, n, conn->sock)); - - if (mg_is_error(n)) { - conn->flags |= MG_F_CLOSE_IMMEDIATELY; - } else if (n > 0) { -#ifndef MG_DISABLE_FILESYSTEM - /* LCOV_EXCL_START */ - if (conn->mgr->hexdump_file != NULL) { - mg_hexdump_connection(conn, conn->mgr->hexdump_file, n, MG_EV_SEND); - } -/* LCOV_EXCL_STOP */ -#endif - mbuf_remove(io, n); +#ifndef MG_DISABLE_RESOLVER +/* + * Callback for the async resolver on mg_connect_opt() call. + * Main task of this function is to trigger MG_EV_CONNECT event with + * either failure (and dealloc the connection) + * or success (and proceed with connect() + */ +static void resolve_cb(struct mg_dns_message *msg, void *data) { + struct mg_connection *nc = (struct mg_connection *) data; + int i; + int failure = -1; + + if (msg != NULL) { + /* + * Take the first DNS A answer and run... + */ + for (i = 0; i < msg->num_answers; i++) { + if (msg->answers[i].rtype == MG_DNS_A_RECORD) { + /* + * Async resolver guarantees that there is at least one answer. + * TODO(lsm): handle IPv6 answers too + */ + mg_dns_parse_record_data(msg, &msg->answers[i], &nc->sa.sin.sin_addr, + 4); + mg_do_connect(nc, nc->flags & MG_F_UDP ? SOCK_DGRAM : SOCK_STREAM, + &nc->sa); + return; + } + } } - mg_call(conn, MG_EV_SEND, &n); -} -int mg_send(struct mg_connection *conn, const void *buf, int len) { - return (int) mg_out(conn, buf, len); + /* + * If we get there was no MG_DNS_A_RECORD in the answer + */ + mg_call(nc, MG_EV_CONNECT, &failure); + mg_destroy_conn(nc); } +#endif -static void mg_handle_udp(struct mg_connection *ls) { - struct mg_connection nc; - char buf[MG_UDP_RECEIVE_BUFFER_SIZE]; - int n; - socklen_t s_len = sizeof(nc.sa); +struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *address, + mg_event_handler_t callback) { + static struct mg_connect_opts opts; + return mg_connect_opt(mgr, address, callback, opts); +} - memset(&nc, 0, sizeof(nc)); - n = recvfrom(ls->sock, buf, sizeof(buf), 0, &nc.sa.sa, &s_len); - if (n <= 0) { - DBG(("%p recvfrom: %s", ls, strerror(errno))); - } else { - union socket_address sa = nc.sa; - /* Copy all attributes, preserving sender address */ - nc = *ls; +struct mg_connection *mg_connect_opt(struct mg_mgr *mgr, const char *address, + mg_event_handler_t callback, + struct mg_connect_opts opts) { + struct mg_connection *nc = NULL; + int proto, rc; + struct mg_add_sock_opts add_sock_opts; + char host[MG_MAX_HOST_LEN]; - /* Then override some */ - nc.sa = sa; - nc.recv_mbuf.buf = buf; - nc.recv_mbuf.len = nc.recv_mbuf.size = n; - nc.listener = ls; - nc.flags = MG_F_UDP; + MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts); - /* Call MG_EV_RECV handler */ - DBG(("%p %d bytes received", ls, n)); - mg_call(&nc, MG_EV_RECV, &n); + if ((nc = mg_create_connection(mgr, callback, add_sock_opts)) == NULL) { + return NULL; + } else if ((rc = mg_parse_address(address, &nc->sa, &proto, host, + sizeof(host))) < 0) { + /* Address is malformed */ + MG_SET_PTRPTR(opts.error_string, "cannot parse address"); + mg_destroy_conn(nc); + return NULL; + } + nc->flags |= opts.flags & _MG_ALLOWED_CONNECT_FLAGS_MASK; + nc->flags |= (proto == SOCK_DGRAM) ? MG_F_UDP : 0; + nc->user_data = opts.user_data; + if (rc == 0) { +#ifndef MG_DISABLE_RESOLVER /* - * See https://github.com/cesanta/mongoose/issues/207 - * mg_call migth set flags. They need to be synced back to ls. + * DNS resolution is required for host. + * mg_parse_address() fills port in nc->sa, which we pass to resolve_cb() */ - ls->flags = nc.flags; + if (mg_resolve_async(nc->mgr, host, MG_DNS_A_RECORD, resolve_cb, nc) != 0) { + MG_SET_PTRPTR(opts.error_string, "cannot schedule DNS lookup"); + mg_destroy_conn(nc); + return NULL; + } + + return nc; +#else + MG_SET_PTRPTR(opts.error_string, "Resolver is disabled"); + mg_destroy_conn(nc); + return NULL; +#endif + } else { + /* Address is parsed and resolved to IP. proceed with connect() */ + return mg_do_connect(nc, proto, &nc->sa); } } -#define _MG_F_FD_CAN_READ 1 -#define _MG_F_FD_CAN_WRITE 1 << 1 -#define _MG_F_FD_ERROR 1 << 2 +struct mg_connection *mg_bind(struct mg_mgr *srv, const char *address, + mg_event_handler_t event_handler) { + static struct mg_bind_opts opts; + return mg_bind_opt(srv, address, event_handler, opts); +} -static void mg_mgr_handle_connection(struct mg_connection *nc, int fd_flags, - time_t now) { - DBG(("%p fd=%d fd_flags=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, - fd_flags, nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); - if (fd_flags != 0) nc->last_io_time = now; +struct mg_connection *mg_bind_opt(struct mg_mgr *mgr, const char *address, + mg_event_handler_t callback, + struct mg_bind_opts opts) { + union socket_address sa; + struct mg_connection *nc = NULL; + int proto, rc; + struct mg_add_sock_opts add_sock_opts; + char host[MG_MAX_HOST_LEN]; - if (nc->flags & MG_F_CONNECTING) { - if (fd_flags != 0) { - int err = 1; -#if !defined(MG_CC3200) && !defined(MG_ESP8266) - socklen_t len = sizeof(err); - int ret = getsockopt(nc->sock, SOL_SOCKET, SO_ERROR, (char *) &err, &len); - if (ret != 0) err = 1; -#else - /* On CC3200 and ESP8266 we use blocking connect. If we got as far as - * this, - * this means connect() was successful. - * TODO(rojer): Figure out why it fails where blocking succeeds. - */ - mg_set_non_blocking_mode(nc->sock); - err = 0; -#endif - mg_connect_done(nc, err); - } - return; - } + MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts); - if (nc->flags & MG_F_LISTENING) { - /* - * We're not looping here, and accepting just one connection at - * a time. The reason is that eCos does not respect non-blocking - * flag on a listening socket and hangs in a loop. - */ - if (fd_flags & _MG_F_FD_CAN_READ) accept_conn(nc); - return; + if (mg_parse_address(address, &sa, &proto, host, sizeof(host)) <= 0) { + MG_SET_PTRPTR(opts.error_string, "cannot parse address"); + return NULL; } - if (fd_flags & _MG_F_FD_CAN_READ) { - if (nc->flags & MG_F_UDP) { - mg_handle_udp(nc); - } else { - mg_read_from_socket(nc); - } - if (nc->flags & MG_F_CLOSE_IMMEDIATELY) return; + nc = mg_create_connection(mgr, callback, add_sock_opts); + if (nc == NULL) { + return NULL; } - if ((fd_flags & _MG_F_FD_CAN_WRITE) && !(nc->flags & MG_F_DONT_SEND) && - !(nc->flags & MG_F_UDP)) { /* Writes to UDP sockets are not buffered. */ - mg_write_to_socket(nc); + nc->sa = sa; + nc->flags |= MG_F_LISTENING; + if (proto == SOCK_DGRAM) { + nc->flags |= MG_F_UDP; + rc = mg_if_listen_udp(nc, &nc->sa); + } else { + rc = mg_if_listen_tcp(nc, &nc->sa); } - - if (!(fd_flags & (_MG_F_FD_CAN_READ | _MG_F_FD_CAN_WRITE))) { - mg_call(nc, MG_EV_POLL, &now); + if (rc != 0) { + DBG(("Failed to open listener: %d", rc)); + MG_SET_PTRPTR(opts.error_string, "failed to open listener"); + mg_destroy_conn(nc); + return NULL; } + mg_add_conn(nc->mgr, nc); - DBG(("%p after fd=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, nc->flags, - (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); + return nc; } -#ifndef MG_DISABLE_SOCKETPAIR -static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { +struct mg_connection *mg_next(struct mg_mgr *s, struct mg_connection *conn) { + return conn == NULL ? s->active_connections : conn->next; +} + +void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, + size_t len) { struct ctl_msg ctl_msg; - int len = - (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0); - size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0); - (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ - if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) { - struct mg_connection *nc; - for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { - ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.message); - } + /* + * Mongoose manager has a socketpair, `struct mg_mgr::ctl`, + * where `mg_broadcast()` pushes the message. + * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls + * specified callback for each connection. Thus the callback function executes + * in event manager thread. + */ + if (mgr->ctl[0] != INVALID_SOCKET && data != NULL && + len < sizeof(ctl_msg.message)) { + size_t dummy; + + ctl_msg.callback = cb; + memcpy(ctl_msg.message, data, len); + dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg, + offsetof(struct ctl_msg, message) + len, 0); + dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0); + (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ } } -#endif -#if MG_MGR_EV_MGR == 1 /* epoll() */ +static int isbyte(int n) { + return n >= 0 && n <= 255; +} -#ifndef MG_EPOLL_MAX_EVENTS -#define MG_EPOLL_MAX_EVENTS 100 -#endif +static int parse_net(const char *spec, uint32_t *net, uint32_t *mask) { + int n, a, b, c, d, slash = 32, len = 0; -#define _MG_EPF_EV_EPOLLIN (1 << 0) -#define _MG_EPF_EV_EPOLLOUT (1 << 1) -#define _MG_EPF_NO_POLL (1 << 2) + if ((sscanf(spec, "%d.%d.%d.%d/%d%n", &a, &b, &c, &d, &slash, &n) == 5 || + sscanf(spec, "%d.%d.%d.%d%n", &a, &b, &c, &d, &n) == 4) && + isbyte(a) && isbyte(b) && isbyte(c) && isbyte(d) && slash >= 0 && + slash < 33) { + len = n; + *net = + ((uint32_t) a << 24) | ((uint32_t) b << 16) | ((uint32_t) c << 8) | d; + *mask = slash ? 0xffffffffU << (32 - slash) : 0; + } -static uint32_t mg_epf_to_evflags(unsigned int epf) { - uint32_t result = 0; - if (epf & _MG_EPF_EV_EPOLLIN) result |= EPOLLIN; - if (epf & _MG_EPF_EV_EPOLLOUT) result |= EPOLLOUT; - return result; + return len; } -static void mg_ev_mgr_epoll_set_flags(const struct mg_connection *nc, - struct epoll_event *ev) { - /* NOTE: EPOLLERR and EPOLLHUP are always enabled. */ - ev->events = 0; - if (nc->recv_mbuf.len < nc->recv_mbuf_limit) { - ev->events |= EPOLLIN; - } - if ((nc->flags & MG_F_CONNECTING) || - (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_DONT_SEND))) { - ev->events |= EPOLLOUT; - } -} +int mg_check_ip_acl(const char *acl, uint32_t remote_ip) { + int allowed, flag; + uint32_t net, mask; + struct mg_str vec; -static void mg_ev_mgr_epoll_ctl(struct mg_connection *nc, int op) { - int epoll_fd = (intptr_t) nc->mgr->mgr_data; - struct epoll_event ev; - assert(op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD || EPOLL_CTL_DEL); - if (op != EPOLL_CTL_DEL) { - mg_ev_mgr_epoll_set_flags(nc, &ev); - if (op == EPOLL_CTL_MOD) { - uint32_t old_ev_flags = mg_epf_to_evflags((intptr_t) nc->mgr_data); - if (ev.events == old_ev_flags) return; + /* If any ACL is set, deny by default */ + allowed = (acl == NULL || *acl == '\0') ? '+' : '-'; + + while ((acl = mg_next_comma_list_entry(acl, &vec, NULL)) != NULL) { + flag = vec.p[0]; + if ((flag != '+' && flag != '-') || + parse_net(&vec.p[1], &net, &mask) == 0) { + return -1; } - ev.data.ptr = nc; - } - if (epoll_ctl(epoll_fd, op, nc->sock, &ev) != 0) { - perror("epoll_ctl"); - abort(); - } -} -static void mg_ev_mgr_init(struct mg_mgr *mgr) { - int epoll_fd; - DBG(("%p using epoll()", mgr)); - epoll_fd = epoll_create(MG_EPOLL_MAX_EVENTS /* unused but required */); - if (epoll_fd < 0) { - perror("epoll_ctl"); - abort(); - } - mgr->mgr_data = (void *) ((intptr_t) epoll_fd); - if (mgr->ctl[1] != INVALID_SOCKET) { - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.ptr = NULL; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mgr->ctl[1], &ev) != 0) { - perror("epoll_ctl"); - abort(); + if (net == (remote_ip & mask)) { + allowed = flag; } } + + return allowed == '+'; } -static void mg_ev_mgr_free(struct mg_mgr *mgr) { - int epoll_fd = (intptr_t) mgr->mgr_data; - close(epoll_fd); +/* Move data from one connection to another */ +void mg_forward(struct mg_connection *from, struct mg_connection *to) { + mg_send(to, from->recv_mbuf.buf, from->recv_mbuf.len); + mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len); } +#ifdef NS_MODULE_LINES +#line 1 "src/net_if_socket.c" +/**/ +#endif +/* Amalgamated: #include "internal.h" */ -static void mg_ev_mgr_add_conn(struct mg_connection *nc) { - mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_ADD); -} +#define MG_TCP_RECV_BUFFER_SIZE 1024 +#define MG_UDP_RECV_BUFFER_SIZE 1500 -static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { - mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_DEL); -} +static sock_t mg_open_listening_socket(union socket_address *sa, int proto); -time_t mg_mgr_poll(struct mg_mgr *mgr, int timeout_ms) { - int epoll_fd = (intptr_t) mgr->mgr_data; - struct epoll_event events[MG_EPOLL_MAX_EVENTS]; - struct mg_connection *nc, *next; - int num_ev, fd_flags; - time_t now; +void mg_set_non_blocking_mode(sock_t sock) { +#ifdef _WIN32 + unsigned long on = 1; + ioctlsocket(sock, FIONBIO, &on); +#elif defined(MG_CC3200) + cc3200_set_non_blocking_mode(sock); +#else + int flags = fcntl(sock, F_GETFL, 0); + fcntl(sock, F_SETFL, flags | O_NONBLOCK); +#endif +} - num_ev = epoll_wait(epoll_fd, events, MG_EPOLL_MAX_EVENTS, timeout_ms); - now = time(NULL); - DBG(("epoll_wait @ %ld num_ev=%d", (long) now, num_ev)); +int mg_is_error(int n) { +#ifdef MG_CC3200 + DBG(("n = %d, errno = %d", n, errno)); + if (n < 0) errno = n; +#endif + return n == 0 || (n < 0 && errno != EINTR && errno != EINPROGRESS && + errno != EAGAIN && errno != EWOULDBLOCK +#ifdef MG_CC3200 + && errno != SL_EALREADY +#endif +#ifdef _WIN32 + && WSAGetLastError() != WSAEINTR && + WSAGetLastError() != WSAEWOULDBLOCK +#endif + ); +} - while (num_ev-- > 0) { - intptr_t epf; - struct epoll_event *ev = events + num_ev; - nc = (struct mg_connection *) ev->data.ptr; - if (nc == NULL) { - mg_mgr_handle_ctl_sock(mgr); - continue; - } - fd_flags = ((ev->events & (EPOLLIN | EPOLLHUP)) ? _MG_F_FD_CAN_READ : 0) | - ((ev->events & (EPOLLOUT)) ? _MG_F_FD_CAN_WRITE : 0) | - ((ev->events & (EPOLLERR)) ? _MG_F_FD_ERROR : 0); - mg_mgr_handle_connection(nc, fd_flags, now); - epf = (intptr_t) nc->mgr_data; - epf ^= _MG_EPF_NO_POLL; - nc->mgr_data = (void *) epf; +void mg_if_connect_tcp(struct mg_connection *nc, + const union socket_address *sa) { + int rc; + nc->sock = socket(AF_INET, SOCK_STREAM, 0); + if (nc->sock < 0) { + nc->sock = INVALID_SOCKET; + nc->err = errno ? errno : 1; + return; } +#if !defined(MG_CC3200) && !defined(MG_ESP8266) + mg_set_non_blocking_mode(nc->sock); +#endif + rc = connect(nc->sock, &sa->sa, sizeof(sa->sin)); + nc->err = rc == 0 ? 0 : (errno ? errno : 1); + DBG(("%p sock %d err %d", nc, nc->sock, nc->err)); +} - for (nc = mgr->active_connections; nc != NULL; nc = next) { - next = nc->next; - if (!(((intptr_t) nc->mgr_data) & _MG_EPF_NO_POLL)) { - mg_mgr_handle_connection(nc, 0, now); - } else { - intptr_t epf = (intptr_t) nc->mgr_data; - epf ^= _MG_EPF_NO_POLL; - nc->mgr_data = (void *) epf; - } - if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || - (nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) { - mg_close_conn(nc); - } else { - mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_MOD); - } +void mg_if_connect_udp(struct mg_connection *nc) { + nc->sock = socket(AF_INET, SOCK_DGRAM, 0); + if (nc->sock < 0) { + nc->sock = INVALID_SOCKET; + nc->err = errno ? errno : 1; + return; } - - return now; + nc->err = 0; } -#else /* select() */ +int mg_if_listen_tcp(struct mg_connection *nc, union socket_address *sa) { + sock_t sock = mg_open_listening_socket(sa, SOCK_STREAM); + if (sock < 0) return (errno ? errno : 1); + mg_set_sock(nc, sock); + return 0; +} -static void mg_ev_mgr_init(struct mg_mgr *mgr) { - (void) mgr; - DBG(("%p using select()", mgr)); +int mg_if_listen_udp(struct mg_connection *nc, union socket_address *sa) { + sock_t sock = mg_open_listening_socket(sa, SOCK_DGRAM); + if (sock < 0) return (errno ? errno : 1); + mg_set_sock(nc, sock); + return 0; } -static void mg_ev_mgr_free(struct mg_mgr *mgr) { - (void) mgr; +void mg_if_tcp_send(struct mg_connection *nc, const void *buf, size_t len) { + mbuf_append(&nc->send_mbuf, buf, len); } -static void mg_ev_mgr_add_conn(struct mg_connection *nc) { - (void) nc; +void mg_if_udp_send(struct mg_connection *nc, const void *buf, size_t len) { + DBG(("%p %d %d", nc, (int) len, (int) nc->send_mbuf.len)); + mbuf_append(&nc->send_mbuf, buf, len); } -static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { +void mg_if_recved(struct mg_connection *nc, size_t len) { (void) nc; + (void) len; } -static void mg_add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) { - if (sock != INVALID_SOCKET) { - FD_SET(sock, set); - if (*max_fd == INVALID_SOCKET || sock > *max_fd) { - *max_fd = sock; - } +void mg_if_destroy_conn(struct mg_connection *nc) { + if (nc->sock == INVALID_SOCKET) return; +#ifdef MG_ENABLE_SSL + if (nc->ssl != NULL) SSL_free(nc->ssl); + if (nc->ssl_ctx != NULL) SSL_CTX_free(nc->ssl_ctx); +#endif + if (!(nc->flags & MG_F_UDP)) { + closesocket(nc->sock); + } else { + /* Only close outgoing UDP sockets or listeners. */ + if (nc->listener == NULL) closesocket(nc->sock); } + /* + * avoid users accidentally double close a socket + * because it can lead to difficult to debug situations. + * It would happen only if reusing a destroyed mg_connection + * but it's not always possible to run the code through an + * address sanitizer. + */ + nc->sock = INVALID_SOCKET; } -time_t mg_mgr_poll(struct mg_mgr *mgr, int milli) { - time_t now; - struct mg_connection *nc, *tmp; - struct timeval tv; - fd_set read_set, write_set, err_set; - sock_t max_fd = INVALID_SOCKET; - int num_selected; +static void mg_accept_conn(struct mg_connection *lc) { + union socket_address sa; + socklen_t sa_len = sizeof(sa); + /* NOTE(lsm): on Windows, sock is always > FD_SETSIZE */ + sock_t sock = accept(lc->sock, &sa.sa, &sa_len); + if (sock < 0) { + DBG(("%p: failed to accept: %d", lc, errno)); + return; + } + mg_if_accept_tcp_cb(lc, sock, &sa, sa_len); +} - FD_ZERO(&read_set); - FD_ZERO(&write_set); - FD_ZERO(&err_set); - mg_add_to_set(mgr->ctl[1], &read_set, &max_fd); +/* 'sa' must be an initialized address to bind to */ +static sock_t mg_open_listening_socket(union socket_address *sa, int proto) { + socklen_t sa_len = + (sa->sa.sa_family == AF_INET) ? sizeof(sa->sin) : sizeof(sa->sin6); + sock_t sock = INVALID_SOCKET; +#if !defined(MG_CC3200) && !defined(MG_LWIP) + int on = 1; +#endif - for (nc = mgr->active_connections; nc != NULL; nc = tmp) { - tmp = nc->next; + if ((sock = socket(sa->sa.sa_family, proto, 0)) != INVALID_SOCKET && +#if !defined(MG_CC3200) && \ + !defined(MG_LWIP) /* CC3200 and LWIP don't support either */ +#if defined(_WIN32) && defined(SO_EXCLUSIVEADDRUSE) + /* "Using SO_REUSEADDR and SO_EXCLUSIVEADDRUSE" http://goo.gl/RmrFTm */ + !setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (void *) &on, + sizeof(on)) && +#endif - if (!(nc->flags & MG_F_WANT_WRITE) && - nc->recv_mbuf.len < nc->recv_mbuf_limit) { - mg_add_to_set(nc->sock, &read_set, &max_fd); - } +#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE) + /* + * SO_RESUSEADDR is not enabled on Windows because the semantics of + * SO_REUSEADDR on UNIX and Windows is different. On Windows, + * SO_REUSEADDR allows to bind a socket to a port without error even if + * the port is already open by another program. This is not the behavior + * SO_REUSEADDR was designed for, and leads to hard-to-track failure + * scenarios. Therefore, SO_REUSEADDR was disabled on Windows unless + * SO_EXCLUSIVEADDRUSE is supported and set on a socket. + */ + !setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) && +#endif +#endif /* !MG_CC3200 && !MG_LWIP */ - if (((nc->flags & MG_F_CONNECTING) && !(nc->flags & MG_F_WANT_READ)) || - (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_CONNECTING) && - !(nc->flags & MG_F_DONT_SEND))) { - mg_add_to_set(nc->sock, &write_set, &max_fd); - mg_add_to_set(nc->sock, &err_set, &max_fd); - } + !bind(sock, &sa->sa, sa_len) && + (proto == SOCK_DGRAM || listen(sock, SOMAXCONN) == 0)) { +#if !defined(MG_CC3200) && !defined(MG_LWIP) /* TODO(rojer): Fix this. */ + mg_set_non_blocking_mode(sock); + /* In case port was set to 0, get the real port number */ + (void) getsockname(sock, &sa->sa, &sa_len); +#endif + } else if (sock != INVALID_SOCKET) { + closesocket(sock); + sock = INVALID_SOCKET; } - tv.tv_sec = milli / 1000; - tv.tv_usec = (milli % 1000) * 1000; + return sock; +} - num_selected = select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv); - now = time(NULL); - DBG(("select @ %ld num_ev=%d", (long) now, num_selected)); +static void mg_write_to_socket(struct mg_connection *nc) { + struct mbuf *io = &nc->send_mbuf; + int n = 0; -#ifndef MG_DISABLE_SOCKETPAIR - if (num_selected > 0 && mgr->ctl[1] != INVALID_SOCKET && - FD_ISSET(mgr->ctl[1], &read_set)) { - mg_mgr_handle_ctl_sock(mgr); - } +#ifdef MG_LWIP + /* With LWIP we don't know if the socket is ready */ + if (io->len == 0) return; #endif - for (nc = mgr->active_connections; nc != NULL; nc = tmp) { - int fd_flags = 0; - if (num_selected > 0) { - fd_flags = (FD_ISSET(nc->sock, &read_set) ? _MG_F_FD_CAN_READ : 0) | - (FD_ISSET(nc->sock, &write_set) ? _MG_F_FD_CAN_WRITE : 0) | - (FD_ISSET(nc->sock, &err_set) ? _MG_F_FD_ERROR : 0); - } -#ifdef MG_CC3200 - // CC3200 does not report UDP sockets as writeable. - if (nc->flags & MG_F_UDP && - (nc->send_mbuf.len > 0 || nc->flags & MG_F_CONNECTING)) { - fd_flags |= _MG_F_FD_CAN_WRITE; + assert(io->len > 0); + + if (nc->flags & MG_F_UDP) { + int n = + sendto(nc->sock, io->buf, io->len, 0, &nc->sa.sa, sizeof(nc->sa.sin)); + DBG(("%p %d %d %d %s:%hu", nc, nc->sock, n, errno, + inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port))); + if (n > 0) { + mbuf_remove(io, n); } -#endif -#ifdef MG_LWIP - /* With LWIP socket emulation layer, we don't get write events */ - fd_flags |= _MG_F_FD_CAN_WRITE; -#endif - tmp = nc->next; - mg_mgr_handle_connection(nc, fd_flags, now); + mg_if_sent_cb(nc, n); + return; } - for (nc = mgr->active_connections; nc != NULL; nc = tmp) { - tmp = nc->next; - if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || - (nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) { - mg_close_conn(nc); +#ifdef MG_ENABLE_SSL + if (nc->ssl != NULL) { + if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) { + n = SSL_write(nc->ssl, io->buf, io->len); + if (n <= 0) { + int ssl_err = mg_ssl_err(nc, n); + if (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE) { + return; /* Call us again */ + } else { + nc->flags |= MG_F_CLOSE_IMMEDIATELY; + } + } else { + /* Successful SSL operation, clear off SSL wait flags */ + nc->flags &= ~(MG_F_WANT_READ | MG_F_WANT_WRITE); + } + } else { + mg_ssl_begin(nc); + return; } + } else +#endif + { + n = (int) MG_SEND_FUNC(nc->sock, io->buf, io->len, 0); } - return now; -} - -#endif + DBG(("%p %d bytes -> %d", nc, n, nc->sock)); -/* - * Schedules an async connect for a resolved address and proto. - * Called from two places: `mg_connect_opt()` and from async resolver. - * When called from the async resolver, it must trigger `MG_EV_CONNECT` event - * with a failure flag to indicate connection failure. - */ -MG_INTERNAL struct mg_connection *mg_finish_connect(struct mg_connection *nc, - int proto, - union socket_address *sa, - struct mg_add_sock_opts o) { - sock_t sock = INVALID_SOCKET; - int rc; + if (n > 0) { + mbuf_remove(io, n); + } + mg_if_sent_cb(nc, n); +} - DBG(("%p %s://%s:%hu", nc, proto == SOCK_DGRAM ? "udp" : "tcp", - inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port))); +static void mg_read_from_socket(struct mg_connection *conn) { + int n = 0; + char *buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE); - if ((sock = socket(AF_INET, proto, 0)) == INVALID_SOCKET) { - int failure = errno; - MG_SET_PTRPTR(o.error_string, "cannot create socket"); - if (nc->flags & MG_F_CONNECTING) { - mg_call(nc, MG_EV_CONNECT, &failure); - } - mg_destroy_conn(nc); - return NULL; + if (buf == NULL) { + DBG(("OOM")); + return; } -#if !defined(MG_CC3200) && !defined(MG_ESP8266) - mg_set_non_blocking_mode(sock); +#ifdef MG_ENABLE_SSL + if (conn->ssl != NULL) { + if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) { + /* SSL library may have more bytes ready to read then we ask to read. + * Therefore, read in a loop until we read everything. Without the loop, + * we skip to the next select() cycle which can just timeout. */ + while ((n = SSL_read(conn->ssl, buf, MG_TCP_RECV_BUFFER_SIZE)) > 0) { + DBG(("%p %d bytes <- %d (SSL)", conn, n, conn->sock)); + mg_if_recv_tcp_cb(conn, buf, n); + buf = NULL; + if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break; + /* buf has been freed, we need a new one. */ + buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE); + if (buf == NULL) break; + } + MG_FREE(buf); + mg_ssl_err(conn, n); + } else { + MG_FREE(buf); + mg_ssl_begin(conn); + return; + } + } else #endif - rc = (proto == SOCK_DGRAM) ? 0 : connect(sock, &sa->sa, sizeof(sa->sin)); - - if (rc != 0 && mg_is_error(rc)) { - MG_SET_PTRPTR(o.error_string, "cannot connect to socket"); - if (nc->flags & MG_F_CONNECTING) { - mg_call(nc, MG_EV_CONNECT, &rc); + { + n = (int) MG_RECV_FUNC(conn->sock, buf, + recv_avail_size(conn, MG_TCP_RECV_BUFFER_SIZE), 0); + if (n > 0) { + DBG(("%p %d bytes (PLAIN) <- %d", conn, n, conn->sock)); + mg_if_recv_tcp_cb(conn, buf, n); + } else { + MG_FREE(buf); + } + if (mg_is_error(n)) { + conn->flags |= MG_F_CLOSE_IMMEDIATELY; } - mg_destroy_conn(nc); - close(sock); - return NULL; } +} - /* Fire MG_EV_CONNECT on next poll. */ - nc->flags |= MG_F_CONNECTING; - - /* No mg_destroy_conn() call after this! */ - mg_set_sock(nc, sock); - -#ifdef MG_ENABLE_SSL - /* - * If we are using async resolver, socket isn't open - * before this place, so - * for SSL connections we have to add socket to SSL fd set - */ - if (nc->ssl != NULL && !(nc->flags & MG_F_LISTENING)) { - SSL_set_fd(nc->ssl, nc->sock); +static int mg_recvfrom(struct mg_connection *nc, union socket_address *sa, + socklen_t *sa_len, char **buf) { + int n; + *buf = (char *) MG_MALLOC(MG_UDP_RECV_BUFFER_SIZE); + if (*buf == NULL) { + DBG(("Out of memory")); + return -ENOMEM; } -#endif + n = recvfrom(nc->sock, *buf, MG_UDP_RECV_BUFFER_SIZE, 0, &sa->sa, sa_len); + if (n <= 0) { + DBG(("%p recvfrom: %s", nc, strerror(errno))); + MG_FREE(*buf); + } + return n; +} - return nc; +static void mg_handle_udp_read(struct mg_connection *nc) { + char *buf = NULL; + union socket_address sa; + socklen_t sa_len = sizeof(sa); + int n = mg_recvfrom(nc, &sa, &sa_len, &buf); + DBG(("%p %d bytes from %s:%d", nc, n, inet_ntoa(nc->sa.sin.sin_addr), + ntohs(nc->sa.sin.sin_port))); + mg_if_recv_udp_cb(nc, buf, n, &sa, sa_len); } -#ifndef MG_DISABLE_RESOLVER -/* - * Callback for the async resolver on mg_connect_opt() call. - * Main task of this function is to trigger MG_EV_CONNECT event with - * either failure (and dealloc the connection) - * or success (and proceed with connect() +#define _MG_F_FD_CAN_READ 1 +#define _MG_F_FD_CAN_WRITE 1 << 1 +#define _MG_F_FD_ERROR 1 << 2 + +void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, time_t now) { + DBG(("%p fd=%d fd_flags=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, + fd_flags, nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); + if (fd_flags != 0) nc->last_io_time = now; + + if (nc->flags & MG_F_CONNECTING) { + if (fd_flags != 0) { + int err = 0; +#if !defined(MG_CC3200) && !defined(MG_ESP8266) + if (!(nc->flags & MG_F_UDP)) { + socklen_t len = sizeof(err); + int ret = + getsockopt(nc->sock, SOL_SOCKET, SO_ERROR, (char *) &err, &len); + if (ret != 0) err = 1; + } +#else +/* On CC3200 and ESP8266 we use blocking connect. If we got as far as + * this, it means connect() was successful. + * TODO(rojer): Figure out why it fails where blocking succeeds. */ -static void resolve_cb(struct mg_dns_message *msg, void *data) { - struct mg_connection *nc = (struct mg_connection *) data; - int i; - int failure = -1; +#endif + mg_if_connect_cb(nc, err); + } else if (nc->err != 0) { + mg_if_connect_cb(nc, nc->err); + } + } - if (msg != NULL) { - /* - * Take the first DNS A answer and run... - */ - for (i = 0; i < msg->num_answers; i++) { - if (msg->answers[i].rtype == MG_DNS_A_RECORD) { - static struct mg_add_sock_opts opts; + if (fd_flags & _MG_F_FD_CAN_READ) { + if (nc->flags & MG_F_UDP) { + mg_handle_udp_read(nc); + } else { + if (nc->flags & MG_F_LISTENING) { /* - * Async resolver guarantees that there is at least one answer. - * TODO(lsm): handle IPv6 answers too + * We're not looping here, and accepting just one connection at + * a time. The reason is that eCos does not respect non-blocking + * flag on a listening socket and hangs in a loop. */ - mg_dns_parse_record_data(msg, &msg->answers[i], &nc->sa.sin.sin_addr, - 4); - /* Make mg_finish_connect() trigger MG_EV_CONNECT on failure */ - nc->flags |= MG_F_CONNECTING; - mg_finish_connect(nc, nc->flags & MG_F_UDP ? SOCK_DGRAM : SOCK_STREAM, - &nc->sa, opts); + if (fd_flags & _MG_F_FD_CAN_READ) mg_accept_conn(nc); return; + } else { + mg_read_from_socket(nc); } } + if (nc->flags & MG_F_CLOSE_IMMEDIATELY) return; } - /* - * If we get there was no MG_DNS_A_RECORD in the answer - */ - mg_call(nc, MG_EV_CONNECT, &failure); - mg_destroy_conn(nc); + if ((fd_flags & _MG_F_FD_CAN_WRITE) && !(nc->flags & MG_F_DONT_SEND) && + nc->send_mbuf.len > 0) { + mg_write_to_socket(nc); + } + + if (!(fd_flags & (_MG_F_FD_CAN_READ | _MG_F_FD_CAN_WRITE))) { + mg_call(nc, MG_EV_POLL, &now); + } + + DBG(("%p after fd=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, nc->flags, + (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); +} + +#ifndef MG_DISABLE_SOCKETPAIR +static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { + struct ctl_msg ctl_msg; + int len = + (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0); + size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0); + (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ + if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) { + struct mg_connection *nc; + for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { + ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.message); + } + } } #endif -struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *address, - mg_event_handler_t callback) { - static struct mg_connect_opts opts; - return mg_connect_opt(mgr, address, callback, opts); +struct mg_connection *mg_add_sock(struct mg_mgr *s, sock_t sock, + mg_event_handler_t callback) { + static struct mg_add_sock_opts opts; + return mg_add_sock_opt(s, sock, callback, opts); } -struct mg_connection *mg_connect_opt(struct mg_mgr *mgr, const char *address, - mg_event_handler_t callback, - struct mg_connect_opts opts) { - struct mg_connection *nc = NULL; - int proto, rc; - struct mg_add_sock_opts add_sock_opts; - char host[MG_MAX_HOST_LEN]; +struct mg_connection *mg_add_sock_opt(struct mg_mgr *s, sock_t sock, + mg_event_handler_t callback, + struct mg_add_sock_opts opts) { + struct mg_connection *nc = mg_create_connection(s, callback, opts); + if (nc != NULL) { + mg_set_sock(nc, sock); + mg_add_conn(nc->mgr, nc); + } + return nc; +} - MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts); +/* Associate a socket to a connection. */ +MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock) { + mg_set_non_blocking_mode(sock); + mg_set_close_on_exec(sock); + nc->sock = sock; + DBG(("%p %d", nc, sock)); +} - if ((nc = mg_create_connection(mgr, callback, add_sock_opts)) == NULL) { - return NULL; - } else if ((rc = mg_parse_address(address, &nc->sa, &proto, host, - sizeof(host))) < 0) { - /* Address is malformed */ - MG_SET_PTRPTR(opts.error_string, "cannot parse address"); - mg_destroy_conn(nc); - return NULL; +#if MG_MGR_EV_MGR == 1 /* epoll() */ + +#ifndef MG_EPOLL_MAX_EVENTS +#define MG_EPOLL_MAX_EVENTS 100 +#endif + +#define _MG_EPF_EV_EPOLLIN (1 << 0) +#define _MG_EPF_EV_EPOLLOUT (1 << 1) +#define _MG_EPF_NO_POLL (1 << 2) + +static uint32_t mg_epf_to_evflags(unsigned int epf) { + uint32_t result = 0; + if (epf & _MG_EPF_EV_EPOLLIN) result |= EPOLLIN; + if (epf & _MG_EPF_EV_EPOLLOUT) result |= EPOLLOUT; + return result; +} + +static void mg_ev_mgr_epoll_set_flags(const struct mg_connection *nc, + struct epoll_event *ev) { + /* NOTE: EPOLLERR and EPOLLHUP are always enabled. */ + ev->events = 0; + if ((nc->flags & MG_F_LISTENING) || nc->recv_mbuf.len < nc->recv_mbuf_limit) { + ev->events |= EPOLLIN; } - nc->flags |= opts.flags & _MG_ALLOWED_CONNECT_FLAGS_MASK; - nc->flags |= (proto == SOCK_DGRAM) ? MG_F_UDP : 0; - nc->user_data = opts.user_data; + if ((nc->flags & MG_F_CONNECTING) || + (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_DONT_SEND))) { + ev->events |= EPOLLOUT; + } +} - if (rc == 0) { -#ifndef MG_DISABLE_RESOLVER - /* - * DNS resolution is required for host. - * mg_parse_address() fills port in nc->sa, which we pass to resolve_cb() - */ - if (mg_resolve_async(nc->mgr, host, MG_DNS_A_RECORD, resolve_cb, nc) != 0) { - MG_SET_PTRPTR(opts.error_string, "cannot schedule DNS lookup"); - mg_destroy_conn(nc); - return NULL; +static void mg_ev_mgr_epoll_ctl(struct mg_connection *nc, int op) { + int epoll_fd = (intptr_t) nc->mgr->mgr_data; + struct epoll_event ev; + assert(op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD || EPOLL_CTL_DEL); + DBG(("%p %d %d", nc, nc->sock, op)); + if (nc->sock == INVALID_SOCKET) return; + if (op != EPOLL_CTL_DEL) { + mg_ev_mgr_epoll_set_flags(nc, &ev); + if (op == EPOLL_CTL_MOD) { + uint32_t old_ev_flags = mg_epf_to_evflags((intptr_t) nc->mgr_data); + if (ev.events == old_ev_flags) return; } + ev.data.ptr = nc; + } + if (epoll_ctl(epoll_fd, op, nc->sock, &ev) != 0) { + perror("epoll_ctl"); + abort(); + } +} - return nc; -#else - MG_SET_PTRPTR(opts.error_string, "Resolver is disabled"); - mg_destroy_conn(nc); - return NULL; +static void mg_ev_mgr_init(struct mg_mgr *mgr) { + int epoll_fd; + DBG(("%p using epoll()", mgr)); +#ifndef MG_DISABLE_SOCKETPAIR + do { + mg_socketpair(mgr->ctl, SOCK_DGRAM); + } while (mgr->ctl[0] == INVALID_SOCKET); #endif - } else { - /* Address is parsed and resolved to IP. proceed with connect() */ - return mg_finish_connect(nc, proto, &nc->sa, add_sock_opts); + epoll_fd = epoll_create(MG_EPOLL_MAX_EVENTS /* unused but required */); + if (epoll_fd < 0) { + perror("epoll_ctl"); + abort(); + } + mgr->mgr_data = (void *) ((intptr_t) epoll_fd); + if (mgr->ctl[1] != INVALID_SOCKET) { + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.ptr = NULL; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mgr->ctl[1], &ev) != 0) { + perror("epoll_ctl"); + abort(); + } } } -struct mg_connection *mg_bind(struct mg_mgr *srv, const char *address, - mg_event_handler_t event_handler) { - static struct mg_bind_opts opts; - return mg_bind_opt(srv, address, event_handler, opts); +static void mg_ev_mgr_free(struct mg_mgr *mgr) { + int epoll_fd = (intptr_t) mgr->mgr_data; + close(epoll_fd); +} + +static void mg_ev_mgr_add_conn(struct mg_connection *nc) { + if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) { + mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_ADD); + } } -struct mg_connection *mg_bind_opt(struct mg_mgr *mgr, const char *address, - mg_event_handler_t callback, - struct mg_bind_opts opts) { - union socket_address sa; - struct mg_connection *nc = NULL; - int proto; - sock_t sock; - struct mg_add_sock_opts add_sock_opts; - char host[MG_MAX_HOST_LEN]; +static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { + if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) { + mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_DEL); + } +} - MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts); +time_t mg_mgr_poll(struct mg_mgr *mgr, int timeout_ms) { + int epoll_fd = (intptr_t) mgr->mgr_data; + struct epoll_event events[MG_EPOLL_MAX_EVENTS]; + struct mg_connection *nc, *next; + int num_ev, fd_flags; + time_t now; - if (mg_parse_address(address, &sa, &proto, host, sizeof(host)) <= 0) { - MG_SET_PTRPTR(opts.error_string, "cannot parse address"); - } else if ((sock = mg_open_listening_socket(&sa, proto)) == INVALID_SOCKET) { - DBG(("Failed to open listener: %d", errno)); - MG_SET_PTRPTR(opts.error_string, "failed to open listener"); - } else if ((nc = mg_add_sock_opt(mgr, sock, callback, add_sock_opts)) == - NULL) { - /* opts.error_string set by mg_add_sock_opt */ - DBG(("Failed to mg_add_sock")); - closesocket(sock); - } else { - nc->sa = sa; - nc->handler = callback; + num_ev = epoll_wait(epoll_fd, events, MG_EPOLL_MAX_EVENTS, timeout_ms); + now = time(NULL); + DBG(("epoll_wait @ %ld num_ev=%d", (long) now, num_ev)); - if (proto == SOCK_DGRAM) { - nc->flags |= MG_F_UDP; - } else { - nc->flags |= MG_F_LISTENING; + while (num_ev-- > 0) { + intptr_t epf; + struct epoll_event *ev = events + num_ev; + nc = (struct mg_connection *) ev->data.ptr; + if (nc == NULL) { + mg_mgr_handle_ctl_sock(mgr); + continue; } + fd_flags = ((ev->events & (EPOLLIN | EPOLLHUP)) ? _MG_F_FD_CAN_READ : 0) | + ((ev->events & (EPOLLOUT)) ? _MG_F_FD_CAN_WRITE : 0) | + ((ev->events & (EPOLLERR)) ? _MG_F_FD_ERROR : 0); + mg_mgr_handle_conn(nc, fd_flags, now); + epf = (intptr_t) nc->mgr_data; + epf ^= _MG_EPF_NO_POLL; + nc->mgr_data = (void *) epf; + } - DBG(("%p sock %d/%d", nc, sock, proto)); + for (nc = mgr->active_connections; nc != NULL; nc = next) { + next = nc->next; + if (!(((intptr_t) nc->mgr_data) & _MG_EPF_NO_POLL)) { + mg_mgr_handle_conn(nc, 0, now); + } else { + intptr_t epf = (intptr_t) nc->mgr_data; + epf ^= _MG_EPF_NO_POLL; + nc->mgr_data = (void *) epf; + } + if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || + (nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) { + mg_close_conn(nc); + } else { + if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) { + mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_MOD); + } else { + /* This is a kludge, but... */ + if (nc->send_mbuf.len > 0) { + mg_mgr_handle_conn(nc, _MG_F_FD_CAN_WRITE, now); + } + } + } } - return nc; + return now; } -struct mg_connection *mg_add_sock(struct mg_mgr *s, sock_t sock, - mg_event_handler_t callback) { - static struct mg_add_sock_opts opts; - return mg_add_sock_opt(s, sock, callback, opts); -} +#else /* select() */ -struct mg_connection *mg_add_sock_opt(struct mg_mgr *s, sock_t sock, - mg_event_handler_t callback, - struct mg_add_sock_opts opts) { - struct mg_connection *nc = mg_create_connection(s, callback, opts); - if (nc != NULL) { - mg_set_sock(nc, sock); - } - return nc; +static void mg_ev_mgr_init(struct mg_mgr *mgr) { + (void) mgr; + DBG(("%p using select()", mgr)); +#ifndef MG_DISABLE_SOCKETPAIR + do { + mg_socketpair(mgr->ctl, SOCK_DGRAM); + } while (mgr->ctl[0] == INVALID_SOCKET); +#endif } -struct mg_connection *mg_next(struct mg_mgr *s, struct mg_connection *conn) { - return conn == NULL ? s->active_connections : conn->next; +static void mg_ev_mgr_free(struct mg_mgr *mgr) { + (void) mgr; } -void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, - size_t len) { - struct ctl_msg ctl_msg; +static void mg_ev_mgr_add_conn(struct mg_connection *nc) { + (void) nc; +} - /* - * Mongoose manager has a socketpair, `struct mg_mgr::ctl`, - * where `mg_broadcast()` pushes the message. - * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls - * specified callback for each connection. Thus the callback function executes - * in event manager thread. - */ - if (mgr->ctl[0] != INVALID_SOCKET && data != NULL && - len < sizeof(ctl_msg.message)) { - size_t dummy; +static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { + (void) nc; +} - ctl_msg.callback = cb; - memcpy(ctl_msg.message, data, len); - dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg, - offsetof(struct ctl_msg, message) + len, 0); - dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0); - (void) dummy; +static void mg_add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) { + if (sock != INVALID_SOCKET) { + FD_SET(sock, set); + if (*max_fd == INVALID_SOCKET || sock > *max_fd) { + *max_fd = sock; + } } } -static int isbyte(int n) { - return n >= 0 && n <= 255; -} +time_t mg_mgr_poll(struct mg_mgr *mgr, int milli) { + time_t now = time(NULL); + struct mg_connection *nc, *tmp; + struct timeval tv; + fd_set read_set, write_set, err_set; + sock_t max_fd = INVALID_SOCKET; + int num_fds, num_selected; -static int parse_net(const char *spec, uint32_t *net, uint32_t *mask) { - int n, a, b, c, d, slash = 32, len = 0; + FD_ZERO(&read_set); + FD_ZERO(&write_set); + FD_ZERO(&err_set); + mg_add_to_set(mgr->ctl[1], &read_set, &max_fd); - if ((sscanf(spec, "%d.%d.%d.%d/%d%n", &a, &b, &c, &d, &slash, &n) == 5 || - sscanf(spec, "%d.%d.%d.%d%n", &a, &b, &c, &d, &n) == 4) && - isbyte(a) && isbyte(b) && isbyte(c) && isbyte(d) && slash >= 0 && - slash < 33) { - len = n; - *net = - ((uint32_t) a << 24) | ((uint32_t) b << 16) | ((uint32_t) c << 8) | d; - *mask = slash ? 0xffffffffU << (32 - slash) : 0; + for (nc = mgr->active_connections, num_fds = 0; nc != NULL; nc = tmp) { + tmp = nc->next; + + if (nc->sock == INVALID_SOCKET) { + mg_mgr_handle_conn(nc, 0, now); + continue; + } + + num_fds++; + + if (!(nc->flags & MG_F_WANT_WRITE) && + nc->recv_mbuf.len < nc->recv_mbuf_limit && + (!(nc->flags & MG_F_UDP) || nc->listener == NULL)) { + mg_add_to_set(nc->sock, &read_set, &max_fd); + } + + if (((nc->flags & MG_F_CONNECTING) && !(nc->flags & MG_F_WANT_READ)) || + (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_CONNECTING) && + !(nc->flags & MG_F_DONT_SEND))) { + mg_add_to_set(nc->sock, &write_set, &max_fd); + mg_add_to_set(nc->sock, &err_set, &max_fd); + } } - return len; -} + tv.tv_sec = milli / 1000; + tv.tv_usec = (milli % 1000) * 1000; -int mg_check_ip_acl(const char *acl, uint32_t remote_ip) { - int allowed, flag; - uint32_t net, mask; - struct mg_str vec; + num_selected = select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv); + now = time(NULL); + DBG(("select @ %ld num_ev=%d of %d", (long) now, num_selected, num_fds)); - /* If any ACL is set, deny by default */ - allowed = (acl == NULL || *acl == '\0') ? '+' : '-'; +#ifndef MG_DISABLE_SOCKETPAIR + if (num_selected > 0 && mgr->ctl[1] != INVALID_SOCKET && + FD_ISSET(mgr->ctl[1], &read_set)) { + mg_mgr_handle_ctl_sock(mgr); + } +#endif - while ((acl = mg_next_comma_list_entry(acl, &vec, NULL)) != NULL) { - flag = vec.p[0]; - if ((flag != '+' && flag != '-') || - parse_net(&vec.p[1], &net, &mask) == 0) { - return -1; + for (nc = mgr->active_connections; nc != NULL; nc = tmp) { + int fd_flags = 0; + if (num_selected > 0) { + fd_flags = (FD_ISSET(nc->sock, &read_set) ? _MG_F_FD_CAN_READ : 0) | + (FD_ISSET(nc->sock, &write_set) ? _MG_F_FD_CAN_WRITE : 0) | + (FD_ISSET(nc->sock, &err_set) ? _MG_F_FD_ERROR : 0); + } +#ifdef MG_CC3200 + // CC3200 does not report UDP sockets as writeable. + if (nc->flags & MG_F_UDP && + (nc->send_mbuf.len > 0 || nc->flags & MG_F_CONNECTING)) { + fd_flags |= _MG_F_FD_CAN_WRITE; } +#endif +#ifdef MG_LWIP + /* With LWIP socket emulation layer, we don't get write events */ + fd_flags |= _MG_F_FD_CAN_WRITE; +#endif + tmp = nc->next; + mg_mgr_handle_conn(nc, fd_flags, now); + } - if (net == (remote_ip & mask)) { - allowed = flag; + for (nc = mgr->active_connections; nc != NULL; nc = tmp) { + tmp = nc->next; + if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || + (nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) { + mg_close_conn(nc); } } - return allowed == '+'; + return now; } -/* Move data from one connection to another */ -void mg_forward(struct mg_connection *from, struct mg_connection *to) { - mg_send(to, from->recv_mbuf.buf, from->recv_mbuf.len); - mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len); +#endif + +#ifndef MG_DISABLE_SOCKETPAIR +int mg_socketpair(sock_t sp[2], int sock_type) { + union socket_address sa; + sock_t sock; + socklen_t len = sizeof(sa.sin); + int ret = 0; + + sock = sp[0] = sp[1] = INVALID_SOCKET; + + (void) memset(&sa, 0, sizeof(sa)); + sa.sin.sin_family = AF_INET; + sa.sin.sin_port = htons(0); + sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ + + if ((sock = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) { + } else if (bind(sock, &sa.sa, len) != 0) { + } else if (sock_type == SOCK_STREAM && listen(sock, 1) != 0) { + } else if (getsockname(sock, &sa.sa, &len) != 0) { + } else if ((sp[0] = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) { + } else if (connect(sp[0], &sa.sa, len) != 0) { + } else if (sock_type == SOCK_DGRAM && + (getsockname(sp[0], &sa.sa, &len) != 0 || + connect(sock, &sa.sa, len) != 0)) { + } else if ((sp[1] = (sock_type == SOCK_DGRAM ? sock + : accept(sock, &sa.sa, &len))) == + INVALID_SOCKET) { + } else { + mg_set_close_on_exec(sp[0]); + mg_set_close_on_exec(sp[1]); + if (sock_type == SOCK_STREAM) closesocket(sock); + ret = 1; + } + + if (!ret) { + if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); + if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); + if (sock != INVALID_SOCKET) closesocket(sock); + sock = sp[0] = sp[1] = INVALID_SOCKET; + } + + return ret; } +#endif /* MG_DISABLE_SOCKETPAIR */ #ifdef NS_MODULE_LINES #line 1 "src/multithreading.c" /**/ @@ -7293,7 +7443,7 @@ struct mg_dns_reply mg_dns_create_reply(struct mbuf *io, return rep; } -int mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) { +void mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) { size_t sent = r->io->len - r->start; mg_dns_insert_header(r->io, r->start, r->msg); if (!(nc->flags & MG_F_UDP)) { @@ -7301,11 +7451,10 @@ int mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) { mbuf_insert(r->io, r->start, &len, 2); } - if (&nc->send_mbuf != r->io || nc->flags & MG_F_UDP) { - sent = mg_send(nc, r->io->buf + r->start, r->io->len - r->start); + if (&nc->send_mbuf != r->io) { + mg_send(nc, r->io->buf + r->start, r->io->len - r->start); r->io->len = r->start; } - return sent; } int mg_dns_reply_record(struct mg_dns_reply *reply, @@ -8098,7 +8247,6 @@ uint32_t mg_coap_compose(struct mg_coap_message *cm, struct mbuf *io) { uint32_t mg_coap_send_message(struct mg_connection *nc, struct mg_coap_message *cm) { struct mbuf packet_out; - int send_res; uint32_t compose_res; mbuf_init(&packet_out, 0); @@ -8107,17 +8255,9 @@ uint32_t mg_coap_send_message(struct mg_connection *nc, return compose_res; /* LCOV_EXCL_LINE */ } - send_res = mg_send(nc, packet_out.buf, (int) packet_out.len); + mg_send(nc, packet_out.buf, (int) packet_out.len); mbuf_free(&packet_out); - if (send_res == 0) { - /* - * in case of UDP mg_send tries to send immediately - * and could return an error. - */ - return MG_COAP_NETWORK_ERROR; /* LCOV_EXCL_LINE */ - } - return 0; } diff --git a/mongoose.h b/mongoose.h index 5d9def373..3448f0909 100644 --- a/mongoose.h +++ b/mongoose.h @@ -672,7 +672,8 @@ struct mg_connection { struct mg_connection *listener; /* Set only for accept()-ed connections */ struct mg_mgr *mgr; /* Pointer to containing manager */ - sock_t sock; /* Socket to the remote peer */ + sock_t sock; /* Socket to the remote peer */ + int err; union socket_address sa; /* Remote peer address */ size_t recv_mbuf_limit; /* Max size of recv buffer */ struct mbuf recv_mbuf; /* Received data */ @@ -923,13 +924,11 @@ const char *mg_set_ssl(struct mg_connection *nc, const char *cert, /* * Send data to the connection. * - * Return number of written bytes. Note that sending - * functions do not actually push data to the socket. They just append data - * to the output buffer. The exception is UDP connections. For UDP, data is - * sent immediately, and returned value indicates an actual number of bytes - * sent to the socket. + * Note that sending functions do not actually push data to the socket. + * They just append data to the output buffer. MG_EV_SEND will be delivered when + * the data has actually been pushed out. */ -int mg_send(struct mg_connection *, const void *buf, int len); +void mg_send(struct mg_connection *, const void *buf, int len); /* Enables format string warnings for mg_printf */ #if defined(__GNUC__) @@ -1013,6 +1012,57 @@ enum v7_err mg_enable_javascript(struct mg_mgr *m, struct v7 *v7, #endif /* __cplusplus */ #endif /* MG_NET_HEADER_INCLUDED */ +#ifndef MG_NET_IF_HEADER_INCLUDED +#define MG_NET_IF_HEADER_INCLUDED + +/* + * Internal async networking core interface. + * Consists of calls made by the core, which should not block, + * and callbacks back into the core ("..._cb"). + * Callbacks may (will) cause methods to be invoked from within, + * but methods are not allowed to invoke callbacks inline. + * + * Implementation must ensure that only one callback is invoked at any time. + */ + +/* Request that a TCP connection is made to the specified address. */ +void mg_if_connect_tcp(struct mg_connection *nc, + const union socket_address *sa); +/* Open a UDP socket. Doesn't actually connect anything. */ +void mg_if_connect_udp(struct mg_connection *nc); +/* Callback invoked by connect methods. err = 0 -> ok, != 0 -> error. */ +void mg_if_connect_cb(struct mg_connection *nc, int err); + +/* Set up a listening TCP socket on a given address. rv = 0 -> ok. */ +int mg_if_listen_tcp(struct mg_connection *nc, union socket_address *sa); +/* Deliver a new TCP connection. */ +void mg_if_accept_tcp_cb(struct mg_connection *lc, sock_t sock, + union socket_address *sa, size_t sa_len); + +/* Request that a "listening" UDP socket be created. */ +int mg_if_listen_udp(struct mg_connection *nc, union socket_address *sa); + +/* Send functions for TCP and UDP. Sent data is copied before return. */ +void mg_if_tcp_send(struct mg_connection *nc, const void *buf, size_t len); +void mg_if_udp_send(struct mg_connection *nc, const void *buf, size_t len); +/* Callback that reports that data has been put on the wire. */ +void mg_if_sent_cb(struct mg_connection *nc, int num_sent); + +/* + * Receive callback. + * buf must be heap-allocated and ownership is transferred to the core. + * Core will acknowledge consumption by calling mg_if_recved. + * No more than one chunk of data can be unacknowledged at any time. + */ +void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len); +void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len, + union socket_address *sa, size_t sa_len); +void mg_if_recved(struct mg_connection *nc, size_t len); + +/* Perform interface-related cleanup on connection before destruction. */ +void mg_if_destroy_conn(struct mg_connection *nc); + +#endif /* MG_NET_IF_HEADER_INCLUDED */ /* * Copyright (c) 2014 Cesanta Software Limited * All rights reserved @@ -2344,7 +2394,7 @@ int mg_dns_reply_record(struct mg_dns_reply *, struct mg_dns_resource_record *, * Once sent, the IO buffer will be trimmed unless the reply IO buffer * is the connection's send buffer and the connection is not in UDP mode. */ -int mg_dns_send_reply(struct mg_connection *, struct mg_dns_reply *); +void mg_dns_send_reply(struct mg_connection *, struct mg_dns_reply *); #ifdef __cplusplus } -- GitLab