From bd23e5b9f1aa390819be0059d7ab7dcd4c3a2e82 Mon Sep 17 00:00:00 2001 From: Shad Ansari Date: Fri, 6 Dec 2019 21:48:09 +0000 Subject: [PATCH] Fix scaling issues --- enbsim/cell_config.cpp | 14 ++------- enbsim/cell_config.h | 2 +- enbsim/context.cpp | 17 +++++++++- enbsim/context.h | 7 +++-- enbsim/dispatch.cpp | 26 +++++---------- enbsim/dispatch.h | 2 +- enbsim/enbsim.cpp | 36 ++++++++++++--------- enbsim/ue.cpp | 48 +++------------------------- xranc-sb-api-src/Makefile | 2 +- xranc/cell_config.cpp | 4 +-- xranc/client.cpp | 15 +++------ xranc/client.h | 2 ++ xranc/dispatch.cpp | 11 ++++--- xranc/dispatch.h | 2 +- xranc/server.cpp | 66 ++++++++++----------------------------- 15 files changed, 91 insertions(+), 163 deletions(-) diff --git a/enbsim/cell_config.cpp b/enbsim/cell_config.cpp index f5e04ab..0597634 100644 --- a/enbsim/cell_config.cpp +++ b/enbsim/cell_config.cpp @@ -41,7 +41,7 @@ static void make_ecgi(ECGI_t *dest, int enb_index) { dest->eUTRANcellIdentifier.size = 4; } -int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, context_t *context) { +void cell_config_request(XRANCPDU *req, context_t *context) { // TODO XRANCPDU *resp; struct Cell cell; @@ -86,17 +86,9 @@ int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, contex resp->body.choice.cellConfigReport.max_num_ues_sched_per_tti_ul = 10; resp->body.choice.cellConfigReport.dlfs_sched_enable = true; - //xer_fprint(stdout, &asn_DEF_XRANCPDU, resp); - - asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, resp, resp_buf, resp_buf_size); - if(er.encoded > resp_buf_size) { - fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n", - resp_buf_size, asn_DEF_XRANCPDU.name, er.encoded); - } - - ASN_STRUCT_FREE(asn_DEF_XRANCPDU, resp); + ctx_send(resp, context); log_debug("-> CCResp enodeb:{}", context->enb_index); - return er.encoded; + ASN_STRUCT_FREE(asn_DEF_XRANCPDU, resp); } diff --git a/enbsim/cell_config.h b/enbsim/cell_config.h index 2267e76..828320f 100644 --- a/enbsim/cell_config.h +++ b/enbsim/cell_config.h @@ -20,5 +20,5 @@ #include #include "context.h" -int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, context_t *context); +void cell_config_request(XRANCPDU *req, context_t *context); #endif diff --git a/enbsim/context.cpp b/enbsim/context.cpp index e420f80..75301e4 100644 --- a/enbsim/context.cpp +++ b/enbsim/context.cpp @@ -18,8 +18,9 @@ #include "context.h" #include "config.h" #include "cell_config.h" +#include "logger.h" -void closecontext(context_t *context) { +void ctx_close(context_t *context) { if (context != NULL) { if (context->fd >= 0) { close(context->fd); @@ -32,3 +33,17 @@ void closecontext(context_t *context) { } } } + +void ctx_send(XRANCPDU *pdu, context_t *context) { + asn_encode_to_new_buffer_result_t res = { NULL, {0, NULL, NULL} }; + + trace_pdu(pdu); + + res = asn_encode_to_new_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu); + + if (bufferevent_write(context->buf_ev, res.buffer, res.result.encoded)) { + log_error("Error sending data to client on fd {}", context->fd); + ctx_close(context); + } + free(res.buffer); +} diff --git a/enbsim/context.h b/enbsim/context.h index c86745d..03e386c 100644 --- a/enbsim/context.h +++ b/enbsim/context.h @@ -53,9 +53,10 @@ typedef struct context { int enb_index; bool connected; struct event *ue_admission_timer; + char data[8192]; + int nbytes; } context_t; -void closecontext(context_t *context); -void context_timers_add(context_t *context); -void context_send(XRANCPDU *pdu, context_t *context); +void ctx_close(context_t *context); +void ctx_send(XRANCPDU *pdu, context_t *context); #endif diff --git a/enbsim/dispatch.cpp b/enbsim/dispatch.cpp index 9b125da..b78b457 100644 --- a/enbsim/dispatch.cpp +++ b/enbsim/dispatch.cpp @@ -47,13 +47,8 @@ static void ue_admission_timeout(int fd, short event, void *arg) evtimer_del(context->ue_admission_timer); } -void dispatch(uint8_t *buffer, size_t buf_size, context_t *context) { +int dispatch(uint8_t *buffer, size_t buf_size, context_t *context) { XRANCPDU *pdu = 0; - - int resp_buf_size = 4096; - char resp_buf[resp_buf_size]; - int nbytes = 0; - size_t remaining = buf_size; size_t consumed = 0; uint8_t *curr = buffer; @@ -61,7 +56,8 @@ void dispatch(uint8_t *buffer, size_t buf_size, context_t *context) { do { consumed = decode(&pdu, curr, remaining); if (!consumed) { - log_error("Error decoding input: remaining={}, consumed={}", remaining, consumed); + //log_debug("bytes remaining={}, consumed={}", remaining, consumed); + return remaining; } remaining -= consumed; curr += consumed; @@ -70,37 +66,29 @@ void dispatch(uint8_t *buffer, size_t buf_size, context_t *context) { switch (pdu->hdr.api_id) { case XRANC_API_ID_cellConfigRequest: - nbytes = cell_config_request(pdu, resp_buf, resp_buf_size, context); + cell_config_request(pdu, context); break; case XRANC_API_ID_uEAdmissionResponse: ue_admission_response(pdu, context); break; default: - printf("Message %lu not handled\n", pdu->hdr.api_id); + log_error("ERROR message not handled, api_id:{}", pdu->hdr.api_id); } ASN_STRUCT_FREE(asn_DEF_XRANCPDU, pdu); pdu = 0; - if (nbytes) { - //struct evbuffer *tmp = evbuffer_new(); - //evbuffer_add(tmp, resp_buf, nbytes); - if (bufferevent_write(context->buf_ev, resp_buf, nbytes)) { - printf("Error sending data to context on fd %d\n", context->fd); - closecontext(context); - } - //evbuffer_free(tmp); - } } while (remaining); if (context->connected == false) { struct timeval tv = {1, 0}; - //printf("starting ues\n"); log_info("starting ue admission timer"); context->connected = true; context->ue_admission_timer = event_new(context->evbase, -1, EV_PERSIST, ue_admission_timeout, context); evtimer_add(context->ue_admission_timer, &tv); } + + return 0; } diff --git a/enbsim/dispatch.h b/enbsim/dispatch.h index 0b55ad4..0577b19 100644 --- a/enbsim/dispatch.h +++ b/enbsim/dispatch.h @@ -19,5 +19,5 @@ #include "context.h" -void dispatch(uint8_t *buffer, size_t buf_size, context_t *context); +int dispatch(uint8_t *buffer, size_t buf_size, context_t *context); #endif diff --git a/enbsim/enbsim.cpp b/enbsim/enbsim.cpp index 36b44a0..4c2e7e8 100644 --- a/enbsim/enbsim.cpp +++ b/enbsim/enbsim.cpp @@ -18,6 +18,7 @@ #include "context.h" #include "dispatch.h" #include "config.h" +#include "logger.h" int64_t total_bytes_read = 0; int64_t total_messages_read = 0; @@ -62,23 +63,26 @@ static int create_bind_socket(int enb_id) { static void readcb(struct bufferevent *bev, void *arg) { context_t *ctx = (context_t *)arg; - char data[4096]; - int nbytes; - int tbytes = 0; - - /* This callback is invoked when there is data to read on bev. */ - struct evbuffer *input = bufferevent_get_input(bev); - while (evbuffer_get_length(input) > 0) { - /* Remove a chunk of data from the input buffer, copying it into our - * local array (data). */ - nbytes = evbuffer_remove(input, data, 4096); - /* Add the chunk of data from our local array (data) to the client's - * output buffer. */ - //evbuffer_add(client->output_buffer, data, nbytes); - tbytes += nbytes; + struct evbuffer *input; + + input = bufferevent_get_input(bev); + + while (evbuffer_get_length(input)) { + int n = evbuffer_remove(input, ctx->data + ctx->nbytes, sizeof(ctx->data) - ctx->nbytes); + log_info("removed {} bytes", n); + ctx->nbytes += n; } - dispatch((uint8_t *)data, tbytes, ctx); + //log_debug("dispatching {} bytes", ctx->nbytes); + + int r = dispatch((uint8_t *)(ctx->data), ctx->nbytes, ctx); + + if (r) { + memmove(ctx->data, ctx->data + ctx->nbytes - r, r); + ctx->nbytes = r; + } else { + ctx->nbytes = 0; + } } static void eventcb(struct bufferevent *bev, short events, void *arg) @@ -109,6 +113,8 @@ static void *worker_function(void *arg) { context->evbase = evbase; context->fd = create_bind_socket(context->enb_index); context->buf_ev = bufferevent_socket_new(context->evbase, context->fd, BEV_OPT_CLOSE_ON_FREE); + //bufferevent_set_max_single_write(context->buf_ev, 8192); + //bufferevent_set_max_single_read(context->buf_ev, 8192); bufferevent_setcb(context->buf_ev, readcb, NULL, eventcb, context); bufferevent_enable(context->buf_ev, EV_READ|EV_WRITE); if (bufferevent_socket_connect(context->buf_ev, (struct sockaddr *)&sin, sizeof(sin)) < 0) { diff --git a/enbsim/ue.cpp b/enbsim/ue.cpp index 66dc30d..d83a563 100644 --- a/enbsim/ue.cpp +++ b/enbsim/ue.cpp @@ -23,10 +23,7 @@ #include "logger.h" void ue_admission_request(context_t *context, uint16_t crnti) { - char *req_buf = (char *)malloc(4096); - int req_buf_size = 4096; XRANCPDU *req = (XRANCPDU *)calloc(1, sizeof(XRANCPDU)); - int nbytes = 0; /* Fill in the version */ req->hdr.ver.buf = (uint8_t *)calloc(1, sizeof(char)); @@ -49,35 +46,15 @@ void ue_admission_request(context_t *context, uint16_t crnti) { req->body.choice.uEAdmissionRequest.adm_est_cause = AdmEstCause_mo_signalling; - //xer_fprint(stdout, &asn_DEF_XRANCPDU, req); + ctx_send(req, context); - asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, req, req_buf, req_buf_size); - if(er.encoded > req_buf_size) { - fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n", - req_buf_size, asn_DEF_XRANCPDU.name, er.encoded); - } - - nbytes = er.encoded; - - if (nbytes) { -// struct evbuffer *tmp = evbuffer_new(); -// evbuffer_add(tmp, req_buf, nbytes); - if (bufferevent_write(context->buf_ev, req_buf, nbytes)) { - printf("Error sending data to context on fd %d\n", context->fd); - } -// evbuffer_free(tmp); - } + log_debug("-> UEAdmReq enodeb:{} crnti:{}", context->enb_index, crnti); ASN_STRUCT_FREE(asn_DEF_XRANCPDU, req); - - log_debug("-> UEAdmReq enodeb:{} crnti:{}", context->enb_index, crnti); } void ue_admission_status(context_t *context, uint16_t crnti) { - char *req_buf = (char *)malloc(4096); - int req_buf_size = 4096; XRANCPDU *req = (XRANCPDU *)calloc(1, sizeof(XRANCPDU)); - int nbytes = 0; /* Fill in the version */ req->hdr.ver.buf = (uint8_t *)calloc(1, sizeof(char)); @@ -100,28 +77,11 @@ void ue_admission_status(context_t *context, uint16_t crnti) { req->body.choice.uEAdmissionStatus.adm_est_status = AdmEstStatus_success; - //xer_fprint(stdout, &asn_DEF_XRANCPDU, req); + ctx_send(req, context); - asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, req, req_buf, req_buf_size); - if(er.encoded > req_buf_size) { - fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n", - req_buf_size, asn_DEF_XRANCPDU.name, er.encoded); - } - - nbytes = er.encoded; - - if (nbytes) { -// struct evbuffer *tmp = evbuffer_new(); -// evbuffer_add(tmp, req_buf, nbytes); - if (bufferevent_write(context->buf_ev, req_buf, nbytes)) { - printf("Error sending data to context on fd %d\n", context->fd); - } -// evbuffer_free(tmp); - } + log_debug("-> UEAdmStatus enodeb:{} crnti:{}", context->enb_index, crnti); ASN_STRUCT_FREE(asn_DEF_XRANCPDU, req); - - log_debug("-> UEAdmStatus enodeb:{} crnti:{}", context->enb_index, crnti); } void ue_admission_response(XRANCPDU *pdu, context_t *context) { diff --git a/xranc-sb-api-src/Makefile b/xranc-sb-api-src/Makefile index a7daa61..2fcfd0b 100644 --- a/xranc-sb-api-src/Makefile +++ b/xranc-sb-api-src/Makefile @@ -8,7 +8,7 @@ ASN_LIBRARY ?= libasncodec.a #all: $(ASN_PROGRAM) -all: $(ASN_LIBRARY) +all: $(ASN_LIBRARY) #$(ASN_PROGRAM): $(ASN_LIBRARY) $(ASN_PROGRAM_SRCS:.c=.o) # $(CC) $(CFLAGS) $(CPPFLAGS) -o $(ASN_PROGRAM) $(ASN_PROGRAM_SRCS:.c=.o) $(LDFLAGS) $(ASN_LIBRARY) $(LIBS) diff --git a/xranc/cell_config.cpp b/xranc/cell_config.cpp index 57bc32e..7f1f973 100644 --- a/xranc/cell_config.cpp +++ b/xranc/cell_config.cpp @@ -97,8 +97,6 @@ void cell_config_request(client_t *client) { void cell_config_response(XRANCPDU *pdu, client_t *client) { - log_debug("<- CCResp enodeb:{}", client->enb_index); - // TODO - Update information on Redis DB through NBI - gRPC Config* config = Config::Instance(); std::string redisServerInfo = config->redis_ip_addr + ":" + std::to_string(config->redis_port); @@ -150,6 +148,8 @@ void cell_config_response(XRANCPDU *pdu, client_t *client) { log_info("connected to enodeb:{}", client->enb_index); } + log_debug("<- CCResp enodeb:{}", client->enb_index); + gRPCClientImplCellConfigReport reportService(grpc::CreateChannel(redisServerInfo, grpc::InsecureChannelCredentials())); int resultCode = reportService.UpdateCellConfig(cellConfigReport); if (resultCode != 1) { diff --git a/xranc/client.cpp b/xranc/client.cpp index ecc1162..2480ef1 100644 --- a/xranc/client.cpp +++ b/xranc/client.cpp @@ -39,20 +39,15 @@ void client_timers_add(client_t *client) { } void client_send(XRANCPDU *pdu, client_t *client) { - char buffer[4096]; - int buf_size = 4096; - asn_enc_rval_t er; + asn_encode_to_new_buffer_result_t res = { NULL, {0, NULL, NULL} }; trace_pdu(pdu); - er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu, buffer, buf_size); - if(er.encoded > buf_size) { - fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n", - buf_size, asn_DEF_XRANCPDU.name, er.encoded); - } + res = asn_encode_to_new_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu); - if (bufferevent_write(client->buf_ev, buffer, er.encoded)) { - printf("Error sending data to client on fd %d\n", client->fd); + if (bufferevent_write(client->buf_ev, res.buffer, res.result.encoded)) { + log_error("ERROT sending data, fd {}", client->fd); closeClient(client); } + free(res.buffer); } diff --git a/xranc/client.h b/xranc/client.h index e529774..c1d9ef6 100644 --- a/xranc/client.h +++ b/xranc/client.h @@ -57,6 +57,8 @@ typedef struct client { int enb_index; uint16_t num_ue_admissions; + char data[8192]; + int nbytes; } client_t; void closeClient(client_t *client); diff --git a/xranc/dispatch.cpp b/xranc/dispatch.cpp index 7160853..c37be2a 100644 --- a/xranc/dispatch.cpp +++ b/xranc/dispatch.cpp @@ -38,19 +38,18 @@ static size_t decode(XRANCPDU **pdu, uint8_t *buffer, size_t buf_size) { return rval.consumed; } -void dispatch(uint8_t *buffer, size_t buf_size, client_t *client) { +int dispatch(uint8_t *buffer, size_t buf_size, client_t *client) { XRANCPDU *pdu = 0; - - size_t remaining = buf_size; size_t consumed = 0; uint8_t *curr = buffer; + //log_debug("dispatch: bytes={}", buf_size); do { consumed = decode(&pdu, curr, remaining); if (!consumed) { - log_error("Error decoding input: remaining={}, consumed={}", remaining, consumed); - break; + //log_debug("dispatch: bytes remaining={}/{}", remaining, buf_size); + return remaining; } remaining -= consumed; curr += consumed; @@ -86,4 +85,6 @@ void dispatch(uint8_t *buffer, size_t buf_size, client_t *client) { ASN_STRUCT_FREE(asn_DEF_XRANCPDU, pdu); pdu = 0; } while (remaining); + + return 0; } diff --git a/xranc/dispatch.h b/xranc/dispatch.h index 0c549c6..45043d3 100644 --- a/xranc/dispatch.h +++ b/xranc/dispatch.h @@ -19,5 +19,5 @@ #include "client.h" -void dispatch(uint8_t *buffer, size_t buf_size, client_t *client); +int dispatch(uint8_t *buffer, size_t buf_size, client_t *client); #endif diff --git a/xranc/server.cpp b/xranc/server.cpp index 6178c31..26de4f0 100644 --- a/xranc/server.cpp +++ b/xranc/server.cpp @@ -46,6 +46,7 @@ #include "workqueue.h" #include "cell_config.h" #include "dispatch.h" +#include "logger.h" #define USE_SCTP #ifdef USE_SCTP @@ -100,38 +101,26 @@ static void closeAndFreeClient(client_t *client) { */ void buffered_on_read(struct bufferevent *bev, void *arg) { client_t *client = (client_t *)arg; - char data[4096]; - int nbytes; - int tbytes = 0; - - /* If we have input data, the number of bytes we have is contained in - * bev->input->off. Copy the data from the input buffer to the output - * buffer in 4096-byte chunks. There is a one-liner to do the whole thing - * in one shot, but the purpose of this server is to show actual real-world - * reading and writing of the input and output buffers, so we won't take - * that shortcut here. */ struct evbuffer *input; + input = bufferevent_get_input(bev); - while (evbuffer_get_length(input) > 0) { - /* Remove a chunk of data from the input buffer, copying it into our - * local array (data). */ - nbytes = evbuffer_remove(input, data, 4096); - /* Add the chunk of data from our local array (data) to the client's - * output buffer. */ - //evbuffer_add(client->output_buffer, data, nbytes); - tbytes += nbytes; + + while (evbuffer_get_length(input)) { + int n = evbuffer_remove(input, client->data + client->nbytes, sizeof(client->data) - client->nbytes); + //log_debug("removed {} bytes", n); + client->nbytes += n; } - dispatch((uint8_t *)data, tbytes, client); + //log_debug("disptaching {} bytes", client->nbytes); - /* Send the results to the client. This actually only queues the results - * for sending. Sending will occur asynchronously, handled by libevent. */ - /* - if (bufferevent_write_buffer(bev, client->output_buffer)) { - errorOut("Error sending data to client on fd %d\n", client->fd); - closeClient(client); + int r = dispatch((uint8_t *)(client->data), client->nbytes, client); + + if (r) { + memmove(client->data, client->data + client->nbytes - r, r); + client->nbytes = r; + } else { + client->nbytes = 0; } - */ } /** @@ -210,29 +199,6 @@ void on_accept(evutil_socket_t fd, short ev, void *arg) { return; } - /* Create the buffered event. - * - * The first argument is the file descriptor that will trigger - * the events, in this case the clients socket. - * - * The second argument is the callback that will be called - * when data has been read from the socket and is available to - * the application. - * - * The third argument is a callback to a function that will be - * called when the write buffer has reached a low watermark. - * That usually means that when the write buffer is 0 length, - * this callback will be called. It must be defined, but you - * don't actually have to do anything in this callback. - * - * The fourth argument is a callback that will be called when - * there is a socket error. This is where you will detect - * that the client disconnected or other socket errors. - * - * The fifth and final argument is to store an argument in - * that will be passed to the callbacks. We store the client - * object here. - */ /* Shad - try BEV_OPT_DEFER_CALLBACKS */ client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, BEV_OPT_CLOSE_ON_FREE); @@ -241,6 +207,8 @@ void on_accept(evutil_socket_t fd, short ev, void *arg) { closeAndFreeClient(client); return; } + //bufferevent_set_max_single_write(client->buf_ev, 8192); + //bufferevent_set_max_single_read(client->buf_ev, 8192); bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write, buffered_on_error, client);