From 4e6c2867da36ec8c911b980accb3166dc4ca9052 Mon Sep 17 00:00:00 2001 From: Deomid Ryabkov <rojer@cesanta.com> Date: Thu, 9 Feb 2017 01:34:28 +0800 Subject: [PATCH] UDP fixes: move recv callback into event loop, ... ...and fix a leak where UDP connections with SEND_AND_CLOSE set would not be closed PUBLISHED_FROM=d60f1fc037e8e57671c7d3146fabc1f068bbb514 --- mongoose.c | 109 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 78 insertions(+), 31 deletions(-) diff --git a/mongoose.c b/mongoose.c index 4b4da8235..5c116dc12 100644 --- a/mongoose.c +++ b/mongoose.c @@ -13535,6 +13535,8 @@ struct mg_lwip_conn_state { size_t rx_offset; /* Offset within the first pbuf (if partially consumed) */ /* Last SSL write size, for retries. */ int last_ssl_write_size; + int recv_pending; /* Whether MG_SIG_RECV is already pending for this + connection */ }; enum mg_sig_type { @@ -13611,6 +13613,8 @@ void mg_lwip_if_add_conn(struct mg_connection *nc); void mg_lwip_if_remove_conn(struct mg_connection *nc); time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms); +static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p); + #if LWIP_TCP_KEEPALIVE void mg_lwip_set_keepalive_params(struct mg_connection *nc, int idle, int interval, int count) { @@ -13688,27 +13692,23 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb, for (; q != NULL; q = q->next) pbuf_ref(q); } if (cs->rx_chain == NULL) { - cs->rx_chain = p; cs->rx_offset = 0; - } else { - if (pbuf_clen(cs->rx_chain) >= 4) { - /* ESP SDK has a limited pool of 5 pbufs. We must not hog them all or RX - * will be completely blocked. We already have at least 4 in the chain, - * this one is, so we have to make a copy and release this one. */ - struct pbuf *np = pbuf_alloc(PBUF_RAW, p->tot_len, PBUF_RAM); - if (np != NULL) { - pbuf_copy(np, p); - pbuf_free(p); - p = np; - } - } - pbuf_chain(cs->rx_chain, p); - } - mg_lwip_post_signal(MG_SIG_RECV, nc); + } else if (pbuf_clen(cs->rx_chain) >= 4) { + /* ESP SDK has a limited pool of 5 pbufs. We must not hog them all or RX + * will be completely blocked. We already have at least 4 in the chain, + * this one is, so we have to make a copy and release this one. */ + struct pbuf *np = pbuf_alloc(PBUF_RAW, p->tot_len, PBUF_RAM); + if (np != NULL) { + pbuf_copy(np, p); + pbuf_free(p); + p = np; + } + } + mg_lwip_recv_common(nc, p); return ERR_OK; } -static void mg_lwip_handle_recv(struct mg_connection *nc) { +static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) { struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock; #if MG_ENABLE_SSL @@ -13799,21 +13799,61 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, #endif { struct mg_connection *nc = (struct mg_connection *) arg; - size_t len = p->len; - char *data = (char *) malloc(len); - union socket_address sa; - (void) pcb; - DBG(("%p %s:%u %u", nc, IPADDR_NTOA(addr), port, p->len)); - if (data == NULL) { - DBG(("OOM")); + LOG(LL_INFO, + ("%p %s:%u %p %u %u", nc, IPADDR_NTOA(addr), port, p, p->ref, p->len)); + /* Put address in a separate pbuf and tack it onto the packet. */ + struct pbuf *sap = + pbuf_alloc(PBUF_RAW, sizeof(union socket_address), PBUF_RAM); + if (sap == NULL) { pbuf_free(p); return; } - sa.sin.sin_addr.s_addr = addr->addr; - sa.sin.sin_port = htons(port); - pbuf_copy_partial(p, data, len, 0); - pbuf_free(p); - mg_if_recv_udp_cb(nc, data, len, &sa, sizeof(sa.sin)); + union socket_address *sa = (union socket_address *) sap->payload; + sa->sin.sin_addr.s_addr = addr->addr; + sa->sin.sin_port = htons(port); + /* Logic in the recv handler requires that there be exactly one data pbuf. */ + p = pbuf_coalesce(p, PBUF_RAW); + pbuf_chain(sap, p); + mg_lwip_recv_common(nc, sap); + (void) pcb; +} + +static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) { + struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock; + if (cs->rx_chain == NULL) { + cs->rx_chain = p; + } else { + pbuf_chain(cs->rx_chain, p); + } + if (!cs->recv_pending) { + cs->recv_pending = 1; + mg_lwip_post_signal(MG_SIG_RECV, nc); + } +} + +static void mg_lwip_handle_recv_udp(struct mg_connection *nc) { + struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock; + /* + * For UDP, RX chain consists of interleaved address and packet bufs: + * Address pbuf followed by exactly one data pbuf (recv_cb took care of that). + */ + while (cs->rx_chain != NULL) { + struct pbuf *sap = cs->rx_chain; + struct pbuf *p = sap->next; + cs->rx_chain = pbuf_dechain(p); + size_t data_len = p->len; + char *data = (char *) malloc(data_len); + if (data != NULL) { + pbuf_copy_partial(p, data, data_len, 0); + pbuf_free(p); + mg_if_recv_udp_cb(nc, data, data_len, + (union socket_address *) sap->payload, sap->len); + pbuf_free(sap); + } else { + pbuf_free(p); + pbuf_free(sap); + } + } } void mg_lwip_if_connect_udp(struct mg_connection *nc) { @@ -14168,7 +14208,12 @@ void mg_ev_mgr_lwip_process_signals(struct mg_mgr *mgr) { break; } case MG_SIG_RECV: { - mg_lwip_handle_recv(nc); + cs->recv_pending = 0; + if (nc->flags & MG_F_UDP) { + mg_lwip_handle_recv_udp(nc); + } else { + mg_lwip_handle_recv_tcp(nc); + } break; } case MG_SIG_SENT_CB: { @@ -14231,7 +14276,9 @@ time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms) { struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock; tmp = nc->next; n++; - if (nc->flags & MG_F_CLOSE_IMMEDIATELY) { + if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || + ((nc->flags & MG_F_SEND_AND_CLOSE) && (nc->flags & MG_F_UDP) && + (nc->send_mbuf.len == 0))) { mg_close_conn(nc); continue; } -- GitLab