From 3a8f47befd68ac6236cf2f38aa016cafdf552c23 Mon Sep 17 00:00:00 2001
From: Sergey Lyubka <lsm@cesanta.com>
Date: Thu, 27 Oct 2016 20:43:56 +0300
Subject: [PATCH] Mqtt: parsing auth info in the connect message

PUBLISHED_FROM=017e707ea2bd7b1d1972fbb92e5d98c74e17e9d0
---
 docs/c-api/mqtt.h/intro.md                    |   1 -
 .../mqtt.h/struct_mg_mqtt_topic_expression.md |  13 --
 examples/mqtt_client/mqtt_client.c            |   6 +-
 mongoose.c                                    | 119 +++++++++++-------
 mongoose.h                                    |  16 ++-
 5 files changed, 94 insertions(+), 61 deletions(-)
 delete mode 100644 docs/c-api/mqtt.h/struct_mg_mqtt_topic_expression.md

diff --git a/docs/c-api/mqtt.h/intro.md b/docs/c-api/mqtt.h/intro.md
index 77175619e..7c5201346 100644
--- a/docs/c-api/mqtt.h/intro.md
+++ b/docs/c-api/mqtt.h/intro.md
@@ -20,7 +20,6 @@ items:
   - { name: mg_send_mqtt_handshake.md }
   - { name: mg_send_mqtt_handshake_opt.md }
   - { name: mg_set_protocol_mqtt.md }
-  - { name: struct_mg_mqtt_topic_expression.md }
 ---
 
 
diff --git a/docs/c-api/mqtt.h/struct_mg_mqtt_topic_expression.md b/docs/c-api/mqtt.h/struct_mg_mqtt_topic_expression.md
deleted file mode 100644
index 8ae14b2d1..000000000
--- a/docs/c-api/mqtt.h/struct_mg_mqtt_topic_expression.md
+++ /dev/null
@@ -1,13 +0,0 @@
----
-title: "struct mg_mqtt_topic_expression"
-decl_name: "struct mg_mqtt_topic_expression"
-symbol_kind: "struct"
-signature: |
-  struct mg_mqtt_topic_expression {
-    const char *topic;
-    uint8_t qos;
-  };
----
-
-puback 
-
diff --git a/examples/mqtt_client/mqtt_client.c b/examples/mqtt_client/mqtt_client.c
index 4be13b495..75cd1ee44 100644
--- a/examples/mqtt_client/mqtt_client.c
+++ b/examples/mqtt_client/mqtt_client.c
@@ -63,10 +63,10 @@ static void ev_handler(struct mg_connection *nc, int ev, void *p) {
 #if 0
         char hex[1024] = {0};
         mg_hexdump(nc->recv_mbuf.buf, msg->payload.len, hex, sizeof(hex));
-        printf("Got incoming message %s:\n%s", msg->topic, hex);
+        printf("Got incoming message %.*s:\n%s", (int)msg->topic.len, msg->topic.p, hex);
 #else
-      printf("Got incoming message %s: %.*s\n", msg->topic,
-             (int) msg->payload.len, msg->payload.p);
+      printf("Got incoming message %.*s: %.*s\n", (int)msg->topic.len,
+             msg->topic.p,  (int) msg->payload.len, msg->payload.p);
 #endif
 
       printf("Forwarding to /test\n");
diff --git a/mongoose.c b/mongoose.c
index ab5d5e46e..7911cc243 100644
--- a/mongoose.c
+++ b/mongoose.c
@@ -8246,72 +8246,100 @@ MG_INTERNAL int mg_get_errno(void) {
 /* Amalgamated: #include "mongoose/src/internal.h" */
 /* Amalgamated: #include "mongoose/src/mqtt.h" */
 
+static const char *scanto(const char *p, struct mg_str *s) {
+  s->len = ntohs(*(uint16_t *) p);
+  s->p = p + 2;
+  return p + 2 + s->len;
+}
+
 MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
   uint8_t header;
-  int cmd;
   size_t len = 0;
-  int var_len = 0;
-  char *vlen = &io->buf[1];
+  int cmd;
+  const char *p = &io->buf[1], *end;
 
   if (io->len < 2) return -1;
-
   header = io->buf[0];
   cmd = header >> 4;
 
   /* decode mqtt variable length */
   do {
-    len += (*vlen & 127) << 7 * (vlen - &io->buf[1]);
-  } while ((*vlen++ & 128) != 0 && ((size_t)(vlen - io->buf) <= io->len));
+    len += (*p & 127) << 7 * (p - &io->buf[1]);
+  } while ((*p++ & 128) != 0 && ((size_t)(p - io->buf) <= io->len));
 
-  if (len != 0 && io->len < (size_t)(len - 1)) return -1;
+  end = p + len;
+  if (end > io->buf + io->len + 1) {
+    return -1;
+  }
 
-  mbuf_remove(io, 1 + (vlen - &io->buf[1]));
   mm->cmd = cmd;
   mm->qos = MG_MQTT_GET_QOS(header);
 
   switch (cmd) {
-    case MG_MQTT_CMD_CONNECT:
-      /* TODO(mkm): parse keepalive and will */
+    case MG_MQTT_CMD_CONNECT: {
+      p = scanto(p, &mm->protocol_name);
+      mm->protocol_version = *(uint8_t *) p++;
+      mm->connect_flags = *(uint8_t *) p++;
+      mm->keep_alive_timer = ntohs(*(uint16_t *) p);
+      p += 2;
+      if (p < end) p = scanto(p, &mm->client_id);
+      if (p < end && (mm->connect_flags & MG_MQTT_HAS_WILL))
+        p = scanto(p, &mm->will_topic);
+      if (p < end && (mm->connect_flags & MG_MQTT_HAS_WILL))
+        p = scanto(p, &mm->will_message);
+      if (p < end && (mm->connect_flags & MG_MQTT_HAS_USER_NAME))
+        p = scanto(p, &mm->user_name);
+      if (p < end && (mm->connect_flags & MG_MQTT_HAS_PASSWORD))
+        p = scanto(p, &mm->password);
+
+      LOG(LL_DEBUG,
+          ("%d %2x %d proto [%.*s] client_id [%.*s] will_topic [%.*s] "
+           "will_msg [%.*s] user_name [%.*s] password [%.*s]",
+           len, (int) mm->connect_flags, (int) mm->keep_alive_timer,
+           (int) mm->protocol_name.len, mm->protocol_name.p,
+           (int) mm->client_id.len, mm->client_id.p, (int) mm->will_topic.len,
+           mm->will_topic.p, (int) mm->will_message.len, mm->will_message.p,
+           (int) mm->user_name.len, mm->user_name.p, (int) mm->password.len,
+           mm->password.p));
       break;
+    }
     case MG_MQTT_CMD_CONNACK:
-      mm->connack_ret_code = io->buf[1];
-      var_len = 2;
+      mm->connack_ret_code = p[1];
       break;
     case MG_MQTT_CMD_PUBACK:
     case MG_MQTT_CMD_PUBREC:
     case MG_MQTT_CMD_PUBREL:
     case MG_MQTT_CMD_PUBCOMP:
     case MG_MQTT_CMD_SUBACK:
-      mm->message_id = ntohs(*(uint16_t *) io->buf);
-      var_len = 2;
+      mm->message_id = ntohs(*(uint16_t *) p);
       break;
     case MG_MQTT_CMD_PUBLISH: {
-      uint16_t topic_len = ntohs(*(uint16_t *) io->buf);
-      mm->topic = (char *) MG_MALLOC(topic_len + 1);
-      mm->topic[topic_len] = 0;
-      strncpy(mm->topic, io->buf + 2, topic_len);
-      var_len = topic_len + 2;
-
       if (MG_MQTT_GET_QOS(header) > 0) {
         mm->message_id = ntohs(*(uint16_t *) io->buf);
-        var_len += 2;
+        p += 2;
       }
-    } break;
+      p = scanto(p, &mm->topic);
+
+      mm->payload.p = p;
+      mm->payload.len = end - p;
+      break;
+    }
     case MG_MQTT_CMD_SUBSCRIBE:
+      mm->message_id = ntohs(*(uint16_t *) p);
+      p += 2;
       /*
        * topic expressions are left in the payload and can be parsed with
        * `mg_mqtt_next_subscribe_topic`
        */
-      mm->message_id = ntohs(*(uint16_t *) io->buf);
-      var_len = 2;
+      mm->payload.p = p;
+      mm->payload.len = end - p;
       break;
     default:
       /* Unhandled command */
       break;
   }
 
-  mbuf_remove(io, var_len);
-  return len - var_len;
+  return end - io->buf;
 }
 
 static void mqtt_handler(struct mg_connection *nc, int ev, void *ev_data) {
@@ -8326,15 +8354,8 @@ static void mqtt_handler(struct mg_connection *nc, int ev, void *ev_data) {
     case MG_EV_RECV:
       len = parse_mqtt(io, &mm);
       if (len == -1) break; /* not fully buffered */
-      mm.payload.p = io->buf;
-      mm.payload.len = len;
-
       nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm);
-
-      if (mm.topic) {
-        MG_FREE(mm.topic);
-      }
-      mbuf_remove(io, mm.payload.len);
+      mbuf_remove(io, len);
       break;
   }
 }
@@ -8463,6 +8484,7 @@ void mg_mqtt_subscribe(struct mg_connection *nc,
 int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
                                  struct mg_str *topic, uint8_t *qos, int pos) {
   unsigned char *buf = (unsigned char *) msg->payload.p + pos;
+
   if ((size_t) pos >= msg->payload.len) {
     return -1;
   }
@@ -8610,7 +8632,7 @@ void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) {
 
 static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
                                           struct mg_connection *nc) {
-  struct mg_mqtt_session *s = (struct mg_mqtt_session *) malloc(sizeof *s);
+  struct mg_mqtt_session *s = (struct mg_mqtt_session *) calloc(1, sizeof *s);
   if (s == NULL) {
     /* LCOV_EXCL_START */
     mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_SERVER_UNAVAILABLE);
@@ -8630,6 +8652,7 @@ static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
 
 static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
                                             struct mg_mqtt_message *msg) {
+
   struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->user_data;
   uint8_t qoss[512];
   size_t qoss_len = 0;
@@ -8643,6 +8666,7 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
     qoss[qoss_len++] = qos;
   }
 
+
   ss->subscriptions = (struct mg_mqtt_topic_expression *) realloc(
       ss->subscriptions, sizeof(*ss->subscriptions) * qoss_len);
   for (pos = 0;
@@ -8664,13 +8688,17 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
  *
  * Returns 1 if it matches; 0 otherwise.
  */
-static int mg_mqtt_match_topic_expression(const char *exp, const char *topic) {
+static int mg_mqtt_match_topic_expression(const char *exp,
+                                          const struct mg_str *topic) {
   /* TODO(mkm): implement real matching */
-  int len = strlen(exp);
+  size_t len = strlen(exp);
   if (strchr(exp, '#')) {
     len -= 2;
+    if (topic->len < len) {
+      len = topic->len;
+    }
   }
-  return strncmp(exp, topic, len) == 0;
+  return strncmp(topic->p, exp, len) == 0;
 }
 
 static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
@@ -8681,9 +8709,16 @@ static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
   for (s = mg_mqtt_next(brk, NULL); s != NULL; s = mg_mqtt_next(brk, s)) {
     for (i = 0; i < s->num_subscriptions; i++) {
       if (mg_mqtt_match_topic_expression(s->subscriptions[i].topic,
-                                         msg->topic)) {
-        mg_mqtt_publish(s->nc, msg->topic, 0, 0, msg->payload.p,
-                        msg->payload.len);
+                                         &msg->topic)) {
+        char buf[100], *p = buf;
+        mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len, msg->topic.p);
+        if (p == NULL) {
+          return;
+        }
+        mg_mqtt_publish(s->nc, p, 0, 0, msg->payload.p, msg->payload.len);
+        if (p != buf) {
+          MG_FREE(p);
+        }
         break;
       }
     }
@@ -8714,7 +8749,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
       mg_mqtt_broker_handle_publish(brk, msg);
       break;
     case MG_EV_CLOSE:
-      if (nc->listener) {
+      if (nc->listener && nc->user_data != NULL) {
         mg_mqtt_close_session((struct mg_mqtt_session *) nc->user_data);
       }
       break;
diff --git a/mongoose.h b/mongoose.h
index 336c2eba2..c0148dc47 100644
--- a/mongoose.h
+++ b/mongoose.h
@@ -3624,11 +3624,23 @@ int mg_http_create_digest_auth_header(char *buf, size_t buf_len,
 
 struct mg_mqtt_message {
   int cmd;
-  struct mg_str payload;
   int qos;
+  struct mg_str topic;
+  struct mg_str payload;
+
   uint8_t connack_ret_code; /* connack */
   uint16_t message_id;      /* puback */
-  char *topic;
+
+  /* connect */
+  uint8_t protocol_version;
+  uint8_t connect_flags;
+  uint16_t keep_alive_timer;
+  struct mg_str protocol_name;
+  struct mg_str client_id;
+  struct mg_str will_topic;
+  struct mg_str will_message;
+  struct mg_str user_name;
+  struct mg_str password;
 };
 
 struct mg_mqtt_topic_expression {
-- 
GitLab