diff --git a/jupyverse_api/jupyverse_api/contents/__init__.py b/jupyverse_api/jupyverse_api/contents/__init__.py index 4296e29b..8edbd615 100644 --- a/jupyverse_api/jupyverse_api/contents/__init__.py +++ b/jupyverse_api/jupyverse_api/contents/__init__.py @@ -1,7 +1,7 @@ import asyncio from abc import ABC, abstractmethod from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from fastapi import APIRouter, Depends, Request, Response @@ -15,6 +15,7 @@ class FileIdManager(ABC): stop_watching_files: asyncio.Event stopped_watching_files: asyncio.Event + Change: Any @abstractmethod async def get_path(self, file_id: str) -> str: diff --git a/plugins/contents/fps_contents/fileid.py b/plugins/contents/fps_contents/fileid.py index f489c59d..b6af9f2e 100644 --- a/plugins/contents/fps_contents/fileid.py +++ b/plugins/contents/fps_contents/fileid.py @@ -35,9 +35,11 @@ class FileIdManager(metaclass=Singleton): initialized: asyncio.Event watchers: Dict[str, List[Watcher]] lock: asyncio.Lock + Change = Change - def __init__(self, db_path: str = ".fileid.db"): + def __init__(self, db_path: str = ".fileid.db", root_dir: str = "."): self.db_path = db_path + self.root_dir = root_dir self.initialized = asyncio.Event() self.watchers = {} self.watch_files_task = asyncio.create_task(self.watch_files()) @@ -90,7 +92,7 @@ async def watch_files(self): # index files async with self.lock: async with aiosqlite.connect(self.db_path) as db: - async for path in Path().rglob("*"): + async for path in Path(self.root_dir).rglob("*"): idx = uuid4().hex mtime = (await path.stat()).st_mtime await db.execute( @@ -99,14 +101,16 @@ async def watch_files(self): await db.commit() self.initialized.set() - async for changes in awatch(".", stop_event=self.stop_watching_files): + async for changes in awatch(self.root_dir, stop_event=self.stop_watching_files): async with self.lock: async with aiosqlite.connect(self.db_path) as db: deleted_paths = set() added_paths = set() for change, changed_path in changes: # get relative path - changed_path = Path(changed_path).relative_to(await Path().absolute()) + changed_path = Path(changed_path).relative_to( + await Path(self.root_dir).absolute() + ) changed_path_str = str(changed_path) if change == Change.deleted: @@ -156,9 +160,16 @@ async def watch_files(self): for change in changes: changed_path = change[1] # get relative path - relative_changed_path = str(Path(changed_path).relative_to(await Path().absolute())) + relative_changed_path = Path(changed_path).relative_to( + await Path(self.root_dir).absolute() + ) relative_change = (change[0], relative_changed_path) - for watcher in self.watchers.get(relative_changed_path, []): + all_watchers = [] + for path, watchers in self.watchers.items(): + p = Path(path) + if p == relative_changed_path or p in relative_changed_path.parents: + all_watchers += watchers + for watcher in all_watchers: watcher.notify(relative_change) self.stopped_watching_files.set() diff --git a/plugins/yjs/fps_yjs/ydocs/ydrive.py b/plugins/yjs/fps_yjs/ydocs/ydrive.py new file mode 100644 index 00000000..d7066b3d --- /dev/null +++ b/plugins/yjs/fps_yjs/ydocs/ydrive.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from contextlib import AsyncExitStack +from functools import partial +from pathlib import Path +from typing import Any, Callable + +from anyio import create_task_group +from anyio.abc import TaskGroup +from pycrdt import Doc, Map, MapEvent + +from jupyverse_api.auth import User +from jupyverse_api.contents import Contents + +from .ybasedoc import YBaseDoc + + +class YDrive(YBaseDoc): + _starting: bool + _task_group: TaskGroup | None + + def __init__( + self, + contents: Contents, + ydoc: Doc | None = None, + root_dir: Path | str | None = None, + ): + super().__init__(ydoc) + self._root_dir = Path() if root_dir is None else Path(root_dir) + self._ydoc["content"] = self._ycontent = self._new_dir_content() + self._ycontent.observe_deep(self._callback) + self._user = User() + self._starting = False + self._task_group = None + self._contents = contents + self._watcher = contents.file_id_manager.watch(".") + + async def __aenter__(self) -> YDrive: + if self._task_group is not None: + raise RuntimeError("YDrive already running") + + async with AsyncExitStack() as exit_stack: + tg = create_task_group() + self._task_group = await exit_stack.enter_async_context(tg) + self._exit_stack = exit_stack.pop_all() + + assert self._task_group is not None + self._task_group.start_soon(self._process_file_changes) + + return self + + async def _process_file_changes(self): + async for change in self._watcher: + change_, path = change + if change_ == self._contents.file_id_manager.Change.deleted: + parent_content = self._get(path.parent) + del parent_content["content"][path.name] + + async def __aexit__(self, exc_type, exc_value, exc_tb): + if self._task_group is None: + raise RuntimeError("YDrive not running") + + self._task_group.cancel_scope.cancel() + self._task_group = None + return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) + + def _callback(self, events): + for event in events: + if isinstance(event, MapEvent): + current = self._ycontent + for path in event.path: + current = current[path] + for key, val in event.keys.items(): + if val.get("action") == "delete": + path = "/".join(event.path[1::2] + [key]) + self._task_group.start_soon(self._contents.delete_content, path, self._user) + + @property + def version(self) -> str: + return "1.0.0" + + def _new_dir_content(self) -> Map: + return Map({"is_dir": True, "content": None}) + + def _new_file_content(self, size: int) -> Map: + return Map({"is_dir": False, "size": size}) + + def _get_directory_content(self, path: Path) -> Map: + res = {} + for entry in (self._root_dir / path).iterdir(): + if entry.is_dir(): + res[entry.name] = self._new_dir_content() + else: + stat = entry.stat() + res[entry.name] = self._new_file_content( + size=stat.st_size, + ) + return Map(res) + + def _maybe_populate_dir(self, path: Path, content: Map): + if content["content"] is None: + content["content"] = self._get_directory_content(path) + + def _get(self, path: Path | str | None = None) -> Map: + path = Path() if path is None else Path(path) + current_content = self._ycontent + self._maybe_populate_dir(path, self._ycontent) + cwd = Path() + last_idx = len(path.parts) - 1 + for idx, part in enumerate(path.parts): + try: + current_content = current_content["content"][part] + except KeyError: + raise FileNotFoundError(f'No entry "{part}" in "{cwd}".') + if current_content["is_dir"]: + cwd /= part + self._maybe_populate_dir(cwd, current_content) + elif idx < last_idx: + raise RuntimeError(f'Entry "{part}" in "{cwd}" is not a directory.') + return current_content + + def get(self, path: Path | str | None = None) -> dict: + return dict(self._get(path)) + + def delete(self, path: Path | str): + path = Path(path) if isinstance(path, str) else path + if not path.parts: + raise RuntimeError("Cannot delete root directory") + parent_content = self._get(path.parent) + del parent_content["content"][path.name] + + def set(self, value) -> None: + raise RuntimeError("Cannot set a YDrive") + + def observe(self, callback: Callable[[str, Any], None]) -> None: + self.unobserve() + self._subscriptions[self._ystate] = self._ystate.observe(partial(callback, "state")) + self._subscriptions[self._ycontent] = self._ycontent.observe_deep(partial(callback, "content")) diff --git a/plugins/yjs/pyproject.toml b/plugins/yjs/pyproject.toml index d395ae81..b9ed2871 100644 --- a/plugins/yjs/pyproject.toml +++ b/plugins/yjs/pyproject.toml @@ -8,10 +8,18 @@ description = "An FPS plugin for the Yjs API" keywords = [ "jupyter", "server", "fastapi", "plugins" ] requires-python = ">=3.8" dependencies = [ - "pycrdt >=0.3.4,<0.4.0", + "anyio >=3.6.2,<5", + "pycrdt >=0.7.2,<0.8.0", "jupyverse-api >=0.1.2,<1", ] dynamic = [ "version",] + +[project.optional-dependencies] +test = [ + "pytest", + "fps-contents", +] + [[project.authors]] name = "Jupyter Development Team" email = "jupyter@googlegroups.com" diff --git a/plugins/yjs/tests/conftest.py b/plugins/yjs/tests/conftest.py new file mode 100644 index 00000000..af7e4799 --- /dev/null +++ b/plugins/yjs/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture +def anyio_backend(): + return "asyncio" diff --git a/plugins/yjs/tests/fake_contents.py b/plugins/yjs/tests/fake_contents.py new file mode 100644 index 00000000..1abbc66a --- /dev/null +++ b/plugins/yjs/tests/fake_contents.py @@ -0,0 +1,14 @@ +from anyio import create_memory_object_stream +from anyio.streams.stapled import StapledObjectStream +from fps_contents.fileid import FileIdManager + + +class Contents: + def __init__(self, db_path, root_dir): + send_stream, recv_stream = create_memory_object_stream[str]() + self.event_stream = StapledObjectStream(send_stream, recv_stream) + self.file_id_manager = FileIdManager(db_path=db_path, root_dir=root_dir) + self.watcher = self.file_id_manager.watch(".") + + async def delete_content(self, path, user): + await self.event_stream.send(f"delete {path}") diff --git a/plugins/yjs/tests/test_ydocs.py b/plugins/yjs/tests/test_ydocs.py new file mode 100644 index 00000000..a38780ec --- /dev/null +++ b/plugins/yjs/tests/test_ydocs.py @@ -0,0 +1,69 @@ +import tempfile +from pathlib import Path + +import pytest +from anyio import sleep +from fake_contents import Contents +from fps_yjs.ydocs.ydrive import YDrive + + +@pytest.mark.anyio +async def test_ydrive(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_dir = Path(tmp_dir) + (tmp_dir / "file0").write_text(" " * 1) + (tmp_dir / "file1").write_text(" " * 2) + (tmp_dir / "dir0").mkdir() + (tmp_dir / "dir0" / "file2").write_text(" " * 3) + (tmp_dir / "dir1").mkdir() + (tmp_dir / "dir1" / "dir2").mkdir() + (tmp_dir / "dir1" / "dir2" / "file3").write_text(" " * 4) + (tmp_dir / "dir1" / "dir2" / "file4").write_text(" " * 5) + + contents = Contents(db_path=str(tmp_dir / ".fileid.db"), root_dir=str(tmp_dir)) + + async with YDrive(contents=contents, root_dir=tmp_dir) as ydrive: + + with pytest.raises(FileNotFoundError): + ydrive.get("doesnt_exist") + + root_dir = ydrive.get() + assert len(root_dir["content"]) == 4 + assert "file0" in root_dir["content"] + assert "file1" in root_dir["content"] + assert "dir0" in root_dir["content"] + assert "dir1" in root_dir["content"] + + dir0 = ydrive.get("dir0") + assert len(dir0["content"]) == 1 + assert "file2" in dir0["content"] + + dir1 = ydrive.get("dir1") + assert len(dir1["content"]) == 1 + assert "dir2" in dir1["content"] + + dir2 = ydrive.get("dir1/dir2") + assert len(dir2["content"]) == 2 + assert "file3" in dir2["content"] + assert "file4" in dir2["content"] + assert dict(dir1["content"]["dir2"]["content"]["file3"]) == {"is_dir": False, "size": 4} + + # the fake contents actually doesn't delete files + path = "file0" + ydrive.delete(path) + assert await contents.event_stream.receive() == f"delete {path}" + path = "dir1/dir2/file3" + ydrive.delete(path) + assert await contents.event_stream.receive() == f"delete {path}" + + await contents.file_id_manager.initialized.wait() + await sleep(0.1) + assert "file1" in root_dir["content"] + (tmp_dir / "file1").unlink() + await sleep(0.2) + assert "file1" not in root_dir["content"] + + assert "file4" in dir2["content"] + (tmp_dir / "dir1" / "dir2" / "file4").unlink() + await sleep(0.1) + assert "file4" not in dir2["content"]