Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebase heartbeat branch #428

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

charsyam
Copy link
Contributor

@charsyam charsyam commented Nov 3, 2015

It had a bug when redis is loading.
but after patch of ef45313
(handle loading state as error)
I think it fixed.

In my tests. this works fine :)

@CLAassistant
Copy link

CLAassistant commented Nov 16, 2019

CLA assistant check
All committers have signed the CLA.

@TysonAndre
Copy link
Collaborator

TysonAndre commented Apr 28, 2021

  1. This allocates a placeholder client connection that doesn't get freed (seen with -v 7 for malloc/free) if a server is misconfigured or rejecting connections and discards requests before the response is "sent". The below patch fixes that. This would eventually lead to hitting the client connection limit.
  2. add_failed_server should be called before updating the ketama distribution, not after it - otherwise, the most recently failed host doesn't get removed from the pool
  3. One more place should be updated in the ketama update routine to avoid having an inconsistent definition of what a dead server is - the PR as-is could violate the precondition that there's at least one server in a pool with auto_eject_hosts: true expected by ketama_dispatch

I think the following patch should help fix that (parts of it may conflict or depend on other patches, I haven't checked yet)

I'm still not 100% sure of correctness and the heartbeat notes are guesses - some other patches may be causing the behaviors I'm seeing

diff --git a/notes/heartbeat.md b/notes/heartbeat.md
new file mode 100644
index 0000000..95fdc48
--- /dev/null
+++ b/notes/heartbeat.md
@@ -0,0 +1,55 @@
+Heartbeat Patches
+=================
+
+These apply to all server groups that use ketama when they set `auto_eject_hosts: true`.
+These are unconditionally enabled.
+
+Server status
+-------------
+
+A server(`struct server`, a single connection to single server backend) can be in one of three states:
+
+1. `FAIL_STATUS_NORMAL`(0): The server is healthy
+2. `FAIL_STATUS_TRY_CONNECT`(1): The server has failed and twemproxy will asynchronously send a heartbeat later on.
+
+   All incoming and outgoing responses and requests are removed from the server before putting it into this state(`server_failure`).
+3. `FAIL_STATUS_TRY_HEARTBEAT`(2): The server is in the process of connecting and sending a heartbeat to determine if the server has recovered.
+
+This is different from the pre-heartbeat implementation, where the ability to reconnect was determined by whether `server->next_retry` had elapsed (based on the time when the server failed plus `server_retry_timeout`).
+In the pre-heartbeat implementation, servers would be reintroduced into the pool once that timeout had elapsed. See notes/recommendation.md for outdated documentation.
+
+
+The heartbeat is a memcache/redis get command which is marked as `swallow` so that nutcracker does not attempt to forward the response to a placeholder client (`send_heartbeat`).
+If the server sends a well-formed non-error response, the server's status goes back to `FAIL_STATUS_NORMAL` in `server_restore_from_heartbeat`
+
+Failed server lists
+-------------------
+
+See `retry_connection`, which is specific to the heartbeat patch.
+This is called in response to events or timeouts (the stats computation timeout interval is 30 seconds), even events such as requests/responses for a different pool.
+
+The heartbeat patch alternates between two variable-sized `array`s of failed servers to ensure that in any loop over servers, a failing server is processed at most once.
+Those will call `server_reconnect` to begin the process of connecting and sending a heartbeat to check if the server is healthy after the reconnect.
+
+- If/when reconnection or the heartbeat fails, it gets added to the `ctx->failed_servers` list at the opposite index to retry again.
+
+Bugs
+----
+
+When `auto_eject_hosts` is false, it seems like the heartbeat is sent redundantly and isn't sent ahead of time.
+
+- The heartbeat message gets sent more than once over the same connection if pipelined requests trigger the reconnect.
+- The heartbeat message gets sent after the requests get sent.
+
+(this is different from `auto_eject_hosts: true`, where heartbeats get triggered by attempts to reintroduce a host to the pool)
+
+If a pool is timing out or rejecting connections, new connection attempts triggered by proxied attempts will still be made and timeout. (possibly not a bug)
+
+It may be possible to improve that and immediately reject requests for hosts that are in a known bad state for multiple retries
+
+TODOs
+-----
+
+Add an additional timer for more consistent `server_retry_timeout` behavior on the main pool. Note that memcached requests and responses on any pool will also trigger an event leading to the heartbeat code attempting to reconnect to failed servers, so this only matters when there is a low volume of requests.
+The stats timeout is 30000 and currently also triggers automatic reconnect attempts as a side effect when there are no other events.
diff --git a/notes/recommendation.md b/notes/recommendation.md
index 1328929..d768331 100644
--- a/notes/recommendation.md
+++ b/notes/recommendation.md
@@ -34,6 +34,8 @@ Failures are a fact of life, especially when things are distributed. To be resil
 
 Enabling `auto_eject_hosts:` ensures that a dead server can be ejected out of the hash ring after `server_failure_limit:` consecutive failures have been encountered on that said server. A non-zero `server_retry_timeout:` ensures that we don't incorrectly mark a server as dead forever especially when the failures were really transient. The combination of `server_retry_timeout:` and `server_failure_limit:` controls the tradeoff between resiliency to permanent and transient failures.
 
+- **NOTE: The heartbeat patch changes this behavior from the upstream for pools configured with `auto_eject_hosts: true`.** See [heartbeat.md](./heartbeat.md)
+
 Note that an ejected server will not be included in the hash ring for any requests until the retry timeout passes. This will lead to data partitioning as keys originally on the ejected server will now be written to a server still in the pool.
 
 To ensure that requests always succeed in the face of server ejections (`auto_eject_hosts:` is enabled), some form of retry must be implemented at the client layer since nutcracker itself does not retry a request. This client-side retry count must be greater than `server_failure_limit:` value, which ensures that the original request has a chance to make it to a live server.
@@ -151,7 +153,7 @@ You can also graph the timestamp at which any given server was ejected by graphi
 
 ## server_connections: > 1
 
-By design, twemproxy multiplexes several client connections over few server connections. It is important to note that **"read my last write"** constraint doesn't necessarily hold true when twemproxy is configured with `server_connections: > 1`. 
+By design, twemproxy multiplexes several client connections over few server connections. It is important to note that **"read my last write"** constraint doesn't necessarily hold true when twemproxy is configured with `server_connections: > 1`.
 
 To illustrate this, consider a scenario where twemproxy is configured with `server_connections: 2`. If a client makes pipelined requests with the first request in pipeline being `set foo 0 0 3\r\nbar\r\n` (write) and the second request being `get foo\r\n` (read), the expectation is that the read of key `foo` would return the value `bar`. However, with configuration of two server connections it is possible that write and read request are sent on different server connections which would mean that their completion could race with one another. In summary, if the client expects "read my last write" constraint, you either configure twemproxy to use `server_connections:1` or use clients that only make synchronous requests to twemproxy.
 
diff --git a/src/hashkit/nc_ketama.c b/src/hashkit/nc_ketama.c
index d34f1b1..fea0ad5 100644
--- a/src/hashkit/nc_ketama.c
+++ b/src/hashkit/nc_ketama.c
@@ -100,7 +100,7 @@ ketama_update(struct server_pool *pool)
         ASSERT(server->weight > 0);
 
         /* count weight only for live servers */
-        if (!pool->auto_eject_hosts || server->fail == 0) {
+        if (!pool->auto_eject_hosts || server->fail == FAIL_STATUS_NORMAL) {
             total_weight += server->weight;
         }
     }
@@ -150,7 +150,7 @@ ketama_update(struct server_pool *pool)
 
         server = array_get(&pool->server, server_index);
 
-        if (pool->auto_eject_hosts && server->next_retry > now) {
+        if (pool->auto_eject_hosts && server->fail != FAIL_STATUS_NORMAL) {
             continue;
         }
 
@@ -190,6 +190,8 @@ ketama_update(struct server_pool *pool)
         pointer_counter += pointer_per_server;
     }
 
+    /* The continuum should have only been regenerated if there was at least one live server */
+    ASSERT(pointer_counter > 0);
     pool->ncontinuum = pointer_counter;
     qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
           ketama_item_cmp);
diff --git a/src/hashkit/nc_modula.c b/src/hashkit/nc_modula.c
index 7faa694..0939727 100644
--- a/src/hashkit/nc_modula.c
+++ b/src/hashkit/nc_modula.c
@@ -112,7 +112,7 @@ modula_update(struct server_pool *pool)
     for (server_index = 0; server_index < nserver; server_index++) {
         struct server *server = array_get(&pool->server, server_index);
 
-        if (pool->auto_eject_hosts && server->next_retry > now) {
+        if (pool->auto_eject_hosts && server->fail != FAIL_STATUS_NORMAL) {
             continue;
         }
 
diff --git a/src/hashkit/nc_random.c b/src/hashkit/nc_random.c
index 1afe3c0..200bbdc 100644
--- a/src/hashkit/nc_random.c
+++ b/src/hashkit/nc_random.c
@@ -104,7 +104,7 @@ random_update(struct server_pool *pool)
     for (server_index = 0; server_index < nserver; server_index++) {
         struct server *server = array_get(&pool->server, server_index);
 
-        if (pool->auto_eject_hosts && server->next_retry > now) {
+        if (pool->auto_eject_hosts && server->fail != FAIL_STATUS_NORMAL) {
             continue;
         }
 
diff --git a/src/nc_server.c b/src/nc_server.c
index bd74afe..1e93dda 100644
--- a/src/nc_server.c
+++ b/src/nc_server.c
@@ -356,12 +356,14 @@ server_failure(struct context *ctx, struct server *server)
 
     server->failure_count = 0;
 
+    /* BEFORE updating the ketama/modulo/random distribution to remove failed servers, mark the server as failed. */
+    add_failed_server(ctx, server);
+
     status = server_pool_run(pool);
     if (status != NC_OK) {
         log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx,
                   pool->name.len, pool->name.data, strerror(errno));
     }
-    add_failed_server(ctx, server);
 }
 
 static void
@@ -420,7 +422,7 @@ server_close(struct context *ctx, struct conn *conn)
     for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
         nmsg = TAILQ_NEXT(msg, s_tqe);
 
-        /* dequeue the message (request) from server inq */
+        /* dequeue the message (request) from server inq - it hasn't been sent yet */
         conn->dequeue_inq(ctx, conn, msg);
 
         /*
@@ -431,6 +433,16 @@ server_close(struct context *ctx, struct conn *conn)
         if (msg->swallow || msg->noreply) {
             log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
                       " type %d", conn->sd, msg->id, msg->mlen, msg->type);
+            /* Assumes that the server status is always set to FAIL_STATUS_ERR_TRY_HEARTBEAT AFTER the server is closed */
+            if (msg->swallow && ((struct server*)conn->owner)->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) {
+                c_conn = msg->owner;
+                ASSERT(c_conn->client);
+                ASSERT(!c_conn->proxy);
+                log_debug(LOG_INFO, "closing fake connection %d to %d for heartbeat, req %"PRIu64" len %"PRIu32
+                      " type %d", c_conn->sd, conn->sd, msg->id, msg->mlen, msg->type);
+                c_conn->unref(c_conn);
+                conn_put(c_conn);
+            }
             req_put(msg);
         } else {
             c_conn = msg->owner;
@@ -459,12 +471,21 @@ server_close(struct context *ctx, struct conn *conn)
     for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) {
         nmsg = TAILQ_NEXT(msg, s_tqe);
 
-        /* dequeue the message (request) from server outq */
+        /* dequeue the message (request) from server outq - it was sent and has an unprocessed response */
         conn->dequeue_outq(ctx, conn, msg);
 
         if (msg->swallow) {
             log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
                       " type %d", conn->sd, msg->id, msg->mlen, msg->type);
+            if (((struct server*)conn->owner)->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) {
+                c_conn = msg->owner;
+                ASSERT(c_conn->client);
+                ASSERT(!c_conn->proxy);
+                log_debug(LOG_INFO, "closing fake connection %d to %d for heartbeat, req %"PRIu64" len %"PRIu32
+                      " type %d", c_conn->sd, conn->sd, msg->id, msg->mlen, msg->type);
+                c_conn->unref(c_conn);
+                conn_put(c_conn);
+            }
             req_put(msg);
         } else {
             c_conn = msg->owner;
@@ -1076,7 +1097,7 @@ set_heartbeat_command(struct mbuf *mbuf, int redis)
     return n;
 }
 
-/* Send a heartbeat command to a backend server */
+/* Send a heartbeat command to a backend server. See notes/heartbeat.md. */
 static rstatus_t
 send_heartbeat(struct context *ctx, struct conn *conn, struct server *server)
 {
@@ -1182,7 +1203,7 @@ add_failed_server(struct context *ctx, struct server *server)
     *pserver = server;
 }
 
-/* Called when a response to a heartbeat command is received */
+/* Called when a response to a heartbeat command is received. See notes/heartbeat.md. */
 void
 server_restore_from_heartbeat(struct server *server, struct conn *conn)
 {
diff --git a/src/nc_server.h b/src/nc_server.h
index 3773bfe..e15d0c2 100644
--- a/src/nc_server.h
+++ b/src/nc_server.h
@@ -59,6 +59,7 @@
  *            //
  */
 
+/* See notes/heartbeat.md */
 #define     FAIL_STATUS_NORMAL              0
 #define     FAIL_STATUS_ERR_TRY_CONNECT     1
 #define     FAIL_STATUS_ERR_TRY_HEARTBEAT   2

}

/* skip over req that are in-error or done */

if (msg->error || msg->done) {
msg_tmo_delete(msg);
continue;
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intent of changing this from a continue to a break - It seems to work anyway, but if there are a large number of timeouts should this try to process those first for efficiency and close connections by continuing?

Should the return below also be a break;?

(it seems like in the typical case, msg is null or reprocessed because it's removed before it times out, so this doesn't matter too much)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems possibly concerning - what would happen if these still belonged to a connection (e.g. multiple heartbeats), and we retried a connection before processing some requests that had also errored on that connection.

I guess that break; may make it less likely to call retry_connection on the same server twice, though that should be a different check.

I see that retry_connection will call add_failed_server. Should add_failed_server iterate over the array to check if something already marked the server as failed (e.g. if multiple heartbeats were sent and got multiple errors, e.g. a server that was connected to but immediately resulted in a protocol error)

ctx->failed_idx = idx;
ctx->fails = &(ctx->failed_servers[idx]);

now = nc_usec_now();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can move this below the if (nsize == 0) to avoid the call to gettimeofday when no servers fail

return;
}

send_heartbeat(ctx, conn, server);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be sent both for FAIL_STATUS_ERR_TRY_HEARTBEAT and FAIL_STATUS_ERR_TRY_CONNECT - for the former, it will send redundant heartbeats even if a heartbeat is already being sent out?

void
add_failed_server(struct context *ctx, struct server *server)
{
struct server **pserver;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: The below comment can/should be ignored if this code were to be combined with a "failover" pool patch in the future to move failing hosts to a separate pool in combination with auto_eject_hosts: false

  • But even with that, if there's no failover pool, there wouldn't be much need to send a heartbeat if commands would be sent ordinarily

EDIT: never mind, the fork I'm using has a patch that changes the meaning of auto_eject_hosts slightly and is similar but not exactly the same as twitter/twemproxy.


The implementation or caller of add_failed_server should also check if auto_eject_hosts is true?

server_close() calls server_failure for connection failures such as timeouts

void
server_close(struct context *ctx, struct conn *conn)
{
    rstatus_t status;
    struct msg *msg, *nmsg; /* current and next message */
    struct conn *c_conn;    /* peer client connection */

    ASSERT(!conn->client && !conn->proxy);

    server_close_stats(ctx, conn->owner, conn->err, conn->eof,
                       conn->connected);

    conn->connected = false;

    if (conn->sd < 0) {
        server_failure(ctx, conn->owner);

@TysonAndre
Copy link
Collaborator

TysonAndre commented Apr 28, 2021

It might also be useful to assert that the client connection count is 0 after shutdown for debug builds (conn_ncurr_cconn())

EDIT: clean shutdown doesn't seem possible currently, the installed signal handler will exit immediately.

It may be useful to also report curr_client_connections (conn_ncurr_cconn) in stats , similar to conn_ncurr_conn (curr_connections)

is_reconnect = (server->fail != FAIL_STATUS_NORMAL) ? true : false;

if (is_reconnect) {
add_failed_server(ctx, server);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to remove server_failure_limit entirely (i.e. same as always 1)? Is it still needed with a heartbeat patch?

Also, I see that add_failed_server will set server->fail = FAIL_STATUS_ERR_TRY_HEARTBEAT and server->next_retry is also set.

This branch won't call server_pool_run to update the distribution(e.g. ketama), but if something unrelated updates the ketama distribution (server_pool_run) (e.g. a different memcache server in the same pool changes state), then the server would be ejected from the pool.


status = server_pool_run(pool);
if (status != NC_OK) {
log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx,
pool->name.len, pool->name.data, strerror(errno));
}
add_failed_server(ctx, server);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the server should be marked as failing before calling server_pool_run, because it sets server->fail and ketama_update(etc.) depend on server->fail to count unhealthy hosts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants