Skip to content

Commit

Permalink
fix blocking during persist backup
Browse files Browse the repository at this point in the history
Signed-off-by: Flávio Tapajós <[email protected]>
  • Loading branch information
ftapajos committed Sep 13, 2024
1 parent 57f0e17 commit f60d10a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 25 deletions.
1 change: 1 addition & 0 deletions config.mk
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ ifeq ($(WITH_THREADING),yes)
LIB_LDFLAGS:=$(LIB_LDFLAGS) -pthread
LIB_CPPFLAGS:=$(LIB_CPPFLAGS) -DWITH_THREADING
CLIENT_CPPFLAGS:=$(CLIENT_CPPFLAGS) -DWITH_THREADING
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_THREADING
STATIC_LIB_DEPS:=$(STATIC_LIB_DEPS) -pthread
endif

Expand Down
1 change: 1 addition & 0 deletions lib/dummypthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define pthread_mutex_init(A, B)
#define pthread_mutex_destroy(A)
#define pthread_mutex_lock(A)
#define pthread_mutex_trylock(A) false
#define pthread_mutex_unlock(A)

#endif
8 changes: 4 additions & 4 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#endif
#include <stdlib.h>

#if defined(WITH_THREADING) && !defined(WITH_BROKER)
#if defined(WITH_THREADING)
# include <pthread.h>
#else
# include <dummypthread.h>
Expand Down Expand Up @@ -203,9 +203,9 @@ struct mosquitto_msg_data{
#else
struct mosquitto_message_all *inflight;
int queue_len;
# ifdef WITH_THREADING
#endif
#ifdef WITH_THREADING
pthread_mutex_t mutex;
# endif
#endif
int inflight_quota;
uint16_t inflight_maximum;
Expand Down Expand Up @@ -272,7 +272,7 @@ struct mosquitto {
enum mosquitto__keyform tls_keyform;
#endif
bool want_write;
#if defined(WITH_THREADING) && !defined(WITH_BROKER)
#if defined(WITH_THREADING)
pthread_mutex_t callback_mutex;
pthread_mutex_t log_callback_mutex;
pthread_mutex_t msgtime_mutex;
Expand Down
87 changes: 66 additions & 21 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,63 @@ static void queue_plugin_msgs(void)
}


#ifdef WITH_PERSISTENCE
void check_persistence(time_t *last_backup){
if(db.config->persistence && db.config->autosave_interval){
if(db.config->autosave_on_changes){
if(db.persistence_changes >= db.config->autosave_interval){
persist__backup(false);
db.persistence_changes = 0;
}
}else{
if(*last_backup + db.config->autosave_interval < db.now_s){
persist__backup(false);
*last_backup = db.now_s;
}
}
}

if(flag_db_backup){
flag_db_backup = false;
persist__backup(false);
}
}

void threaded_persistence(bool *run)
{
time_t last_backup = mosquitto_time();
pthread_mutex_t persistence_mutex;
pthread_mutex_init(&persistence_mutex, NULL);

while(*run){

if(!pthread_mutex_trylock(&persistence_mutex)){
check_persistence(&last_backup);

pthread_mutex_unlock(&persistence_mutex);
}
#ifdef WIN32
Sleep(1000);
#else
sleep(1);
#endif
}

pthread_mutex_destroy(&persistence_mutex);
}
#endif

int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count)
{
#ifdef WITH_SYS_TREE
time_t start_time = mosquitto_time();
#endif
#ifdef WITH_PERSISTENCE
# ifdef WITH_THREADING
pthread_t thread__persist_id;
# else
time_t last_backup = mosquitto_time();
# endif
#endif
#ifdef WITH_WEBSOCKETS
int i;
Expand All @@ -186,6 +236,10 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
if(rc) return rc;
#endif

#if defined(WITH_PERSISTENCE) && defined(WITH_THREADING)
pthread_create(&thread__persist_id, NULL, (void*) &threaded_persistence, &run);
#endif

while(run){
queue_plugin_msgs();
context__free_disused();
Expand All @@ -202,32 +256,15 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
#endif

rc = mux__handle(listensock, listensock_count);
if(rc) return rc;
if(rc) goto error;

session_expiry__check();
will_delay__check();
#ifdef WITH_PERSISTENCE
if(db.config->persistence && db.config->autosave_interval){
if(db.config->autosave_on_changes){
if(db.persistence_changes >= db.config->autosave_interval){
persist__backup(false);
db.persistence_changes = 0;
}
}else{
if(last_backup + db.config->autosave_interval < db.now_s){
persist__backup(false);
last_backup = db.now_s;
}
}
}
#endif

#ifdef WITH_PERSISTENCE
if(flag_db_backup){
persist__backup(false);
flag_db_backup = false;
}
#if defined(WITH_PERSISTENCE) && !defined(WITH_THREADING)
check_persistence(&last_backup);
#endif

if(flag_reload){
log__printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
config__read(db.config, true);
Expand Down Expand Up @@ -270,6 +307,14 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens

mux__cleanup();

error:

#if defined(WITH_PERSISTENCE) && defined(WITH_THREADING)
run = false;
pthread_join(thread__persist_id, NULL);
#endif
if(rc) return rc;

return MOSQ_ERR_SUCCESS;
}

Expand Down

0 comments on commit f60d10a

Please sign in to comment.