-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: ensure only one instance of the program can run at a time (#88)
* feat: ensure only one instance of the program can run at a time * fix(installer): update shortcuts to avoid hotkey conflict and add parameter to notify running instance * refactor: reorder imports and add type annotations * fix(main): correct temporary directory path and argument parser * refactor(filewatcher): Rename FocusEventHandler to FileWatcher * refactor: Rename file watcher function and relocate constants * refactor: optimize startup process when an instance is already running
- Loading branch information
Showing
9 changed files
with
513 additions
and
311 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from __future__ import annotations | ||
from typing import Callable, TYPE_CHECKING | ||
import os | ||
import time | ||
from watchdog.observers import Observer | ||
from watchdog.events import FileSystemEventHandler | ||
|
||
if TYPE_CHECKING: | ||
from watchdog.observers import BaseObserverSubclassCallable | ||
from watchdog.events import FileSystemEvent | ||
from basilisk.consts import TMP_DIR | ||
|
||
|
||
class FileWatcher(FileSystemEventHandler): | ||
last_modified = {} | ||
|
||
def __init__(self, callback: Callable): | ||
self.callback = callback | ||
|
||
def on_modified(self, event: FileSystemEvent): | ||
if event.src_path == os.path.join(TMP_DIR, "focus_file"): | ||
if event.src_path not in self.last_modified: | ||
self.last_modified[event.src_path] = 0 | ||
elif time.time() - self.last_modified[event.src_path] > 1: | ||
self.last_modified[event.src_path] = time.time() | ||
self.callback() | ||
|
||
|
||
def send_focus_signal(): | ||
with open(os.path.join(TMP_DIR, "focus_file"), 'w') as f: | ||
f.write(str(time.time())) | ||
|
||
|
||
def init_file_watcher(callback: Callable) -> BaseObserverSubclassCallable: | ||
event_handler = FileWatcher(callback) | ||
observer = Observer() | ||
observer.schedule(event_handler, TMP_DIR, recursive=False) | ||
observer.start() | ||
return observer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import logging | ||
import shutil | ||
import sys | ||
import threading | ||
import wx | ||
import basilisk.globalvars as globalvars | ||
import basilisk.config as config | ||
|
||
# don't use relative import here, CxFreeze will fail to find the module | ||
from basilisk.consts import APP_NAME, TMP_DIR | ||
from basilisk.filewatcher import init_file_watcher | ||
from basilisk.localization import init_translation | ||
from basilisk.logger import ( | ||
setup_logging, | ||
logging_uncaught_exceptions, | ||
get_log_file_path, | ||
) | ||
from basilisk.serverthread import ServerThread | ||
from basilisk.soundmanager import initialize_sound_manager | ||
from basilisk.updater import automatic_update_check, automatic_update_download | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class MainApp(wx.App): | ||
def OnInit(self) -> bool: | ||
sys.excepthook = logging_uncaught_exceptions | ||
|
||
self.conf = config.initialize_config() | ||
log_level = ( | ||
globalvars.args.log_level or self.conf.general.log_level.name | ||
) | ||
log.debug(f"args: {globalvars.args}") | ||
setup_logging(log_level) | ||
log.debug(f"config: {self.conf}") | ||
if getattr(sys, "frozen", False): | ||
log.info( | ||
"running frozen application: redirecting stdio to log file" | ||
) | ||
self.RedirectStdio(str(get_log_file_path())) | ||
language = globalvars.args.language or self.conf.general.language | ||
self.locale = init_translation(language) | ||
log.info("translation initialized") | ||
initialize_sound_manager() | ||
log.info("sound manager initialized") | ||
from basilisk.gui.mainframe import MainFrame | ||
|
||
self.frame = MainFrame(None, title=APP_NAME, conf=self.conf) | ||
self.SetTopWindow(self.frame) | ||
self.frame.Show(True) | ||
self.file_watcher = init_file_watcher(self.bring_window_to_focus) | ||
self.server = None | ||
if self.conf.server.enable: | ||
self.server = ServerThread(self.frame, self.conf.server.port) | ||
self.server.start() | ||
self.frame.Show() | ||
self.auto_update = None | ||
if ( | ||
self.conf.general.automatic_update_mode | ||
!= config.AutomaticUpdateModeEnum.OFF | ||
): | ||
self.start_auto_update_thread() | ||
log.info("Application started") | ||
return True | ||
|
||
def bring_window_to_focus(self): | ||
wx.CallAfter(self.frame.toggle_visibility, None) | ||
|
||
def start_auto_update_thread(self): | ||
self.stop_auto_update = False | ||
target_func = ( | ||
automatic_update_check | ||
if self.conf.general.automatic_update_mode | ||
== config.AutomaticUpdateModeEnum.NOTIFY | ||
else automatic_update_download | ||
) | ||
callback_func = ( | ||
self.frame.show_update_notification | ||
if self.conf.general.automatic_update_mode | ||
== config.AutomaticUpdateModeEnum.NOTIFY | ||
else self.frame.show_update_download | ||
) | ||
self.auto_update = threading.Thread( | ||
target=target_func, | ||
args=(self.conf, callback_func, self.stop_auto_update), | ||
daemon=True, | ||
) | ||
self.auto_update.start() | ||
log.info("Automatic update thread started") | ||
|
||
def OnExit(self) -> int: | ||
if self.server: | ||
log.debug("Stopping server") | ||
self.server.stop() | ||
self.server.join() | ||
log.debug("Server stopped") | ||
if self.auto_update and self.auto_update.is_alive(): | ||
self.stop_auto_update = True | ||
self.auto_update.join() | ||
log.info("Automatic update thread stopped") | ||
log.debug("Removing temporary files") | ||
shutil.rmtree(TMP_DIR, ignore_errors=True) | ||
|
||
log.debug("Stopping file watcher") | ||
self.file_watcher.stop() | ||
self.file_watcher.join() | ||
log.debug("File watcher stopped") | ||
|
||
log.info("Application exited") | ||
return 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import atexit | ||
import os | ||
|
||
|
||
class SingletonInstance: | ||
def __init__(self, lock_file: str): | ||
self.lock_file = lock_file | ||
self.lock_handle = None | ||
|
||
def acquire(self) -> bool: | ||
"""Return True if lock was acquired, False otherwise.""" | ||
if os.path.exists(self.lock_file): | ||
return False | ||
self.lock_handle = open(self.lock_file, 'w') | ||
try: | ||
self.lock_handle.write(str(os.getpid())) | ||
self.lock_handle.flush() | ||
except Exception: | ||
self.release() | ||
return False | ||
atexit.register(self.release) | ||
return True | ||
|
||
def release(self): | ||
"""Release the lock.""" | ||
if self.lock_handle: | ||
try: | ||
self.lock_handle.close() | ||
except Exception: | ||
pass | ||
try: | ||
os.remove(self.lock_file) | ||
except Exception: | ||
pass | ||
|
||
def get_existing_pid(self) -> int | None: | ||
"""Return the PID of the existing lock file, or None if it doesn't exist.""" | ||
if os.path.exists(self.lock_file): | ||
with open(self.lock_file, 'r') as f: | ||
return int(f.read().strip()) | ||
return None |
Oops, something went wrong.