Skip to content

Commit

Permalink
change ext interface from udp to tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
KensakuKOMATSU committed Dec 29, 2017
1 parent 8fb0880 commit fec6175
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 71 deletions.
6 changes: 2 additions & 4 deletions conf/janus.plugin.skywayiot.cfg.sample
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
[external-interface]
data_recv_port = 15000
data_recv_addr = 0.0.0.0
data_send_port = 15001
data_send_dest = 127.0.0.1
data_port = 15000
data_addr = 0.0.0.0
media_send_port = 25000
media_send_dest = 127.0.0.1
116 changes: 49 additions & 67 deletions plugins/janus_skywayiot.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,12 @@ static janus_callbacks *gateway = NULL;
static GThread *handler_thread;
static GThread *watchdog;
static void *janus_skywayiot_handler(void *data);
static int create_data_receiver(char *data_recv_addr, int data_recv_port);
static int create_data_sender(char *data_send_dest, int data_send_port);
static int create_ext_data_interface(char *addr, int port);
static int create_media_sender(char *media_recv_addr, int media_recv_port);

static void *thread_receive_external_data(void *data);
static void *thread_receive_ext_data(void *data);

static void relay_extern_data(gpointer handle, gpointer session, gpointer data);
static void relay_ext_to_datachannel(gpointer handle, gpointer session, gpointer data);

typedef struct data_with_handleid {
guint64 handle_id;
Expand Down Expand Up @@ -129,11 +128,10 @@ static GHashTable *sessions;
static GList *old_sessions;
static janus_mutex sessions_mutex;

int data_recv_fd; /* socket for external data stream */
int data_send_fd; /* socket for external data stream */
int media_send_fd; /* socket for external media stream */
int ext_listen_fd = -1; /* socket for listening external tcp */
int ext_fd = -1; /* socket for tcp data */
int media_send_fd; /* socket for external media stream */

struct sockaddr_in g_data_sender;
struct sockaddr_in g_media_sender;

static void janus_skywayiot_message_free(janus_skywayiot_message *msg) {
Expand Down Expand Up @@ -232,27 +230,21 @@ int janus_skywayiot_init(janus_callbacks *callback, const char *config_path) {

JANUS_LOG(LOG_INFO, "config:: name of category '%s'\n", cat->name);

janus_config_item *data_recv_port = janus_config_get_item(cat, "data_recv_port");
janus_config_item *data_recv_addr = janus_config_get_item(cat, "data_recv_addr");

janus_config_item *data_send_port = janus_config_get_item(cat, "data_send_port");
janus_config_item *data_send_dest = janus_config_get_item(cat, "data_send_dest");
janus_config_item *data_port = janus_config_get_item(cat, "data_port");
janus_config_item *data_addr = janus_config_get_item(cat, "data_addr");

janus_config_item *media_send_port = janus_config_get_item(cat, "media_send_port");
janus_config_item *media_send_dest = janus_config_get_item(cat, "media_send_dest");

if(data_recv_port == NULL || data_recv_port->value == NULL
|| data_recv_addr == NULL || data_recv_addr->value == NULL
|| data_send_port == NULL || data_send_port->value == NULL
|| data_send_dest == NULL || data_send_dest->value == NULL
if(data_port == NULL || data_port->value == NULL
|| data_addr == NULL || data_addr->value == NULL
|| media_send_port == NULL || media_send_port->value == NULL
|| media_send_dest == NULL || media_send_dest->value == NULL) {
JANUS_LOG(LOG_WARN, " -- Invalid dataport, mediaport, listenaddr, we'll skip opening '%s'. \n", cat->name);
cl = cl->next;
continue;
} else {
create_data_receiver( (char *)data_recv_addr->value, atoi(data_recv_port->value) );
create_data_sender( (char *)data_send_dest->value, atoi(data_send_port->value) );
create_ext_data_interface( (char *)data_addr->value, atoi(data_port->value) );
create_media_sender( (char *)media_send_dest->value, atoi(media_send_port->value) );

cl = cl->next;
Expand Down Expand Up @@ -546,15 +538,18 @@ void janus_skywayiot_incoming_data(janus_plugin_session *handle, char *buf, int
char* ext_data;
int id_len = sizeof(guint64);
guint64 handle_id = (guint64)handle;
socklen_t addrlen = sizeof(g_data_sender);

ext_data = (char *)malloc( id_len + len );
memcpy(ext_data, &handle_id, id_len);
memcpy(ext_data + id_len, buf, len);

/* todo: change to TCP sender */
if((void *)&g_data_sender != NULL) {
sendto(data_send_fd, ext_data, (id_len + len), 0, (struct sockaddr *)&g_data_sender, addrlen);
int n;
if( ext_fd > 0 ) {
n = write( ext_fd, ext_data, (id_len + len) );

if ( n < 0 ) {
JANUS_LOG(LOG_ERR, "Failed to write data to ``ext_fd``\n");
}
}
g_free(ext_data);
}
Expand Down Expand Up @@ -837,12 +832,11 @@ static void *janus_skywayiot_handler(void *data) {
* create external data receiver interface via TCP. The data received from this interface will
* be relayed to DataChannel
*/
static int create_data_receiver(char *addr, int port) {
/* todo: change interface from UDP to TCP. */
static int create_ext_data_interface(char *addr, int port) {
JANUS_LOG(LOG_INFO, "create data receiver: listener address %s, port %d\n", addr, port);

/* create a UDP socket for data receiver (it will be transfered via WebRTC DataChannel */
if ((data_recv_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
/* create a TCP socket for data receiver (it will be transfered via WebRTC DataChannel */
if ((ext_listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
JANUS_LOG(LOG_WARN, "cannot create socket for data receiver\n");
return -1;
}
Expand All @@ -851,50 +845,26 @@ static int create_data_receiver(char *addr, int port) {
/* bind the socket to any valid IP address and a specific port */
memset((char *)&data_sockaddr, 0, sizeof(data_sockaddr));
data_sockaddr.sin_family = AF_INET;
data_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
data_sockaddr.sin_addr.s_addr = inet_addr(addr);
data_sockaddr.sin_port = htons(port);

if (bind(data_recv_fd, (struct sockaddr *)&data_sockaddr, sizeof(data_sockaddr)) < 0) {
if (bind(ext_listen_fd, (struct sockaddr *)&data_sockaddr, sizeof(data_sockaddr)) < 0) {
JANUS_LOG(LOG_WARN, "bind failed for data receiver\n");
return -1;
}

JANUS_LOG(LOG_INFO, "succeed to create socket for data receiver\n");
JANUS_LOG(LOG_INFO, "succeed to create socket for ext data\n");

/* create thread to receive udp datagram for each channel */
GError *error = NULL;
g_thread_try_new("skywayiot_data_thread", &thread_receive_external_data, NULL, &error);
g_thread_try_new("skywayiot_ext_interface_thread", &thread_receive_ext_data, NULL, &error);
if(error != NULL) {
JANUS_LOG(LOG_WARN, "Got error %d (%s) while launching the data channel ext interface thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
return 0;
}

/* todo: it will be eliminated */
static int create_data_sender(char *addr, int port) {
JANUS_LOG(LOG_INFO, "create data sender: destination address %s, port %d\n", addr, port);

/* create a UDP socket for data sender (it was received via WebRTC DataChannel */
if ((data_send_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
JANUS_LOG(LOG_WARN, "cannot create socket for data sender\n");
return -1;
}

/* bind the socket to any valid IP address and a specific port */
struct hostent *server;

server = gethostbyname(addr);

memset((char *)&g_data_sender, 0, sizeof(g_data_sender));
g_data_sender.sin_family = AF_INET;
bcopy((char *)server->h_addr, (char *)&g_data_sender.sin_addr.s_addr, server->h_length);
g_data_sender.sin_port = htons(port);

JANUS_LOG(LOG_INFO, "succeed to create socket for data sender\n");
return 0;
}

/**
* This channel is used for relay received media data to external UDP media interface
*/
Expand Down Expand Up @@ -924,9 +894,9 @@ static int create_media_sender(char *addr, int port) {
/**
* This thread function will be used to receive data from external TCP interface.
*/
static void *thread_receive_external_data(void *data /* to avoid warning */) {
char buff[1500];
int bytes_recv;
static void *thread_receive_ext_data(void *data /* to avoid warning */) {
char recvBuff[65535];
int n;


guint64 handle_id;
Expand All @@ -941,20 +911,32 @@ static void *thread_receive_external_data(void *data /* to avoid warning */) {
data_len: 0
};

listen( ext_listen_fd, 1 ); /* we only accept 1 TCP client, at the same time */

while(1 /* fixme: detect plugin termination */ ) {
bytes_recv = recvfrom(data_recv_fd, buff, 1500, 0, (struct sockaddr *)&addr, &addr_len);
ext_fd = accept( ext_listen_fd, (struct sockaddr *)&addr, &addr_len);

while( ( n = read( ext_fd, recvBuff, sizeof(recvBuff) - 1 ) ) > 0 ) {
recvBuff[n] = '\0';

if(bytes_recv > handle_id_len) {
memcpy(&handle_id, buff, (size_t)handle_id_len);
if( n > handle_id_len) {
memcpy(&handle_id, recvBuff, (size_t)handle_id_len);

data_len = bytes_recv - handle_id_len;
data_len = n - handle_id_len;

parsed.handle_id = handle_id;
parsed.data = buff + handle_id_len;
parsed.data_len = data_len;
parsed.handle_id = handle_id;
parsed.data = recvBuff + handle_id_len;
parsed.data_len = data_len;

g_hash_table_foreach(sessions, &relay_extern_data, &parsed);
g_hash_table_foreach(sessions, &relay_ext_to_datachannel, &parsed);
}
}

/* socket HANG */
close(ext_fd);
ext_fd = -1;

sleep(1);
}
return NULL;
}
Expand All @@ -964,7 +946,7 @@ static void *thread_receive_external_data(void *data /* to avoid warning */) {
* When handle is ``0xffffffffffffffff``, data will be broadcasted to
* every connected data channel (used for pubsub model).
*/
static void relay_extern_data(gpointer handle, gpointer session, gpointer data) {
static void relay_ext_to_datachannel(gpointer handle, gpointer session, gpointer data) {
data_with_handleid *_data = (data_with_handleid *)data;

guint64 handle_id = (guint64)handle;
Expand Down

0 comments on commit fec6175

Please sign in to comment.