Skip to content

Commit

Permalink
refresh ziti_session.edge_routers (#620)
Browse files Browse the repository at this point in the history
Opportunistically refresh ziti_session after ERs changes are detected
Re-work/simplify "first-channel-to-connect" logic: let ziti context mediate pending connections
* mark session for update if list of current ERs changes
* parse session.service_id from json
* fix json parsing to match exact field name
* cleanup conn_req.session
* remove obsolete handling of pending connects in ziti_channel
  • Loading branch information
ekoby committed Feb 13, 2024
1 parent 63856dd commit 4271e3e
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 214 deletions.
3 changes: 2 additions & 1 deletion inc_internal/internal_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ XX(protocols, ziti_er_protocols, none, supportedProtocols, __VA_ARGS__)
XX(token, string, none, token, __VA_ARGS__)\
XX(id, string, none, id, __VA_ARGS__) \
XX(edge_routers, ziti_edge_router, list, edgeRouters, __VA_ARGS__) \
XX(service_id, string, none, NULL, __VA_ARGS__)
XX(service_id, string, none, serviceId, __VA_ARGS__) \
XX(refresh, bool, none, , __VA_ARGS__)

#define ZITI_PROCESS_MODEL(XX, ...) \
XX(path, string, none, path, __VA_ARGS__)
Expand Down
17 changes: 12 additions & 5 deletions inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,7 +102,6 @@ typedef struct ziti_channel {
ch_state state;
uint32_t reconnect_count;

LIST_HEAD(conn_reqs, ch_conn_req) conn_reqs;
uint32_t msg_seq;

buffer *incoming;
Expand Down Expand Up @@ -298,6 +297,10 @@ struct ziti_ctx {
// map<id,ziti_conn>
model_map connections;

// map<conn_id,conn_id> -- connections waiting for a suitable channel
// map to make removal easier
model_map waiting_connections;

uint32_t conn_seq;

/* context wide metrics */
Expand All @@ -322,8 +325,7 @@ extern "C" {

bool ziti_is_session_valid(ziti_context ztx, ziti_session *session, const char *service_id, ziti_session_type type);

void
ziti_invalidate_session(ziti_context ztx, ziti_session *session, const char *service_id, ziti_session_type type);
void ziti_invalidate_session(ziti_context ztx, const char *service_id, ziti_session_type type);

void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx);

Expand All @@ -335,7 +337,9 @@ bool ziti_channel_is_connected(ziti_channel_t *ch);

uint64_t ziti_channel_latency(ziti_channel_t *ch);

int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx);
int ziti_channel_force_connect(ziti_channel_t *ch);

int ziti_channel_connect(ziti_context ztx, const char *name, const char *url);

int ziti_channel_prepare(ziti_channel_t *ch);

Expand Down Expand Up @@ -401,6 +405,9 @@ extern uv_timer_t *new_ztx_timer(ziti_context ztx);

int conn_bridge_info(ziti_connection conn, char *buf, size_t buflen);

void process_connect(struct ziti_conn *conn, ziti_session *session);


#ifdef __cplusplus
}
#endif
Expand Down
104 changes: 42 additions & 62 deletions library/channel.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
Expand Down Expand Up @@ -85,7 +85,7 @@ static void process_inbound(ziti_channel_t *ch);
static void on_tls_close(uv_handle_t *s);

static inline void close_connection(ziti_channel_t *ch) {
if (ch->connection) {
if (ch->connection && ch->connection->close_cb == NULL) {
tlsuv_stream_t *conn = ch->connection;
CH_LOG(DEBUG, "closing TLS[%p]", conn);
tlsuv_stream_close(conn, on_tls_close);
Expand All @@ -95,23 +95,12 @@ static inline void close_connection(ziti_channel_t *ch) {
// global channel sequence
static uint32_t channel_counter = 0;

struct ch_write_req {
uv_buf_t buf;
ziti_channel_t *ch;
};

struct waiter_s {
uint32_t seq;
reply_cb cb;
void *reply_ctx;
};

struct ch_conn_req {
ch_connect_cb cb;
void *ctx;
LIST_ENTRY(ch_conn_req) next;
};

struct msg_receiver {
int id;
void *receiver;
Expand Down Expand Up @@ -160,7 +149,6 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
snprintf(ch->token, sizeof(ch->token), "ziti-sdk-c[%d]@%*.*s", ch->id, (int) hostlen, (int) hostlen, hostname);

ch->state = Initial;
LIST_INIT(&ch->conn_reqs);

ch->name = NULL;
ch->in_next = NULL;
Expand All @@ -181,6 +169,10 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
}

void ziti_channel_free(ziti_channel_t *ch) {
if (ch->connection) {
ch->connection->data = NULL;
ch->connection = NULL;
}
free_buffer(ch->incoming);
pool_destroy(ch->in_msg_pool);
ch->in_msg_pool = NULL;
Expand Down Expand Up @@ -212,14 +204,17 @@ int ziti_close_channels(struct ziti_ctx *ztx, int err) {
static void on_tls_close(uv_handle_t *s) {
tlsuv_stream_t *tls = (tlsuv_stream_t *) s;
ziti_channel_t *ch = tls->data;
ch->connection = NULL;

if (ch) {
ch->connection = NULL;
if (ch->reconnect) {
ch->reconnect = false;
reconnect_channel(ch, true);
}
}

tlsuv_stream_free(tls);
free(tls);

if (ch->reconnect) {
reconnect_channel(ch, true);
}
}

int ziti_channel_close(ziti_channel_t *ch, int err) {
Expand Down Expand Up @@ -309,7 +304,23 @@ static void check_connecting_state(ziti_channel_t *ch) {
}
}

int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch_connect_cb cb, void *cb_ctx) {
int ziti_channel_force_connect(ziti_channel_t *ch) {
if (ch == NULL) {
return ZITI_INVALID_STATE;
}

if (ch->state == Closed) {
return ZITI_GATEWAY_UNAVAILABLE;
}

if (ch->state == Disconnected) {
reconnect_channel(ch, true);
}

return ZITI_OK;
}

int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url) {
ziti_channel_t *ch = model_map_get(&ztx->channels, url);

if (ch != NULL) {
Expand All @@ -324,29 +335,6 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,
check_connecting_state(ch);
}

switch (ch->state) {
case Connected:
if (cb) {
cb(ch, cb_ctx, ZITI_OK);
}
break;

case Initial:
case Connecting:
case Disconnected:
if (cb != NULL) {
NEWP(r, struct ch_conn_req);
r->cb = cb;
r->ctx = cb_ctx;
LIST_INSERT_HEAD(&ch->conn_reqs, r, next);
}

break;
default:
CH_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF));
return ZITI_WTF;
}

if (ch->state == Initial || ch->state == Disconnected) {
reconnect_channel(ch, true);
}
Expand Down Expand Up @@ -695,13 +683,6 @@ static void hello_reply_cb(void *ctx, message *msg, int err) {
close_connection(ch);
reconnect_channel(ch, false);
}

while (!LIST_EMPTY(&ch->conn_reqs)) {
struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs);
LIST_REMOVE(r, next);
r->cb(ch, r->ctx, cb_code);
free(r);
}
}

static void send_hello(ziti_channel_t *ch, ziti_api_session *session) {
Expand Down Expand Up @@ -778,6 +759,12 @@ static void reconnect_channel(ziti_channel_t *ch, bool now) {

uint64_t timeout = 0;
if (!now) {
if (uv_is_active((const uv_handle_t *) ch->timer) &&
ch->timer->timer_cb == reconnect_cb) {
// reconnect is already scheduled
return;
}

ch->reconnect_count++;
int backoff = MIN(ch->reconnect_count, MAX_BACKOFF);

Expand All @@ -786,8 +773,7 @@ static void reconnect_channel(ziti_channel_t *ch, bool now) {

timeout = random % ((1U << backoff) * BACKOFF_TIME);
CH_LOG(INFO, "reconnecting in %" PRIu64 "ms (attempt = %d)", timeout, ch->reconnect_count);
}
else {
} else {
CH_LOG(INFO, "reconnecting NOW");
}
uv_timer_start(ch->timer, reconnect_cb, timeout, 0);
Expand Down Expand Up @@ -894,7 +880,8 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
}

static void on_channel_connect_internal(uv_connect_t *req, int status) {
ziti_channel_t *ch = req->data;
tlsuv_stream_t *tls = (tlsuv_stream_t *)req->handle;
ziti_channel_t *ch = tls->data;

if (status == 0) {
if (ch->ctx->api_session != NULL && ch->ctx->api_session->token != NULL) {
Expand All @@ -908,16 +895,9 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
close_connection(ch);
reconnect_channel(ch, false);
}
} else {
} else if (ch != NULL) {
CH_LOG(ERROR, "failed to connect to ER[%s] [%d/%s]", ch->name, status, uv_strerror(status));

while (!LIST_EMPTY(&ch->conn_reqs)) {
struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs);
LIST_REMOVE(r, next);
r->cb(ch, r->ctx, status);
free(r);
}

if (status != UV_ECANCELED) {
close_connection(ch);
}
Expand Down

0 comments on commit 4271e3e

Please sign in to comment.