diff --git a/lavague-core/lavague/core/__init__.py b/lavague-core/lavague/__init__.py similarity index 70% rename from lavague-core/lavague/core/__init__.py rename to lavague-core/lavague/__init__.py index a14c5530..7ee154a9 100644 --- a/lavague-core/lavague/core/__init__.py +++ b/lavague-core/lavague/__init__.py @@ -1,7 +1,7 @@ -from lavague.core.utilities.version_checker import check_latest_version +from lavague.utilities.version_checker import check_latest_version -from lavague.core.agent import WebAgent -from lavague.core.trajectory import Trajectory +from lavague.agent import WebAgent +from lavague.trajectory import Trajectory import os import warnings diff --git a/lavague-core/lavague/core/action/__init__.py b/lavague-core/lavague/action/__init__.py similarity index 58% rename from lavague-core/lavague/core/action/__init__.py rename to lavague-core/lavague/action/__init__.py index c38e5c8f..6157506a 100644 --- a/lavague-core/lavague/core/action/__init__.py +++ b/lavague-core/lavague/action/__init__.py @@ -1,11 +1,12 @@ -from lavague.core.action.base import ( +from lavague.action.base import ( Action, ActionStatus, ActionParser, DEFAULT_PARSER, UnhandledTypeException, + ActionTranslator, ) -from lavague.core.action.navigation import NavigationAction +from lavague.action.navigation import NavigationAction DEFAULT_PARSER.register("navigation", NavigationAction) diff --git a/lavague-core/lavague/action/base.py b/lavague-core/lavague/action/base.py new file mode 100644 index 00000000..73448ba6 --- /dev/null +++ b/lavague-core/lavague/action/base.py @@ -0,0 +1,60 @@ +from typing import Dict, Type, Optional, Callable, TypeVar, Self +from pydantic import BaseModel, validate_call +from enum import Enum + + +class ActionStatus(Enum): + COMPLETED = "completed" + FAILED = "failed" + + +class Action(BaseModel): + """Action performed by the agent.""" + + engine: str + action: str + status: ActionStatus + + @classmethod + def parse(cls, action_dict: Dict) -> "Action": + return cls(**action_dict) + + @classmethod + def add_translator(cls, name: str, translator: "ActionTranslator[Self]"): + setattr(cls, name, translator) + + +class ActionParser(BaseModel): + engine_action_builders: Dict[str, Type[Action]] + + def __init__(self): + super().__init__(engine_action_builders={}) + + @validate_call + def register(self, engine: str, action: Type[Action]): + self.engine_action_builders[engine] = action + + def unregister(self, engine: str): + if engine in self.engine_action_builders: + del self.engine_action_builders[engine] + + def parse(self, action_dict: Dict) -> Action: + engine = action_dict.get("engine", "") + target_type: Type[Action] = self.engine_action_builders.get(engine, Action) + try: + return target_type.parse(action_dict) + except UnhandledTypeException: + return Action.parse(action_dict) + + +class UnhandledTypeException(Exception): + pass + + +T = TypeVar("T", bound=Action) + + +ActionTranslator = Callable[[T], Optional[str]] + + +DEFAULT_PARSER = ActionParser() diff --git a/lavague-core/lavague/action/navigation.py b/lavague-core/lavague/action/navigation.py new file mode 100644 index 00000000..3ec24a4a --- /dev/null +++ b/lavague-core/lavague/action/navigation.py @@ -0,0 +1,83 @@ +from lavague.action import Action +from typing import ClassVar, Dict, Type, Optional, TypeVar + +T = TypeVar("T", bound="NavigationAction") + + +class NavigationAction(Action): + """Navigation action performed by the agent.""" + + subtypes: ClassVar[Dict[str, Type["NavigationAction"]]] = {} + + xpath: str + value: Optional[str] = None + + @classmethod + def parse(cls, action_dict: Dict) -> "NavigationAction": + action_name = action_dict.get("action", "") + target_type = cls.subtypes.get(action_name, NavigationAction) + return target_type(**action_dict) + + @classmethod + def register_subtype(cls, subtype: str, action: Type[T]): + cls.subtypes[subtype] = action + return cls + + +def register_navigation(name: str): + def wrapper(cls: Type[T]) -> Type[T]: + NavigationAction.register_subtype(name, cls) + return cls + + return wrapper + + +class NavigationWithValueAction(NavigationAction): + """Navigation action performed by the agent with a value.""" + + value: str + + +@register_navigation("click") +class ClickAction(NavigationAction): + pass + + +@register_navigation("hover") +class HoverAction(NavigationAction): + pass + + +@register_navigation("setValue") +class SetValueAction(NavigationWithValueAction): + pass + + +@register_navigation("setValueAndEnter") +class SetValueAndEnterAction(SetValueAction): + pass + + +@register_navigation("dropdownSelect") +class DropdownSelectAction(NavigationWithValueAction): + pass + + +@register_navigation("scroll_down") +class ScrollDownAction(NavigationAction): + pass + + +@register_navigation("scroll_up") +class ScrollUpAction(NavigationAction): + pass + + +@register_navigation("back") +class BackAction(NavigationAction): + pass + + +@register_navigation("switch_tab") +class SwitchTabAction(NavigationAction): + pass diff --git a/lavague-core/lavague/core/agent.py b/lavague-core/lavague/agent.py similarity index 87% rename from lavague-core/lavague/core/agent.py rename to lavague-core/lavague/agent.py index 93cd79ab..9ee2f8f7 100644 --- a/lavague-core/lavague/core/agent.py +++ b/lavague-core/lavague/agent.py @@ -1,9 +1,9 @@ import logging from pydantic import BaseModel from typing import Optional -from lavague.core.trajectory import Trajectory -from lavague.core.client import LaVagueClient -from lavague.core.utilities.config import get_config +from lavague.trajectory import Trajectory +from lavague.client import LaVagueClient +from lavague.utilities.config import get_config logging_print = logging.getLogger(__name__) logging_print.setLevel(logging.INFO) diff --git a/lavague-core/lavague/cli.py b/lavague-core/lavague/cli.py new file mode 100644 index 00000000..86f2a1ab --- /dev/null +++ b/lavague-core/lavague/cli.py @@ -0,0 +1,27 @@ +import click +from lavague import WebAgent +import sys +from typing import Optional + + +def run(url: str, objective: str, file: Optional[str] = None): + agent = WebAgent() + trajectory = agent.run(url, objective) + if file: + trajectory.write_to_file(file) + else: + print(trajectory.model_dump_json(indent=2)) + + +@click.command() +@click.argument("url", required=True) +@click.argument("objective", required=True) +@click.option("--file", "-f", required=False) +def cli_run(url: str, objective: str, file: Optional[str]): + run(url, objective, file) + + +if __name__ == "__main__": + url = sys.argv[1] + objective = sys.argv[2] + run(url, objective) diff --git a/lavague-core/lavague/core/client.py b/lavague-core/lavague/client.py similarity index 61% rename from lavague-core/lavague/core/client.py rename to lavague-core/lavague/client.py index 2a69e874..518359e3 100644 --- a/lavague-core/lavague/core/client.py +++ b/lavague-core/lavague/client.py @@ -1,8 +1,7 @@ from pydantic import BaseModel -from lavague.core.utilities.config import get_config, LAVAGUE_API_BASE_URL -from lavague.core.action import ActionParser, DEFAULT_PARSER -from lavague.core.trajectory import Trajectory -from pydantic_core import from_json +from lavague.utilities.config import get_config, is_flag_true, LAVAGUE_API_BASE_URL +from lavague.action import ActionParser, DEFAULT_PARSER +from lavague.trajectory import Trajectory from typing import Any, Optional import requests @@ -14,16 +13,20 @@ class LaVagueClient(BaseModel): api_base_url: str = get_config("LAVAGUE_API_BASE_URL", LAVAGUE_API_BASE_URL) api_key: str = get_config("LAVAGUE_API_KEY") + telemetry: bool = is_flag_true("LAVAGUE_TELEMETRY", True) parser: ActionParser = DEFAULT_PARSER def request_api(self, endpoint: str, method: str, json: Optional[Any]) -> bytes: + headers = { + "Authorization": f"Bearer {self.api_key}", + } + if not self.telemetry: + headers["DNT"] = "1" response = requests.request( method, f"{self.api_base_url}/{endpoint}", json=json, - headers={ - "Authorization": f"Bearer {self.api_key}", - }, + headers=headers, ) if response.status_code > 299: raise ApiException(response.text) @@ -31,11 +34,7 @@ def request_api(self, endpoint: str, method: str, json: Optional[Any]) -> bytes: def run(self, url: str, objective: str) -> Trajectory: content = self.request_api("/run", "POST", {"url": url, "objective": objective}) - result = from_json(content) - result_list = result.get("results", []) - actions = [self.parser.parse(action) for action in result_list] - trajectory = Trajectory(**result, actions=actions) - return trajectory + return Trajectory.from_data(content, self.parser) class ApiException(Exception): diff --git a/lavague-core/lavague/core/action/base.py b/lavague-core/lavague/core/action/base.py deleted file mode 100644 index 95b08ecb..00000000 --- a/lavague-core/lavague/core/action/base.py +++ /dev/null @@ -1,104 +0,0 @@ -from PIL import Image -import base64 -import os -from typing import Dict, Type, Optional -from pydantic import BaseModel, validate_call -import time -from enum import Enum - - -class ActionStatus(Enum): - COMPLETED = "completed" - FAILED = "failed" - - -class Action(BaseModel): - """Action performed by the agent.""" - - engine: str - action: str - status: ActionStatus - preaction_screenshot: Optional[str] = None - postaction_screenshot: Optional[str] = None - - @property - def preaction_screenshot_image(self) -> Image: - return ( - Image.open(self.preaction_screenshot) if self.preaction_screenshot else None - ) - - @property - def postaction_screenshot_image(self) -> Image: - return ( - Image.open(self.postaction_screenshot) - if self.postaction_screenshot - else None - ) - - @classmethod - def parse(cls, action_dict: Dict) -> "Action": - return cls(**action_dict) - - -class ActionParser(BaseModel): - engine_action_builders: Dict[str, Type[Action]] - store_images: bool = False - storage_path: str = "./screenshots" - _image_index = 0 - - def __init__(self): - super().__init__(engine_action_builders={}) - os.makedirs(self.storage_path, exist_ok=True) - - @validate_call - def register(self, engine: str, action: Type[Action]): - self.engine_action_builders[engine] = action - - def unregister(self, engine: str): - if engine in self.engine_action_builders: - del self.engine_action_builders[engine] - - def parse(self, action_dict: Dict) -> Action: - engine = action_dict.get("engine") - - if self.store_images: - action_dict = action_dict.copy() - action_dict["preaction_screenshot"] = self._store_image( - action_dict.get("preaction_screenshot") - ) - action_dict["postaction_screenshot"] = self._store_image( - action_dict.get("postaction_screenshot") - ) - - target_type: Type[Action] = self.engine_action_builders.get(engine, Action) - try: - return target_type.parse(action_dict) - except UnhandledTypeException: - return Action.parse(action_dict) - - def _store_image(self, image: str) -> str: - """Store image on disk and return absolute path""" - if image is None: - return None - self._image_index += 1 - timestamp = str(int(time.time())) - image_parts = image.split(",", 1) - if len(image_parts) == 2: - header, encoded_data = image_parts - extension = header.split("/")[1].split(";")[0] - image_name = f"{timestamp}_{self._image_index}.{extension}" - else: - encoded_data = image_parts[0] - image_name = f"{timestamp}_{self._image_index}.png" - image_path = os.path.join(self.storage_path, image_name) - image_data = base64.b64decode(encoded_data) - with open(image_path, "wb") as file: - file.write(image_data) - return os.path.abspath(image_path) - - -class UnhandledTypeException(Exception): - pass - - -DEFAULT_PARSER = ActionParser() diff --git a/lavague-core/lavague/core/action/navigation.py b/lavague-core/lavague/core/action/navigation.py deleted file mode 100644 index 5ee34c03..00000000 --- a/lavague-core/lavague/core/action/navigation.py +++ /dev/null @@ -1,92 +0,0 @@ -from lavague.core.action import Action, UnhandledTypeException -from enum import Enum -from typing import ClassVar, Dict, Type, Optional - - -class NavigationActionType(Enum): - """Types of navigation actions.""" - - CLICK = "click" - SET_VALUE = "setValue" - SET_VALUE_AND_ENTER = "setValueAndEnter" - DROPDOWN_SELECT = "dropdownSelect" - HOVER = "hover" - SCROLL_DOWN = "scroll_down" - SCROLL_UP = "scroll_up" - BACK = "back" - - -class NavigationAction(Action): - """Navigation action performed by the agent.""" - - subtypes: ClassVar[Dict[str, Type["NavigationAction"]]] = {} - - xpath: str - action: NavigationActionType - value: Optional[str] = None - - @classmethod - def parse(cls, action_dict: Dict) -> "NavigationAction": - action_name = action_dict.get("action") - try: - NavigationActionType(action_name) - except ValueError: - raise UnhandledTypeException(f"Unhandled action type: {action_name}") - - target_type = cls.subtypes.get(action_name, NavigationAction) - return target_type(**action_dict) - - @classmethod - def register_subtype(cls, subtype: str, action: Type["NavigationAction"]): - cls.subtypes[subtype] = action - return cls - - -def register_navigation(name: str): - return lambda cls: NavigationAction.register_subtype(name, cls) - - -class NavigationWithValueAction(NavigationAction): - """Navigation action performed by the agent with a value.""" - - value: str - - -@register_navigation(NavigationActionType.CLICK.value) -class ClickAction(NavigationAction): - pass - - -@register_navigation(NavigationActionType.HOVER.value) -class HoverAction(NavigationAction): - pass - - -@register_navigation(NavigationActionType.SET_VALUE.value) -class SetValueAction(NavigationWithValueAction): - pass - - -@register_navigation(NavigationActionType.SET_VALUE_AND_ENTER.value) -class SetValueAndEnterAction(SetValueAction): - pass - - -@register_navigation(NavigationActionType.DROPDOWN_SELECT.value) -class DropdownSelectAction(NavigationWithValueAction): - pass - - -@register_navigation(NavigationActionType.SCROLL_DOWN.value) -class ScrollDownAction(NavigationAction): - pass - - -@register_navigation(NavigationActionType.SCROLL_UP.value) -class ScrollUpAction(NavigationAction): - pass - - -@register_navigation(NavigationActionType.BACK.value) -class BackAction(NavigationAction): - pass \ No newline at end of file diff --git a/lavague-core/lavague/core/cli.py b/lavague-core/lavague/core/cli.py deleted file mode 100644 index 5cf52b86..00000000 --- a/lavague-core/lavague/core/cli.py +++ /dev/null @@ -1,28 +0,0 @@ -import click -from lavague.core.agent import WebAgent -from lavague.core.action import DEFAULT_PARSER -import sys - - -def run(url: str, objective: str, store_images: bool = False): - DEFAULT_PARSER.store_images = store_images - agent = WebAgent() - trajectory = agent.run(url, objective) - for action in trajectory.actions: - print(type(action), action) - print("**********") - - -@click.command() -@click.argument("url", required=True) -@click.argument("objective", required=True) -@click.option("--store-images", is_flag=True) -def cli_run(url: str, objective: str, store_images: bool = False): - run(url=url, objective=objective, store_images=store_images) - - -if __name__ == "__main__": - url = sys.argv[1] - objective = sys.argv[2] - store_images = "--store-images" in sys.argv - run(url=url, objective=objective, store_images=store_images) diff --git a/lavague-core/lavague/core/trajectory.py b/lavague-core/lavague/core/trajectory.py deleted file mode 100644 index acb50982..00000000 --- a/lavague-core/lavague/core/trajectory.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Optional, Generator, List -from pydantic import BaseModel -from lavague.core.action.base import Action -from enum import Enum - - -class TrajectoryStatus(Enum): - COMPLETED = "completed" - FAILED = "failed" - - -class Trajectory(BaseModel): - """Observable trajectory of web interactions towards an objective.""" - - url: str - objective: str - actions: List[Action] - status: TrajectoryStatus - final_html: Optional[str] - output: Optional[str] - - def __iter__(self) -> Generator[Action, None, None]: - yield from self.actions diff --git a/lavague-core/lavague/core/utilities/config.py b/lavague-core/lavague/core/utilities/config.py deleted file mode 100644 index b85fad1d..00000000 --- a/lavague-core/lavague/core/utilities/config.py +++ /dev/null @@ -1,15 +0,0 @@ -import os -from typing import Optional - -LAVAGUE_API_BASE_URL = "https://cloud.lavague.ai/api" - - -def is_flag_true(flag_name: str) -> bool: - return os.getenv(flag_name, "").lower() in ("true", "1", "y", "yes") - - -def get_config(var_name: str, default: Optional[str] = None, required=True) -> str: - value = os.getenv(var_name, default) - if required and value is None: - raise ValueError(f"Environment variable {var_name} is required") - return value diff --git a/lavague-core/lavague/driver/__init__.py b/lavague-core/lavague/driver/__init__.py new file mode 100644 index 00000000..7130c9a2 --- /dev/null +++ b/lavague-core/lavague/driver/__init__.py @@ -0,0 +1 @@ +from lavague.driver.base import BaseDriver \ No newline at end of file diff --git a/lavague-core/lavague/driver/base.py b/lavague-core/lavague/driver/base.py new file mode 100644 index 00000000..5ad9fd77 --- /dev/null +++ b/lavague-core/lavague/driver/base.py @@ -0,0 +1,281 @@ +from PIL import Image +import re +from typing import Any, Callable, Optional, Mapping, Dict, Set, List, Tuple, Union +from abc import ABC, abstractmethod +from enum import Enum + + +class InteractionType(Enum): + CLICK = "click" + HOVER = "hover" + SCROLL = "scroll" + TYPE = "type" + + +PossibleInteractionsByXpath = Dict[str, Set[InteractionType]] + +r_get_xpaths_from_html = r'xpath=["\'](.*?)["\']' + + +class BaseDriver(ABC): + @abstractmethod + def destroy(self) -> None: + """Cleanly destroy the underlying driver""" + pass + + @abstractmethod + def get_url(self) -> Optional[str]: + """Get the url of the current page""" + pass + + @abstractmethod + def get(self, url: str) -> None: + """Navigate to the url""" + pass + + @abstractmethod + def back(self) -> None: + """Navigate back""" + pass + + @abstractmethod + def get_html(self) -> str: + """ + Returns the HTML of the current page. + If clean is True, We remove unnecessary tags and attributes from the HTML. + Clean HTMLs are easier to process for the LLM. + """ + pass + + @abstractmethod + def get_tabs(self) -> str: + """Return description of the tabs opened with the current tab being focused. + + Example of output: + Tabs opened: + 0 - Overview - OpenAI API + 1 - [CURRENT] Nos destinations Train - SNCF Connect + """ + return "Tabs opened:\n 0 - [CURRENT] tab" + + @abstractmethod + def switch_tab(self, tab_id: int) -> None: + """Switch to the tab with the given id""" + pass + + @abstractmethod + def resolve_xpath(self, xpath) -> "DOMNode": + """ + Return the element for the corresponding xpath, the underlying driver may switch iframe if necessary + """ + pass + + @abstractmethod + def get_possible_interactions( + self, in_viewport=True, foreground_only=True + ) -> PossibleInteractionsByXpath: + """Get elements that can be interacted with as a dictionary mapped by xpath""" + pass + + def check_visibility(self, xpath: str) -> bool: + pass + + @abstractmethod + def get_highlighted_element(self, generated_code: str): + """Return the page elements that generated code interact with""" + pass + + @abstractmethod + def exec_code( + self, + code: str, + globals: dict[str, Any] = None, + locals: Mapping[str, object] = None, + ): + """Exec generated code""" + pass + + @abstractmethod + def scroll_up(self): + pass + + @abstractmethod + def scroll_down(self): + pass + + @abstractmethod + def get_capability(self) -> str: + """Prompt to explain the llm which style of code he should output and which variables and imports he should expect""" + pass + + @abstractmethod + def wait_for_idle(self): + pass + + @abstractmethod + def get_screenshot_as_png(self) -> bytes: + pass + + @abstractmethod + def get_shadow_roots(self) -> Dict[str, str]: + return {} + + @abstractmethod + def get_nodes(self, xpaths: List[str]) -> List["DOMNode"]: + raise NotImplementedError("get_nodes not implemented") + + def get_nodes_from_html(self, html: str) -> List["DOMNode"]: + return self.get_nodes(re.findall(r_get_xpaths_from_html, html)) + + def highlight_node_from_xpath( + self, xpath: str, color: str = "red", label=False + ) -> Callable: + return self.highlight_nodes([xpath], color, label) + + def highlight_nodes( + self, xpaths: List[str], color: str = "red", label=False + ) -> Callable: + nodes = self.get_nodes(xpaths) + for n in nodes: + n.highlight(color) + return self._add_highlighted_destructors(lambda: [n.clear() for n in nodes]) + + def highlight_nodes_from_html( + self, html: str, color: str = "blue", label=False + ) -> Callable: + return self.highlight_nodes( + re.findall(r_get_xpaths_from_html, html), color, label + ) + + def remove_highlight(self): + if hasattr(self, "_highlight_destructors"): + for destructor in self._highlight_destructors: + destructor() + delattr(self, "_highlight_destructors") + + def _add_highlighted_destructors( + self, destructors: Union[List[Callable], Callable] + ) -> Callable: + if not hasattr(self, "_highlight_destructors"): + self._highlight_destructors = [] + if isinstance(destructors, Callable): + self._highlight_destructors.append(destructors) + return destructors + + self._highlight_destructors.extend(destructors) + return lambda: [d() for d in destructors] + + def highlight_interactive_nodes( + self, + *with_interactions: tuple[InteractionType], + color: str = "red", + in_viewport=True, + foreground_only=True, + label=False, + ): + if with_interactions is None or len(with_interactions) == 0: + return self.highlight_nodes( + list( + self.get_possible_interactions( + in_viewport=in_viewport, foreground_only=foreground_only + ).keys() + ), + color, + label, + ) + + return self.highlight_nodes( + [ + xpath + for xpath, interactions in self.get_possible_interactions( + in_viewport=in_viewport, foreground_only=foreground_only + ).items() + if set(interactions) & set(with_interactions) + ], + color, + label, + ) + + +class DOMNode(ABC): + @property + @abstractmethod + def element(self) -> Any: + pass + + @property + @abstractmethod + def value(self) -> Any: + pass + + @abstractmethod + def highlight(self, color: str = "red", bounding_box=True): + pass + + @abstractmethod + def clear(self): + return self + + @abstractmethod + def take_screenshot(self) -> Image.Image: + pass + + @abstractmethod + def get_html(self) -> str: + pass + + def __str__(self) -> str: + return self.get_html() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class ScrollDirection(Enum): + """Enum for the different scroll directions. Value is (x, y, dimension_index)""" + + LEFT = (-1, 0, 0) + RIGHT = (1, 0, 0) + UP = (0, -1, 1) + DOWN = (0, 1, 1) + + def get_scroll_xy( + self, dimension: List[float], scroll_factor: float = 0.75 + ) -> Tuple[int, int]: + size = dimension[self.value[2]] + return ( + round(self.value[0] * size * scroll_factor), + round(self.value[1] * size * scroll_factor), + ) + + def get_page_script(self, scroll_factor: float = 0.75) -> str: + return f"window.scrollBy({self.value[0] * scroll_factor} * window.innerWidth, {self.value[1] * scroll_factor} * window.innerHeight);" + + def get_script_element_is_scrollable(self) -> str: + match self: + case ScrollDirection.UP: + return "return arguments[0].scrollTop > 0" + case ScrollDirection.DOWN: + return "return arguments[0].scrollTop + arguments[0].clientHeight + 1 < arguments[0].scrollHeight" + case ScrollDirection.LEFT: + return "return arguments[0].scrollLeft > 0" + case ScrollDirection.RIGHT: + return "return arguments[0].scrollLeft + arguments[0].clientWidth + 1 < arguments[0].scrollWidth" + + def get_script_page_is_scrollable(self) -> str: + match self: + case ScrollDirection.UP: + return "return window.scrollY > 0" + case ScrollDirection.DOWN: + return "return window.innerHeight + window.scrollY + 1 < document.body.scrollHeight" + case ScrollDirection.LEFT: + return "return window.scrollX > 0" + case ScrollDirection.RIGHT: + return "return window.innerWidth + window.scrollX + 1 < document.body.scrollWidth" + + @classmethod + def from_string(cls, name: str) -> "ScrollDirection": + return cls[name.upper().strip()] diff --git a/lavague-core/lavague/driver/javascript.py b/lavague-core/lavague/driver/javascript.py new file mode 100644 index 00000000..a3b79e8e --- /dev/null +++ b/lavague-core/lavague/driver/javascript.py @@ -0,0 +1,244 @@ +JS_SETUP_GET_EVENTS = """ +(function() { + if (window && !window.getEventListeners) { + const targetProto = EventTarget.prototype; + targetProto._addEventListener = Element.prototype.addEventListener; + targetProto.addEventListener = function(a,b,c) { + this._addEventListener(a,b,c); + if(!this.eventListenerList) this.eventListenerList = {}; + if(!this.eventListenerList[a]) this.eventListenerList[a] = []; + this.eventListenerList[a].push(b); + }; + targetProto._removeEventListener = Element.prototype.removeEventListener; + targetProto.removeEventListener = function(a, b, c) { + this._removeEventListener(a, b, c); + if(this.eventListenerList && this.eventListenerList[a]) { + const index = this.eventListenerList[a].indexOf(b); + if (index > -1) { + this.eventListenerList[a].splice(index, 1); + if (!this.eventListenerList[a].length) { + delete this.eventListenerList[a]; + } + } + } + }; + window.getEventListeners = function(e) { + return (e && e.eventListenerList) || []; + } + } +})();""" + +JS_GET_INTERACTIVES = """ +const windowHeight = (window.innerHeight || document.documentElement.clientHeight); +const windowWidth = (window.innerWidth || document.documentElement.clientWidth); + +return (function(inViewport, foregroundOnly) { + function getInteractions(e) { + const tag = e.tagName.toLowerCase(); + if (!e.checkVisibility() || e.hasAttribute('disabled') || e.hasAttribute('readonly') + || (tag === 'input' && e.getAttribute('type') === 'hidden') || tag === 'body') { + return []; + } + const rect = e.getBoundingClientRect(); + if (rect.width + rect.height < 5) { + return []; + } + const style = getComputedStyle(e) || {}; + if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { + return []; + } + const events = window && typeof window.getEventListeners === 'function' ? window.getEventListeners(e) : []; + const role = e.getAttribute('role'); + const clickableInputs = ['submit', 'checkbox', 'radio', 'color', 'file', 'image', 'reset']; + function hasEvent(n) { + return events[n]?.length || e.hasAttribute('on' + n); + } + const evts = []; + if (hasEvent('keydown') || hasEvent('keyup') || hasEvent('keypress') || hasEvent('keydown') || hasEvent('input') || e.isContentEditable + || ( + (tag === 'input' || tag === 'textarea' || role === 'searchbox' || role === 'input') + ) && !clickableInputs.includes(e.getAttribute('type')) + ) { + evts.push('TYPE'); + } + if (['a', 'button', 'select'].includes(tag) || ['button', 'checkbox', 'select'].includes(role) + || hasEvent('click') || hasEvent('mousedown') || hasEvent('mouseup') || hasEvent('dblclick') + || style.cursor === 'pointer' + || e.hasAttribute('aria-haspopup') + || (tag === 'input' && clickableInputs.includes(e.getAttribute('type'))) + || (tag === 'label' && document.getElementById(e.getAttribute('for'))) + ) { + evts.push('CLICK'); + } + if (hasEvent('scroll') || hasEvent('wheel')|| e.scrollHeight > e.clientHeight || e.scrollWidth > e.clientWidth) { + //evts.push('SCROLL'); + } + + if (inViewport) { + let rect = e.getBoundingClientRect(); + let iframe = e.ownerDocument.defaultView.frameElement; + while (iframe) { + const iframeRect = iframe.getBoundingClientRect(); + rect = { + top: rect.top + iframeRect.top, + left: rect.left + iframeRect.left, + bottom: rect.bottom + iframeRect.bottom, + right: rect.right + iframeRect.right, + width: rect.width, + height: rect.height + } + iframe = iframe.ownerDocument.defaultView.frameElement; + } + const elemCenter = { + x: Math.round(rect.left + rect.width / 2), + y: Math.round(rect.top + rect.height / 2) + }; + if (elemCenter.x < 0) return []; + if (elemCenter.x > windowWidth) return []; + if (elemCenter.y < 0) return []; + if (elemCenter.y > windowHeight) return []; + if (!foregroundOnly) return evts; // whenever to check for elements above + let pointContainer = document.elementFromPoint(elemCenter.x, elemCenter.y); + iframe = e.ownerDocument.defaultView.frameElement; + while (iframe) { + const iframeRect = iframe.getBoundingClientRect(); + pointContainer = iframe.contentDocument.elementFromPoint( + elemCenter.x - iframeRect.left, + elemCenter.y - iframeRect.top + ); + iframe = iframe.ownerDocument.defaultView.frameElement; + } + do { + if (pointContainer === e) return evts; + if (pointContainer == null) return evts; + } while (pointContainer = pointContainer.parentNode); + return []; + } + + return evts; + } + + const results = {}; + function traverse(node, xpath) { + if (node.nodeType === Node.ELEMENT_NODE) { + const interactions = getInteractions(node); + if (interactions.length > 0) { + results[xpath] = interactions; + } + } + const countByTag = {}; + for (let child = node.firstChild; child; child = child.nextSibling) { + let tag = child.nodeName.toLowerCase(); + if (tag.includes(":")) continue; //namespace + let isLocal = ['svg'].includes(tag); + if (isLocal) { + tag = `*[local-name() = '${tag}']`; + } + countByTag[tag] = (countByTag[tag] || 0) + 1; + let childXpath = xpath + '/' + tag; + if (countByTag[tag] > 1) { + childXpath += '[' + countByTag[tag] + ']'; + } + if (tag === 'iframe') { + try { + traverse(child.contentWindow.document.body, childXpath + '/html/body'); + } catch (e) { + console.warn("iframe access blocked", child, e); + } + } else if (!isLocal) { + traverse(child, childXpath); + if (child.shadowRoot) { + traverse(child.shadowRoot, childXpath + '/'); + } + } + } + } + traverse(document.body, '/html/body'); + return results; +})(arguments?.[0], arguments?.[1]); +""" + +JS_WAIT_DOM_IDLE = """ +return new Promise(resolve => { + const timeout = arguments[0] || 10000; + const stabilityThreshold = arguments[1] || 100; + + let mutationObserver; + let timeoutId = null; + + const waitForIdle = () => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = setTimeout(() => resolve(true), stabilityThreshold); + }; + mutationObserver = new MutationObserver(waitForIdle); + mutationObserver.observe(document.body, { + childList: true, + attributes: true, + subtree: true, + }); + waitForIdle(); + + setTimeout(() => { + resolve(false); + mutationObserver.disconnect(); + mutationObserver = null; + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } + }, timeout); +}); +""" + +JS_GET_SCROLLABLE_PARENT = """ +let element = arguments[0]; +while (element) { + const style = window.getComputedStyle(element); + + // Check if the element is scrollable + if (style.overflow === 'auto' || style.overflow === 'scroll' || + style.overflowX === 'auto' || style.overflowX === 'scroll' || + style.overflowY === 'auto' || style.overflowY === 'scroll') { + + // Check if the element has a scrollable area + if (element.scrollHeight > element.clientHeight || + element.scrollWidth > element.clientWidth) { + return element; + } + } + element = element.parentElement; +} +return null; +""" + +JS_GET_SHADOW_ROOTS = """ +const results = {}; +function traverse(node, xpath) { + if (node.shadowRoot) { + results[xpath] = node.shadowRoot.getHTML(); + } + const countByTag = {}; + for (let child = node.firstChild; child; child = child.nextSibling) { + let tag = child.nodeName.toLowerCase(); + countByTag[tag] = (countByTag[tag] || 0) + 1; + let childXpath = xpath + '/' + tag; + if (countByTag[tag] > 1) { + childXpath += '[' + countByTag[tag] + ']'; + } + if (child.shadowRoot) { + traverse(child.shadowRoot, childXpath + '/'); + } + if (tag === 'iframe') { + try { + traverse(child.contentWindow.document.body, childXpath + '/html/body'); + } catch (e) { + console.warn("iframe access blocked", child, e); + } + } else { + traverse(child, childXpath); + } + } +} +traverse(document.body, '/html/body'); +return results; +""" \ No newline at end of file diff --git a/lavague-core/lavague/exporter/__init__.py b/lavague-core/lavague/exporter/__init__.py new file mode 100644 index 00000000..24066ebc --- /dev/null +++ b/lavague-core/lavague/exporter/__init__.py @@ -0,0 +1,8 @@ +from lavague.exporter.base import ( + TrajectoryExporter, + ActionTranslator, + ActionWrapper, + method_action_translator, + wrap_action_translator, + translate_action, +) \ No newline at end of file diff --git a/lavague-core/lavague/exporter/base.py b/lavague-core/lavague/exporter/base.py new file mode 100644 index 00000000..70e5af9c --- /dev/null +++ b/lavague-core/lavague/exporter/base.py @@ -0,0 +1,88 @@ +from lavague.trajectory import Trajectory +from lavague.action import Action, ActionTranslator +from typing import List, Optional, Self, Protocol, TypeVar, Iterable +from abc import ABC, abstractmethod +import copy + + +class ActionWrapper(Protocol): + def __call__(self, action: Action, code: str) -> str: ... + + +class TrajectoryExporter(ABC): + def generate_setup(self, trajectory: Trajectory) -> Optional[str]: + """Generate setup code (imports, configurations, etc.)""" + return None + + def generate_teardown(self, trajectory: Trajectory) -> Optional[str]: + """Generate teardown code (cleanup, final assertions, etc.)""" + return None + + @abstractmethod + def translate_action(self, action: Action) -> Optional[str]: + """Translate a single action to target framework code""" + pass + + def merge_code(self, *codes: str | None) -> str: + """Combine multiple strings into a single string""" + return "".join(list(map(lambda x: x or "", codes))) + + def export(self, trajectory: Trajectory) -> str: + setup = self.generate_setup(trajectory) + teardown = self.generate_teardown(trajectory) + actions = [self.translate_action(action) for action in trajectory.actions] + return self.merge_code(setup, *actions, teardown) + + def __call__(self, trajectory: Trajectory) -> str: + return self.export(trajectory) + + def export_to_file(self, trajectory: Trajectory, file_path: str): + exported = self.export(trajectory) + with open(file_path, "w", encoding="utf-8") as file: + file.write(exported) + + def with_wrapper(self, wrapper: ActionWrapper, clone=True) -> Self: + instance = copy.copy(self) if clone else self + instance.translate_action = lambda action: wrap_action_translator( + self.translate_action, wrapper + )(action) + return instance + + @classmethod + def from_translator( + cls, action_translator: ActionTranslator + ) -> "TrajectoryExporter": + class DynamicExporter(cls): + def translate_action(self, action: Action) -> Optional[str]: + return action_translator(action) + + return DynamicExporter() + + @classmethod + def from_method(cls, method_name: str) -> "TrajectoryExporter": + return cls.from_translator(method_action_translator(method_name)) + + +def translate_action(action: Action, method_name: str) -> Optional[str]: + return getattr(action, method_name)() if hasattr(action, method_name) else None + + +def method_action_translator(name: str) -> ActionTranslator[Action]: + def wrapper(action: Action) -> Optional[str]: + return translate_action(action, name) + + return wrapper + + +T = TypeVar("T", bound=Action) + + +def wrap_action_translator( + action_translator: ActionTranslator[T], + wrapper: ActionWrapper, +) -> ActionTranslator[T]: + def wrapped(action: T) -> Optional[str]: + code = action_translator(action) + return wrapper(action, code) if code else None + + return wrapped diff --git a/lavague-core/lavague/trajectory/__init__.py b/lavague-core/lavague/trajectory/__init__.py new file mode 100644 index 00000000..1606272c --- /dev/null +++ b/lavague-core/lavague/trajectory/__init__.py @@ -0,0 +1 @@ +from lavague.trajectory.base import Trajectory, TrajectoryStatus \ No newline at end of file diff --git a/lavague-core/lavague/trajectory/base.py b/lavague-core/lavague/trajectory/base.py new file mode 100644 index 00000000..290f5116 --- /dev/null +++ b/lavague-core/lavague/trajectory/base.py @@ -0,0 +1,42 @@ +from typing import Optional, List +from pydantic import BaseModel, SerializeAsAny +from lavague.action import Action +from enum import Enum +from pydantic_core import from_json +from lavague.action import ActionParser, DEFAULT_PARSER + + +class TrajectoryStatus(Enum): + COMPLETED = "completed" + FAILED = "failed" + + +class Trajectory(BaseModel): + """Observable trajectory of web interactions towards an objective.""" + + url: str + objective: str + status: TrajectoryStatus + output: Optional[str] + actions: List[SerializeAsAny[Action]] + + def write_to_file(self, file_path: str): + json_model = self.model_dump_json(indent=2) + with open(file_path, "w", encoding="utf-8") as file: + file.write(json_model) + + @classmethod + def from_data( + cls, data: str | bytes | bytearray, parser: ActionParser = DEFAULT_PARSER + ): + obj = from_json(data) + obj["actions"] = [parser.parse(action) for action in obj.get("actions", [])] + return cls.model_validate(obj) + + @classmethod + def from_file( + cls, file_path: str, parser: ActionParser = DEFAULT_PARSER, encoding="utf-8" + ): + with open(file_path, "r", encoding=encoding) as file: + content = file.read() + return cls.from_data(content, parser) \ No newline at end of file diff --git a/lavague-core/lavague/utilities/config.py b/lavague-core/lavague/utilities/config.py new file mode 100644 index 00000000..3697c2ef --- /dev/null +++ b/lavague-core/lavague/utilities/config.py @@ -0,0 +1,18 @@ +import os +from typing import Optional + +LAVAGUE_API_BASE_URL = "https://cloud.lavague.ai/api" + + +def is_flag_true(flag_name: str, default=False) -> bool: + value = os.getenv(flag_name, "") + if value == "": + return default + return value in ("true", "1", "y", "yes") + + +def get_config(var_name: str, default: Optional[str] = None) -> str: + value = os.getenv(var_name, default) + if value is None: + raise ValueError(f"Environment variable {var_name} is required") + return value diff --git a/lavague-core/lavague/core/utilities/version_checker.py b/lavague-core/lavague/utilities/version_checker.py similarity index 100% rename from lavague-core/lavague/core/utilities/version_checker.py rename to lavague-core/lavague/utilities/version_checker.py diff --git a/lavague-core/pyproject.toml b/lavague-core/pyproject.toml index 57076d05..a1109583 100644 --- a/lavague-core/pyproject.toml +++ b/lavague-core/pyproject.toml @@ -29,4 +29,4 @@ pydantic = "^2.9.1" pillow = "^10.4.0" [tool.poetry.scripts] -lavague = "lavague.core.cli:run" \ No newline at end of file +lavague = "lavague.cli:cli_run" \ No newline at end of file