From fec61754b2256ab7556ba9f18c5a6a0e30339642 Mon Sep 17 00:00:00 2001 From: Kensaku Komatsu Date: Fri, 29 Dec 2017 02:12:44 +0000 Subject: [PATCH] change ext interface from udp to tcp --- conf/janus.plugin.skywayiot.cfg.sample | 6 +- plugins/janus_skywayiot.c | 116 +++++++++++-------------- 2 files changed, 51 insertions(+), 71 deletions(-) diff --git a/conf/janus.plugin.skywayiot.cfg.sample b/conf/janus.plugin.skywayiot.cfg.sample index 32ea61d..59bbee0 100644 --- a/conf/janus.plugin.skywayiot.cfg.sample +++ b/conf/janus.plugin.skywayiot.cfg.sample @@ -1,7 +1,5 @@ [external-interface] -data_recv_port = 15000 -data_recv_addr = 0.0.0.0 -data_send_port = 15001 -data_send_dest = 127.0.0.1 +data_port = 15000 +data_addr = 0.0.0.0 media_send_port = 25000 media_send_dest = 127.0.0.1 diff --git a/plugins/janus_skywayiot.c b/plugins/janus_skywayiot.c index 8607101..5b4fdc0 100644 --- a/plugins/janus_skywayiot.c +++ b/plugins/janus_skywayiot.c @@ -86,13 +86,12 @@ static janus_callbacks *gateway = NULL; static GThread *handler_thread; static GThread *watchdog; static void *janus_skywayiot_handler(void *data); -static int create_data_receiver(char *data_recv_addr, int data_recv_port); -static int create_data_sender(char *data_send_dest, int data_send_port); +static int create_ext_data_interface(char *addr, int port); static int create_media_sender(char *media_recv_addr, int media_recv_port); -static void *thread_receive_external_data(void *data); +static void *thread_receive_ext_data(void *data); -static void relay_extern_data(gpointer handle, gpointer session, gpointer data); +static void relay_ext_to_datachannel(gpointer handle, gpointer session, gpointer data); typedef struct data_with_handleid { guint64 handle_id; @@ -129,11 +128,10 @@ static GHashTable *sessions; static GList *old_sessions; static janus_mutex sessions_mutex; -int data_recv_fd; /* socket for external data stream */ -int data_send_fd; /* socket for external data stream */ -int media_send_fd; /* socket for external media stream */ +int ext_listen_fd = -1; /* socket for listening external tcp */ +int ext_fd = -1; /* socket for tcp data */ +int media_send_fd; /* socket for external media stream */ -struct sockaddr_in g_data_sender; struct sockaddr_in g_media_sender; static void janus_skywayiot_message_free(janus_skywayiot_message *msg) { @@ -232,27 +230,21 @@ int janus_skywayiot_init(janus_callbacks *callback, const char *config_path) { JANUS_LOG(LOG_INFO, "config:: name of category '%s'\n", cat->name); - janus_config_item *data_recv_port = janus_config_get_item(cat, "data_recv_port"); - janus_config_item *data_recv_addr = janus_config_get_item(cat, "data_recv_addr"); - - janus_config_item *data_send_port = janus_config_get_item(cat, "data_send_port"); - janus_config_item *data_send_dest = janus_config_get_item(cat, "data_send_dest"); + janus_config_item *data_port = janus_config_get_item(cat, "data_port"); + janus_config_item *data_addr = janus_config_get_item(cat, "data_addr"); janus_config_item *media_send_port = janus_config_get_item(cat, "media_send_port"); janus_config_item *media_send_dest = janus_config_get_item(cat, "media_send_dest"); - if(data_recv_port == NULL || data_recv_port->value == NULL - || data_recv_addr == NULL || data_recv_addr->value == NULL - || data_send_port == NULL || data_send_port->value == NULL - || data_send_dest == NULL || data_send_dest->value == NULL + if(data_port == NULL || data_port->value == NULL + || data_addr == NULL || data_addr->value == NULL || media_send_port == NULL || media_send_port->value == NULL || media_send_dest == NULL || media_send_dest->value == NULL) { JANUS_LOG(LOG_WARN, " -- Invalid dataport, mediaport, listenaddr, we'll skip opening '%s'. \n", cat->name); cl = cl->next; continue; } else { - create_data_receiver( (char *)data_recv_addr->value, atoi(data_recv_port->value) ); - create_data_sender( (char *)data_send_dest->value, atoi(data_send_port->value) ); + create_ext_data_interface( (char *)data_addr->value, atoi(data_port->value) ); create_media_sender( (char *)media_send_dest->value, atoi(media_send_port->value) ); cl = cl->next; @@ -546,15 +538,18 @@ void janus_skywayiot_incoming_data(janus_plugin_session *handle, char *buf, int char* ext_data; int id_len = sizeof(guint64); guint64 handle_id = (guint64)handle; - socklen_t addrlen = sizeof(g_data_sender); ext_data = (char *)malloc( id_len + len ); memcpy(ext_data, &handle_id, id_len); memcpy(ext_data + id_len, buf, len); - /* todo: change to TCP sender */ - if((void *)&g_data_sender != NULL) { - sendto(data_send_fd, ext_data, (id_len + len), 0, (struct sockaddr *)&g_data_sender, addrlen); + int n; + if( ext_fd > 0 ) { + n = write( ext_fd, ext_data, (id_len + len) ); + + if ( n < 0 ) { + JANUS_LOG(LOG_ERR, "Failed to write data to ``ext_fd``\n"); + } } g_free(ext_data); } @@ -837,12 +832,11 @@ static void *janus_skywayiot_handler(void *data) { * create external data receiver interface via TCP. The data received from this interface will * be relayed to DataChannel */ -static int create_data_receiver(char *addr, int port) { - /* todo: change interface from UDP to TCP. */ +static int create_ext_data_interface(char *addr, int port) { JANUS_LOG(LOG_INFO, "create data receiver: listener address %s, port %d\n", addr, port); - /* create a UDP socket for data receiver (it will be transfered via WebRTC DataChannel */ - if ((data_recv_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + /* create a TCP socket for data receiver (it will be transfered via WebRTC DataChannel */ + if ((ext_listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { JANUS_LOG(LOG_WARN, "cannot create socket for data receiver\n"); return -1; } @@ -851,19 +845,19 @@ static int create_data_receiver(char *addr, int port) { /* bind the socket to any valid IP address and a specific port */ memset((char *)&data_sockaddr, 0, sizeof(data_sockaddr)); data_sockaddr.sin_family = AF_INET; - data_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); + data_sockaddr.sin_addr.s_addr = inet_addr(addr); data_sockaddr.sin_port = htons(port); - if (bind(data_recv_fd, (struct sockaddr *)&data_sockaddr, sizeof(data_sockaddr)) < 0) { + if (bind(ext_listen_fd, (struct sockaddr *)&data_sockaddr, sizeof(data_sockaddr)) < 0) { JANUS_LOG(LOG_WARN, "bind failed for data receiver\n"); return -1; } - JANUS_LOG(LOG_INFO, "succeed to create socket for data receiver\n"); + JANUS_LOG(LOG_INFO, "succeed to create socket for ext data\n"); /* create thread to receive udp datagram for each channel */ GError *error = NULL; - g_thread_try_new("skywayiot_data_thread", &thread_receive_external_data, NULL, &error); + g_thread_try_new("skywayiot_ext_interface_thread", &thread_receive_ext_data, NULL, &error); if(error != NULL) { JANUS_LOG(LOG_WARN, "Got error %d (%s) while launching the data channel ext interface thread...\n", error->code, error->message ? error->message : "??"); return -1; @@ -871,30 +865,6 @@ static int create_data_receiver(char *addr, int port) { return 0; } -/* todo: it will be eliminated */ -static int create_data_sender(char *addr, int port) { - JANUS_LOG(LOG_INFO, "create data sender: destination address %s, port %d\n", addr, port); - - /* create a UDP socket for data sender (it was received via WebRTC DataChannel */ - if ((data_send_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - JANUS_LOG(LOG_WARN, "cannot create socket for data sender\n"); - return -1; - } - - /* bind the socket to any valid IP address and a specific port */ - struct hostent *server; - - server = gethostbyname(addr); - - memset((char *)&g_data_sender, 0, sizeof(g_data_sender)); - g_data_sender.sin_family = AF_INET; - bcopy((char *)server->h_addr, (char *)&g_data_sender.sin_addr.s_addr, server->h_length); - g_data_sender.sin_port = htons(port); - - JANUS_LOG(LOG_INFO, "succeed to create socket for data sender\n"); - return 0; -} - /** * This channel is used for relay received media data to external UDP media interface */ @@ -924,9 +894,9 @@ static int create_media_sender(char *addr, int port) { /** * This thread function will be used to receive data from external TCP interface. */ -static void *thread_receive_external_data(void *data /* to avoid warning */) { - char buff[1500]; - int bytes_recv; +static void *thread_receive_ext_data(void *data /* to avoid warning */) { + char recvBuff[65535]; + int n; guint64 handle_id; @@ -941,20 +911,32 @@ static void *thread_receive_external_data(void *data /* to avoid warning */) { data_len: 0 }; + listen( ext_listen_fd, 1 ); /* we only accept 1 TCP client, at the same time */ + while(1 /* fixme: detect plugin termination */ ) { - bytes_recv = recvfrom(data_recv_fd, buff, 1500, 0, (struct sockaddr *)&addr, &addr_len); + ext_fd = accept( ext_listen_fd, (struct sockaddr *)&addr, &addr_len); + + while( ( n = read( ext_fd, recvBuff, sizeof(recvBuff) - 1 ) ) > 0 ) { + recvBuff[n] = '\0'; - if(bytes_recv > handle_id_len) { - memcpy(&handle_id, buff, (size_t)handle_id_len); + if( n > handle_id_len) { + memcpy(&handle_id, recvBuff, (size_t)handle_id_len); - data_len = bytes_recv - handle_id_len; + data_len = n - handle_id_len; - parsed.handle_id = handle_id; - parsed.data = buff + handle_id_len; - parsed.data_len = data_len; + parsed.handle_id = handle_id; + parsed.data = recvBuff + handle_id_len; + parsed.data_len = data_len; - g_hash_table_foreach(sessions, &relay_extern_data, &parsed); + g_hash_table_foreach(sessions, &relay_ext_to_datachannel, &parsed); + } } + + /* socket HANG */ + close(ext_fd); + ext_fd = -1; + + sleep(1); } return NULL; } @@ -964,7 +946,7 @@ static void *thread_receive_external_data(void *data /* to avoid warning */) { * When handle is ``0xffffffffffffffff``, data will be broadcasted to * every connected data channel (used for pubsub model). */ -static void relay_extern_data(gpointer handle, gpointer session, gpointer data) { +static void relay_ext_to_datachannel(gpointer handle, gpointer session, gpointer data) { data_with_handleid *_data = (data_with_handleid *)data; guint64 handle_id = (guint64)handle;