Skip to content

Commit

Permalink
feat: use keep alive connection pool for JSONRPC remote machines
Browse files Browse the repository at this point in the history
  • Loading branch information
edubart committed Apr 30, 2024
1 parent 1837a0c commit 366e867
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 25 deletions.
5 changes: 2 additions & 3 deletions src/jsonrpc-remote-machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,6 @@ static json jsonrpc_fork_handler(const json &j, mg_connection *con, http_handler
/// \details Changes the address the server is listening to.
/// After this call, all new connections should be established using the new server address.
static json jsonrpc_rebind_handler(const json &j, mg_connection *con, http_handler_data *h) {
(void) con;
static const char *param_name[] = {"address"};
auto args = parse_args<std::string>(j, param_name);
const std::string new_server_address = std::get<0>(args);
Expand All @@ -678,6 +677,8 @@ static json jsonrpc_rebind_handler(const json &j, mg_connection *con, http_handl
if (!new_listen_connection) {
return jsonrpc_response_server_error(j, "rebind failed listening on "s + new_server_address);
}
// Mark connection to be drained
con->is_draining = 1;
// Mark previous listen connection to be closed
h->listen_connection->is_closing = 1;
// Set the new listen connection
Expand Down Expand Up @@ -1784,7 +1785,6 @@ static json jsonrpc_machine_verify_send_cmio_response_state_transition_handler(c
/// \param j JSON response object
void jsonrpc_http_reply(mg_connection *con, http_handler_data *h, const json &j) {
SLOG(trace) << h->server_address << " response is " << j.dump().data();
con->is_draining = 1;
return mg_http_reply(con, 200, "Access-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n", "%s",
j.dump().data());
}
Expand All @@ -1793,7 +1793,6 @@ void jsonrpc_http_reply(mg_connection *con, http_handler_data *h, const json &j)
/// \param con Mongoose connection
void jsonrpc_send_empty_reply(mg_connection *con, http_handler_data *h) {
SLOG(trace) << h->server_address << " response is empty";
con->is_draining = 1;
return mg_http_reply(con, 200, "Access-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n", "");
}

Expand Down
134 changes: 112 additions & 22 deletions src/jsonrpc-virtual-machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include "json-util.h"
#include "jsonrpc-mg-mgr.h"

// We need to keep 2 connections alive to save one reconnection in snapshot/commit
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage,modernize-macro-to-enum)
#define MAX_KEEP_ALIVE_POOL 2

using namespace std::string_literals;
using json = nlohmann::json;

Expand All @@ -54,6 +58,7 @@ struct http_request_data {
std::string status_code;
std::string reason_phrase;
std::string entity_body;
bool keep_alive;
bool done;
};

Expand All @@ -74,48 +79,133 @@ static void setup_client_socket(struct mg_connection *c) {
#endif
}

// Sends an HTTP POST to connection socket
static bool send_http_post(struct mg_connection *c, const http_request_data *data) {
const struct mg_str host = mg_url_host(data->url.c_str());
// Write HTTP header
if (mg_printf(c,
"POST %s HTTP/1.1\r\n"
"Host: %.*s\r\n"
"Connection: keep-alive\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n",
mg_url_uri(data->url.c_str()), static_cast<int>(host.len), host.ptr, data->post_data.size()) <= 0) {
return false;
}
// Write HTTP body
if (!mg_send(c, data->post_data.data(), data->post_data.size())) {
return false;
}
return true;
}

// Returns the amount of connections in the keep alive connection pool
static int get_keepalive_count(const struct mg_mgr *mgr) {
int count = 0;
for (struct mg_connection *c = mgr->conns; c != nullptr; c = c->next) {
if (c->is_client && // is a client connection
!c->is_closing && !c->is_draining && // not closing
c->fn_data == nullptr // has no ongoing request
) {
count++;
}
}
return count;
}

// Print HTTP response and signal that we're done
static void json_post_fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
http_request_data *data = static_cast<http_request_data *>(fn_data);
if (!data) {
// Event on a keep alive connection with no request data associated,
// we have nothing to do.
return;
}
if (ev == MG_EV_CONNECT) {
setup_client_socket(c);
const struct mg_str host = mg_url_host(data->url.c_str());
mg_printf(c,
"POST %s HTTP/1.0\r\n"
"Host: %.*s\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n",
mg_url_uri(data->url.c_str()), static_cast<int>(host.len), host.ptr, data->post_data.size());
mg_send(c, data->post_data.data(), data->post_data.size());
if (!send_http_post(c, data)) {
data->entity_body.clear();
data->status_code.clear();
data->reason_phrase = "http post send failed";
data->done = true;
c->is_closing = 1;
c->fn_data = nullptr;
}
} else if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = static_cast<struct mg_http_message *>(ev_data);
data->entity_body = std::string_view(hm->body.ptr, hm->body.len);
data->status_code = std::string_view(hm->uri.ptr, hm->uri.len);
data->reason_phrase = std::string_view(hm->proto.ptr, hm->proto.len);
c->is_closing = 1;
data->done = true;
c->fn_data = nullptr;
// Keep the connection alive only if:
// - The HTTP request is successful (status 200)
// - The request itself can use HTTP pipelining (shutdown/rebind cannot)
// - We have not exhausted the size of our keep alive connection pool.
if (data->status_code != "200" || !data->keep_alive || get_keepalive_count(c->mgr) > MAX_KEEP_ALIVE_POOL) {
// Marking the connection to be closed, so it's out of the keep alive connection pool
c->is_closing = 1;
}
} else if (ev == MG_EV_ERROR) {
data->entity_body.clear();
data->status_code = "503";
data->reason_phrase = static_cast<char *>(ev_data);
data->done = true;
c->fn_data = nullptr;
} else if (ev == MG_EV_CLOSE && !data->done) {
data->entity_body.clear();
data->status_code.clear();
data->reason_phrase = "connection closed";
data->done = true;
c->fn_data = nullptr;
}
}

static std::string json_post(struct mg_mgr &mgr, const std::string &url, const std::string &post_data) {
http_request_data data{url, post_data, "", "", "", false};
if (!mg_http_connect(&mgr, url.c_str(), json_post_fn, &data)) {
throw std::runtime_error("connection to '"s + url + "' failed"s);
static struct mg_connection *find_keepalive_conn(struct mg_mgr &mgr, const std::string &url) {
// Resolve remote address from URL
struct mg_addr rem {};
rem.port = mg_htons(mg_url_port(url.c_str()));
if (!mg_aton(mg_url_host(url.c_str()), &rem)) {
throw std::runtime_error("failed to resolve remote address for '"s + url + "'"s);
}
// Try to find an already connected keep alive connection for the URL
for (struct mg_connection *c = mgr.conns; c != nullptr; c = c->next) {
if (c->is_client && // is a client connection
!c->is_closing && !c->is_draining && // not closing
c->fn_data == nullptr && // has no ongoing request
memcmp(&rem, &c->rem, sizeof(struct mg_addr)) == 0 // has same remote address
) {
return c;
}
}
return nullptr;
}

static std::string json_post(struct mg_mgr &mgr, const std::string &url, const std::string &post_data,
bool keep_alive) {
http_request_data data{url, post_data, "", "", "", keep_alive, false};
// Try to reused a keep alive connection, otherwise we have to create a new connection
mg_connection *c = find_keepalive_conn(mgr, url);
if (c) { // Reusing previous keep alive connection
// Set next request data in the connection
c->fn_data = &data;
// Send request
if (!send_http_post(c, &data)) {
throw std::runtime_error("failed to send http for '"s + url + "' in keep alive connection"s);
}
} else {
// No keep alive connection found, open a new one
c = mg_http_connect(&mgr, url.c_str(), json_post_fn, &data);
if (!c) {
throw std::runtime_error("connection to '"s + url + "' failed"s);
}
}
// Wait request to complete
while (!data.done) {
mg_mgr_poll(&mgr, 1000);
}
// Process response
if (data.status_code.empty()) {
throw std::runtime_error("http error: "s + data.reason_phrase);
}
Expand All @@ -127,11 +217,11 @@ static std::string json_post(struct mg_mgr &mgr, const std::string &url, const s

template <typename R, typename... Ts>
void jsonrpc_request(struct mg_mgr &mgr, const std::string &url, const std::string &method, const std::tuple<Ts...> &tp,
R &result) {
R &result, bool keep_alive = true) {
auto request = jsonrpc_post_data(method, tp);
json response;
try {
response = json::parse(json_post(mgr, url, request));
response = json::parse(json_post(mgr, url, request, keep_alive));
} catch (std::exception &x) {
throw std::runtime_error("jsonrpc server error: invalid response ("s + x.what() + ")"s);
}
Expand Down Expand Up @@ -234,11 +324,11 @@ void jsonrpc_mg_mgr::commit() {

// To commit, we kill the parent server and replace its address with the child's
bool result = false;
jsonrpc_request(get_mgr(), get_remote_parent_address(), "shutdown", std::tie(), result);
jsonrpc_request(get_mgr(), get_remote_parent_address(), "shutdown", std::tie(), result, false);

// Rebind the remote server to continue listening in the original port
result = false;
jsonrpc_request(get_mgr(), get_remote_address(), "rebind", std::tie(m_address[0]), result);
jsonrpc_request(get_mgr(), get_remote_address(), "rebind", std::tie(m_address[0]), result, false);
m_address.pop_back();
}

Expand All @@ -250,7 +340,7 @@ void jsonrpc_mg_mgr::rollback() {

// To rollback, we kill the child and expose the parent server
bool result = false;
jsonrpc_request(get_mgr(), get_remote_address(), "shutdown", std::tie(), result);
jsonrpc_request(get_mgr(), get_remote_address(), "shutdown", std::tie(), result, false);
m_address.pop_back();
}

Expand All @@ -261,9 +351,9 @@ bool jsonrpc_mg_mgr::is_forked(void) const {
void jsonrpc_mg_mgr::shutdown(void) {
bool result = false;
if (is_forked()) {
jsonrpc_request(get_mgr(), get_remote_parent_address(), "shutdown", std::tie(), result);
jsonrpc_request(get_mgr(), get_remote_parent_address(), "shutdown", std::tie(), result, false);
}
jsonrpc_request(get_mgr(), get_remote_address(), "shutdown", std::tie(), result);
jsonrpc_request(get_mgr(), get_remote_address(), "shutdown", std::tie(), result, false);
m_address.clear();
}

Expand Down Expand Up @@ -403,7 +493,7 @@ std::string jsonrpc_virtual_machine::fork(const jsonrpc_mg_mgr_ptr &mgr) {

void jsonrpc_virtual_machine::rebind(const jsonrpc_mg_mgr_ptr &mgr, const std::string &address) {
bool result = false;
jsonrpc_request(mgr->get_mgr(), mgr->get_remote_address(), "rebind", std::tie(address), result);
jsonrpc_request(mgr->get_mgr(), mgr->get_remote_address(), "rebind", std::tie(address), result, false);
}

uint64_t jsonrpc_virtual_machine::do_read_f(int i) const {
Expand Down

0 comments on commit 366e867

Please sign in to comment.