Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: registry column keyerror + registry df + remote path bugs #301

Merged
merged 10 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Types of changes

# Latch SDK Changelog

## 2.36.4 - 2023-10-25

### Added

* Added ability to get a pandas Dataframe from a registry table.

## 2.36.3 - 2023-10-25

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions latch/registry/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def load(self) -> None:
if not c.upstream_type["allowEmpty"]:
vals[k] = InvalidValue("")

# prevent keyerrors when accessing columns that don't have a value
vals[k] = None

self._cache.values = vals

# get_table_id
Expand Down
32 changes: 32 additions & 0 deletions latch/registry/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,38 @@ def list_records(self, *, page_size: int = 100) -> Iterator[Dict[str, Record]]:
if len(page) > 0:
yield page

def get_dataframe(self):
"""Get a pandas DataFrame of all records in this table.

Returns:
DataFrame representing all records in this table.
"""

try:
import pandas as pd
except ImportError:
raise ImportError(
"pandas needs to be installed to use get_dataframe. Install it with"
" `pip install pandas` or `pip install latch[pandas]`."
)

records = []
for page in self.list_records():
for record in page.values():
full_record = record.get_values()
if full_record is not None:
full_record["Name"] = record.get_name()
records.append(full_record)

if len(records) == 0:
cols = self.get_columns()
if cols is None:
return pd.DataFrame()

return pd.DataFrame(columns=list(cols.keys()))

return pd.DataFrame(records)

@contextmanager
def update(self, *, reload_on_commit: bool = True) -> Iterator["TableUpdate"]:
"""Start an update transaction.
Expand Down
15 changes: 10 additions & 5 deletions latch/types/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing_extensions import Annotated

from latch.types.file import LatchFile
from latch.types.utils import _is_valid_url
from latch.types.utils import format_path, is_valid_url
from latch_cli.utils import urljoins
from latch_cli.utils.path import normalize_path

Expand Down Expand Up @@ -112,7 +112,7 @@ def __init__(

self._path_generated = False

if _is_valid_url(self.path) and remote_path is None:
if is_valid_url(self.path) and remote_path is None:
self._remote_directory = self.path
else:
self._remote_directory = None if remote_path is None else str(remote_path)
Expand Down Expand Up @@ -244,13 +244,18 @@ def remote_path(self) -> Optional[str]:

def __repr__(self):
if self.remote_path is None:
return f'LatchDir("{self.local_path}")'
return f'LatchDir("{self.path}", remote_path="{self.remote_path}")'
return f"LatchDir({repr(format_path(self.local_path))})"

return (
f"LatchDir({repr(self.path)},"
f" remote_path={repr( format_path(self.remote_path))})"
)

def __str__(self):
if self.remote_path is None:
return "LatchDir()"
return f'LatchDir("{self.remote_path}")'

return f"LatchDir({format_path(self.remote_path)})"


LatchOutputDir = Annotated[
Expand Down
16 changes: 9 additions & 7 deletions latch/types/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
from latch_sdk_gql.execute import execute
from typing_extensions import Annotated

from latch.types.utils import _is_valid_url
from latch.types.utils import format_path, is_absolute_node_path, is_valid_url
from latch_cli.utils.path import normalize_path

is_absolute_node_path = re.compile(r"^(latch)?://\d+.node(/)?$")


class LatchFile(FlyteFile):
"""Represents a file object in the context of a task execution.
Expand Down Expand Up @@ -77,7 +75,7 @@ def __init__(

self._path_generated = False

if _is_valid_url(self.path) and remote_path is None:
if is_valid_url(self.path) and remote_path is None:
self._remote_path = str(path)
else:
self._remote_path = None if remote_path is None else str(remote_path)
Expand Down Expand Up @@ -156,13 +154,17 @@ def remote_path(self) -> Optional[str]:

def __repr__(self):
if self.remote_path is None:
return f'LatchFile("{self.local_path}")'
return f'LatchFile("{self.path}", remote_path="{self.remote_path}")'
return f"LatchFile({repr(format_path(self.local_path))})"

return (
f"LatchFile({repr(self.path)},"
f" remote_path={repr(format_path(self.remote_path))})"
)

def __str__(self):
if self.remote_path is None:
return "LatchFile()"
return f'LatchFile("{self.remote_path}")'
return f"LatchFile({format_path(self.remote_path)})"


LatchOutputFile = Annotated[
Expand Down
4 changes: 2 additions & 2 deletions latch/types/glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

from latch.types.file import LatchFile
from latch.types.utils import _is_valid_url
from latch.types.utils import is_valid_url


def file_glob(
Expand Down Expand Up @@ -41,7 +41,7 @@ def task():

"""

if not _is_valid_url(remote_directory):
if not is_valid_url(remote_directory):
return []

if target_dir is None:
Expand Down
55 changes: 53 additions & 2 deletions latch/types/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import re
from pathlib import Path
from typing import Union
from typing import Optional, Union
from urllib.parse import urlparse

import gql
from latch_sdk_gql.execute import execute

def _is_valid_url(raw_url: Union[str, Path]) -> bool:

def is_valid_url(raw_url: Union[str, Path]) -> bool:
"""A valid URL (as a source or destination of a LatchFile) must:
* contain a latch or s3 scheme
* contain an absolute path
Expand All @@ -17,3 +21,50 @@ def _is_valid_url(raw_url: Union[str, Path]) -> bool:
if parsed.path != "" and not parsed.path.startswith("/"):
return False
return True


is_absolute_node_path = re.compile(r"^(latch)?://(?P<node_id>\d+).node(/)?$")
old_style_path = re.compile(r"^(?:(?P<account_root>account_root)|(?P<mount>mount))")


def format_path(path: str) -> str:
match = is_absolute_node_path.match(path)
if match is None:
return path

node_id = match.group("node_id")

data = execute(
gql.gql("""
query ldataGetPathQ($id: BigInt!) {
ldataGetPath(argNodeId: $id)
ldataOwner(argNodeId: $id)
}
"""),
{"id": node_id},
)

raw: Optional[str] = data["ldataGetPath"]
if raw is None:
return path

parts = raw.split("/")

match = old_style_path.match(raw)
if match is None:
return path

if match["mount"] is not None:
bucket = parts[1]
key = "/".join(parts[2:])
return f"latch://{bucket}.mount/{key}"

owner: Optional[str] = data["ldataOwner"]
if owner is None:
return path

if match["account_root"] is not None:
key = "/".join(parts[2:])
return f"latch://{owner}.account/{key}"

return path
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
"websockets==11.0.3",
"watchfiles==0.19.0",
],
extras_require={"snakemake": ["snakemake>=7.18.0, <7.30.2"]},
extras_require={
"snakemake": ["snakemake>=7.18.0, <7.30.2"],
"pandas": ["pandas>=2.0.0"],
},
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
Expand Down