Skip to content

Commit

Permalink
Creates a dialog to answer messages
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Jul 13, 2023
1 parent b56202d commit 50f9c43
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 17 deletions.
2 changes: 1 addition & 1 deletion jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class YDocExtension(ExtensionApp):
)

document_cleanup_delay = Float(
60,
10,
allow_none=True,
config=True,
help="""The delay in seconds to keep a document in memory in the back-end after all clients
Expand Down
3 changes: 1 addition & 2 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
MessageType,
RoomMessages,
decode_file_path,
)
from .websocketserver import JupyterWebsocketServer
Expand Down Expand Up @@ -320,7 +319,7 @@ async def _clean_room(self) -> None:
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0:
self.log.info("Deleting file %s", file.path)
del self._file_loaders[file_id]
await self._file_loaders.remove(file_id)
self._emit(LogLevel.INFO, "clean", "Loader deleted.")

def check_origin(self, origin):
Expand Down
3 changes: 2 additions & 1 deletion jupyter_collaboration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ async def maybe_save_content(self, model: dict[str, Any]) -> dict[str, Any]:
)

if model["last_modified"] == m["last_modified"]:
return await self.save_content(model)
self._log.info("Saving file: %s", path)
return await ensure_async(self._contents_manager.save(model, path))

else:
# file changed on disk, raise an error
Expand Down
22 changes: 14 additions & 8 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
self._save_delay = save_delay

self._update_lock = asyncio.Lock()
self._outofband_lock = asyncio.Lock()
self._initialization_lock = asyncio.Lock()
self._cleaner: asyncio.Task | None = None
self._saving_document: asyncio.Task | None = None
Expand Down Expand Up @@ -160,7 +161,7 @@ async def initialize(self) -> None:

async def handle_msg(self, data: bytes) -> None:
msg_type = data[0]
msg_id = data[1:].decode()
msg_id = data[2:].decode()

# Use a lock to prevent handling responses from multiple clients
# at the same time
Expand All @@ -171,9 +172,9 @@ async def handle_msg(self, data: bytes) -> None:

try:
ans = None
if msg_type == RoomMessages.RESTORE:
if msg_type == RoomMessages.RELOAD:
# Restore the room with the content from disk
await self._restore()
await self._load_document()
ans = RoomMessages.DOC_OVERWRITTEN

elif msg_type == RoomMessages.OVERWRITE:
Expand All @@ -185,7 +186,8 @@ async def handle_msg(self, data: bytes) -> None:
# Remove the lock and broadcast the resolution
self._messages.pop(msg_id)
data = msg_id.encode()
self._broadcast_msg(
self._outofband_lock.release()
await self._broadcast_msg(
bytes([MessageType.ROOM, ans]) + write_var_uint(len(data)) + data
)

Expand Down Expand Up @@ -230,6 +232,9 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
event (str): Type of change.
args (dict): A dictionary with format, type, last_modified.
"""
if self._outofband_lock.locked():
return

if event == "metadata" and (
self._last_modified is None or self._last_modified < args["last_modified"]
):
Expand All @@ -238,8 +243,9 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
self._broadcast_msg(
await self._broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
+ write_var_uint(len(data))
+ data
Expand Down Expand Up @@ -270,7 +276,7 @@ def _on_document_change(self, target: str, event: Any) -> None:

self._saving_document = asyncio.create_task(self._maybe_save_document())

async def _restore(self) -> None:
async def _load_document(self) -> None:
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
except Exception as e:
Expand Down Expand Up @@ -362,9 +368,9 @@ async def _maybe_save_document(self) -> None:
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)

def _broadcast_msg(self, msg) -> None:
async def _broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
client.send(msg)
await client.send(msg)


class TransientRoom(YRoom):
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MessageType(IntEnum):


class RoomMessages(IntEnum):
RESTORE = 0
RELOAD = 0
OVERWRITE = 1
FILE_CHANGED = 2
FILE_OVERWRITTEN = 3
Expand Down
5 changes: 1 addition & 4 deletions packages/docprovider/src/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import { WebsocketProvider } from 'y-websocket';

import { MessageType } from './utils';
import { IAwarenessProvider } from './tokens';

export enum MessageType {
CHAT = 125
}

export interface IContent {
type: string;
body: string;
Expand Down
17 changes: 17 additions & 0 deletions packages/docprovider/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* -----------------------------------------------------------------------------
| Copyright (c) Jupyter Development Team.
| Distributed under the terms of the Modified BSD License.
|----------------------------------------------------------------------------*/

export enum MessageType {
ROOM = 124,
CHAT = 125
}

export enum RoomMessage {
RELOAD = 0,
OVERWRITE = 1,
FILE_CHANGED = 2,
FILE_OVERWRITTEN = 3,
DOC_OVERWRITTEN = 4
}
68 changes: 68 additions & 0 deletions packages/docprovider/src/yprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import { Signal } from '@lumino/signaling';

import { DocumentChange, YDocument } from '@jupyter/ydoc';

import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import { Awareness } from 'y-protocols/awareness';
import { WebsocketProvider as YWebsocketProvider } from 'y-websocket';

import { requestDocSession } from './requests';
import { MessageType, RoomMessage } from './utils';

/**
* An interface for a document provider.
Expand Down Expand Up @@ -111,6 +114,18 @@ export class WebSocketProvider implements IDocumentProvider {

this._yWebsocketProvider.on('sync', this._onSync);
this._yWebsocketProvider.on('connection-close', this._onConnectionClosed);

this._yWebsocketProvider.messageHandlers[MessageType.ROOM] = (
encoder,
decoder,
provider,
emitSynced,
messageType
) => {
const msgType = decoding.readVarUint(decoder);
const data = decoding.readVarString(decoder);
this._handleRoomMessage(msgType, data);
};
}

private _onUserChanged(user: User.IManager): void {
Expand Down Expand Up @@ -138,6 +153,59 @@ export class WebSocketProvider implements IDocumentProvider {
}
};

private _handleRoomMessage(type: number, data: string): void {
switch (type) {
case RoomMessage.FILE_CHANGED:
this._handleFileChanged(data);
break;

case RoomMessage.DOC_OVERWRITTEN:
case RoomMessage.FILE_OVERWRITTEN:
if (this._dialog) {
this._dialog.close();
this._dialog = null;
}
break;
}
}

private _handleFileChanged(data: string): void {
this._dialog = new Dialog({
title: this._trans.__('File changed'),
body: this._trans.__('Do you want to overwrite the file or reload it?'),
buttons: [
Dialog.okButton({ label: 'Reload' }),
Dialog.warnButton({ label: 'Overwrite' })
],
hasClose: false
});

this._dialog.launch().then(resp => {
if (resp.button.label === 'Reload') {
this._sendReloadMsg(data);
} else if (resp.button.label === 'Overwrite') {
this._sendOverwriteMsg(data);
}
});
}

private _sendReloadMsg(data: string): void {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MessageType.ROOM);
encoding.writeVarUint(encoder, RoomMessage.RELOAD);
encoding.writeVarString(encoder, data);
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
}

private _sendOverwriteMsg(data: string): void {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MessageType.ROOM);
encoding.writeVarUint(encoder, RoomMessage.OVERWRITE);
encoding.writeVarString(encoder, data);
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
}

private _dialog: Dialog<any> | null = null;
private _awareness: Awareness;
private _contentType: string;
private _format: string;
Expand Down

0 comments on commit 50f9c43

Please sign in to comment.