From 09e4254718300644ccc90c28aa0a32975d49237b Mon Sep 17 00:00:00 2001 From: Our CI Bot <ourci@cesanta.com> Date: Thu, 24 Aug 2017 11:29:02 +0100 Subject: [PATCH] Add mqtt_over_websocket_server example Fixed bug in websocket handshake: now adding Sec-WebSocket-Protocol header in handshake response. PUBLISHED_FROM=e4a71ff9dc4aeec63db40cb7f356dc5b25c1215a --- examples/mqtt_over_websocket_server/Makefile | 4 + .../mqtt_over_websocket_server.c | 124 ++++++++++++++++++ mongoose.c | 40 ++++-- mongoose.h | 1 + 4 files changed, 156 insertions(+), 13 deletions(-) create mode 100644 examples/mqtt_over_websocket_server/Makefile create mode 100644 examples/mqtt_over_websocket_server/mqtt_over_websocket_server.c diff --git a/examples/mqtt_over_websocket_server/Makefile b/examples/mqtt_over_websocket_server/Makefile new file mode 100644 index 000000000..7f6be7c6c --- /dev/null +++ b/examples/mqtt_over_websocket_server/Makefile @@ -0,0 +1,4 @@ +PROG = mqtt_over_websocket_server +MODULE_CFLAGS = -DMG_ENABLE_MQTT_BROKER=1 +#SSL_LIB=mbedtls +include ../examples.mk diff --git a/examples/mqtt_over_websocket_server/mqtt_over_websocket_server.c b/examples/mqtt_over_websocket_server/mqtt_over_websocket_server.c new file mode 100644 index 000000000..885921ce9 --- /dev/null +++ b/examples/mqtt_over_websocket_server/mqtt_over_websocket_server.c @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2014 Cesanta Software Limited + * All rights reserved + * This software is dual-licensed: you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. For the terms of this + * license, see <http://www.gnu.org/licenses/>. + * + * You are free to use this software under the terms of the GNU General + * Public License, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * Alternatively, you can license this software under a commercial + * license, as set out in <https://www.cesanta.com/license>. + */ + +#include "mongoose.h" + +static const char *s_mqtt_address = "0.0.0.0:1883"; +static const char *s_http_address = "0.0.0.0:8080"; + +static void unproxy(struct mg_connection *c) { + struct mg_connection *pc = (struct mg_connection *) c->user_data; + if (pc != NULL) { + pc->flags |= MG_F_CLOSE_IMMEDIATELY; + pc->user_data = NULL; + c->user_data = NULL; + } + printf("Closing connection %p\n", c); +} + +static void proxy_handler(struct mg_connection *c, int ev, void *ev_data) { + if (ev == MG_EV_POLL) return; + printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data); + switch (ev) { + case MG_EV_CLOSE: { + unproxy(c); + break; + } + case MG_EV_RECV: { + struct mg_connection *pc = (struct mg_connection *) c->user_data; + if (pc != NULL) { + mg_send_websocket_frame(pc, WEBSOCKET_OP_BINARY, c->recv_mbuf.buf, + c->recv_mbuf.len); + mbuf_remove(&c->recv_mbuf, c->recv_mbuf.len); + } + break; + } + } +} + +static void http_handler(struct mg_connection *c, int ev, void *ev_data) { + struct mg_connection *pc = (struct mg_connection *) c->user_data; + if (ev == MG_EV_POLL) return; + printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data); + /* Do your custom event processing here */ + switch (ev) { + case MG_EV_WEBSOCKET_HANDSHAKE_DONE: { + pc = mg_connect(c->mgr, s_mqtt_address, proxy_handler); + pc->user_data = c; + c->user_data = pc; + printf("Created proxy connection %p\n", pc); + break; + } + case MG_EV_WEBSOCKET_FRAME: { + struct websocket_message *wm = (struct websocket_message *) ev_data; + if (pc != NULL) { + printf("Forwarding %d bytes\n", (int) wm->size); + mg_send(pc, wm->data, wm->size); + } + break; + } + case MG_EV_CLOSE: { + unproxy(c); + break; + } + } +} + +static void mqtt_handler(struct mg_connection *c, int ev, void *ev_data) { + if (ev == MG_EV_POLL) return; + printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data); + /* Do your custom event processing here */ + switch (ev) { + case MG_EV_CLOSE: + printf("Closing MQTT connection %p\n", c); + break; + } + mg_mqtt_broker(c, ev, ev_data); +} + +static void start_mqtt_server(struct mg_mgr *mgr, const char *addr) { + struct mg_connection *c; + static struct mg_mqtt_broker brk; // static is important - must not perish + if ((c = mg_bind(mgr, addr, mqtt_handler)) == NULL) { + fprintf(stderr, "Cannot start MQTT server on port [%s]\n", addr); + exit(EXIT_FAILURE); + } + mg_mqtt_broker_init(&brk, NULL); + c->user_data = &brk; + mg_set_protocol_mqtt(c); + printf("MQTT server started on %s\n", addr); +} + +static void start_http_server(struct mg_mgr *mgr, const char *addr) { + struct mg_connection *c; + if ((c = mg_bind(mgr, addr, http_handler)) == NULL) { + fprintf(stderr, "Cannot start HTTP server on port [%s]\n", addr); + exit(EXIT_FAILURE); + } + mg_set_protocol_http_websocket(c); + printf("HTTP server started on %s\n", addr); +} + +int main(void) { + struct mg_mgr mgr; + mg_mgr_init(&mgr, NULL); + start_http_server(&mgr, s_http_address); + start_mqtt_server(&mgr, s_mqtt_address); + for (;;) { + mg_mgr_poll(&mgr, 1000); + } +} diff --git a/mongoose.c b/mongoose.c index a765594a1..eac8391dc 100644 --- a/mongoose.c +++ b/mongoose.c @@ -34,9 +34,9 @@ #define MG_DISABLE_PFS #endif -/* Amalgamated: #include "mongoose/src/net.h" */ -/* Amalgamated: #include "mongoose/src/http.h" */ /* Amalgamated: #include "common/cs_dbg.h" */ +/* Amalgamated: #include "mongoose/src/http.h" */ +/* Amalgamated: #include "mongoose/src/net.h" */ #define MG_CTL_MSG_MESSAGE_SIZE 8192 @@ -140,7 +140,8 @@ MG_INTERNAL void mg_handle_put(struct mg_connection *nc, const char *path, MG_INTERNAL void mg_ws_handler(struct mg_connection *nc, int ev, void *ev_data MG_UD_ARG(void *user_data)); MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc, - const struct mg_str *key); + const struct mg_str *key, + struct http_message *); #endif #endif /* MG_ENABLE_HTTP */ @@ -6021,7 +6022,7 @@ void mg_http_handler(struct mg_connection *nc, int ev, hm); if (!(nc->flags & (MG_F_CLOSE_IMMEDIATELY | MG_F_SEND_AND_CLOSE))) { if (nc->send_mbuf.len == 0) { - mg_ws_handshake(nc, vec); + mg_ws_handshake(nc, vec, hm); } mg_call(nc, nc->handler, nc->user_data, MG_EV_WEBSOCKET_HANDSHAKE_DONE, NULL); @@ -9164,8 +9165,9 @@ static int mg_deliver_websocket_data(struct mg_connection *nc) { unsigned char *p = (unsigned char *) nc->recv_mbuf.buf, *buf = p, *e = p + buf_len; unsigned *sizep = (unsigned *) &p[1]; /* Size ptr for defragmented frames */ - int ok, reass = buf_len > 0 && mg_is_ws_fragment(p[0]) && - !(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG); + int ok; + int reass = buf_len > 0 && mg_is_ws_fragment(p[0]) && + !(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG); /* If that's a continuation frame that must be reassembled, handle it */ if (reass && !mg_is_ws_first_fragment(p[0]) && @@ -9418,21 +9420,28 @@ extern void mg_hash_sha1_v(size_t num_msgs, const uint8_t *msgs[], #endif MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc, - const struct mg_str *key) { + const struct mg_str *key, + struct http_message *hm) { static const char *magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; const uint8_t *msgs[2] = {(const uint8_t *) key->p, (const uint8_t *) magic}; const size_t msg_lens[2] = {key->len, 36}; unsigned char sha[20]; char b64_sha[30]; + struct mg_str *s; mg_hash_sha1_v(2, msgs, msg_lens, sha); mg_base64_encode(sha, sizeof(sha), b64_sha); - mg_printf(nc, "%s%s%s", + mg_printf(nc, "%s", "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Accept: ", - b64_sha, "\r\n\r\n"); + "Connection: Upgrade\r\n"); + + s = mg_get_http_header(hm, "Sec-WebSocket-Protocol"); + if (s != NULL) { + mg_printf(nc, "Sec-WebSocket-Protocol: %.*s\r\n", (int) s->len, s->p); + } + mg_printf(nc, "Sec-WebSocket-Accept: %s%s", b64_sha, "\r\n\r\n"); + DBG(("%p %.*s %s", nc, (int) key->len, key->p, b64_sha)); } @@ -9996,7 +10005,8 @@ MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) { break; } - return end - io->buf; + mm->len = end - io->buf; + return mm->len; } static void mqtt_handler(struct mg_connection *nc, int ev, @@ -10008,6 +10018,9 @@ static void mqtt_handler(struct mg_connection *nc, int ev, nc->handler(nc, ev, ev_data MG_UD_ARG(user_data)); switch (ev) { + case MG_EV_ACCEPT: + if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc); + break; case MG_EV_RECV: { /* There can be multiple messages in the buffer, process them all. */ while (1) { @@ -10027,6 +10040,7 @@ static void mqtt_handler(struct mg_connection *nc, int ev, LOG(LL_DEBUG, ("Send PINGREQ")); mg_mqtt_ping(nc); } + break; } } } @@ -10438,7 +10452,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) { switch (ev) { case MG_EV_ACCEPT: - mg_set_protocol_mqtt(nc); + if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc); nc->user_data = NULL; /* Clear up the inherited pointer to broker */ break; case MG_EV_MQTT_CONNECT: diff --git a/mongoose.h b/mongoose.h index a94142741..a7689ba6c 100644 --- a/mongoose.h +++ b/mongoose.h @@ -5087,6 +5087,7 @@ int mg_http_create_digest_auth_header(char *buf, size_t buf_len, struct mg_mqtt_message { int cmd; int qos; + int len; /* message length in the IO buffer */ struct mg_str topic; struct mg_str payload; -- GitLab