Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace qprocess by qobject and create new processes per action #88

Merged
merged 11 commits into from
Jul 30, 2024
65 changes: 64 additions & 1 deletion napari_plugin_manager/_tests/test_installer_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ def environment(self, env=None):
return QProcessEnvironment.systemEnvironment()


def test_not_implemented_methods():
tool = AbstractInstallerTool('install', ['requests'])
with pytest.raises(NotImplementedError):
tool.executable()

with pytest.raises(NotImplementedError):
tool.arguments()

with pytest.raises(NotImplementedError):
tool.environment()

with pytest.raises(NotImplementedError):
tool.available()


def test_pip_installer_tasks(qtbot, tmp_virtualenv: 'Session', monkeypatch):
installer = InstallerQueue()
monkeypatch.setattr(
Expand Down Expand Up @@ -165,6 +180,17 @@ def test_installer_failures(qtbot, tmp_virtualenv: 'Session', monkeypatch):
)


def test_cancel_incorrect_job_id(qtbot, tmp_virtualenv: 'Session'):
installer = InstallerQueue()
with qtbot.waitSignal(installer.allFinished, timeout=20000):
job_id = installer.install(
tool=InstallerTools.PIP,
pkgs=['requests'],
)
with pytest.raises(ValueError):
installer.cancel(job_id + 1)


@pytest.mark.skipif(
not CondaInstallerTool.available(), reason="Conda is not available."
)
Expand Down Expand Up @@ -207,7 +233,7 @@ def test_conda_installer(qtbot, tmp_conda_env: Path):
prefix=tmp_conda_env,
)
assert installer.currentJobs() == 2
installer.cancel()
installer.cancel_all()

assert not installer.hasJobs()
assert not list(conda_meta.glob(glob_pat))
Expand Down Expand Up @@ -250,6 +276,38 @@ def test_conda_installer(qtbot, tmp_conda_env: Path):
assert not installer.hasJobs()


def test_installer_error(qtbot, tmp_virtualenv: 'Session', monkeypatch):
installer = InstallerQueue()
monkeypatch.setattr(
PipInstallerTool, "executable", lambda *a: 'not-a-real-executable'
)
with qtbot.waitSignal(installer.allFinished, timeout=600_000):
installer.install(
tool=InstallerTools.PIP,
pkgs=['some-package-that-does-not-exist'],
)


@pytest.mark.skipif(
not CondaInstallerTool.available(), reason="Conda is not available."
)
def test_conda_installer_wait_for_finished(qtbot, tmp_conda_env: Path):
installer = InstallerQueue()

with qtbot.waitSignal(installer.allFinished, timeout=600_000):
installer.install(
tool=InstallerTools.CONDA,
pkgs=['requests'],
prefix=tmp_conda_env,
)
installer.install(
tool=InstallerTools.CONDA,
pkgs=['pyzenhub'],
prefix=tmp_conda_env,
)
installer.waitForFinished(20000)


def test_constraints_are_in_sync():
conda_constraints = sorted(CondaInstallerTool.constraints())
pip_constraints = sorted(PipInstallerTool.constraints())
Expand All @@ -273,3 +331,8 @@ def test_executables():
def test_available():
assert str(CondaInstallerTool.available())
assert PipInstallerTool.available()


def test_unrecognized_tool():
with pytest.raises(ValueError):
InstallerQueue().install(tool='shrug', pkgs=[])
156 changes: 94 additions & 62 deletions napari_plugin_manager/qt_package_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@
pkgs: Tuple[str, ...]
origins: Tuple[str, ...] = ()
prefix: Optional[str] = None
process: QProcess = None

@property
def ident(self):
return hash((self.action, *self.pkgs, *self.origins, self.prefix))
return hash(
(self.action, *self.pkgs, *self.origins, self.prefix, self.process)
)

# abstract method
@classmethod
Expand Down Expand Up @@ -245,7 +248,7 @@
raise ValueError("Prefix has not been specified!")


class InstallerQueue(QProcess):
class InstallerQueue(QObject):
"""Queue for installation and uninstallation tasks in the plugin manager."""

# emitted when all jobs are finished. Not to be confused with finished,
Expand All @@ -257,22 +260,19 @@
# dict: ProcessFinishedData
processFinished = Signal(dict)

# emitted when each job starts
started = Signal()

def __init__(
self, parent: Optional[QObject] = None, prefix: Optional[str] = None
) -> None:
super().__init__(parent)
self._queue: Deque[AbstractInstallerTool] = deque()
self._current_process: QProcess = None
self._prefix = prefix
self._output_widget = None
self._exit_codes = []

self.setProcessChannelMode(QProcess.MergedChannels)
self.readyReadStandardOutput.connect(self._on_stdout_ready)
self.readyReadStandardError.connect(self._on_stderr_ready)

self.finished.connect(self._on_process_finished)
self.errorOccurred.connect(self._on_error_occurred)

# -------------------------- Public API ------------------------------
def install(
self,
Expand Down Expand Up @@ -308,6 +308,7 @@
pkgs=pkgs,
prefix=prefix,
origins=origins,
process=self._create_process(),
**kwargs,
)
return self._queue_item(item)
Expand Down Expand Up @@ -346,6 +347,7 @@
pkgs=pkgs,
prefix=prefix,
origins=origins,
process=self._create_process(),
**kwargs,
)
return self._queue_item(item)
Expand Down Expand Up @@ -379,42 +381,37 @@
action=InstallerActions.UNINSTALL,
pkgs=pkgs,
prefix=prefix,
process=self._create_process(),
**kwargs,
)
return self._queue_item(item)

def cancel(self, job_id: Optional[JobId] = None):
"""Cancel `job_id` if it is running.
def cancel(self, job_id: JobId):
"""Cancel `job_id` if it is running. If `job_id` does not exist int the queue,
a ValueError is raised.

Parameters
----------
job_id : Optional[JobId], optional
Job ID to cancel. If not provided, cancel all jobs.
job_id : JobId
Job ID to cancel.
"""
if job_id is None:
all_pkgs = []
for item in deque(self._queue):
all_pkgs.extend(item.pkgs)

# cancel all jobs
self._queue.clear()
self._end_process()
self.processFinished.emit(
{
'exit_code': 1,
'exit_status': 0,
'action': InstallerActions.CANCEL_ALL,
'pkgs': all_pkgs,
}
)
return

for i, item in enumerate(deque(self._queue)):
if item.ident == job_id:
if i == 0: # first in queue, currently running
if i == 0:
# first in queue, currently running
self._queue.remove(item)
self._end_process()
else: # still pending, just remove from queue

with contextlib.suppress(RuntimeError):
item.process.finished.disconnect(
self._on_process_finished
)
item.process.errorOccurred.disconnect(
self._on_error_occurred
)

self._end_process(item.process)
else:
# still pending, just remove from queue
self._queue.remove(item)

self.processFinished.emit(
Expand All @@ -425,6 +422,7 @@
'pkgs': item.pkgs,
}
)
self._process_queue()
return

msg = f"No job with id {job_id}. Current queue:\n - "
Expand All @@ -434,18 +432,33 @@
for item in self._queue
]
)
raise ValueError(msg)

def cancel_all(self):
"""Terminate all process in the queue and emit the `processFinished` signal."""
all_pkgs = []
for item in deque(self._queue):
all_pkgs.extend(item.pkgs)
process = item.process

with contextlib.suppress(RuntimeError):
process.finished.disconnect(self._on_process_finished)
process.errorOccurred.disconnect(self._on_error_occurred)

self._end_process(process)

self._queue.clear()
self._current_process = None
self.processFinished.emit(
{
'exit_code': 1,
'exit_status': 0,
'action': InstallerActions.CANCEL,
'pkgs': [],
'action': InstallerActions.CANCEL_ALL,
'pkgs': all_pkgs,
}
)
raise ValueError(msg)

def cancel_all(self):
self.cancel()
self._process_queue()
return

def waitForFinished(self, msecs: int = 10000) -> bool:
"""Block and wait for all jobs to finish.
Expand All @@ -456,7 +469,8 @@
Time to wait, by default 10000
"""
while self.hasJobs():
super().waitForFinished(msecs)
if self._current_process is not None:
self._current_process.waitForFinished(msecs)
return True

def hasJobs(self) -> bool:
Expand All @@ -472,6 +486,15 @@
self._output_widget = output_widget

# -------------------------- Private methods ------------------------------
def _create_process(self) -> QProcess:
process = QProcess(self)
process.setProcessChannelMode(QProcess.MergedChannels)
process.readyReadStandardOutput.connect(self._on_stdout_ready)
process.readyReadStandardError.connect(self._on_stderr_ready)
process.finished.connect(self._on_process_finished)
process.errorOccurred.connect(self._on_error_occurred)
return process

def _log(self, msg: str):
log.debug(msg)
if self._output_widget:
Expand Down Expand Up @@ -513,27 +536,32 @@
return

tool = self._queue[0]
self.setProgram(str(tool.executable()))
self.setProcessEnvironment(tool.environment())
self.setArguments([str(arg) for arg in tool.arguments()])
# this might throw a warning because the same process
# was already running but it's ok
self._log(
trans._(
"Starting '{program}' with args {args}",
program=self.program(),
args=self.arguments(),
process = tool.process

if process.state() != QProcess.Running:
process.setProgram(str(tool.executable()))
process.setProcessEnvironment(tool.environment())
process.setArguments([str(arg) for arg in tool.arguments()])
process.started.connect(self.started)

self._log(
trans._(
"Starting '{program}' with args {args}",
program=process.program(),
args=process.arguments(),
)
)
)
self.start()

def _end_process(self):
process.start()
self._current_process = process

def _end_process(self, process: QProcess):
if os.name == 'nt':
# TODO: this might be too agressive and won't allow rollbacks!
# investigate whether we can also do .terminate()
self.kill()
process.kill()
else:
self.terminate()
process.terminate()

if self._output_widget:
self._output_widget.append(
Expand Down Expand Up @@ -605,14 +633,18 @@
self._process_queue()

def _on_stdout_ready(self):
text = self.readAllStandardOutput().data().decode()
if text:
self._log(text)
if self._current_process is not None:
text = (
self._current_process.readAllStandardOutput().data().decode()
)
if text:
self._log(text)

def _on_stderr_ready(self):
text = self.readAllStandardError().data().decode()
if text:
self._log(text)
if self._current_process is not None:
text = self._current_process.readAllStandardError().data().decode()
if text:
self._log(text)

Check warning on line 647 in napari_plugin_manager/qt_package_installer.py

View check run for this annotation

Codecov / codecov/patch

napari_plugin_manager/qt_package_installer.py#L644-L647

Added lines #L644 - L647 were not covered by tests


def _get_python_exe():
Expand Down
Loading