Skip to content

Commit

Permalink
HA posture: format and send posture protobuf to edge routers
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoby committed Jan 23, 2025
1 parent 23c40a2 commit fcd0868
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 24 deletions.
2 changes: 2 additions & 0 deletions inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ int ziti_channel_force_connect(ziti_channel_t *ch);

int ziti_channel_update_token(ziti_channel_t *ch);

int ziti_channel_update_posture(ziti_channel_t *ch, const uint8_t *data, size_t len);

int ziti_channel_connect(ziti_context ztx, const char *name, const char *url);

int ziti_channel_prepare(ziti_channel_t *ch);
Expand Down
18 changes: 18 additions & 0 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,3 +977,21 @@ static const char *get_timeout_cb(ziti_channel_t *ch) {

return "unknown";
}

static void on_posture_update_reply(void *ctx, message *m, int status) {
ziti_channel_t *ch = ctx;
if (status != ZITI_OK) {
CH_LOG(ERROR, "failed to update posture: %d[%s]", status, ziti_errorstr(status));
} else {
CH_LOG(INFO, "received ContentType[%04x]", m->header.content);
}
}

int ziti_channel_update_posture(ziti_channel_t *ch, const uint8_t *data, size_t len) {
if (ch->state == Connected) {
ziti_channel_send(ch, ContentTypePostureResponse, NULL, 0, data, len, NULL);
return ZITI_OK;
}

return ZITI_GATEWAY_UNAVAILABLE;
}
212 changes: 190 additions & 22 deletions library/posture.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const bool IS_NOT_ERRORED = false;
#define PC_OS_TYPE ziti_posture_query_types.name(ziti_posture_query_types.PC_OS)
#define PC_MAC_TYPE ziti_posture_query_types.name(ziti_posture_query_types.PC_MAC)

#define s_strdup(s) (s ? strdup(s) : NULL)

struct query_info {
ziti_service *service;
ziti_posture_query_set *query_set;
Expand Down Expand Up @@ -119,6 +121,10 @@ static int hash_sha512(ziti_context ztx, uv_loop_t *loop, const char *path, unsi

static bool check_running(uv_loop_t *loop, const char *path);

static void send_posture_legacy(ziti_context ztx, model_list *send_prs);

static void send_posture_ha(ziti_context ztx, model_list *send_prs);

static void ziti_pr_free_pr_info(pr_info *info) {
FREE(info->id);
free_ziti_pr(info->obj);
Expand Down Expand Up @@ -580,51 +586,213 @@ static void ziti_pr_send_bulk(ziti_context ztx) {
return;
}

size_t body_len = 0;
char *body;

bool send = false;
__attribute__((unused)) const char *key;
const char *key;
pr_info *info;
model_list send_prs = {};
MODEL_MAP_FOREACH(key, info, &checks->responses) {
ZTX_LOG(VERBOSE, "%s posture response [%s], pending = %s",
info->should_send ? "sending" : "not sending",
info->id,
info->pending ? "true" : "false");
if (info->should_send) {
send = true;
break;
model_list_append(&send_prs, info);
}
}

if (!send) {
ZTX_LOG(VERBOSE, "no change in posture data, not sending");
return; //nothing to send
}
if (model_list_size(&send_prs) > 0) {
ZTX_LOG(DEBUG, "sending posture responses [%zd]", model_list_size(&send_prs));

model_list json_list = {};
MODEL_MAP_FOREACH(key, info, &checks->responses) {
if (info->should_send) {
char *json = ziti_pr_to_json(info->obj);
model_list_append(&json_list, json);
ZTX_LOG(VERBOSE, "sending posture response [%s], should_send = true: %s", info->id, json);
info->should_send = false;
if (ztx->ctrl.is_ha) {
send_posture_ha(ztx, &send_prs);
} else {
ZTX_LOG(VERBOSE, "not sending posture response [%s], should_send = false, pending = %s",
info->id, info->pending ? "true" : "false");
send_posture_legacy(ztx, &send_prs);
}
} else {
ZTX_LOG(VERBOSE, "no change in posture data, not sending");
}
model_list_clear(&send_prs, NULL);
}

ZTX_LOG(DEBUG, "sending posture responses [%zd]", model_list_size(&json_list));
static void send_posture_legacy(ziti_context ztx, model_list *send_prs) {
model_list json_list = {};
pr_info *info;
MODEL_LIST_FOREACH(info, *send_prs) {
char *json = ziti_pr_to_json(info->obj);
model_list_append(&json_list, json);
info->should_send = false;
}

string_buf_t buf;
string_buf_init(&buf);
model_list_fmt_to_json(&buf, &json_list, get_json_meta(), 0, 0);
model_list_clear(&json_list, free);

body = string_buf_to_string(&buf, &body_len);
size_t body_len;
char *body = string_buf_to_string(&buf, &body_len);
ZTX_LOG(TRACE, "bulk posture response: %s", body);

ziti_pr_post_bulk(ztx_get_controller(ztx), body, body_len, ziti_pr_post_bulk_cb, ztx);
string_buf_free(&buf);
}

static void send_posture_ha(ziti_context ztx, model_list *send_prs) {
pr_info *info;
Ziti__EdgeClient__Pb__PostureResponse *pr_resp;
model_list process_list = {};
model_list pb_list = {};
MODEL_LIST_FOREACH(info, *send_prs) {
switch (info->obj->typeId) {
case ziti_posture_query_type_Unknown:
break;
case ziti_posture_query_type_PC_Domain: {
ziti_pr_domain_req *req = (ziti_pr_domain_req *) info->obj;

pr_resp = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse));
ziti__edge_client__pb__posture_response__init(pr_resp);
Ziti__EdgeClient__Pb__PostureResponse__Domain * d = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse__Domain));
ziti__edge_client__pb__posture_response__domain__init(d);
d->name = s_strdup(req->domain);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_DOMAIN;
pr_resp->domain = d;
model_list_append(&pb_list, pr_resp);
break;
}
case ziti_posture_query_type_PC_OS: {
ziti_pr_os_req *req = (ziti_pr_os_req *) info->obj;
Ziti__EdgeClient__Pb__PostureResponse__OperatingSystem *os =
calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse__OperatingSystem));
ziti__edge_client__pb__posture_response__operating_system__init(os);
os->type = s_strdup(req->type);
os->version = s_strdup(req->version);
os->build = s_strdup(req->build);

pr_resp = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse));
ziti__edge_client__pb__posture_response__init(pr_resp);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_OS;
pr_resp->os = os;
model_list_append(&pb_list, pr_resp);
break;
}
case ziti_posture_query_type_PC_Process: {
ziti_pr_process_req *req = (ziti_pr_process_req *) info->obj;
Ziti__EdgeClient__Pb__PostureResponse__Process *proc =
calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse__Process));
ziti__edge_client__pb__posture_response__process__init(proc);
proc->path = s_strdup(req->path);
proc->hash = s_strdup(req->hash);
proc->isrunning = req->is_running;
model_list_append(&process_list, proc);
break;
}
case ziti_posture_query_type_PC_Process_Multi:
break;
case ziti_posture_query_type_PC_MAC: {
ziti_pr_mac_req *req = (ziti_pr_mac_req *) info->obj;
Ziti__EdgeClient__Pb__PostureResponse__Macs *mac =
calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse__Macs));
ziti__edge_client__pb__posture_response__macs__init(mac);
mac->n_addresses = model_list_size(&req->mac_addresses);
mac->addresses = calloc(mac->n_addresses, sizeof(mac->addresses[0]));
int i = 0;
const char* addr;
MODEL_LIST_FOREACH(addr, req->mac_addresses) {
mac->addresses[i++] = s_strdup(addr);
}

pr_resp = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse));
ziti__edge_client__pb__posture_response__init(pr_resp);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_MACS;
pr_resp->macs = mac;
model_list_append(&pb_list, pr_resp);
break;
}
case ziti_posture_query_type_PC_MFA:
break;
case ziti_posture_query_type_PC_Endpoint_State: {
ziti_pr_endpoint_state_req *req = (ziti_pr_endpoint_state_req *) info->obj;
uv_timeval64_t now;
uv_gettimeofday(&now);

if (req->unlocked) {
Ziti__EdgeClient__Pb__PostureResponse__Unlocked *unlocked = calloc(1, sizeof(*unlocked));
ziti__edge_client__pb__posture_response__unlocked__init(unlocked);
unlocked->time = calloc(1, sizeof(*unlocked->time));
google__protobuf__timestamp__init(unlocked->time);
unlocked->time->seconds = now.tv_sec;

pr_resp = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse));
ziti__edge_client__pb__posture_response__init(pr_resp);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_UNLOCKED;
pr_resp->unlocked = unlocked;
model_list_append(&pb_list, pr_resp);
}

if (req->woken) {
Ziti__EdgeClient__Pb__PostureResponse__Woken *woken = calloc(1, sizeof(*woken));
ziti__edge_client__pb__posture_response__woken__init(woken);
woken->time = calloc(1, sizeof(*woken->time));
google__protobuf__timestamp__init(woken->time);
woken->time->seconds = now.tv_sec;

pr_resp = calloc(1, sizeof(Ziti__EdgeClient__Pb__PostureResponse));
ziti__edge_client__pb__posture_response__init(pr_resp);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_WOKEN;
pr_resp->woken = woken;
model_list_append(&pb_list, pr_resp);
}

break;
}
}
}

if (model_list_size(&process_list) > 0) {
pr_resp = calloc(1, sizeof(*pr_resp));
ziti__edge_client__pb__posture_response__init(pr_resp);
Ziti__EdgeClient__Pb__PostureResponse__ProcessList *pl = calloc(1, sizeof(*pl));
ziti__edge_client__pb__posture_response__process_list__init(pl);
pl->n_processes = model_list_size(&process_list);
pl->processes = calloc(pl->n_processes, sizeof(pl->processes[0]));
int i = 0;
Ziti__EdgeClient__Pb__PostureResponse__Process *proc;
MODEL_LIST_FOREACH(proc, process_list) {
pl->processes[i++] = proc;
}
model_list_clear(&process_list, NULL);
pr_resp->type_case = ZITI__EDGE_CLIENT__PB__POSTURE_RESPONSE__TYPE_PROCESS_LIST;
pr_resp->processlist = pl;
model_list_append(&pb_list, pr_resp);
}

if (model_list_size(&pb_list) > 0) {
Ziti__EdgeClient__Pb__PostureResponses *resp = calloc(1, sizeof(*resp));
ziti__edge_client__pb__posture_responses__init(resp);

resp->n_responses = model_list_size(&pb_list);
resp->responses = calloc(resp->n_responses, sizeof(resp->responses[0]));

int idx = 0;
MODEL_LIST_FOREACH(pr_resp, pb_list) {
resp->responses[idx++] = pr_resp;
}
model_list_clear(&pb_list, NULL);

uint8_t pad[128];
ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad);
ziti__edge_client__pb__posture_responses__pack_to_buffer(resp, (ProtobufCBuffer *) &buffer);
ziti__edge_client__pb__posture_responses__free_unpacked(resp, NULL);
ZTX_LOG(VERBOSE, "posture protobuf size = %zd", buffer.len);

ziti_channel_t *ch;
const char *n;
MODEL_MAP_FOREACH(n, ch, &ztx->channels) {
ziti_channel_update_posture(ch, buffer.data, buffer.len);
}

PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer);
}
}

static void ziti_pr_handle_mac(ziti_context ztx, const char *id, char **mac_addresses, int num_mac) {
ziti_pr_mac_req *mac_req = alloc_ziti_pr_mac_req();
*mac_req = (ziti_pr_mac_req){
Expand Down
4 changes: 3 additions & 1 deletion library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,9 @@ void ziti_dump(ziti_context ztx, int (*printer)(void *arg, const char *fmt, ...)

for (int idx = 0; pq_set->posture_queries[idx] != NULL; idx++) {
pq = pq_set->posture_queries[idx];
printer(ctx, "\t\t\tquery_id[%s] type[%s] is_passing[%s] timeout[%d] timeoutRemaining[%d]\n", pq->id, pq->query_type, pq->is_passing ? "true" : "false", pq->timeout, *pq->timeoutRemaining);
printer(ctx, "\t\t\tquery_id[%s] type[%s] is_passing[%s] timeout[%d] timeoutRemaining[%d]\n",
pq->id, ziti_posture_query_types.name(pq->query_type),
pq->is_passing ? "true" : "false", pq->timeout, *pq->timeoutRemaining);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion programs/ziti-prox-c/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ static void on_ziti_event(ziti_context ztx, const ziti_event_t *event) {
}
} else if (event->ctx.ctrl_status == ZITI_DISABLED) {
ZITI_LOG(INFO, "ziti is shutdown");
uv_close((uv_handle_t *) &shutdown_timer, NULL);
if (shutdown_timer.type == UV_TIMER && !uv_is_closing((const uv_handle_t *) &shutdown_timer))
uv_close((uv_handle_t *) &shutdown_timer, NULL);
} else {
ZITI_LOG(ERROR, "controller is not available: %s/%s", ziti_errorstr(event->ctx.ctrl_status),
event->ctx.err);
Expand Down

0 comments on commit fcd0868

Please sign in to comment.