Skip to content

Commit

Permalink
Message add msguuid (#613)
Browse files Browse the repository at this point in the history
* add msg UUID for data flow messages

this will allow to trace end-to-end flow of individual messages
UUID is constructed with:
- timestamp: current loop time
- msg_seq
- abbreviated hash slug of payload

* add more crypto diagnostics

* reduce flushing noise
  • Loading branch information
ekoby committed Jan 30, 2024
1 parent 9297dce commit 548fcf4
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 19 deletions.
1 change: 1 addition & 0 deletions inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ struct ziti_conn {
char marker[MARKER_CHAR_LEN];

uint32_t edge_msg_seq;
uint32_t in_msg_seq;

ziti_channel_t *channel;
ziti_data_cb data_cb;
Expand Down
2 changes: 1 addition & 1 deletion library/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ static void process_dial(struct binding_s *b, message *msg) {

if (!peer_key_sent && conn->encrypted) {
ZITI_LOG(ERROR, "failed to establish crypto for encrypted service: did not receive peer key");
reject_dial_request(0, b->ch, msg->header.seq, "did not receive peer crypto key");
reject_dial_request(conn->conn_id, b->ch, msg->header.seq, "did not receive peer crypto key");
return;
}

Expand Down
120 changes: 103 additions & 17 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ static const char *conn_state_str[] = {
conn_states(state_str)
};

struct msg_uuid {
union {
uint8_t raw[16];
struct {
uint32_t slug;
uint32_t seq;
uint64_t ts;
};
};
};

struct local_hash {
union {
uint8_t hash[32];
uint32_t i32[8];
};
};

#define UUID_FMT "%08x:%08x:%llx"
#define UUID_FMT_ARG(u) ((u)->slug),((u)->seq),(long long)((u)->ts)
#define HASH_FMT "%08x:%08x:%08x:%08x:%08x:%08x:%08x:%08x"
#define HASH_FMT_ARG(lh) (lh).i32[0],(lh).i32[1],(lh).i32[2],(lh).i32[3], \
(lh).i32[4],(lh).i32[5],(lh).i32[6],(lh).i32[7]

struct ziti_conn_req {
ziti_session_type session_type;
Expand Down Expand Up @@ -241,6 +264,11 @@ void on_write_completed(struct ziti_conn *conn, struct ziti_write_req_s *req, in
message *create_message(struct ziti_conn *conn, uint32_t content, size_t body_len) {
int32_t conn_id = htole32(conn->conn_id);
int32_t msg_seq = htole32(conn->edge_msg_seq++);
struct msg_uuid uuid = {
.ts = uv_now(conn->ziti_ctx->loop),
.seq = msg_seq,
};
int hcount = (content == ContentTypeData) ? 3 : 2;
hdr_t headers[] = {
{
.header_id = ConnIdHeader,
Expand All @@ -251,13 +279,35 @@ message *create_message(struct ziti_conn *conn, uint32_t content, size_t body_le
.header_id = SeqHeader,
.length = sizeof(msg_seq),
.value = (uint8_t *) &msg_seq
},
{ // allocate space for UUID, it will be populated in send_message
.header_id = UUIDHeader,
.length = sizeof(uuid.raw),
.value = uuid.raw
}
};
return message_new(NULL, content, headers, 2, body_len);
return message_new(NULL, content, headers, hcount, body_len);
}

static int send_message(struct ziti_conn *conn, message *m, struct ziti_write_req_s *wr) {
ziti_channel_t *ch = conn->channel;
if (m->header.content == ContentTypeData) {
struct msg_uuid *uuid = NULL;
size_t len;
message_get_bytes_header(m, UUIDHeader, (uint8_t**)&uuid, &len);

if (uuid) {
assert(len == sizeof(*uuid));
struct local_hash h = {0};
crypto_hash_sha256(h.hash, m->body, m->header.body_len);
int32_t seq;
message_get_int32_header(m, SeqHeader, &seq);

uuid->slug = htole32(h.i32[0]);
CONN_LOG(TRACE, "=> ct[%0X] uuid[" UUID_FMT "] edge_seq[%d] len[%d] hash[" HASH_FMT "]",
m->header.content, UUID_FMT_ARG(uuid), seq, m->header.body_len, HASH_FMT_ARG(h));
}
}
return ziti_channel_send_message(ch, m, wr);
}

Expand Down Expand Up @@ -574,8 +624,8 @@ static void ziti_write_req(struct ziti_write_req_s *req) {
m = create_message(conn, ContentTypeData, total_len);

if (conn->encrypted) {
crypto_secretstream_xchacha20poly1305_push(&conn->crypt_o, m->body, NULL, req->buf, req->len, NULL, 0,
0);
crypto_secretstream_xchacha20poly1305_push(&conn->crypt_o, m->body, NULL,
req->buf, req->len, NULL, 0, 0);
} else {
memcpy(m->body, req->buf, req->len);
}
Expand All @@ -589,7 +639,7 @@ static void on_disconnect(ziti_connection conn, ssize_t status, void *ctx) {
conn_set_state(conn, conn->close ? Closed : Disconnected);
ziti_channel_t *ch = conn->channel;
if (ch) {
ziti_channel_rem_receiver(ch, conn->conn_id);
ziti_channel_rem_receiver(ch, (int)conn->conn_id);
conn->channel = NULL;
}
}
Expand Down Expand Up @@ -711,7 +761,7 @@ static void on_flush(uv_idle_t *fl) {
}

static void flush_connection(ziti_connection conn) {
if (conn->flusher) {
if (conn->flusher && !uv_is_active((const uv_handle_t *) conn->flusher)) {
CONN_LOG(TRACE, "starting flusher");
uv_idle_start(conn->flusher, on_flush);
}
Expand All @@ -729,14 +779,16 @@ static bool flush_to_service(ziti_connection conn) {
struct ziti_write_req_s *req = TAILQ_FIRST(&conn->wreqs);
TAILQ_REMOVE(&conn->wreqs, req, _next);

if (conn->state == Connected) {
if (conn->state == Connected || req->close) {
if (req->conn) {
TAILQ_INSERT_TAIL(&conn->pending_wreqs, req, _next);
}
ziti_write_req(req);
count++;
} else {
CONN_LOG(DEBUG, "got write req in invalid state[%s]", conn_state_str[conn->state]);
CONN_LOG(DEBUG, "got write msg{ct[%0X]} in invalid state[%s]",
req->message ? req->message->header.content : -1,
conn_state_str[conn->state]);

if (req->cb) {
req->cb(conn, ZITI_INVALID_STATE, req->ctx);
Expand Down Expand Up @@ -815,11 +867,31 @@ void conn_inbound_data_msg(ziti_connection conn, message *msg) {
unsigned char tag;
if (msg->header.body_len > 0) {
plain_text = malloc(msg->header.body_len - crypto_secretstream_xchacha20poly1305_ABYTES);
assert(plain_text != NULL);
CONN_LOG(VERBOSE, "decrypting %d bytes", msg->header.body_len);
TRY(crypto, crypto_secretstream_xchacha20poly1305_pull(&conn->crypt_i,
plain_text, &plain_len, &tag,
msg->body, msg->header.body_len, NULL, 0));
CONN_LOG(VERBOSE, "decrypted %lld bytes", plain_len);
int crypto_rc = crypto_secretstream_xchacha20poly1305_pull(&conn->crypt_i,
plain_text, &plain_len, &tag,
msg->body, msg->header.body_len, NULL, 0);
if (crypto_rc != 0) {
// try to figure out the cause of crypto error
struct msg_uuid *uuid;
size_t uuid_len;
struct local_hash h;
crypto_hash_sha256(h.hash, msg->body, msg->header.body_len);

if (message_get_bytes_header(msg, UUIDHeader, (uint8_t **)&uuid, &uuid_len)) {
CONN_LOG(ERROR, "uuid[" UUID_FMT "] %s corruption hash[" HASH_FMT "]",
UUID_FMT_ARG(uuid),
uuid->slug != htole32(h.i32[0]) ? "payload" : "crypto state",
HASH_FMT_ARG(h));
} else {
CONN_LOG(ERROR, "message/state corruption hash[" HASH_FMT "]",
HASH_FMT_ARG(h));
}

TRY(crypto, crypto_rc);
}
CONN_LOG(VERBOSE, "decrypted %lld bytes tag[%x]", plain_len, (int)tag);
buffer_append(conn->inbound, plain_text, plain_len);
metrics_rate_update(&conn->ziti_ctx->down_rate, (int64_t) plain_len);
conn->received += plain_len;
Expand Down Expand Up @@ -1133,7 +1205,6 @@ int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_write_cb

static int send_fin_message(ziti_connection conn, struct ziti_write_req_s *wr) {
CONN_LOG(DEBUG, "sending FIN");
ziti_channel_t *ch = conn->channel;
int32_t conn_id = htole32(conn->conn_id);
int32_t msg_seq = htole32(conn->edge_msg_seq++);
int32_t flags = htole32(EDGE_FIN);
Expand All @@ -1155,7 +1226,7 @@ static int send_fin_message(ziti_connection conn, struct ziti_write_req_s *wr) {
},
};
message *m = message_new(NULL, ContentTypeData, headers, 3, 0);
return ziti_channel_send_message(ch, m, wr);
return send_message(conn, m, wr);
}

int ziti_close(ziti_connection conn, ziti_close_cb close_cb) {
Expand Down Expand Up @@ -1249,7 +1320,7 @@ static void queue_edge_message(struct ziti_conn *conn, message *msg, int code) {
conn->conn_id, conn->marker, conn->service, BOOL_STR(conn->close), BOOL_STR(conn->encrypted),
BOOL_STR(conn->fin_recv), BOOL_STR(conn->fin_sent));
message *reply = new_inspect_result(msg->header.seq, conn->conn_id, ConnTypeDial, conn_info, ci_len);
ziti_channel_send_message(conn->channel, reply, NULL);
send_message(conn, reply, NULL);
pool_return_obj(msg);
return;
}
Expand All @@ -1259,18 +1330,33 @@ static void queue_edge_message(struct ziti_conn *conn, message *msg, int code) {
}

static void process_edge_message(struct ziti_conn *conn, message *msg) {

int rc;
int32_t seq;
int32_t conn_id;
struct msg_uuid *uuid;
size_t uuid_len;
bool has_seq = message_get_int32_header(msg, SeqHeader, &seq);
bool has_conn_id = message_get_int32_header(msg, ConnIdHeader, &conn_id);
assert(has_conn_id && conn_id == conn->conn_id);

if (message_get_bytes_header(msg, UUIDHeader, (uint8_t **) &uuid, &uuid_len)) {
struct local_hash h;
crypto_hash_sha256(h.hash, msg->body, msg->header.body_len);
CONN_LOG(TRACE, "<= ct[%04X] uuid[" UUID_FMT "] edge_seq[%d] len[%d] ",
msg->header.content, UUID_FMT_ARG(uuid), seq, msg->header.body_len);

if (uuid->seq != conn->in_msg_seq + 1) {
CONN_LOG(WARN, "unexpected msg_seq[%d] previous[%d]", uuid->seq, conn->in_msg_seq);
}
conn->in_msg_seq = uuid->seq;
} else {
CONN_LOG(TRACE, "<= ct[%04X] edge_seq[%d] len[%d]", msg->header.content, seq, msg->header.body_len);
}

CONN_LOG(TRACE, "<= ct[%04X] edge_seq[%d] body[%d]", msg->header.content, seq, msg->header.body_len);

switch (msg->header.content) {
case ContentTypeStateClosed:
CONN_LOG(DEBUG, "connection status[%X] conn_id[%d] seq[%d] err[%.*s]", msg->header.content, conn_id, seq,
CONN_LOG(DEBUG, "connection status[%X] seq[%d] err[%.*s]", msg->header.content, seq,
msg->header.body_len, msg->body);
bool retry_connect = false;

Expand Down
12 changes: 11 additions & 1 deletion library/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ bool message_get_uint64_header(message *m, int header_id, uint64_t *v) {
}

bool message_get_bytes_header(message *m, int header_id, uint8_t **v, size_t *len) {
*v = NULL;
*len = 0;

hdr_t *h = find_header(m, header_id);
if (h != NULL) {
*len = h->length;
Expand Down Expand Up @@ -219,11 +222,18 @@ message *message_new(pool_t *pool, uint32_t content, const hdr_t *hdrs, int nhdr
// write header
header_to_buffer(&m->header, m->msgbufp);

// write headers
// write/populate headers
m->hdrs = calloc(nhdrs, sizeof(hdr_t));
m->nhdrs = nhdrs;
m->headers = m->msgbufp + HEADER_SIZE;
m->body = m->headers + m->header.headers_len;
uint8_t *p = m->headers;
for (int i = 0; i < nhdrs; i++) {
m->hdrs[i] = (hdr_t){
.header_id = hdrs[i].header_id,
.length = hdrs[i].length,
.value = p + 2 * sizeof(uint32_t),
};
p = write_hdr(&hdrs[i], p);
}

Expand Down

0 comments on commit 548fcf4

Please sign in to comment.