Skip to content

Commit

Permalink
let ziti sdk hold UDP payloads that arrive before ziti_dial completes
Browse files Browse the repository at this point in the history
  • Loading branch information
scareything committed Aug 23, 2024
1 parent 7a67e6a commit b9c539f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 60 deletions.
67 changes: 14 additions & 53 deletions lib/ziti-tunnel/tunnel_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,7 @@ static void to_ziti(struct io_ctx_s *io, struct pbuf *p) {
return;
}

struct pbuf *recv_data = NULL;
if (io->tnlr_io->udp.queued != NULL) {
if (p != NULL) {
pbuf_cat(io->tnlr_io->udp.queued, p);
}
recv_data = io->tnlr_io->udp.queued;
io->tnlr_io->udp.queued = NULL;
} else {
recv_data = p;
}

struct pbuf *recv_data = p;
if (recv_data == NULL) {
TNL_LOG(TRACE, "no data to write");
return;
Expand All @@ -64,7 +54,7 @@ static void to_ziti(struct io_ctx_s *io, struct pbuf *p) {
io->tnlr_io->client, io->tnlr_io->intercepted, io->tnlr_io->service_name);
struct write_ctx_s *wr_ctx = calloc(1, sizeof(struct write_ctx_s));
wr_ctx->pbuf = recv_data;
wr_ctx->udp = io->tnlr_io->udp.pcb;
wr_ctx->udp = io->tnlr_io->udp;
wr_ctx->ack = tunneler_udp_ack;

recv_data = recv_data->next;
Expand All @@ -73,7 +63,8 @@ static void to_ziti(struct io_ctx_s *io, struct pbuf *p) {
if (s == ERR_WOULDBLOCK) {
tunneler_udp_ack(wr_ctx);
free(wr_ctx);
TNL_LOG(DEBUG, "ziti_write stalled: dropping UDP packet service=%s, client=%s, ret=%ld", io->tnlr_io->service_name, io->tnlr_io->client, s);
TNL_LOG(WARN, "ziti_write stalled: dropping UDP packet service=%s, client=%s, ret=%ld", io->tnlr_io->service_name, io->tnlr_io->client, s);
break;
} else if (s < 0) {
tunneler_udp_ack(wr_ctx);
free(wr_ctx);
Expand All @@ -84,27 +75,6 @@ static void to_ziti(struct io_ctx_s *io, struct pbuf *p) {
} while (recv_data != NULL);
}

/** called by lwip when a packet arrives from a connected client and the ziti service is not yet connected */
void on_udp_client_data_enqueue(void *io_context, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port) {
if (io_context == NULL) {
TNL_LOG(DEBUG, "null io_context");
return;
}
struct io_ctx_s *io_ctx = io_context;
tunneler_io_context tnlr_io_ctx = io_ctx->tnlr_io;
if (tnlr_io_ctx == NULL) {
TNL_LOG(INFO, "null tnlr_io_context");
return;
}
if (tnlr_io_ctx->udp.queued == NULL) {
tnlr_io_ctx->udp.queued = p;
} else {
pbuf_chain(tnlr_io_ctx->udp.queued, p);
}
TNL_LOG(VERBOSE, "queued %d bytes src[%s] dst[%s] service[%s]", tnlr_io_ctx->udp.queued->len,
tnlr_io_ctx->client, tnlr_io_ctx->intercepted, tnlr_io_ctx->service_name);
}

/** called by lwip when a packet arrives from a connected client and the ziti service is connected */
void on_udp_client_data(void *io_context, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port) {
if (io_context == NULL) {
Expand All @@ -113,6 +83,13 @@ void on_udp_client_data(void *io_context, struct udp_pcb *pcb, struct pbuf *p, c
}
TNL_LOG(VERBOSE, "%d bytes from %s:%d", p->len, ipaddr_ntoa(addr), port);

struct io_ctx_s *io = io_context;
if (!io->tnlr_io->conn_timer) {
io->tnlr_io->conn_timer = calloc(1, sizeof(uv_timer_t));
io->tnlr_io->conn_timer->data = io;
uv_timer_init(io->tnlr_io->tnlr_ctx->loop, io->tnlr_io->conn_timer);
}

to_ziti(io_context, p);
}

Expand All @@ -126,26 +103,11 @@ int tunneler_udp_close(struct udp_pcb *pcb) {
TNL_LOG(DEBUG, "closing src[%s] dst[%s] service[%s]",
tnlr_io_ctx->client, tnlr_io_ctx->intercepted, tnlr_io_ctx->service_name);
udp_remove(pcb);
if (tnlr_io_ctx->udp.queued != NULL) {
pbuf_free(tnlr_io_ctx->udp.queued);
tnlr_io_ctx->udp.queued = NULL;
}
return 0;
}

void tunneler_udp_dial_completed(struct io_ctx_s *io, bool ok) {
struct udp_pcb *pcb = io->tnlr_io->udp.pcb;
/* change recv callback to send packets that arrive instead of queuing */
udp_recv(pcb, on_udp_client_data, io);

/* send any data that was queued while waiting for the dial to complete */
if (ok) {
io->tnlr_io->conn_timer = calloc(1, sizeof(uv_timer_t));
io->tnlr_io->conn_timer->data = io;
uv_timer_init(io->tnlr_io->tnlr_ctx->loop, io->tnlr_io->conn_timer);

to_ziti(io, NULL);
} else {
if (!ok) {
ziti_tunneler_close(io->tnlr_io);
}
}
Expand Down Expand Up @@ -259,8 +221,7 @@ u8_t recv_udp(void *tnlr_ctx_arg, struct raw_pcb *pcb, struct pbuf *p, const ip_
io->tnlr_io->service_name = strdup(intercept_ctx->service_name);
snprintf(io->tnlr_io->client, sizeof(io->tnlr_io->client), "udp:%s:%d", src_str, src_p);
snprintf(io->tnlr_io->intercepted, sizeof(io->tnlr_io->intercepted), "udp:%s:%d", dst_str, dst_p);
io->tnlr_io->udp.pcb = npcb;
io->tnlr_io->udp.queued = NULL;
io->tnlr_io->udp = npcb;
io->ziti_ctx = intercept_ctx->app_intercept_ctx;
io->write_fn = intercept_ctx->write_fn ? intercept_ctx->write_fn : tnlr_ctx->opts.ziti_write;
io->close_fn = intercept_ctx->close_fn ? intercept_ctx->close_fn : tnlr_ctx->opts.ziti_close;
Expand All @@ -269,7 +230,7 @@ u8_t recv_udp(void *tnlr_ctx_arg, struct raw_pcb *pcb, struct pbuf *p, const ip_
TNL_LOG(DEBUG, "intercepted address[%s] client[%s] service[%s]", io->tnlr_io->intercepted, io->tnlr_io->client,
intercept_ctx->service_name);

udp_recv(npcb, on_udp_client_data_enqueue, io);
udp_recv(npcb, on_udp_client_data, io);

void *ziti_io_ctx = zdial(intercept_ctx->app_intercept_ctx, io);
if (ziti_io_ctx == NULL) {
Expand Down
6 changes: 3 additions & 3 deletions lib/ziti-tunnel/ziti_tunnel.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ ssize_t ziti_tunneler_write(tunneler_io_context tnlr_io_ctx, const void *data, s
r = tunneler_tcp_write(tnlr_io_ctx->tcp, data, len);
break;
case tun_udp:
r = tunneler_udp_write(tnlr_io_ctx->udp.pcb, data, len);
r = tunneler_udp_write(tnlr_io_ctx->udp, data, len);
break;
}

Expand All @@ -439,8 +439,8 @@ int ziti_tunneler_close(tunneler_io_context tnlr_io_ctx) {
tnlr_io_ctx->tcp = NULL;
break;
case tun_udp:
tunneler_udp_close(tnlr_io_ctx->udp.pcb);
tnlr_io_ctx->udp.pcb = NULL;
tunneler_udp_close(tnlr_io_ctx->udp);
tnlr_io_ctx->udp = NULL;
break;
default:
TNL_LOG(ERR, "unknown proto %d", tnlr_io_ctx->proto);
Expand Down
5 changes: 1 addition & 4 deletions lib/ziti-tunnel/ziti_tunnel_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,7 @@ struct tunneler_io_ctx_s {
tunneler_proto_type proto;
union {
struct tcp_pcb *tcp;
struct {
struct udp_pcb *pcb;
struct pbuf *queued;
} udp;
struct udp_pcb *udp;
};
uv_timer_t *conn_timer;
uint32_t idle_timeout;
Expand Down

0 comments on commit b9c539f

Please sign in to comment.