From bba29d32ce7d3982bd519898feff3c6b2991d5ea Mon Sep 17 00:00:00 2001
From: Sergey Lyubka <valenok@gmail.com>
Date: Thu, 5 Dec 2013 10:01:11 +0000
Subject: [PATCH] Using socketpair for inter-thread communication

---
 build/src/internal.h |  15 ++---
 build/src/mongoose.c | 105 +++++++++++++-----------------
 build/src/win32.c    |  28 --------
 mongoose.c           | 148 ++++++++++++++-----------------------------
 4 files changed, 94 insertions(+), 202 deletions(-)

diff --git a/build/src/internal.h b/build/src/internal.h
index 932c4e7ce..11c2e85e2 100644
--- a/build/src/internal.h
+++ b/build/src/internal.h
@@ -152,7 +152,6 @@ typedef long off_t;
 #endif // !fileno MINGW #defines fileno
 
 typedef HANDLE pthread_mutex_t;
-typedef struct {HANDLE signal, broadcast;} pthread_cond_t;
 typedef DWORD pthread_t;
 #define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here.
 
@@ -252,6 +251,8 @@ typedef int SOCKET;
 #define MAX_REQUEST_SIZE 16384
 #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
 
+typedef SOCKET sock_t;
+
 #ifdef DEBUG_TRACE
 #undef DEBUG_TRACE
 #define DEBUG_TRACE(x)
@@ -407,19 +408,11 @@ struct mg_context {
   char *config[NUM_OPTIONS];      // Mongoose configuration parameters
   mg_event_handler_t event_handler;  // User-defined callback function
   void *user_data;                // User-defined data
-
   struct socket *listening_sockets;
   int num_listening_sockets;
+  int num_threads;    // Number of threads
+  sock_t ctl[2];     // Socket pair for inter-thread communication
 
-  volatile int num_threads;  // Number of threads
-  pthread_mutex_t mutex;     // Protects (max|num)_threads
-  pthread_cond_t  cond;      // Condvar for tracking workers terminations
-
-  struct socket queue[MGSQLEN];   // Accepted sockets
-  volatile int sq_head;      // Head of the socket queue
-  volatile int sq_tail;      // Tail of the socket queue
-  pthread_cond_t sq_full;    // Signaled when socket is produced
-  pthread_cond_t sq_empty;   // Signaled when socket is consumed
 };
 
 struct mg_connection {
diff --git a/build/src/mongoose.c b/build/src/mongoose.c
index 033b5e7a6..8c49ffbbd 100644
--- a/build/src/mongoose.c
+++ b/build/src/mongoose.c
@@ -12,6 +12,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) {
     0 : conn->ctx->event_handler(&conn->event);
 }
 
+static int mg_socketpair(sock_t sp[2]) {
+  struct sockaddr_in sa;
+  sock_t sock, ret = -1;
+  socklen_t len = sizeof(sa);
+
+  sp[0] = sp[1] = INVALID_SOCKET;
+
+  (void) memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_port = htons(0);
+  sa.sin_addr.s_addr = htonl(0x7f000001);
+
+  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
+      !bind(sock, (struct sockaddr *) &sa, len) &&
+      !listen(sock, 1) &&
+      !getsockname(sock, (struct sockaddr *) &sa, &len) &&
+      (sp[0] = socket(AF_INET, SOCK_STREAM, 6)) != -1 &&
+      !connect(sp[0], (struct sockaddr *) &sa, len) &&
+      (sp[1] = accept(sock,(struct sockaddr *) &sa, &len)) != INVALID_SOCKET) {
+    set_close_on_exec(sp[0]);
+    set_close_on_exec(sp[1]);
+    ret = 0;
+  } else {
+    if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
+    if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
+    sp[0] = sp[1] = INVALID_SOCKET;
+  }
+  closesocket(sock);
+
+  return ret;
+}
+
 static FILE *mg_fopen(const char *path, const char *mode) {
 #ifdef _WIN32
   wchar_t wbuf[PATH_MAX], wmode[20];
@@ -1081,31 +1113,7 @@ static void process_new_connection(struct mg_connection *conn) {
 
 // Worker threads take accepted socket from the queue
 static int consume_socket(struct mg_context *ctx, struct socket *sp) {
-  (void) pthread_mutex_lock(&ctx->mutex);
-  DEBUG_TRACE(("going idle"));
-
-  // If the queue is empty, wait. We're idle at this point.
-  while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
-    pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
-  }
-
-  // If we're stopping, sq_head may be equal to sq_tail.
-  if (ctx->sq_head > ctx->sq_tail) {
-    // Copy socket from the queue and increment tail
-    *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
-    ctx->sq_tail++;
-    DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
-
-    // Wrap pointers if needed
-    while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
-      ctx->sq_tail -= ARRAY_SIZE(ctx->queue);
-      ctx->sq_head -= ARRAY_SIZE(ctx->queue);
-    }
-  }
-
-  (void) pthread_cond_signal(&ctx->sq_empty);
-  (void) pthread_mutex_unlock(&ctx->mutex);
-
+  recv(ctx->ctl[1], (void *) sp, sizeof(*sp), 0);
   return !ctx->stop_flag;
 }
 
@@ -1154,11 +1162,7 @@ static void *worker_thread(void *thread_func_param) {
   }
 
   // Signal master that we're done with connection and exiting
-  (void) pthread_mutex_lock(&ctx->mutex);
-  ctx->num_threads--;
-  (void) pthread_cond_signal(&ctx->cond);
-  assert(ctx->num_threads >= 0);
-  (void) pthread_mutex_unlock(&ctx->mutex);
+  send(ctx->ctl[1], "x", 1, 0);
 
   DEBUG_TRACE(("exiting"));
   return NULL;
@@ -1166,23 +1170,7 @@ static void *worker_thread(void *thread_func_param) {
 
 // Master thread adds accepted socket to a queue
 static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
-  (void) pthread_mutex_lock(&ctx->mutex);
-
-  // If the queue is full, wait
-  while (ctx->stop_flag == 0 &&
-         ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
-    (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
-  }
-
-  if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
-    // Copy socket to the queue and increment head
-    ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
-    ctx->sq_head++;
-    DEBUG_TRACE(("queued socket %d", sp->sock));
-  }
-
-  (void) pthread_cond_signal(&ctx->sq_full);
-  (void) pthread_mutex_unlock(&ctx->mutex);
+  send(ctx->ctl[0], (void *) sp, sizeof(*sp), 0);
 }
 
 static int set_sock_timeout(SOCKET sock, int milliseconds) {
@@ -1272,20 +1260,16 @@ static void *master_thread(void *thread_func_param) {
   close_all_listening_sockets(ctx);
 
   // Wakeup workers that are waiting for connections to handle.
-  pthread_cond_broadcast(&ctx->sq_full);
+  for (i = 0; i < ctx->num_threads; i++) {
+    struct socket dummy;
+    send(ctx->ctl[0], (void *) &dummy, sizeof(dummy), 0);
+  }
 
   // Wait until all threads finish
-  (void) pthread_mutex_lock(&ctx->mutex);
-  while (ctx->num_threads > 0) {
-    (void) pthread_cond_wait(&ctx->cond, &ctx->mutex);
+  for (i = 0; i < ctx->num_threads; i++) {
+    char ch;
+    recv(ctx->ctl[0], &ch, 1, 0);
   }
-  (void) pthread_mutex_unlock(&ctx->mutex);
-
-  // All threads exited, no sync is needed. Destroy mutex and condvars
-  (void) pthread_mutex_destroy(&ctx->mutex);
-  (void) pthread_cond_destroy(&ctx->cond);
-  (void) pthread_cond_destroy(&ctx->sq_empty);
-  (void) pthread_cond_destroy(&ctx->sq_full);
 
 #if !defined(NO_SSL)
   uninitialize_ssl(ctx);
@@ -1406,10 +1390,7 @@ struct mg_context *mg_start(const char **options,
   (void) signal(SIGPIPE, SIG_IGN);
 #endif // !_WIN32
 
-  (void) pthread_mutex_init(&ctx->mutex, NULL);
-  (void) pthread_cond_init(&ctx->cond, NULL);
-  (void) pthread_cond_init(&ctx->sq_empty, NULL);
-  (void) pthread_cond_init(&ctx->sq_full, NULL);
+  mg_socketpair(ctx->ctl);
 
   // Start master (listening) thread
   mg_start_thread(master_thread, ctx);
diff --git a/build/src/win32.c b/build/src/win32.c
index 93c7c9628..b03591fa5 100644
--- a/build/src/win32.c
+++ b/build/src/win32.c
@@ -23,34 +23,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) {
   return ReleaseMutex(*mutex) == 0 ? -1 : 0;
 }
 
-static int pthread_cond_init(pthread_cond_t *cv, const void *unused) {
-  (void) unused;
-  cv->signal = CreateEvent(NULL, FALSE, FALSE, NULL);
-  cv->broadcast = CreateEvent(NULL, TRUE, FALSE, NULL);
-  return cv->signal != NULL && cv->broadcast != NULL ? 0 : -1;
-}
-
-static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
-  HANDLE handles[] = {cv->signal, cv->broadcast};
-  ReleaseMutex(*mutex);
-  WaitForMultipleObjects(2, handles, FALSE, INFINITE);
-  return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
-}
-
-static int pthread_cond_signal(pthread_cond_t *cv) {
-  return SetEvent(cv->signal) == 0 ? -1 : 0;
-}
-
-static int pthread_cond_broadcast(pthread_cond_t *cv) {
-  // Implementation with PulseEvent() has race condition, see
-  // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
-  return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
-}
-
-static int pthread_cond_destroy(pthread_cond_t *cv) {
-  return CloseHandle(cv->signal) && CloseHandle(cv->broadcast) ? 0 : -1;
-}
-
 // For Windows, change all slashes to backslashes in path names.
 static void change_slashes_to_backslashes(char *path) {
   int i;
diff --git a/mongoose.c b/mongoose.c
index ae973321e..6a4a52793 100644
--- a/mongoose.c
+++ b/mongoose.c
@@ -152,7 +152,6 @@ typedef long off_t;
 #endif // !fileno MINGW #defines fileno
 
 typedef HANDLE pthread_mutex_t;
-typedef struct {HANDLE signal, broadcast;} pthread_cond_t;
 typedef DWORD pthread_t;
 #define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here.
 
@@ -252,6 +251,8 @@ typedef int SOCKET;
 #define MAX_REQUEST_SIZE 16384
 #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
 
+typedef SOCKET sock_t;
+
 #ifdef DEBUG_TRACE
 #undef DEBUG_TRACE
 #define DEBUG_TRACE(x)
@@ -407,19 +408,11 @@ struct mg_context {
   char *config[NUM_OPTIONS];      // Mongoose configuration parameters
   mg_event_handler_t event_handler;  // User-defined callback function
   void *user_data;                // User-defined data
-
   struct socket *listening_sockets;
   int num_listening_sockets;
+  int num_threads;    // Number of threads
+  sock_t ctl[2];     // Socket pair for inter-thread communication
 
-  volatile int num_threads;  // Number of threads
-  pthread_mutex_t mutex;     // Protects (max|num)_threads
-  pthread_cond_t  cond;      // Condvar for tracking workers terminations
-
-  struct socket queue[MGSQLEN];   // Accepted sockets
-  volatile int sq_head;      // Head of the socket queue
-  volatile int sq_tail;      // Tail of the socket queue
-  pthread_cond_t sq_full;    // Signaled when socket is produced
-  pthread_cond_t sq_empty;   // Signaled when socket is consumed
 };
 
 struct mg_connection {
@@ -1537,34 +1530,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) {
   return ReleaseMutex(*mutex) == 0 ? -1 : 0;
 }
 
-static int pthread_cond_init(pthread_cond_t *cv, const void *unused) {
-  (void) unused;
-  cv->signal = CreateEvent(NULL, FALSE, FALSE, NULL);
-  cv->broadcast = CreateEvent(NULL, TRUE, FALSE, NULL);
-  return cv->signal != NULL && cv->broadcast != NULL ? 0 : -1;
-}
-
-static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
-  HANDLE handles[] = {cv->signal, cv->broadcast};
-  ReleaseMutex(*mutex);
-  WaitForMultipleObjects(2, handles, FALSE, INFINITE);
-  return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
-}
-
-static int pthread_cond_signal(pthread_cond_t *cv) {
-  return SetEvent(cv->signal) == 0 ? -1 : 0;
-}
-
-static int pthread_cond_broadcast(pthread_cond_t *cv) {
-  // Implementation with PulseEvent() has race condition, see
-  // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
-  return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
-}
-
-static int pthread_cond_destroy(pthread_cond_t *cv) {
-  return CloseHandle(cv->signal) && CloseHandle(cv->broadcast) ? 0 : -1;
-}
-
 // For Windows, change all slashes to backslashes in path names.
 static void change_slashes_to_backslashes(char *path) {
   int i;
@@ -3918,6 +3883,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) {
     0 : conn->ctx->event_handler(&conn->event);
 }
 
+static int mg_socketpair(sock_t sp[2]) {
+  struct sockaddr_in sa;
+  sock_t sock, ret = -1;
+  socklen_t len = sizeof(sa);
+
+  sp[0] = sp[1] = INVALID_SOCKET;
+
+  (void) memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_port = htons(0);
+  sa.sin_addr.s_addr = htonl(0x7f000001);
+
+  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
+      !bind(sock, (struct sockaddr *) &sa, len) &&
+      !listen(sock, 1) &&
+      !getsockname(sock, (struct sockaddr *) &sa, &len) &&
+      (sp[0] = socket(AF_INET, SOCK_STREAM, 6)) != -1 &&
+      !connect(sp[0], (struct sockaddr *) &sa, len) &&
+      (sp[1] = accept(sock,(struct sockaddr *) &sa, &len)) != INVALID_SOCKET) {
+    set_close_on_exec(sp[0]);
+    set_close_on_exec(sp[1]);
+    ret = 0;
+  } else {
+    if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
+    if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
+    sp[0] = sp[1] = INVALID_SOCKET;
+  }
+  closesocket(sock);
+
+  return ret;
+}
+
 static FILE *mg_fopen(const char *path, const char *mode) {
 #ifdef _WIN32
   wchar_t wbuf[PATH_MAX], wmode[20];
@@ -4987,31 +4984,7 @@ static void process_new_connection(struct mg_connection *conn) {
 
 // Worker threads take accepted socket from the queue
 static int consume_socket(struct mg_context *ctx, struct socket *sp) {
-  (void) pthread_mutex_lock(&ctx->mutex);
-  DEBUG_TRACE(("going idle"));
-
-  // If the queue is empty, wait. We're idle at this point.
-  while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
-    pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
-  }
-
-  // If we're stopping, sq_head may be equal to sq_tail.
-  if (ctx->sq_head > ctx->sq_tail) {
-    // Copy socket from the queue and increment tail
-    *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
-    ctx->sq_tail++;
-    DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
-
-    // Wrap pointers if needed
-    while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
-      ctx->sq_tail -= ARRAY_SIZE(ctx->queue);
-      ctx->sq_head -= ARRAY_SIZE(ctx->queue);
-    }
-  }
-
-  (void) pthread_cond_signal(&ctx->sq_empty);
-  (void) pthread_mutex_unlock(&ctx->mutex);
-
+  recv(ctx->ctl[1], (void *) sp, sizeof(*sp), 0);
   return !ctx->stop_flag;
 }
 
@@ -5060,11 +5033,7 @@ static void *worker_thread(void *thread_func_param) {
   }
 
   // Signal master that we're done with connection and exiting
-  (void) pthread_mutex_lock(&ctx->mutex);
-  ctx->num_threads--;
-  (void) pthread_cond_signal(&ctx->cond);
-  assert(ctx->num_threads >= 0);
-  (void) pthread_mutex_unlock(&ctx->mutex);
+  send(ctx->ctl[1], "x", 1, 0);
 
   DEBUG_TRACE(("exiting"));
   return NULL;
@@ -5072,23 +5041,7 @@ static void *worker_thread(void *thread_func_param) {
 
 // Master thread adds accepted socket to a queue
 static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
-  (void) pthread_mutex_lock(&ctx->mutex);
-
-  // If the queue is full, wait
-  while (ctx->stop_flag == 0 &&
-         ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
-    (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
-  }
-
-  if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
-    // Copy socket to the queue and increment head
-    ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
-    ctx->sq_head++;
-    DEBUG_TRACE(("queued socket %d", sp->sock));
-  }
-
-  (void) pthread_cond_signal(&ctx->sq_full);
-  (void) pthread_mutex_unlock(&ctx->mutex);
+  send(ctx->ctl[0], (void *) sp, sizeof(*sp), 0);
 }
 
 static int set_sock_timeout(SOCKET sock, int milliseconds) {
@@ -5178,20 +5131,16 @@ static void *master_thread(void *thread_func_param) {
   close_all_listening_sockets(ctx);
 
   // Wakeup workers that are waiting for connections to handle.
-  pthread_cond_broadcast(&ctx->sq_full);
+  for (i = 0; i < ctx->num_threads; i++) {
+    struct socket dummy;
+    send(ctx->ctl[0], (void *) &dummy, sizeof(dummy), 0);
+  }
 
   // Wait until all threads finish
-  (void) pthread_mutex_lock(&ctx->mutex);
-  while (ctx->num_threads > 0) {
-    (void) pthread_cond_wait(&ctx->cond, &ctx->mutex);
+  for (i = 0; i < ctx->num_threads; i++) {
+    char ch;
+    recv(ctx->ctl[0], &ch, 1, 0);
   }
-  (void) pthread_mutex_unlock(&ctx->mutex);
-
-  // All threads exited, no sync is needed. Destroy mutex and condvars
-  (void) pthread_mutex_destroy(&ctx->mutex);
-  (void) pthread_cond_destroy(&ctx->cond);
-  (void) pthread_cond_destroy(&ctx->sq_empty);
-  (void) pthread_cond_destroy(&ctx->sq_full);
 
 #if !defined(NO_SSL)
   uninitialize_ssl(ctx);
@@ -5312,10 +5261,7 @@ struct mg_context *mg_start(const char **options,
   (void) signal(SIGPIPE, SIG_IGN);
 #endif // !_WIN32
 
-  (void) pthread_mutex_init(&ctx->mutex, NULL);
-  (void) pthread_cond_init(&ctx->cond, NULL);
-  (void) pthread_cond_init(&ctx->sq_empty, NULL);
-  (void) pthread_cond_init(&ctx->sq_full, NULL);
+  mg_socketpair(ctx->ctl);
 
   // Start master (listening) thread
   mg_start_thread(master_thread, ctx);
-- 
GitLab