Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scaling issues #30

Merged
merged 2 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions enbsim/cell_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void make_ecgi(ECGI_t *dest, int enb_index) {
dest->eUTRANcellIdentifier.size = 4;
}

int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, context_t *context) {
void cell_config_request(XRANCPDU *req, context_t *context) {
// TODO
XRANCPDU *resp;
struct Cell cell;
Expand Down Expand Up @@ -86,17 +86,9 @@ int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, contex
resp->body.choice.cellConfigReport.max_num_ues_sched_per_tti_ul = 10;
resp->body.choice.cellConfigReport.dlfs_sched_enable = true;

//xer_fprint(stdout, &asn_DEF_XRANCPDU, resp);

asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, resp, resp_buf, resp_buf_size);
if(er.encoded > resp_buf_size) {
fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n",
resp_buf_size, asn_DEF_XRANCPDU.name, er.encoded);
}

ASN_STRUCT_FREE(asn_DEF_XRANCPDU, resp);
ctx_send(resp, context);

log_debug("-> CCResp enodeb:{}", context->enb_index);

return er.encoded;
ASN_STRUCT_FREE(asn_DEF_XRANCPDU, resp);
}
2 changes: 1 addition & 1 deletion enbsim/cell_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
#include <XRANCPDU.h>
#include "context.h"

int cell_config_request(XRANCPDU *req, char *resp_buf, int resp_buf_size, context_t *context);
void cell_config_request(XRANCPDU *req, context_t *context);
#endif
17 changes: 16 additions & 1 deletion enbsim/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#include "context.h"
#include "config.h"
#include "cell_config.h"
#include "logger.h"

void closecontext(context_t *context) {
void ctx_close(context_t *context) {
if (context != NULL) {
if (context->fd >= 0) {
close(context->fd);
Expand All @@ -32,3 +33,17 @@ void closecontext(context_t *context) {
}
}
}

void ctx_send(XRANCPDU *pdu, context_t *context) {
asn_encode_to_new_buffer_result_t res = { NULL, {0, NULL, NULL} };

trace_pdu(pdu);

res = asn_encode_to_new_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu);

if (bufferevent_write(context->buf_ev, res.buffer, res.result.encoded)) {
log_error("Error sending data to client on fd {}", context->fd);
ctx_close(context);
}
free(res.buffer);
}
7 changes: 4 additions & 3 deletions enbsim/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ typedef struct context {
int enb_index;
bool connected;
struct event *ue_admission_timer;
char data[8192];
int nbytes;
} context_t;

void closecontext(context_t *context);
void context_timers_add(context_t *context);
void context_send(XRANCPDU *pdu, context_t *context);
void ctx_close(context_t *context);
void ctx_send(XRANCPDU *pdu, context_t *context);
#endif
26 changes: 7 additions & 19 deletions enbsim/dispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,17 @@ static void ue_admission_timeout(int fd, short event, void *arg)
evtimer_del(context->ue_admission_timer);
}

void dispatch(uint8_t *buffer, size_t buf_size, context_t *context) {
int dispatch(uint8_t *buffer, size_t buf_size, context_t *context) {
XRANCPDU *pdu = 0;

int resp_buf_size = 4096;
char resp_buf[resp_buf_size];
int nbytes = 0;

size_t remaining = buf_size;
size_t consumed = 0;
uint8_t *curr = buffer;

do {
consumed = decode(&pdu, curr, remaining);
if (!consumed) {
log_error("Error decoding input: remaining={}, consumed={}", remaining, consumed);
//log_debug("bytes remaining={}, consumed={}", remaining, consumed);
return remaining;
}
remaining -= consumed;
curr += consumed;
Expand All @@ -70,37 +66,29 @@ void dispatch(uint8_t *buffer, size_t buf_size, context_t *context) {

switch (pdu->hdr.api_id) {
case XRANC_API_ID_cellConfigRequest:
nbytes = cell_config_request(pdu, resp_buf, resp_buf_size, context);
cell_config_request(pdu, context);
break;
case XRANC_API_ID_uEAdmissionResponse:
ue_admission_response(pdu, context);
break;
default:
printf("Message %lu not handled\n", pdu->hdr.api_id);
log_error("ERROR message not handled, api_id:{}", pdu->hdr.api_id);
}

ASN_STRUCT_FREE(asn_DEF_XRANCPDU, pdu);
pdu = 0;

if (nbytes) {
//struct evbuffer *tmp = evbuffer_new();
//evbuffer_add(tmp, resp_buf, nbytes);
if (bufferevent_write(context->buf_ev, resp_buf, nbytes)) {
printf("Error sending data to context on fd %d\n", context->fd);
closecontext(context);
}
//evbuffer_free(tmp);
}
} while (remaining);

if (context->connected == false) {
struct timeval tv = {1, 0};

//printf("starting ues\n");
log_info("starting ue admission timer");
context->connected = true;

context->ue_admission_timer = event_new(context->evbase, -1, EV_PERSIST, ue_admission_timeout, context);
evtimer_add(context->ue_admission_timer, &tv);
}

return 0;
}
2 changes: 1 addition & 1 deletion enbsim/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

#include "context.h"

void dispatch(uint8_t *buffer, size_t buf_size, context_t *context);
int dispatch(uint8_t *buffer, size_t buf_size, context_t *context);
#endif
36 changes: 21 additions & 15 deletions enbsim/enbsim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "context.h"
#include "dispatch.h"
#include "config.h"
#include "logger.h"

int64_t total_bytes_read = 0;
int64_t total_messages_read = 0;
Expand Down Expand Up @@ -62,23 +63,26 @@ static int create_bind_socket(int enb_id) {
static void readcb(struct bufferevent *bev, void *arg)
{
context_t *ctx = (context_t *)arg;
char data[4096];
int nbytes;
int tbytes = 0;

/* This callback is invoked when there is data to read on bev. */
struct evbuffer *input = bufferevent_get_input(bev);
while (evbuffer_get_length(input) > 0) {
/* Remove a chunk of data from the input buffer, copying it into our
* local array (data). */
nbytes = evbuffer_remove(input, data, 4096);
/* Add the chunk of data from our local array (data) to the client's
* output buffer. */
//evbuffer_add(client->output_buffer, data, nbytes);
tbytes += nbytes;
struct evbuffer *input;

input = bufferevent_get_input(bev);

while (evbuffer_get_length(input)) {
int n = evbuffer_remove(input, ctx->data + ctx->nbytes, sizeof(ctx->data) - ctx->nbytes);
log_info("removed {} bytes", n);
ctx->nbytes += n;
}

dispatch((uint8_t *)data, tbytes, ctx);
//log_debug("dispatching {} bytes", ctx->nbytes);

int r = dispatch((uint8_t *)(ctx->data), ctx->nbytes, ctx);

if (r) {
memmove(ctx->data, ctx->data + ctx->nbytes - r, r);
ctx->nbytes = r;
} else {
ctx->nbytes = 0;
}
}

static void eventcb(struct bufferevent *bev, short events, void *arg)
Expand Down Expand Up @@ -109,6 +113,8 @@ static void *worker_function(void *arg) {
context->evbase = evbase;
context->fd = create_bind_socket(context->enb_index);
context->buf_ev = bufferevent_socket_new(context->evbase, context->fd, BEV_OPT_CLOSE_ON_FREE);
//bufferevent_set_max_single_write(context->buf_ev, 8192);
//bufferevent_set_max_single_read(context->buf_ev, 8192);
bufferevent_setcb(context->buf_ev, readcb, NULL, eventcb, context);
bufferevent_enable(context->buf_ev, EV_READ|EV_WRITE);
if (bufferevent_socket_connect(context->buf_ev, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
Expand Down
48 changes: 4 additions & 44 deletions enbsim/ue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
#include "logger.h"

void ue_admission_request(context_t *context, uint16_t crnti) {
char *req_buf = (char *)malloc(4096);
int req_buf_size = 4096;
XRANCPDU *req = (XRANCPDU *)calloc(1, sizeof(XRANCPDU));
int nbytes = 0;

/* Fill in the version */
req->hdr.ver.buf = (uint8_t *)calloc(1, sizeof(char));
Expand All @@ -49,35 +46,15 @@ void ue_admission_request(context_t *context, uint16_t crnti) {

req->body.choice.uEAdmissionRequest.adm_est_cause = AdmEstCause_mo_signalling;

//xer_fprint(stdout, &asn_DEF_XRANCPDU, req);
ctx_send(req, context);

asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, req, req_buf, req_buf_size);
if(er.encoded > req_buf_size) {
fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n",
req_buf_size, asn_DEF_XRANCPDU.name, er.encoded);
}

nbytes = er.encoded;

if (nbytes) {
// struct evbuffer *tmp = evbuffer_new();
// evbuffer_add(tmp, req_buf, nbytes);
if (bufferevent_write(context->buf_ev, req_buf, nbytes)) {
printf("Error sending data to context on fd %d\n", context->fd);
}
// evbuffer_free(tmp);
}
log_debug("-> UEAdmReq enodeb:{} crnti:{}", context->enb_index, crnti);

ASN_STRUCT_FREE(asn_DEF_XRANCPDU, req);

log_debug("-> UEAdmReq enodeb:{} crnti:{}", context->enb_index, crnti);
}

void ue_admission_status(context_t *context, uint16_t crnti) {
char *req_buf = (char *)malloc(4096);
int req_buf_size = 4096;
XRANCPDU *req = (XRANCPDU *)calloc(1, sizeof(XRANCPDU));
int nbytes = 0;

/* Fill in the version */
req->hdr.ver.buf = (uint8_t *)calloc(1, sizeof(char));
Expand All @@ -100,28 +77,11 @@ void ue_admission_status(context_t *context, uint16_t crnti) {

req->body.choice.uEAdmissionStatus.adm_est_status = AdmEstStatus_success;

//xer_fprint(stdout, &asn_DEF_XRANCPDU, req);
ctx_send(req, context);

asn_enc_rval_t er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, req, req_buf, req_buf_size);
if(er.encoded > req_buf_size) {
fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n",
req_buf_size, asn_DEF_XRANCPDU.name, er.encoded);
}

nbytes = er.encoded;

if (nbytes) {
// struct evbuffer *tmp = evbuffer_new();
// evbuffer_add(tmp, req_buf, nbytes);
if (bufferevent_write(context->buf_ev, req_buf, nbytes)) {
printf("Error sending data to context on fd %d\n", context->fd);
}
// evbuffer_free(tmp);
}
log_debug("-> UEAdmStatus enodeb:{} crnti:{}", context->enb_index, crnti);

ASN_STRUCT_FREE(asn_DEF_XRANCPDU, req);

log_debug("-> UEAdmStatus enodeb:{} crnti:{}", context->enb_index, crnti);
}

void ue_admission_response(XRANCPDU *pdu, context_t *context) {
Expand Down
2 changes: 1 addition & 1 deletion xranc-sb-api-src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ASN_LIBRARY ?= libasncodec.a

#all: $(ASN_PROGRAM)

all: $(ASN_LIBRARY)
all: $(ASN_LIBRARY)

#$(ASN_PROGRAM): $(ASN_LIBRARY) $(ASN_PROGRAM_SRCS:.c=.o)
# $(CC) $(CFLAGS) $(CPPFLAGS) -o $(ASN_PROGRAM) $(ASN_PROGRAM_SRCS:.c=.o) $(LDFLAGS) $(ASN_LIBRARY) $(LIBS)
Expand Down
4 changes: 2 additions & 2 deletions xranc/cell_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ void cell_config_request(client_t *client) {

void cell_config_response(XRANCPDU *pdu, client_t *client) {

log_debug("<- CCResp enodeb:{}", client->enb_index);

// TODO - Update information on Redis DB through NBI - gRPC
Config* config = Config::Instance();
std::string redisServerInfo = config->redis_ip_addr + ":" + std::to_string(config->redis_port);
Expand Down Expand Up @@ -150,6 +148,8 @@ void cell_config_response(XRANCPDU *pdu, client_t *client) {
log_info("connected to enodeb:{}", client->enb_index);
}

log_debug("<- CCResp enodeb:{}", client->enb_index);

gRPCClientImplCellConfigReport reportService(grpc::CreateChannel(redisServerInfo, grpc::InsecureChannelCredentials()));
int resultCode = reportService.UpdateCellConfig(cellConfigReport);
if (resultCode != 1) {
Expand Down
15 changes: 5 additions & 10 deletions xranc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,15 @@ void client_timers_add(client_t *client) {
}

void client_send(XRANCPDU *pdu, client_t *client) {
char buffer[4096];
int buf_size = 4096;
asn_enc_rval_t er;
asn_encode_to_new_buffer_result_t res = { NULL, {0, NULL, NULL} };

trace_pdu(pdu);

er = asn_encode_to_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu, buffer, buf_size);
if(er.encoded > buf_size) {
fprintf(stderr, "Buffer of size %d is too small for %s, need %zu\n",
buf_size, asn_DEF_XRANCPDU.name, er.encoded);
}
res = asn_encode_to_new_buffer(0, ATS_BER, &asn_DEF_XRANCPDU, pdu);

if (bufferevent_write(client->buf_ev, buffer, er.encoded)) {
printf("Error sending data to client on fd %d\n", client->fd);
if (bufferevent_write(client->buf_ev, res.buffer, res.result.encoded)) {
log_error("ERROT sending data, fd {}", client->fd);
closeClient(client);
}
free(res.buffer);
}
2 changes: 2 additions & 0 deletions xranc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ typedef struct client {
int enb_index;

uint16_t num_ue_admissions;
char data[8192];
int nbytes;
} client_t;

void closeClient(client_t *client);
Expand Down
11 changes: 6 additions & 5 deletions xranc/dispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ static size_t decode(XRANCPDU **pdu, uint8_t *buffer, size_t buf_size) {
return rval.consumed;
}

void dispatch(uint8_t *buffer, size_t buf_size, client_t *client) {
int dispatch(uint8_t *buffer, size_t buf_size, client_t *client) {
XRANCPDU *pdu = 0;


size_t remaining = buf_size;
size_t consumed = 0;
uint8_t *curr = buffer;

//log_debug("dispatch: bytes={}", buf_size);
do {
consumed = decode(&pdu, curr, remaining);
if (!consumed) {
log_error("Error decoding input: remaining={}, consumed={}", remaining, consumed);
break;
//log_debug("dispatch: bytes remaining={}/{}", remaining, buf_size);
return remaining;
}
remaining -= consumed;
curr += consumed;
Expand Down Expand Up @@ -86,4 +85,6 @@ void dispatch(uint8_t *buffer, size_t buf_size, client_t *client) {
ASN_STRUCT_FREE(asn_DEF_XRANCPDU, pdu);
pdu = 0;
} while (remaining);

return 0;
}
2 changes: 1 addition & 1 deletion xranc/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

#include "client.h"

void dispatch(uint8_t *buffer, size_t buf_size, client_t *client);
int dispatch(uint8_t *buffer, size_t buf_size, client_t *client);
#endif
Loading