Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid bind crypto errors #803

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ struct ziti_conn {
ziti_client_cb client_cb;

bool srv_routers_api_missing;
ziti_edge_router_array routers;
model_list routers;
char *token;
ziti_session *session;
model_map bindings;
Expand Down
162 changes: 86 additions & 76 deletions library/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,24 @@

#define DEFAULT_MAX_BINDINGS 3
#define REBIND_DELAY 1000
#define REFRESH_DELAY (60 * 5 * 1000)

#define CONN_LOG(lvl, fmt, ...) \
ZITI_LOG(lvl, "server[%u.%u](%s) " fmt, \
conn->ziti_ctx->id, conn->conn_id, conn->service, ##__VA_ARGS__)

enum bind_state {
st_unbound,
st_binding,
st_bound,
st_unbinding,
};

struct binding_s {
struct ziti_conn *conn;
uint32_t conn_id;
ziti_channel_t *ch;
struct key_pair key_pair;
bool bound;
enum bind_state state;
struct waiter_s *waiter;
};

Expand All @@ -50,11 +56,11 @@ static void get_service_cb(ziti_context, const ziti_service *service, int status

static int dispose(ziti_connection server);

static void start_binding(struct binding_s *b, ziti_channel_t *ch);
static int start_binding(struct binding_s *b, ziti_channel_t *ch);

static void stop_binding(struct binding_s *b);

static void schedule_rebind(struct ziti_conn *conn, bool now);
static void schedule_rebind(struct ziti_conn *conn);

static void session_cb(ziti_session *session, const ziti_error *err, void *ctx);

Expand Down Expand Up @@ -110,21 +116,25 @@ static struct binding_s* new_binding(struct ziti_conn *conn) {
NEWP(b, struct binding_s);
b->conn_id = conn->conn_id;
b->conn = conn;
b->state = st_unbound;
init_key_pair(&b->key_pair);
return b;
}

void process_bindings(struct ziti_conn *conn) {
// return number of active(bound or binding) bindings
int process_bindings(struct ziti_conn *conn) {
if (conn->server.token == NULL) {
return;
return 0;
}

int active = 0;
struct ziti_ctx *ztx = conn->ziti_ctx;

size_t target = MIN(conn->server.max_bindings, model_map_size(&ztx->channels));
size_t target = MIN(conn->server.max_bindings,
model_list_size(&conn->server.routers));

for(int idx = 0; conn->server.routers && conn->server.routers[idx]; idx++) {
ziti_edge_router *er = conn->server.routers[idx];
ziti_edge_router *er;
MODEL_LIST_FOREACH(er, conn->server.routers) {
ekoby marked this conversation as resolved.
Show resolved Hide resolved
CONN_LOG(DEBUG, "checking router[%s]", er->name);
ziti_channel_t *ch = ztx_get_channel(ztx, er);
if (ch == NULL || !ziti_channel_is_connected(ch)) {
Expand All @@ -133,64 +143,61 @@ void process_bindings(struct ziti_conn *conn) {
}

struct binding_s *b = model_map_get(&conn->server.bindings, er->name);
if (b != NULL) {
if (b->bound) {
target--;
} else {
start_binding(b, ch);
target--;
}
} else {
if (b == NULL) {
b = new_binding(conn);
model_map_set(&conn->server.bindings, er->name, b);
start_binding(b, ch);
target--;
}

if (target <= 0) break;
active += start_binding(b, ch);
if (active >= target) break;
}
return active;
}

void update_bindings(ziti_connection conn) {
if (conn->type != Server) return;

int target = MIN(conn->server.max_bindings, model_map_size(&conn->ziti_ctx->channels));
if (target > model_map_size(&conn->server.bindings)) {
process_bindings(conn);
int target = MIN(conn->server.max_bindings,
model_list_size(&conn->server.routers));

int active = 0;
const char *n;
struct binding_s *b;
MODEL_MAP_FOREACH(n, b, &conn->server.bindings) {
if (b->state == st_bound || b->state == st_binding) active++;
}

if (target > active) {
active = process_bindings(conn);
}

CONN_LOG(DEBUG, "bindings: active[%d] target[%d]", active, target);
// if we're still below target we may need to refresh service.routers
if (target > model_map_size(&conn->server.bindings)) {
schedule_rebind(conn, true);
if (target > active) {
schedule_rebind(conn);
} else {
// target bindings achieved, reset backoff
conn->server.attempt = 0;
uv_timer_stop(conn->server.timer);
}
}

static void schedule_rebind(struct ziti_conn *conn, bool now) {
static void schedule_rebind(struct ziti_conn *conn) {
if (!ziti_is_enabled(conn->ziti_ctx)) {
uv_timer_stop(conn->server.timer);
return;
}

uint64_t delay = REFRESH_DELAY;

if (now) {
int backoff = 1 << MIN(conn->server.attempt, 5);
uint32_t random;
uv_random(conn->ziti_ctx->loop, NULL, &random, sizeof(random), 0, NULL);
delay = (uint64_t) (random % (backoff * REBIND_DELAY));
conn->server.attempt++;
CONN_LOG(DEBUG, "scheduling re-bind(attempt=%d) in %" PRIu64 ".%" PRIu64 "s",
conn->server.attempt, delay / 1000, delay % 1000);

} else {
conn->server.attempt = 0;
CONN_LOG(DEBUG, "scheduling re-bind in %" PRIu64 ".%" PRIu64 "s", delay / 1000, delay % 1000);
}
int backoff = 1 << MIN(conn->server.attempt, 5);
uint32_t random;
uv_random(conn->ziti_ctx->loop, NULL, &random, sizeof(random), 0, NULL);
uint64_t delay = (uint64_t) (random % (backoff * REBIND_DELAY));
conn->server.attempt++;
CONN_LOG(DEBUG, "scheduling re-bind(attempt=%d) in %" PRIu64 ".%" PRIu64 "s",
conn->server.attempt, delay / 1000, delay % 1000);

uv_timer_start(conn->server.timer, rebind_delay_cb, delay, 0);
}


static void session_cb(ziti_session *session, const ziti_error *err, void *ctx) {
struct ziti_conn *conn = ctx;
int e = err ? (int)err->err : ZITI_OK;
Expand All @@ -204,13 +211,10 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)
conn->server.session = session;

if (conn->server.srv_routers_api_missing) {
free_ziti_edge_router_array(&conn->server.routers);
conn->server.routers = calloc(model_list_size(&session->edge_routers) + 1,
sizeof(ziti_edge_router*));
model_list_clear(&conn->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);
ziti_edge_router *er;
int idx = 0;
MODEL_LIST_FOREACH(er, session->edge_routers) {
conn->server.routers[idx++] = er;
model_list_append(&conn->server.routers, er);
}
model_list_clear(&session->edge_routers, NULL);
}
Expand All @@ -235,7 +239,7 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)
FREE(conn->server.token);
free_ziti_session_ptr(conn->server.session);
conn->server.session = NULL;
schedule_rebind(conn, true);
schedule_rebind(conn);
} else {
// here if we could not create Bind session
notify_status(conn, ZITI_SERVICE_UNAVAILABLE);
Expand All @@ -244,7 +248,7 @@ static void session_cb(ziti_session *session, const ziti_error *err, void *ctx)

default:
CONN_LOG(WARN, "failed to get session for service[%s]: %d/%s", conn->service, (int)err->err, err->code);
schedule_rebind(conn, true);
schedule_rebind(conn);
}
}

Expand All @@ -260,12 +264,12 @@ static void list_routers_cb(ziti_service_routers *srv_routers, const ziti_error
}

if (srv_routers) {
free_ziti_edge_router_array(&conn->server.routers);
conn->server.routers = srv_routers->routers;
model_list_clear(&conn->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);

ziti_edge_router *er;
FOR(er, srv_routers->routers) {
CONN_LOG(DEBUG, "%s/%s", er->name, er->protocols.tls);
model_list_append(&conn->server.routers, er);
}
}
free(srv_routers);
Expand All @@ -286,7 +290,7 @@ static void get_service_cb(ziti_context ztx, const ziti_service *service, int st

if (status != ZITI_OK) {
CONN_LOG(WARN, "failed to get service[%s] details, scheduling re-try", conn->service);
schedule_rebind(conn, true);
schedule_rebind(conn);
return;
}

Expand Down Expand Up @@ -349,9 +353,9 @@ static int dispose(ziti_connection server) {
model_map_iter it = model_map_iterator(&server->server.bindings);
while(it) {
struct binding_s *b = model_map_it_value(it);
if (!b->bound && b->waiter == NULL) {
if (b->state == st_unbound) {
it = model_map_it_remove(it);
free(b);
free_binding(b);
} else {
it = model_map_it_next(it);
}
Expand All @@ -375,7 +379,7 @@ static int dispose(ziti_connection server) {

FREE(server->server.token);
free_ziti_session_ptr(server->server.session);
free_ziti_edge_router_array(&server->server.routers);
model_list_clear(&server->server.routers, (void (*)(void *)) free_ziti_edge_router_ptr);
free(server->service);
free(server);
return 1;
Expand Down Expand Up @@ -457,14 +461,13 @@ static void on_message(struct binding_s *b, message *msg, int code) {
struct ziti_conn *conn = b->conn;
if (code != ZITI_OK) {
ZITI_LOG(WARN, "binding failed: %d/%s", code, ziti_errorstr(code));
b->bound = false;
b->ch = NULL;
stop_binding(b);
if (code == ZITI_DISABLED) {
uv_timer_stop(conn->server.timer);
notify_status(conn, code);
} else {
schedule_rebind(conn, true);
schedule_rebind(conn);
}
} else {
ZITI_LOG(DEBUG, "received msg ct[%x] code[%d] from %s", msg->header.content, code, b->ch->name);
Expand All @@ -473,7 +476,7 @@ static void on_message(struct binding_s *b, message *msg, int code) {
CONN_LOG(DEBUG, "binding[%s] was closed: %.*s", b->ch->url, msg->header.body_len, msg->body);
FREE(conn->server.token);
stop_binding(b);
schedule_rebind(conn, true);
schedule_rebind(conn);
break;
case ContentTypeDial:
process_dial(b, msg);
Expand All @@ -499,21 +502,35 @@ static void bind_reply_cb(void *ctx, message *msg, int code) {
CONN_LOG(DEBUG, "bound successfully on router[%s]", b->ch->name);
ziti_channel_add_receiver(b->ch, conn->conn_id, b,
(void (*)(void *, message *, int)) on_message);
b->bound = true;
b->state = st_bound;
} else {
CONN_LOG(DEBUG, "failed to bind on router[%s]", b->ch->name);
b->bound = false;
ziti_channel_rem_receiver(b->ch, conn->conn_id);
b->ch = NULL;
b->state = st_unbound;
}
}

void start_binding(struct binding_s *b, ziti_channel_t *ch) {
int start_binding(struct binding_s *b, ziti_channel_t *ch) {
switch(b->state) {
case st_unbound:
break;

case st_binding: // already active
case st_bound:
return 1;

case st_unbinding: // let it complete unbind
return 0;
}

struct ziti_conn *conn = b->conn;
char *token = conn->server.token;
CONN_LOG(DEBUG, "requesting BIND on ch[%s]", ch->name);
CONN_LOG(TRACE, "ch[%d] => Edge Bind request token[%s]", ch->id, token);

b->ch = ch;
b->state = st_binding;

uint8_t true_val = 1;
int32_t conn_id = htole32(conn->conn_id);
Expand Down Expand Up @@ -553,14 +570,15 @@ void start_binding(struct binding_s *b, ziti_channel_t *ch) {
headers, nheaders,
token, strlen(token), bind_reply_cb,
b);
return 1;
}

void on_unbind(void *ctx, message *m, int code) {
struct binding_s *b = ctx;
b->waiter = NULL;

if (m) {
ZITI_LOG(TRACE, "binding[%d.%s] unbind resp: ct[%X] %.*s", b->conn_id,
ZITI_LOG(DEBUG, "binding[%d.%s] unbind resp: ct[%X] %.*s", b->conn_id,
b->ch->name, m->header.content, m->header.body_len, m->body);
int32_t conn_id = htole32(b->conn_id);
hdr_t headers[] = {
Expand All @@ -569,26 +587,17 @@ void on_unbind(void *ctx, message *m, int code) {
message *close_msg = message_new(NULL, ContentTypeStateClosed, headers, 1, 0);
ziti_channel_send_message(b->ch, close_msg, NULL);
} else {
ZITI_LOG(TRACE, "binding[%d.%s] failed to receive unbind response because channel was disconnected: %d/%s",
ZITI_LOG(DEBUG, "binding[%d.%s] failed to receive unbind response because channel was disconnected: %d/%s",
b->conn_id, b->ch->name, code, ziti_errorstr(code));
}
ziti_channel_rem_receiver(b->ch, b->conn_id);
b->bound = false;
b->state = st_unbound;
b->ch = NULL;
free_binding(b);
}

static void stop_binding(struct binding_s *b) {
struct ziti_conn *conn = b->conn;

const char *n;
struct binding_s *bind;
MODEL_MAP_FOREACH(n, bind, &conn->server.bindings) {
if (b == bind) {
model_map_remove(&conn->server.bindings, n);
}
}

// stop accepting incoming requests
ziti_channel_rem_receiver(b->ch, b->conn_id);
ziti_channel_remove_waiter(b->ch, b->waiter);
Expand All @@ -597,10 +606,12 @@ static void stop_binding(struct binding_s *b) {
// no need to send unbind message
if (b->ch == NULL || !ziti_channel_is_connected(b->ch) || token == NULL) {
b->ch = NULL;
free_binding(b);
b->state = st_unbound;
return;
}

CONN_LOG(DEBUG, "requesting UNBIND on ch[%s]", b->ch->name);
b->state = st_unbinding;
int32_t conn_id = htole32(b->conn_id);
hdr_t headers[] = {
var_header(ConnIdHeader, conn_id),
Expand All @@ -610,7 +621,6 @@ static void stop_binding(struct binding_s *b) {
headers, 2,
token, strlen(token),
on_unbind, b);
b->bound = false;
}

int ziti_close_server(struct ziti_conn *conn) {
Expand Down
Loading
Loading