Skip to content

Commit

Permalink
H2O: Optimize the implementation further (#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
knewmanTE committed Nov 14, 2016
2 parents 1008d96 + b25a29a commit 75e735a
Show file tree
Hide file tree
Showing 16 changed files with 253 additions and 196 deletions.
2 changes: 1 addition & 1 deletion frameworks/C/h2o/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ run_curl()

run_h2o_app()
{
"$1/h2o_app" -a2 -f "$2/template/fortunes.mustache" -m8 "$3" "$4" \
"$1/h2o_app" -a1 -f "$2/template/fortunes.mustache" -m5 "$3" "$4" \
-d "host=$DBHOST dbname=hello_world user=benchmarkdbuser password=benchmarkdbpass" &
}

Expand Down
9 changes: 4 additions & 5 deletions frameworks/C/h2o/src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn)
goto error;
}

const char * const conninfo =
ctx->global_data->config->db_host ? ctx->global_data->config->db_host : "";
const char * const conninfo = ctx->config->db_host ? ctx->config->db_host : "";

db_conn->conn = PQconnectStart(conninfo);

Expand Down Expand Up @@ -441,7 +440,7 @@ static void stop_database_write_polling(db_conn_t *db_conn)

void connect_to_database(thread_context_t *ctx)
{
for (size_t i = ctx->db_state.db_conn_num; i < ctx->global_data->config->max_db_conn_num; i++)
for (size_t i = ctx->db_state.db_conn_num; i < ctx->config->max_db_conn_num; i++)
start_database_connect(ctx, NULL);
}

Expand Down Expand Up @@ -475,13 +474,13 @@ int execute_query(thread_context_t *ctx, db_query_param_t *param)
db_conn->param = param;
do_execute_query(db_conn);
}
else if (ctx->db_state.query_num < ctx->global_data->config->max_query_num) {
else if (ctx->db_state.query_num < ctx->config->max_query_num) {
param->l.next = NULL;
*ctx->db_state.queries.tail = &param->l;
ctx->db_state.queries.tail = &param->l.next;
ctx->db_state.query_num++;

if (ctx->db_state.db_conn_num < ctx->global_data->config->max_db_conn_num)
if (ctx->db_state.db_conn_num < ctx->config->max_db_conn_num)
start_database_connect(ctx, NULL);
}
else
Expand Down
25 changes: 11 additions & 14 deletions frameworks/C/h2o/src/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/syscall.h>

#include "error.h"
#include "event_loop.h"
Expand Down Expand Up @@ -57,7 +56,7 @@ static void accept_connection(h2o_socket_t *listener, const char *err)
sock->on_close.cb = on_close_connection;
sock->on_close.data = &ctx->event_loop.conn_num;
h2o_accept(&ctx->event_loop.h2o_accept_ctx, sock);
} while (++accepted < ctx->global_data->config->max_accept);
} while (++accepted < ctx->config->max_accept);
}
}
}
Expand Down Expand Up @@ -96,11 +95,11 @@ static void process_messages(h2o_multithread_receiver_t *receiver, h2o_linklist_
{
IGNORE_FUNCTION_PARAMETER(messages);

thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
event_loop.h2o_receiver,
receiver);
global_thread_data_t * const global_thread_data = H2O_STRUCT_FROM_MEMBER(global_thread_data_t,
h2o_receiver,
receiver);

h2o_socket_read_stop(ctx->event_loop.h2o_socket);
h2o_socket_read_stop(global_thread_data->ctx->event_loop.h2o_socket);
}

static void shutdown_server(h2o_socket_t *listener, const char *err)
Expand All @@ -113,30 +112,28 @@ static void shutdown_server(h2o_socket_t *listener, const char *err)
ctx->global_data->shutdown = true;
h2o_socket_read_stop(ctx->event_loop.h2o_socket);

for (size_t i = 1; i < ctx->global_data->config->thread_num; i++)
h2o_multithread_send_message(&ctx[i].event_loop.h2o_receiver, NULL);
for (size_t i = 1; i < ctx->config->thread_num; i++)
h2o_multithread_send_message(&ctx->global_thread_data[i].h2o_receiver, NULL);
}
}

void event_loop(thread_context_t *ctx)
{
ctx->tid = syscall(SYS_gettid);
ctx->random_seed = ctx->tid;

while (!ctx->global_data->shutdown || ctx->event_loop.conn_num)
h2o_evloop_run(ctx->event_loop.h2o_ctx.loop);
}

void free_event_loop(event_loop_t *event_loop)
void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver)
{
h2o_multithread_unregister_receiver(event_loop->h2o_ctx.queue, &event_loop->h2o_receiver);
h2o_multithread_unregister_receiver(event_loop->h2o_ctx.queue, h2o_receiver);
h2o_socket_close(event_loop->h2o_socket);
h2o_socket_close(event_loop->epoll_socket);
h2o_context_dispose(&event_loop->h2o_ctx);
}

void initialize_event_loop(bool is_main_thread,
global_data_t *global_data,
h2o_multithread_receiver_t *h2o_receiver,
event_loop_t *loop)
{
memset(loop, 0, sizeof(*loop));
Expand Down Expand Up @@ -165,7 +162,7 @@ void initialize_event_loop(bool is_main_thread,
loop->h2o_socket->data = loop;
h2o_socket_read_start(loop->h2o_socket, accept_connection);
h2o_multithread_register_receiver(loop->h2o_ctx.queue,
&loop->h2o_receiver,
h2o_receiver,
process_messages);
// libh2o's event loop does not support write polling unless it
// controls sending the data as well, so do read polling on the
Expand Down
4 changes: 2 additions & 2 deletions frameworks/C/h2o/src/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ typedef struct {
int epoll_fd;
h2o_accept_ctx_t h2o_accept_ctx;
h2o_context_t h2o_ctx;
h2o_multithread_receiver_t h2o_receiver;
} event_loop_t;

void event_loop(thread_context_t *ctx);
void free_event_loop(event_loop_t *event_loop);
void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver);
void initialize_event_loop(bool is_main_thread,
global_data_t *global_data,
h2o_multithread_receiver_t *h2o_receiver,
event_loop_t *loop);
int start_write_polling(int fd,
void (**on_write_ready)(void *),
Expand Down
65 changes: 37 additions & 28 deletions frameworks/C/h2o/src/fortune.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ static uintmax_t add_iovec(mustache_api_t *api,
void *userdata,
const char *buffer,
uintmax_t buffer_size);
static void cleanup_fortunes(void *data);
static int compare_fortunes(const list_t *x, const list_t *y);
static void complete_fortunes(struct st_h2o_generator_t *self, h2o_req_t *req);
static list_t *get_sorted_sublist(list_t *head);
Expand Down Expand Up @@ -109,6 +110,22 @@ static uintmax_t add_iovec(mustache_api_t *api,
return ret;
}

static void cleanup_fortunes(void *data)
{
fortune_ctx_t * const fortune_ctx = data;
const list_t *iter = fortune_ctx->result;

if (iter)
do {
const fortune_t * const fortune = H2O_STRUCT_FROM_MEMBER(fortune_t, l, iter);

if (fortune->data)
PQclear(fortune->data);

iter = iter->next;
} while (iter);
}

static int compare_fortunes(const list_t *x, const list_t *y)
{
const fortune_t * const f1 = H2O_STRUCT_FROM_MEMBER(fortune_t, l, x);
Expand Down Expand Up @@ -142,12 +159,7 @@ static list_t *get_sorted_sublist(list_t *head)
if (head) {
head = head->next;

while (head && compare_fortunes(tail, head) < 0) {
tail = head;
head = head->next;
}

while (head && !compare_fortunes(tail, head)) {
while (head && compare_fortunes(tail, head) <= 0) {
tail = head;
head = head->next;
}
Expand Down Expand Up @@ -209,40 +221,33 @@ static result_return_t on_fortune_result(db_query_param_t *param, PGresult *resu
ret = SUCCESS;

for (size_t i = 0; i < num_rows; i++) {
const char * const message_data = PQgetvalue(result, i, 1);
h2o_iovec_t message = h2o_htmlescape(&fortune_ctx->req->pool,
message_data,
PQgetlength(result, i, 1));
const size_t id_len = PQgetlength(result, i, 0);
const size_t fortune_size = offsetof(fortune_t, data) + id_len +
(message_data == message.base ? message.len : 0);
fortune_t * const fortune = h2o_mem_alloc_pool(&fortune_ctx->req->pool,
fortune_size);
sizeof(*fortune));

if (fortune) {
memset(fortune, 0, offsetof(fortune_t, data));
memcpy(fortune->data, PQgetvalue(result, i, 0), id_len);
fortune->id.base = fortune->data;
fortune->id.len = id_len;

if (message_data == message.base) {
message.base = fortune->data + id_len;
memcpy(message.base, message_data, message.len);
}

fortune->message = message;
memset(fortune, 0, sizeof(*fortune));
fortune->id.base = PQgetvalue(result, i, 0);
fortune->id.len = PQgetlength(result, i, 0);
fortune->message = h2o_htmlescape(&fortune_ctx->req->pool,
PQgetvalue(result, i, 1),
PQgetlength(result, i, 1));
fortune->l.next = fortune_ctx->result;
fortune_ctx->result = &fortune->l;
fortune_ctx->num_result++;

if (!i)
fortune->data = result;
}
else {
send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, fortune_ctx->req);
ret = DONE;

if (!i)
PQclear(result);

break;
}
}

PQclear(result);
}
else if (result) {
PQclear(result);
Expand Down Expand Up @@ -365,7 +370,9 @@ int fortunes(struct st_h2o_handler_t *self, h2o_req_t *req)
thread_context_t * const ctx = H2O_STRUCT_FROM_MEMBER(thread_context_t,
event_loop.h2o_ctx,
req->conn->ctx);
fortune_ctx_t * const fortune_ctx = h2o_mem_alloc_pool(&req->pool, sizeof(*fortune_ctx));
fortune_ctx_t * const fortune_ctx = h2o_mem_alloc_shared(&req->pool,
sizeof(*fortune_ctx),
cleanup_fortunes);

if (fortune_ctx) {
fortune_t * const fortune = h2o_mem_alloc_pool(&req->pool, sizeof(*fortune));
Expand All @@ -390,6 +397,8 @@ int fortunes(struct st_h2o_handler_t *self, h2o_req_t *req)
if (execute_query(ctx, &fortune_ctx->param))
send_service_unavailable_error(DB_REQ_ERROR, req);
}
else
send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
}
else
send_error(INTERNAL_SERVER_ERROR, MEM_ALLOC_ERR_MSG, req);
Expand Down
2 changes: 1 addition & 1 deletion frameworks/C/h2o/src/fortune.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

typedef struct {
list_t l;
PGresult *data;
h2o_iovec_t id;
h2o_iovec_t message;
char data[];
} fortune_t;

int fortunes(struct st_h2o_handler_t *self, h2o_req_t *req);
Expand Down
35 changes: 25 additions & 10 deletions frameworks/C/h2o/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <netdb.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -44,6 +45,7 @@
#include "tls.h"
#include "utility.h"

#define DEFAULT_CACHE_LINE_SIZE 128
#define DEFAULT_TCP_FASTOPEN_QUEUE_LEN 4096
#define USAGE_MESSAGE \
"Usage:\n%s [-a <max connections accepted simultaneously>] [-b <bind address>] " \
Expand All @@ -63,8 +65,12 @@ static void setup_process(void);

static void free_global_data(global_data_t *global_data)
{
if (global_data->ctx)
free_thread_contexts(global_data);
if (global_data->global_thread_data) {
for (size_t i = 1; i < global_data->global_thread_data->config->thread_num; i++)
CHECK_ERROR(pthread_join, global_data->global_thread_data[i].thread, NULL);

free(global_data->global_thread_data);
}

if (global_data->file_logger)
global_data->file_logger->dispose(global_data->file_logger);
Expand Down Expand Up @@ -177,9 +183,7 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
sigset_t signals;

memset(global_data, 0, sizeof(*global_data));
global_data->config = config;
global_data->memory_alignment = get_maximum_cache_line_size();
assert(global_data->memory_alignment <= DEFAULT_CACHE_LINE_SIZE);
CHECK_ERRNO(sigemptyset, &signals);
#ifdef NDEBUG
CHECK_ERRNO(sigaddset, &signals, SIGINT);
Expand All @@ -194,7 +198,7 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
goto error;

if (config->cert && config->key)
initialize_openssl(global_data);
initialize_openssl(config, global_data);

const h2o_iovec_t host = h2o_iovec_init(H2O_STRLIT("default"));
h2o_hostconf_t * const hostconf = h2o_config_register_host(&global_data->h2o_config,
Expand All @@ -220,10 +224,14 @@ static int initialize_global_data(const config_t *config, global_data_t *global_
global_data->file_logger = h2o_access_log_register(pathconf, log_handle);
}

global_data->ctx = initialize_thread_contexts(global_data);
global_data->global_thread_data = initialize_global_thread_data(config, global_data);

if (global_data->ctx)
if (global_data->global_thread_data) {
printf("Number of processors: %zu\nMaximum cache line size: %zu\n",
h2o_numproc(),
global_data->memory_alignment);
return EXIT_SUCCESS;
}

error:
free_global_data(global_data);
Expand Down Expand Up @@ -353,10 +361,17 @@ int main(int argc, char *argv[])
global_data_t global_data;

if (initialize_global_data(&config, &global_data) == EXIT_SUCCESS) {
thread_context_t ctx;

setup_process();
start_threads(global_data.ctx);
connect_to_database(global_data.ctx);
event_loop(global_data.ctx);
start_threads(global_data.global_thread_data);
initialize_thread_context(global_data.global_thread_data, true, &ctx);
connect_to_database(&ctx);
event_loop(&ctx);
// Even though this is global data, we need to close
// it before the associated event loop is cleaned up.
h2o_socket_close(global_data.signals);
free_thread_context(&ctx);
free_global_data(&global_data);
rc = EXIT_SUCCESS;
}
Expand Down

0 comments on commit 75e735a

Please sign in to comment.