Skip to content
This repository has been archived by the owner on Jun 30, 2021. It is now read-only.

How to use multithreading #146

Open
kekxv opened this issue Nov 11, 2019 · 5 comments
Open

How to use multithreading #146

kekxv opened this issue Nov 11, 2019 · 5 comments

Comments

@kekxv
Copy link

kekxv commented Nov 11, 2019

about long-term tasks

I want to use multithreading, but it doesn't work all the time. How can I continue to monitor other client connections when doing long-term tasks.
What should I do?

Example code (if applicable)

#include <evhtp.h>

void _http_server_recv_callback(evhtp_request_t *req, void *arg) {
    // self->logger->i(TAG, __LINE__, "%s %s%s",
    //                get_command(req->method),
    //                evhtp_kv_find(req->headers_in, "host"),
    //                req->uri->path->full
    //);
    // if i do this,then other connect will wait this done.
    sleep(30);
    const char *_type = get_command(req->method);
    evbuffer_add_reference(req->buffer_out, _type, strlen(_type), nullptr, nullptr);
    evhtp_send_reply(req, EVHTP_RES_OK);
}
int
main(int argc, char ** argv)
{
    evbase_t *evbase = event_base_new();
    evhtp_t *htp = evhtp_new(evbase, this);

    evhtp_set_gencb(htp, _http_server_recv_callback, this);
    int ret = evhtp_use_threads_wexit(htp, nullptr, nullptr, nthreads, this);
    event_base_loop(evbase, 0);
    return 0;
}

master

e200bfa

@Devious2
Copy link

Devious2 commented Nov 11, 2019

Yes even i am facing the same issue. it runs perfectly with JMeter request but not with browser request if asked multiple, threadpool is not working as expected

@tturbs
Copy link

tturbs commented Dec 31, 2019

You can create both of base and evhtp for each thread. Then Listen a socket and every socket can bind the socket. Or you can create sockets as reuseport for a port

@kekxv
Copy link
Author

kekxv commented Dec 31, 2019

@tturbs
I want it's like this

void read_cb(...){
      // If this is a read callback
      // and use sleep 1 minute.
}

I cannot get a new client connection when the minute.
so,how should i do if i want get a new client connection.

@azat
Copy link
Contributor

azat commented Feb 17, 2020

I want to use multithreading, but it doesn't work all the time. How can I continue to monitor other client connections when doing long-term tasks.

This should work, until you have threads that do not perform you long running callback.
So if you are seeing that it does not call _http_server_recv_callback that you can simply increase number of threads for evhtp (this work up to ~1k maybe even 10k, but using more is not a great idea, if you need more you need to limit amount of that long running stuff)

@aflin
Copy link

aflin commented Sep 21, 2020

The problem, as far as my limited experience allows me to see, is that a thread is not considered busy if there are no "messages" from the main thread in its queue. The scheduler tries to use a thread that has no messages waiting, starting with the lowest numbered thread. Unfortunately, if a thread is busy processing a request and no other messages are in its backlog, the same thread is scheduled again.
This was very problematic for my application. So I did a quick hack that appears to fix it.

diff --git a/extern/libevhtp/evhtp.c b/extern/libevhtp/evhtp.c
index adf9beb..29687e6 100644
--- a/extern/libevhtp/evhtp.c
+++ b/extern/libevhtp/evhtp.c
@@ -2005,7 +2005,16 @@ htp__request_parse_fini_(htparser * p)
      *
      */
     if (c->request && c->request->cb) {
-        (c->request->cb)(c->request, c->request->cbarg);
+        if (c->thread)
+        {
+            /* flag that we are in the callback to facilitate
+                a wise choice of threads for next request --ajf */
+            c->thread->busy=1;
+            (c->request->cb)(c->request, c->request->cbarg);
+            c->thread->busy=0;
+        }
+        else
+            (c->request->cb)(c->request, c->request->cbarg);
     }
 
     if (c->flags & EVHTP_CONN_FLAG_PAUSED) {
@@ -2192,7 +2201,11 @@ check_proto:
         size_t len=evbuffer_get_length(request->buffer_out);
         if (len) {
             evbuffer_add_buffer(buf, request->buffer_out);
-
+            /* a compromise for whatever bug causes the major slowdown if
+                we are using ssl and the buffer is not contiguous 
+                https://github.com/criticalstack/libevhtp/issues/160
+                --ajf
+            */
             if (request->conn->htp->ssl_ctx != NULL && len <= 5242880)
                 evbuffer_pullup(buf,-1);
         }
diff --git a/extern/libevhtp/include/evhtp/thread.h b/extern/libevhtp/include/evhtp/thread.h
index 7479aa8..16d224f 100644
--- a/extern/libevhtp/include/evhtp/thread.h
+++ b/extern/libevhtp/include/evhtp/thread.h
@@ -5,6 +5,10 @@
 #ifndef __EVTHR_H__
 #define __EVTHR_H__
 
+#ifndef WIN32
+#include <sys/queue.h>
+#endif
+
 #include <pthread.h>
 #include <event2/event.h>
 #include <evhtp/config.h>
@@ -57,6 +61,27 @@ EVHTP_EXPORT void           evthr_pool_free(evthr_pool_t * pool);
 EVHTP_EXPORT evthr_t      * evthr_wexit_new(evthr_init_cb, evthr_exit_cb, void * shared);
 EVHTP_EXPORT evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb, evthr_exit_cb, void *);

+struct evthr {
+    int             rdr;
+    int             wdr;
+    int             busy;
+    char            err;
+    ev_t          * event;
+    evbase_t      * evbase;
+    pthread_mutex_t lock;
+    pthread_t     * thr;
+    evthr_init_cb   init_cb;
+    evthr_exit_cb   exit_cb;
+    void          * arg;
+    void          * aux;
+
+#ifdef EVTHR_SHARED_PIPE
+    int            pool_rdr;
+    struct event * shared_pool_ev;
+#endif
+    TAILQ_ENTRY(evthr) next;
+};
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/extern/libevhtp/thread.c b/extern/libevhtp/thread.c
index dfddc7e..32031b4 100644
--- a/extern/libevhtp/thread.c
+++ b/extern/libevhtp/thread.c
@@ -37,26 +37,6 @@ struct evthr_pool {
     evthr_pool_slist_t threads;
 };

-struct evthr {
-    int             rdr;
-    int             wdr;
-    char            err;
-    ev_t          * event;
-    evbase_t      * evbase;
-    pthread_mutex_t lock;
-    pthread_t     * thr;
-    evthr_init_cb   init_cb;
-    evthr_exit_cb   exit_cb;
-    void          * arg;
-    void          * aux;
-
-#ifdef EVTHR_SHARED_PIPE
-    int            pool_rdr;
-    struct event * shared_pool_ev;
-#endif
-    TAILQ_ENTRY(evthr) next;
-};
-
 #define _evthr_read(thr, cmd, sock) \
     (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0

@@ -238,6 +218,8 @@ _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args)
     thread->rdr     = fds[0];
     thread->wdr     = fds[1];

+    thread->busy    = 0;
+
     thread->init_cb = init_cb;
     thread->exit_cb = exit_cb;

@@ -354,8 +336,9 @@ get_backlog_(evthr_t * thread)
     int backlog = 0;

     ioctl(thread->rdr, FIONREAD, &backlog);
-
-    return (int)(backlog / sizeof(evthr_cmd_t));
+    /* added thread->busy. If still in callback, don't return 0.
+         Instead, we should use another thread that isn't busy --ajf */
+    return (int)(backlog / sizeof(evthr_cmd_t) + thread->busy);
 }

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants