From a6c66fb6692453481e36240ba69dfa141ab252f3 Mon Sep 17 00:00:00 2001 From: Alexander Alashkin <alexander.alashkin@cesanta.com> Date: Fri, 26 Feb 2016 00:12:32 +0000 Subject: [PATCH] Implement multipart streaming in MG (edition 2) PUBLISHED_FROM=de89316ca3b53b8eb46ea69e88b5fbd277c80dc2 --- examples/big_upload/Makefile | 1 + examples/big_upload/big_upload.c | 105 ++++--------- examples/examples.mk | 2 +- mongoose.c | 245 ++++++++++++++++++++++++++++++- mongoose.h | 41 +++++- 5 files changed, 314 insertions(+), 80 deletions(-) diff --git a/examples/big_upload/Makefile b/examples/big_upload/Makefile index b362d635c..01b576bb2 100644 --- a/examples/big_upload/Makefile +++ b/examples/big_upload/Makefile @@ -1,2 +1,3 @@ PROG = big_upload +MODULE_CFLAGS = -DMG_ENABLE_HTTP_STREAMING_MULTIPART include ../examples.mk diff --git a/examples/big_upload/big_upload.c b/examples/big_upload/big_upload.c index 2d9a4fbc8..2530df3f2 100644 --- a/examples/big_upload/big_upload.c +++ b/examples/big_upload/big_upload.c @@ -13,12 +13,11 @@ static const char *s_http_port = "8000"; struct file_writer_data { FILE *fp; - size_t bytes_left; + size_t bytes_written; }; static void handle_request(struct mg_connection *nc) { - // This handler gets called only when we don't reset proto_handler in - // handle_recv, so we just return a page unconditionally. + // This handler gets for all endpoints but /upload mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\n" "Content-Type: text/html\r\n" @@ -33,66 +32,40 @@ static void handle_request(struct mg_connection *nc) { nc->flags |= MG_F_SEND_AND_CLOSE; } -static void handle_recv(struct mg_connection *nc) { +static void handle_upload(struct mg_connection *nc, int ev, void *p) { struct file_writer_data *data = (struct file_writer_data *) nc->user_data; + struct mg_http_multipart_part *mp = (struct mg_http_multipart_part*)p; - if (data == NULL) { - // This is a new connection, try to parse HTTP request. - struct http_message hm; - int req_len = mg_parse_http(nc->recv_mbuf.buf, nc->recv_mbuf.len, &hm, - 1 /* is_req */); - - if (req_len < 0 || - (req_len == 0 && nc->recv_mbuf.len >= MG_MAX_HTTP_REQUEST_SIZE)) { - nc->flags |= MG_F_CLOSE_IMMEDIATELY; - } else if (req_len == 0) { - // Request is not complete yet, do nothing. - } else if (mg_vcasecmp(&hm.method, "POST") == 0 && - mg_vcmp(&hm.uri, "/upload") == 0) { - // This is the request that we don't want to buffer in memory. - - if (hm.body.len == (size_t) ~0 || hm.body.len == 0) { - mg_printf(nc, "%s", - "HTTP/1.1 411 Content-Length required\r\n" - "Content-Length: 0\r\n\r\n"); - nc->flags |= MG_F_SEND_AND_CLOSE; - return; + switch (ev) { + case MG_EV_HTTP_PART_BEGIN: { + if (data == NULL) { + data = calloc(1, sizeof(struct file_writer_data)); + data->fp = tmpfile(); + data->bytes_written = 0; + + if (data->fp == NULL) { + mg_printf(nc, "%s", + "HTTP/1.1 500 Failed to open a file\r\n" + "Content-Length: 0\r\n\r\n"); + nc->flags |= MG_F_SEND_AND_CLOSE; + return; + } + nc->user_data = (void *) data; } - - // Reset proto_handler so Mongoose's http_handler() won't get confused - // once we start removing data from the buffer. - nc->proto_handler = NULL; - - // Headers will be inaccessible later, so put everything we need into - // user_data. - data = calloc(1, sizeof(struct file_writer_data)); - data->bytes_left = hm.body.len; - data->fp = tmpfile(); - if (data->fp == NULL) { + break; + } + case MG_EV_HTTP_PART_DATA: { + if (fwrite(mp->data.p, 1, mp->data.len, data->fp) != mp->data.len) { mg_printf(nc, "%s", - "HTTP/1.1 500 Failed to open a file\r\n" + "HTTP/1.1 500 Failed to write to a file\r\n" "Content-Length: 0\r\n\r\n"); nc->flags |= MG_F_SEND_AND_CLOSE; return; } - nc->user_data = (void *) data; - - // Remove the headers from the buffer. - mbuf_remove(&nc->recv_mbuf, hm.body.p - nc->recv_mbuf.buf); - - // Invoke itself again to write the piece of the body that is already in - // the buffer. - handle_recv(nc); + data->bytes_written += mp->data.len; + break; } - } else { // data != NULL - size_t to_write = data->bytes_left, written = 0; - if (nc->recv_mbuf.len < to_write) to_write = nc->recv_mbuf.len; - written = fwrite(nc->recv_mbuf.buf, 1, to_write, data->fp); - mbuf_remove(&nc->recv_mbuf, written); - data->bytes_left -= written; - if (data->bytes_left <= 0) { - // Request is complete, do something meaningful here. - + case MG_EV_HTTP_PART_END: { mg_printf(nc, "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" @@ -100,22 +73,14 @@ static void handle_recv(struct mg_connection *nc) { "Written %ld of POST data to a temp file\n\n", (long) ftell(data->fp)); nc->flags |= MG_F_SEND_AND_CLOSE; - // handle_close will free the resources. + fclose(data->fp); + free(data); + nc->user_data = NULL; + break; } } } -// Make sure we free all allocated resources -static void handle_close(struct mg_connection *nc) { - struct file_writer_data *data = (struct file_writer_data *) nc->user_data; - - if (data != NULL) { - fclose(data->fp); - free(data); - nc->user_data = NULL; - } -} - static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { (void) ev_data; switch (ev) { @@ -123,13 +88,6 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { // Invoked when the full HTTP request is in the buffer (including body). handle_request(nc); break; - case MG_EV_RECV: - // Invoked every time new data arrives. - handle_recv(nc); - break; - case MG_EV_CLOSE: - handle_close(nc); - break; } } @@ -140,6 +98,7 @@ int main(void) { mg_mgr_init(&mgr, NULL); nc = mg_bind(&mgr, s_http_port, ev_handler); + mg_register_http_endpoint(nc, "/upload", handle_upload); // Set up HTTP server parameters mg_set_protocol_http_websocket(nc); diff --git a/examples/examples.mk b/examples/examples.mk index 443544742..4d5815d74 100644 --- a/examples/examples.mk +++ b/examples/examples.mk @@ -1,5 +1,5 @@ SOURCES = $(PROG).c ../../mongoose.c -CFLAGS = -W -Wall -I../.. -Wno-unused-function $(CFLAGS_EXTRA) $(MODULE_CFLAGS) +CFLAGS = -g -W -Wall -I../.. -Wno-unused-function $(CFLAGS_EXTRA) $(MODULE_CFLAGS) all: $(PROG) diff --git a/mongoose.c b/mongoose.c index 4300a704f..75a475ebd 100644 --- a/mongoose.c +++ b/mongoose.c @@ -1987,6 +1987,24 @@ int to_wchar(const char *path, wchar_t *wbuf, size_t wbuf_len) { } #endif /* _WIN32 */ +/* The simplest O(mn) algorithm. Better implementation are GPLed */ +const char *c_strnstr(const char *s, const char *find, size_t slen) { + size_t find_length = strlen(find); + size_t i; + + for (i = 0; i < slen; i++) { + if (i + find_length > slen) { + return NULL; + } + + if (strncmp(&s[i], find, find_length) == 0) { + return &s[i]; + } + } + + return NULL; +} + #endif /* EXCLUDE_COMMON */ #ifdef MG_MODULE_LINES #line 0 "./src/net.c" @@ -2137,6 +2155,10 @@ static void mg_destroy_conn(struct mg_connection *conn) { mbuf_free(&conn->recv_mbuf); mbuf_free(&conn->send_mbuf); mbuf_free(&conn->endpoints); +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART + mbuf_free(&conn->strm_state); +#endif + memset(conn, 0, sizeof(*conn)); MG_FREE(conn); } @@ -4877,6 +4899,42 @@ static mg_event_handler_t get_endpoint_handler(struct mg_connection *nc, return ret; } +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART +struct stream_info { + struct mg_str endpoint; + struct mg_str boundary; +}; + +/* + * Save/restore state into buf is convinient due to lack of + * protocol/connection parameters in mongoose + * once mongoose will have way to store connection/protocol + * related data these function can be replaced with usual structs + * TODO(alashkin): replace once those way will be implemented + */ +static void mg_parse_stream_info(struct mbuf *buf, struct stream_info *si) { + char *ptr = buf->buf; + memcpy(&si->endpoint.len, ptr, sizeof(si->endpoint.len)); + ptr += sizeof(si->endpoint.len); + si->endpoint.p = ptr; + ptr += si->endpoint.len; + memcpy(&si->boundary.len, ptr, sizeof(si->boundary.len)); + ptr += sizeof(si->boundary.len); + si->boundary.p = ptr; + ptr += si->boundary.len + 1; /* Explicitly zero-terminated */ +} + +static void mg_store_stream_info(struct mbuf *buf, struct stream_info *si) { + char zero = 0; + mbuf_append(buf, &si->endpoint.len, sizeof(si->endpoint.len)); + mbuf_append(buf, si->endpoint.p, si->endpoint.len); + mbuf_append(buf, &si->boundary.len, sizeof(si->boundary.len)); + mbuf_append(buf, si->boundary.p, si->boundary.len); + /* Make boundary zero terminated */ + mbuf_append(buf, &zero, 1); +} +#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ + static void mg_call_endpoint_handler(struct mg_connection *nc, int ev, struct http_message *hm) { mg_event_handler_t uri_handler = @@ -4885,6 +4943,16 @@ static void mg_call_endpoint_handler(struct mg_connection *nc, int ev, mg_call(nc, uri_handler ? uri_handler : nc->handler, ev, hm); } +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART +static void mg_multipart_continue(struct mg_connection *nc, struct mbuf *io, + int req_len, int ev, void *ev_data); + +static void mg_multipart_begin(struct mg_connection *nc, + struct http_message *hm, struct mbuf *io, + int req_len); + +#endif + /* * lx106 compiler has a bug (TODO(mkm) report and insert tracking bug here) * If a big structure is declared in a big function, lx106 gcc will make it @@ -4944,7 +5012,12 @@ void http_handler(struct mg_connection *nc, int ev, void *ev_data) { mg_handle_chunked(nc, hm, io->buf + req_len, io->len - req_len); } - if (req_len < 0 || (req_len == 0 && io->len >= MG_MAX_HTTP_REQUEST_SIZE)) { + if ( +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART + nc->strm_state.len == 0 && +#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ + (req_len < 0 || + (req_len == 0 && io->len >= MG_MAX_HTTP_REQUEST_SIZE))) { DBG(("invalid request")); nc->flags |= MG_F_CLOSE_IMMEDIATELY; } else if (req_len == 0) { @@ -4976,9 +5049,12 @@ void http_handler(struct mg_connection *nc, int ev, void *ev_data) { mg_call(nc, nc->handler, MG_EV_WEBSOCKET_HANDSHAKE_DONE, NULL); websocket_handler(nc, MG_EV_RECV, ev_data); } - } #endif /* MG_DISABLE_HTTP_WEBSOCKET */ - else if (hm->message.len <= io->len) { +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART + } else if (nc->strm_state.len != 0) { + mg_multipart_continue(nc, io, req_len, ev, ev_data); +#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ + } else if (hm->message.len <= io->len) { int trigger_ev = nc->listener ? MG_EV_HTTP_REQUEST : MG_EV_HTTP_REPLY; /* Whole HTTP message is fully buffered, call event handler */ @@ -5029,12 +5105,173 @@ void http_handler(struct mg_connection *nc, int ev, void *ev_data) { mg_call_endpoint_handler(nc, trigger_ev, hm); } #else - mg_call_endpoint_handler(nc, trigger_ev, hm); + mg_call_endpoint_handler(nc, trigger_ev, hm); #endif mbuf_remove(io, hm->message.len); +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART + } else { + mg_multipart_begin(nc, hm, io, req_len); +#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ + } + } +} + +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART +static void mg_multipart_begin(struct mg_connection *nc, + struct http_message *hm, struct mbuf *io, + int req_len) { + struct mg_str *ct; + const char multipart[] = "multipart"; + char boundary[100]; + int boundary_len; + struct stream_info si; + mg_event_handler_t handler; + + if (nc->listener == NULL) { + /* No streaming for replies now */ + goto exit_mp; + } + + ct = mg_get_http_header(hm, "Content-Type"); + if (ct == NULL) { + /* We need more data - or it isn't multipart mesage */ + goto exit_mp; + } + + /* Content-type should start with "multipart" */ + if (strncmp(ct->p, "multipart", ct->len < sizeof(multipart) - 1 + ? ct->len + : sizeof(multipart) - 1) != 0) { + goto exit_mp; + } + + boundary_len = + mg_http_parse_header(ct, "boundary", boundary, sizeof(boundary)); + if (boundary_len == 0) { + /* + * Content type is multipart, but there is no boundary, + * probably malformed request + */ + nc->flags = MG_F_CLOSE_IMMEDIATELY; + DBG(("invalid request")); + goto exit_mp; + } + + /* If we reach this place - that is multipart request */ + + if (nc->strm_state.len != 0) { + /* + * Another streaming request was in progress, + * looks like protocol error + */ + nc->flags |= MG_F_CLOSE_IMMEDIATELY; + mbuf_free(&nc->strm_state); + } else { + si.endpoint = hm->uri; + si.boundary.p = boundary; + si.boundary.len = boundary_len; + + mg_store_stream_info(&nc->strm_state, &si); + handler = get_endpoint_handler(nc->listener, &si.endpoint); + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_MULTIPART_REQUEST, + hm); + + mbuf_remove(io, req_len); + } +exit_mp: + ; +} + +static void mg_multipart_continue(struct mg_connection *nc, struct mbuf *io, + int req_len, int ev, void *ev_data) { + /* Continue to stream multipart */ + struct stream_info si; + mg_event_handler_t handler; + struct mg_http_multipart_part mp; + const char *boundary; + + mg_parse_stream_info(&nc->strm_state, &si); + handler = get_endpoint_handler(nc->listener, &si.endpoint); + memset(&mp, 0, sizeof(mp)); + + boundary = c_strnstr(io->buf, si.boundary.p, io->len); + if (boundary == NULL) { + mp.data.p = io->buf; + mp.data.len = io->len; + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_DATA, &mp); + mbuf_remove(io, io->len); + } else { + int has_prefix = 0, has_suffix = 0; + if (boundary - 2 >= io->buf) { + has_prefix = (strncmp(boundary - 2, "--", 2) == 0); + } + if (boundary + si.boundary.len <= io->buf + io->len) { + has_suffix = (strncmp(boundary + si.boundary.len, "--", 2) == 0); + } + if (has_prefix && !has_suffix) { + /* No suffix - not last boundary */ + char varname[100] = {0}, filename[100] = {0}; + const char *data = NULL; + size_t data_len = 0; + /* Send previous part (if any) to callback */ + if (boundary - io->buf != 2) { /* -- */ + mp.data.p = io->buf; + mp.data.len = boundary - io->buf - 4; /* --\r\n */ + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_DATA, &mp); + + mbuf_remove(io, mp.data.len + 2); + } + + mg_parse_multipart(io->buf, io->len, varname, sizeof(varname), filename, + sizeof(filename), &data, &data_len); + mp.file_name = filename; + mp.var_name = varname; + if ((req_len = get_request_len(io->buf, io->len)) > 0) { + const char *tmp; + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_BEGIN, + &mp); + + mbuf_remove(io, req_len); + mp.data.p = io->buf; + + tmp = c_strnstr(io->buf, si.boundary.p, io->len); + if (tmp == NULL) { + mp.data.len = io->len; + } else { + mp.data.len = tmp - io->buf - 2; + } + + if (mp.data.len != 0) { + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_DATA, + &mp); + mbuf_remove(io, mp.data.len); + } + + if (io->len != 0) { + http_handler(nc, ev, ev_data); + } + } /* else wait for data */ + } else if (has_prefix && has_suffix) { + /* Last boundary */ + mp.data.p = io->buf; + mp.data.len = boundary - io->buf - 4; + if (mp.data.len != 0) { + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_DATA, &mp); + } + + mg_call(nc, handler ? handler : nc->handler, MG_EV_HTTP_PART_END, &mp); + + /* Skip epilogue (if any) */ + mbuf_remove(io, io->len); + mbuf_free(&nc->strm_state); + } else { + /* Malformed request */ + nc->flags |= MG_F_CLOSE_IMMEDIATELY; + DBG(("invalid request")); } } } +#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ void mg_set_protocol_http_websocket(struct mg_connection *nc) { nc->proto_handler = http_handler; diff --git a/mongoose.h b/mongoose.h index 4b198f90b..4b22a5f7b 100644 --- a/mongoose.h +++ b/mongoose.h @@ -816,6 +816,11 @@ extern "C" { int c_snprintf(char *buf, size_t buf_size, const char *format, ...); int c_vsnprintf(char *buf, size_t buf_size, const char *format, va_list ap); +/* + * Find the first occurrence of find in s, where the search is limited to the + * first slen characters of s. + */ +const char *c_strnstr(const char *s, const char *find, size_t slen); #if (!(defined(_XOPEN_SOURCE) && _XOPEN_SOURCE >= 700) && \ !(defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200809L) && \ @@ -829,6 +834,7 @@ size_t strnlen(const char *s, size_t maxlen); #ifdef __cplusplus } #endif + #endif /* * Copyright (c) 2004-2013 Sergey Lyubka <valenok@gmail.com> @@ -1069,6 +1075,9 @@ struct mg_connection { void *priv_2; /* Used by mg_enable_multithreading() */ struct mbuf endpoints; /* Used by mg_register_http_endpoint */ void *mgr_data; /* Implementation-specific event manager's data. */ +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART + struct mbuf strm_state; /* Used by multi-part streaming */ +#endif unsigned long flags; /* Flags set by Mongoose */ #define MG_F_LISTENING (1 << 0) /* This connection is listening */ @@ -1887,6 +1896,12 @@ struct websocket_message { unsigned char flags; }; +struct mg_http_multipart_part { + const char *file_name; + const char *var_name; + struct mg_str data; +}; + /* HTTP and websocket events. void *ev_data is described in a comment. */ #define MG_EV_HTTP_REQUEST 100 /* struct http_message * */ #define MG_EV_HTTP_REPLY 101 /* struct http_message * */ @@ -1898,13 +1913,25 @@ struct websocket_message { #define MG_EV_WEBSOCKET_FRAME 113 /* struct websocket_message * */ #define MG_EV_WEBSOCKET_CONTROL_FRAME 114 /* struct websocket_message * */ +#ifdef MG_ENABLE_HTTP_STREAMING_MULTIPART +#define MG_EV_HTTP_MULTIPART_REQUEST 121 /* struct http_message */ +#define MG_EV_HTTP_PART_BEGIN 122 /* struct mg_http_multipart_part */ +#define MG_EV_HTTP_PART_DATA 123 /* struct mg_http_multipart_part */ +#define MG_EV_HTTP_PART_END 124 /* struct mg_http_multipart_part */ +#endif + /* * Attach built-in HTTP event handler to the given connection. * User-defined event handler will receive following extra events: * - * - MG_EV_HTTP_REQUEST: HTTP request has arrived. Parsed HTTP request is passed - *as + * - MG_EV_HTTP_REQUEST: HTTP request has arrived. Parsed HTTP request + * is passed as * `struct http_message` through the handler's `void *ev_data` pointer. + * - MG_EV_HTTP_MULTIPART_REQUEST: A multipart POST request has received. + * This event is sent before body is parsed. After this user + * should expect a sequence of MG_EV_HTTP_PART_BEGIN/DATA/END requests. + * This is also the last time when headers and other request fields are + * accessible. * - MG_EV_HTTP_REPLY: HTTP reply has arrived. Parsed HTTP reply is passed as * `struct http_message` through the handler's `void *ev_data` pointer. * - MG_EV_HTTP_CHUNK: HTTP chunked-encoding chunk has arrived. @@ -1925,6 +1952,16 @@ struct websocket_message { * `ev_data` is `NULL`. * - MG_EV_WEBSOCKET_FRAME: new websocket frame has arrived. `ev_data` is * `struct websocket_message *` + * - MG_EV_HTTP_PART_BEGIN: new part of multipart message is started, + * extra parameters are passed in mg_http_multipart_part + * - MG_EV_HTTP_PART_DATA: new portion of data from multiparted message + * no additional headers are available, only data and data size + * - MG_EV_HTTP_PART_END: final boundary received, analogue to maybe used to + * find the end of packet + * Note: Mongoose should be compiled with MG_ENABLE_HTTP_STREAMING_MULTIPART + * to enable MG_EV_HTTP_MULTIPART_REQUEST, MG_EV_HTTP_REQUEST_END, + * MG_EV_HTTP_REQUEST_CANCEL, MG_EV_HTTP_PART_BEGIN, MG_EV_HTTP_PART_DATA, + * MG_EV_HTTP_PART_END constants */ void mg_set_protocol_http_websocket(struct mg_connection *nc); -- GitLab