Skip to content

Commit

Permalink
Merge pull request #803 from openziti/avoid-bind-crypto-errors
Browse files Browse the repository at this point in the history
Avoid bind crypto errors
  • Loading branch information
ekoby authored Jan 8, 2025
2 parents 9a0c55e + 06c1248 commit 47cd62d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 77 deletions.
2 changes: 1 addition & 1 deletion inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ struct ziti_conn {
ziti_client_cb client_cb;

bool srv_routers_api_missing;
ziti_edge_router_array routers;
model_list routers;
char *token;
ziti_session *session;
model_map bindings;
Expand Down
162 changes: 86 additions & 76 deletions library/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,24 @@

#define DEFAULT_MAX_BINDINGS 3
#define REBIND_DELAY 1000
#define REFRESH_DELAY (60 * 5 * 1000)

#define CONN_LOG(lvl, fmt, ...) \
ZITI_LOG(lvl, "server[%u.%u](%s) " fmt, \
conn->ziti_ctx->id, conn->conn_id, conn->service, ##__VA_ARGS__)

enum bind_state {
st_unbound,
st_binding,
st_bound,
st_unbinding,
};

struct binding_s {
struct ziti_conn *conn;
uint32_t conn_id;
ziti_channel_t *ch;
struct key_pair key_pair;
bool bound;
enum bind_state state;
struct waiter_s *waiter;
};

Expand All @@ -50,11 +56,11 @@ static void get_service_cb(ziti_context, const ziti_service *service, int status

static int dispose(ziti_connection server);

static void start_binding(struct binding_s *b, ziti_channel_t *ch);
static int start_binding(struct binding_s *b, ziti_channel_t *ch);

static void stop_binding(struct binding_s *b);

static void schedule_rebind(struct ziti_conn *conn, bool now);
static void schedule_rebind(struct ziti_conn *conn);

static void session_cb(ziti_session *session, const ziti_error *err, void *ctx);

Expand Down Expand Up @@ -110,21 +116,25 @@ static struct binding_s* new_binding(struct ziti_conn *conn) {
NEWP(b, struct binding_s);
b->conn_id = conn->conn_id;
b->conn = conn;
b->state = st_unbound;
init_key_pair(&b->key_pair);
return b;
}

void process_bindings(struct ziti_conn *conn) {
// return number of active(bound or binding) bindings
int process_bindings(struct ziti_conn *conn) {
if (conn->server.token == NULL) {
return;
return 0;
}

int active = 0;
struct ziti_ctx *ztx = conn->ziti_ctx;

size_t target = MIN(conn->server.max_bindings, model_map_size(&ztx->channels));
size_t target = MIN(conn->server.max_bindings,
model_list_size(&conn->server.routers));

for(int idx = 0; conn->server.routers && conn->server.routers[idx]; idx++) {
ziti_edge_router *er = conn->server.routers[idx];
ziti_edge_router *er;
MODEL_LIST_FOREACH(er, conn->server.routers) {
CONN_LOG(DEBUG, "checking router[%s]", er->name);
ziti_channel_t *ch = ztx_get_channel(ztx, er);
if (ch == NULL || !ziti_channel_is_connected(ch)) {
Expand All @@ -133,64 +143,61 @@ void process_bindings(struct ziti_conn *conn) {
}

struct binding_s *b = model_map_get(&conn->server.bindings, er->name);
if (b != NULL) {
if (b->bound) {
target--;
} else {
start_binding(b, ch);
target--;
}
} else {
if (b == NULL) {
b = new_binding(conn);
model_map_set(&conn->server.bindings, er->name, b);
start_binding(b, ch);
target--;
}

if (target <= 0) break;
active += start_binding(b, ch);
if (active >= target) break;
}
return active;
}

void update_bindings(ziti_connection conn) {
if (conn->type != Server) return;

int target = MIN(conn->server.max_bindings, model_map_size(&conn->ziti_ctx->channels));
if (target > model_map_size(&conn->server.bindings)) {
process_bindings(conn);
int target = MIN(conn->server.max_bindings,
model_list_size(&conn->server.routers));

int active = 0;
const char *n;
struct binding_s *b;
MODEL_MAP_FOREACH(n, b, &conn->server.bindings) {
if (b->state == st_bound || b->state == st_binding) active++;
}

if (target > active) {
active = process_bindings(conn);
}

CONN_LOG(DEBUG, "bindings: active[%d] target[%d]", active, target);
// if we're still below target we may need to refresh service.routers
if (target > model_map_size(&conn->server.bindings)) {
schedule_rebind(conn, true);
if (target > active) {
schedule_rebind(conn);
} else {
// target bindings achieved, reset backoff
conn->server.attempt = 0;
uv_timer_stop(conn->server.timer);
}
}

static void schedule_rebind(struct ziti_conn *conn, bool now) {
static void schedule_rebind(struct ziti_conn *conn) {
if (!ziti_is_enabled(conn->ziti_ctx)) {
uv_timer_stop(conn->server.timer);
return;
}

uint64_t delay = REFRESH_DELAY;

if (now) {
int backoff = 1 << MIN(conn->server.attempt, 5);
uint32_t random;
uv_random(conn->ziti_ctx->loop, NULL, &random, sizeof(random), 0, NULL);
delay = (uint64_t) (random % (backoff * REBIND_DELAY));
conn->server.attempt++;
CONN_LOG(DEBUG, "scheduling re-bind(attempt=%d) in %" PRIu64 ".%" PRIu64 "s",
conn->server.attempt, delay / 1000, delay % 1000);

} else {
conn->server.attempt = 0;
CONN_LOG(DEBUG, "scheduling re-bind in %" PRIu64 ".%" PRIu64 "s", delay / 1000, delay % 1000);
}
int backoff = 1 << MIN(conn->server.attempt, 5);
uint32_t random;
uv_random(conn->ziti_ctx->loop, NULL, &random, sizeof(random), 0, NULL);
uint64_t delay = (uint64_t) (random % (backoff * REBIND_DELAY));
conn->server.attempt++;
CONN_LOG(DEBUG, "scheduling re-bind(attempt=%d) in %" PRIu64 ".%" PRIu64 "s",
conn->server.attempt, delay / 1000, delay % 1000);

uv_timer_start(conn->server.timer, rebind_delay_cb, delay, 0);
}


static void session_cb(ziti_session *session, const ziti_error *err, void *ctx) {
struct ziti_conn *conn = ctx;
int e = err ? (int)err->err : ZITI_OK;
Expand All @@ -204,13 +211,10 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)
conn->server.session = session;

if (conn->server.srv_routers_api_missing) {
free_ziti_edge_router_array(&conn->server.routers);
conn->server.routers = calloc(model_list_size(&session->edge_routers) + 1,
sizeof(ziti_edge_router*));
model_list_clear(&conn->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);
ziti_edge_router *er;
int idx = 0;
MODEL_LIST_FOREACH(er, session->edge_routers) {
conn->server.routers[idx++] = er;
model_list_append(&conn->server.routers, er);
}
model_list_clear(&session->edge_routers, NULL);
}
Expand All @@ -235,7 +239,7 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)
FREE(conn->server.token);
free_ziti_session_ptr(conn->server.session);
conn->server.session = NULL;
schedule_rebind(conn, true);
schedule_rebind(conn);
} else {
// here if we could not create Bind session
notify_status(conn, ZITI_SERVICE_UNAVAILABLE);
Expand All @@ -244,7 +248,7 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)

default:
CONN_LOG(WARN, "failed to get session for service[%s]: %d/%s", conn->service, (int)err->err, err->code);
schedule_rebind(conn, true);
schedule_rebind(conn);
}
}

Expand All @@ -260,12 +264,12 @@ static void list_routers_cb(ziti_service_routers *srv_routers, const ziti_error
}

if (srv_routers) {
free_ziti_edge_router_array(&conn->server.routers);
conn->server.routers = srv_routers->routers;
model_list_clear(&conn->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);

ziti_edge_router *er;
FOR(er, srv_routers->routers) {
CONN_LOG(DEBUG, "%s/%s", er->name, er->protocols.tls);
model_list_append(&conn->server.routers, er);
}
}
free(srv_routers);
Expand All @@ -286,7 +290,7 @@ static void get_service_cb(ziti_context ztx, const ziti_service *service, int st

if (status != ZITI_OK) {
CONN_LOG(WARN, "failed to get service[%s] details, scheduling re-try", conn->service);
schedule_rebind(conn, true);
schedule_rebind(conn);
return;
}

Expand Down Expand Up @@ -349,9 +353,9 @@ static int dispose(ziti_connection server) {
model_map_iter it = model_map_iterator(&server->server.bindings);
while(it) {
struct binding_s *b = model_map_it_value(it);
if (!b->bound && b->waiter == NULL) {
if (b->state == st_unbound) {
it = model_map_it_remove(it);
free(b);
free_binding(b);
} else {
it = model_map_it_next(it);
}
Expand All @@ -375,7 +379,7 @@ static int dispose(ziti_connection server) {

FREE(server->server.token);
free_ziti_session_ptr(server->server.session);
free_ziti_edge_router_array(&server->server.routers);
model_list_clear(&server->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);
free(server->service);
free(server);
return 1;
Expand Down Expand Up @@ -457,14 +461,13 @@ static void on_message(struct binding_s *b, message *msg, int code) {
struct ziti_conn *conn = b->conn;
if (code != ZITI_OK) {
ZITI_LOG(WARN, "binding failed: %d/%s", code, ziti_errorstr(code));
b->bound = false;
b->ch = NULL;
stop_binding(b);
if (code == ZITI_DISABLED) {
uv_timer_stop(conn->server.timer);
notify_status(conn, code);
} else {
schedule_rebind(conn, true);
schedule_rebind(conn);
}
} else {
ZITI_LOG(DEBUG, "received msg ct[%x] code[%d] from %s", msg->header.content, code, b->ch->name);
Expand All @@ -473,7 +476,7 @@ static void on_message(struct binding_s *b, message *msg, int code) {
CONN_LOG(DEBUG, "binding[%s] was closed: %.*s", b->ch->url, msg->header.body_len, msg->body);
FREE(conn->server.token);
stop_binding(b);
schedule_rebind(conn, true);
schedule_rebind(conn);
break;
case ContentTypeDial:
process_dial(b, msg);
Expand All @@ -499,21 +502,35 @@ static void bind_reply_cb(void *ctx, message *msg, int code) {
CONN_LOG(DEBUG, "bound successfully on router[%s]", b->ch->name);
ziti_channel_add_receiver(b->ch, conn->conn_id, b,
(void (*)(void *, message *, int)) on_message);
b->bound = true;
b->state = st_bound;
} else {
CONN_LOG(DEBUG, "failed to bind on router[%s]", b->ch->name);
b->bound = false;
ziti_channel_rem_receiver(b->ch, conn->conn_id);
b->ch = NULL;
b->state = st_unbound;
}
}

void start_binding(struct binding_s *b, ziti_channel_t *ch) {
int start_binding(struct binding_s *b, ziti_channel_t *ch) {
switch(b->state) {
case st_unbound:
break;

case st_binding: // already active
case st_bound:
return 1;

case st_unbinding: // let it complete unbind
return 0;
}

struct ziti_conn *conn = b->conn;
char *token = conn->server.token;
CONN_LOG(DEBUG, "requesting BIND on ch[%s]", ch->name);
CONN_LOG(TRACE, "ch[%d] => Edge Bind request token[%s]", ch->id, token);

b->ch = ch;
b->state = st_binding;

uint8_t true_val = 1;
int32_t conn_id = htole32(conn->conn_id);
Expand Down Expand Up @@ -553,14 +570,15 @@ void start_binding(struct binding_s *b, ziti_channel_t *ch) {
headers, nheaders,
token, strlen(token), bind_reply_cb,
b);
return 1;
}

void on_unbind(void *ctx, message *m, int code) {
struct binding_s *b = ctx;
b->waiter = NULL;

if (m) {
ZITI_LOG(TRACE, "binding[%d.%s] unbind resp: ct[%X] %.*s", b->conn_id,
ZITI_LOG(DEBUG, "binding[%d.%s] unbind resp: ct[%X] %.*s", b->conn_id,
b->ch->name, m->header.content, m->header.body_len, m->body);
int32_t conn_id = htole32(b->conn_id);
hdr_t headers[] = {
Expand All @@ -569,26 +587,17 @@ void on_unbind(void *ctx, message *m, int code) {
message *close_msg = message_new(NULL, ContentTypeStateClosed, headers, 1, 0);
ziti_channel_send_message(b->ch, close_msg, NULL);
} else {
ZITI_LOG(TRACE, "binding[%d.%s] failed to receive unbind response because channel was disconnected: %d/%s",
ZITI_LOG(DEBUG, "binding[%d.%s] failed to receive unbind response because channel was disconnected: %d/%s",
b->conn_id, b->ch->name, code, ziti_errorstr(code));
}
ziti_channel_rem_receiver(b->ch, b->conn_id);
b->bound = false;
b->state = st_unbound;
b->ch = NULL;
free_binding(b);
}

static void stop_binding(struct binding_s *b) {
struct ziti_conn *conn = b->conn;

const char *n;
struct binding_s *bind;
MODEL_MAP_FOREACH(n, bind, &conn->server.bindings) {
if (b == bind) {
model_map_remove(&conn->server.bindings, n);
}
}

// stop accepting incoming requests
ziti_channel_rem_receiver(b->ch, b->conn_id);
ziti_channel_remove_waiter(b->ch, b->waiter);
Expand All @@ -597,10 +606,12 @@ static void stop_binding(struct binding_s *b) {
// no need to send unbind message
if (b->ch == NULL || !ziti_channel_is_connected(b->ch) || token == NULL) {
b->ch = NULL;
free_binding(b);
b->state = st_unbound;
return;
}

CONN_LOG(DEBUG, "requesting UNBIND on ch[%s]", b->ch->name);
b->state = st_unbinding;
int32_t conn_id = htole32(b->conn_id);
hdr_t headers[] = {
var_header(ConnIdHeader, conn_id),
Expand All @@ -610,7 +621,6 @@ static void stop_binding(struct binding_s *b) {
headers, 2,
token, strlen(token),
on_unbind, b);
b->bound = false;
}

int ziti_close_server(struct ziti_conn *conn) {
Expand Down
Loading

0 comments on commit 47cd62d

Please sign in to comment.