Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Commit

Permalink
Fix semaphore implementation on non-Android Unix. (#67)
Browse files Browse the repository at this point in the history
The current implementation uses sem_open(), which creates semaphores in
a global system namespace; if the caller asks for a process-private
semaphore, it appends a random number to the name to avoid collisions.
Unlike on Windows, these semaphores are not destroyed when the last
handle is closed; they stay around until someone explicitly calls
sem_unlink().  Unfortunately, this means that if the hosting app
crashes, it'll leak semaphores until the system reboots.

On Android, the code uses sem_init() instead.  This is another standard
POSIX function, which has the capability to create true process-private
semaphores, but is not implemented on some platforms, such as macOS
where it just returns ENOSYS - presumably the reason the code path is
limited to Android.

As an entirely separate issue, on macOS, POSIX semaphore names are
limited to only 31 characters.  One of the semaphores created is named
"/ConnectionThreadShutdown", which is 25 characters by itself; after
adding an underscore and a random number, it'll only fit if the random
number happens to be <= 99999!

This commit abandons POSIX semaphores entirely in favor of emulating
process-private semaphores using pthread condition variables.  This
avoids the leak issue and should also be faster.

As for global semaphores, well - nothing in the FTL SDK actually uses
them, so I decided to just remove the is_global argument from
os_semaphore_create() and only support process-private semaphores.
  • Loading branch information
comex authored and QuinnDamerell-MS committed Jul 18, 2018
1 parent b89829c commit 98289f7
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 115 deletions.
2 changes: 1 addition & 1 deletion libftl/ftl-sdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ FTL_API ftl_status_t ftl_ingest_create(ftl_handle_t *ftl_handle, ftl_ingest_para
os_init_mutex(&ftl->disconnect_mutex);
os_init_mutex(&ftl->status_q.mutex);

if (os_semaphore_create(&ftl->status_q.sem, "/StatusQueue", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&ftl->status_q.sem, "/StatusQueue", O_CREAT, 0) < 0) {
ret_status = FTL_MALLOC_FAILURE;
break;
}
Expand Down
4 changes: 2 additions & 2 deletions libftl/handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ ftl_status_t _ingest_connect(ftl_stream_configuration_private_t *ftl) {

ftl_set_state(ftl, FTL_CONNECTED);

if (os_semaphore_create(&ftl->connection_thread_shutdown, "/ConnectionThreadShutdown", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&ftl->connection_thread_shutdown, "/ConnectionThreadShutdown", O_CREAT, 0) < 0) {
response_code = FTL_MALLOC_FAILURE;
break;
}

if (os_semaphore_create(&ftl->keepalive_thread_shutdown, "/KeepAliveThreadShutdown", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&ftl->keepalive_thread_shutdown, "/KeepAliveThreadShutdown", O_CREAT, 0) < 0) {
response_code = FTL_MALLOC_FAILURE;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions libftl/media.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ ftl_status_t media_init(ftl_stream_configuration_private_t *ftl) {
break;
}

if (os_semaphore_create(&ftl->video.media_component.pkt_ready, "/VideoPkt", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&ftl->video.media_component.pkt_ready, "/VideoPkt", O_CREAT, 0) < 0) {
status = FTL_MALLOC_FAILURE;
break;
}

if (os_semaphore_create(&ftl->audio.media_component.pkt_ready, "/AudioPkt", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&ftl->audio.media_component.pkt_ready, "/AudioPkt", O_CREAT, 0) < 0) {
status = FTL_MALLOC_FAILURE;
break;
}
Expand All @@ -190,7 +190,7 @@ ftl_status_t media_init(ftl_stream_configuration_private_t *ftl) {
break;
}

if (os_semaphore_create(&media->ping_thread_shutdown, "/PingThreadShutdown", O_CREAT, 0, FALSE) < 0) {
if (os_semaphore_create(&media->ping_thread_shutdown, "/PingThreadShutdown", O_CREAT, 0) < 0) {
status = FTL_MALLOC_FAILURE;
break;
}
Expand Down Expand Up @@ -1628,7 +1628,7 @@ FTL_API ftl_status_t ftl_adaptive_bitrate_thread(ftl_handle_t* ftl_handle, void*
thread_params->min_encoding_bitrate = min_encoding_bitrate;
thread_params->context = context;

if (os_semaphore_create(&ftl->bitrate_thread_shutdown, "/BitrateThreadShutdown", O_CREAT, 0, FALSE) < 0)
if (os_semaphore_create(&ftl->bitrate_thread_shutdown, "/BitrateThreadShutdown", O_CREAT, 0) < 0)
{
ret_status = FTL_MALLOC_FAILURE;
break;
Expand Down
138 changes: 50 additions & 88 deletions libftl/posix/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,118 +66,80 @@ int os_delete_mutex(OS_MUTEX *mutex) {
return 0;
}

int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value, BOOL is_global) {
int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value) {

int retval = 0;

sem->name = NULL;
sem->sem = NULL;
if (pthread_mutex_init(&sem->mutex, NULL))
return -2;

do {
if (name == NULL || name[0] != '/') {
retval = -1;
break;
}

//if the semaphore is intended to only be used by the same process and not across processes, give it unique name
if(!is_global) {
int name_len = strlen(name);

if ((sem->name = (char*)malloc( (name_len + 20) * sizeof(char))) == NULL) {
retval = -2;
break;
}

sprintf(sem->name, "%s_%d", name, (unsigned int)rand());
}
else {
if ((sem->name = strdup(name)) == NULL) {
return -2;
}
}

#ifdef __ANDROID__
if ((sem->sem = (sem_t*)malloc(sizeof(sem_t))) == NULL) {
retval = -4;
break;
}

if (sem_init(sem->sem, 0 /* pshared */, 0 /* value */)) {
#else
if ((sem->sem = sem_open(sem->name, oflag, 0644, value)) == SEM_FAILED) {
#endif
retval = -3;
break;
}

return retval;
}while(0);

if(sem->name != NULL){
free(sem->name);
if (pthread_cond_init(&sem->cond, NULL)) {
pthread_mutex_destroy(&sem->mutex);
return -3;
}

if(sem->sem != NULL){
free(sem->sem);
}
sem->value = value;

return retval;
return 0;
}

int os_semaphore_pend(OS_SEMAPHORE *sem, int ms_timeout) {

if (ms_timeout < 0) {
return sem_wait(sem->sem);
}
else {
#ifdef __APPLE__
int sleep_interval = 50;
int retval;
//TODO find a better solution
/*OSX doesnt have a timedwait so this is an ugly polling solution since this SDK doesnt currently use timedwait for performance critical things*/
while (ms_timeout > 0) {
if ((retval = sem_trywait(sem->sem)) == 0) {
break;
}
sleep_ms(sleep_interval);
ms_timeout -= sleep_interval;
}
int retval = 0;

return retval;
#else
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
timespec_add_ms(&ts, ms_timeout);
return sem_timedwait(sem->sem, &ts);
if (pthread_mutex_lock(&sem->mutex))
return -1;

#endif
while (1) {
if (sem->value > 0) {
sem->value--;
break;
} else {
if (ms_timeout < 0) {
if (pthread_cond_wait(&sem->cond, &sem->mutex)) {
retval = -2;
break;
}
} else {
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts)) {
retval = -3;
break;
}
timespec_add_ms(&ts, ms_timeout);
if (pthread_cond_timedwait(&sem->cond, &sem->mutex, &ts)) {
retval = -4;
break;
}
}
continue;
}
}
}

int os_semaphore_post(OS_SEMAPHORE *sem) {
return sem_post(sem->sem);
pthread_mutex_unlock(&sem->mutex);
return retval;
}

int os_semaphore_delete(OS_SEMAPHORE *sem) {

int os_semaphore_post(OS_SEMAPHORE *sem) {
int retval = 0;

#ifdef __ANDROID__
retval = sem_destroy(sem->sem);
free(sem->sem);

#else
if ( (retval = sem_close(sem->sem)) == 0) {

retval = sem_unlink(sem->name);
}
#endif
if (pthread_mutex_lock(&sem->mutex))
return -1;

free(sem->name);
sem->value++;
if (pthread_cond_broadcast(&sem->cond))
retval = -2;

pthread_mutex_unlock(&sem->mutex);
return retval;
}

int os_semaphore_delete(OS_SEMAPHORE *sem) {
pthread_mutex_destroy(&sem->mutex);
pthread_cond_destroy(&sem->cond);
return 0;
}

void sleep_ms(int ms)
{
usleep(ms * 1000);
Expand Down
7 changes: 4 additions & 3 deletions libftl/posix/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ typedef void* OS_THREAD_START_ROUTINE;
typedef void OS_THREAD_ATTRIBS; //todo implement attributes

typedef struct {
sem_t *sem;
char *name;
pthread_mutex_t mutex;
pthread_cond_t cond;
unsigned int value;
}OS_SEMAPHORE;

int os_init();
Expand All @@ -59,7 +60,7 @@ int os_trylock_mutex(OS_MUTEX *mutex);
int os_unlock_mutex(OS_MUTEX *mutex);
int os_delete_mutex(OS_MUTEX *mutex);

int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value, BOOL is_global);
int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value);
int os_semaphore_pend(OS_SEMAPHORE *sem, int ms_timeout);
int os_semaphore_post(OS_SEMAPHORE *sem);
int os_semaphore_delete(OS_SEMAPHORE *sem);
Expand Down
24 changes: 8 additions & 16 deletions libftl/win32/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ int os_delete_mutex(OS_MUTEX *mutex) {
return 0;
}

int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value, BOOL is_global) {
int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value) {

char *internal_name = NULL;
int retval = 0;
Expand All @@ -99,23 +99,15 @@ int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned
}

//if the semaphore is intended to only be used by the same process and not across processes, give it unique name
if(!is_global) {
size_t name_len = strlen(name);
size_t max_name = name_len + 20;
size_t name_len = strlen(name);
size_t max_name = name_len + 20;

if ((internal_name = (char*)malloc(max_name * sizeof(char))) == NULL) {
retval = -2;
break;
}

sprintf_s(internal_name, max_name, "%s_%d", name, (unsigned int)rand());
if ((internal_name = (char*)malloc(max_name * sizeof(char))) == NULL) {
retval = -2;
break;
}
else {
if ((internal_name = _strdup(name)) == NULL) {
retval = -2;
break;
}
}

sprintf_s(internal_name, max_name, "%s_%d", name, (unsigned int)rand());

if ( (*sem = CreateSemaphoreA(NULL, value, MAX_SEM_COUNT, (LPCSTR)internal_name)) == NULL){
retval = -3;
Expand Down
2 changes: 1 addition & 1 deletion libftl/win32/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int os_trylock_mutex(OS_MUTEX *mutex);
int os_unlock_mutex(OS_MUTEX *mutex);
int os_delete_mutex(OS_MUTEX *mutex);

int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value, BOOL is_global);
int os_semaphore_create(OS_SEMAPHORE *sem, const char *name, int oflag, unsigned int value);
int os_semaphore_pend(OS_SEMAPHORE *sem, int ms_timeout);
int os_semaphore_post(OS_SEMAPHORE *sem);
int os_semaphore_delete(OS_SEMAPHORE *sem);
Expand Down

0 comments on commit 98289f7

Please sign in to comment.