Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Dataset Class to Load Datasets from Snowflake #71

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kds1010
Copy link
Collaborator

@kds1010 kds1010 commented Aug 27, 2024

⚠️ This PR is still in Draft to consult better way to implement the dataset class

1. Overview

This PR adds a Datasets class (SFDataset) to load data from Snowflake.

2. Implementation details

  • Add a Dataset class SFDataset to load data from Snowflake
  • Add a codec to decode VARIANT field to ndarray
  • Add a SchemaField SfNumpyField to handle VARIANT column with the codec

The main difficulty is to map types of Snowflake and Wicker.
Especially ObjectField values (e.g. ndarray).
Wicker handles these value by encoding these values and dump to __COLUMN_CONCATENATED_FILES__ and each column stores the pointer to the values as bytes in ObjectField.
Snowflake stores these labels (e.g. bbox coordinates) as VARIANT class, it likes JsonString and we need to parse it by np.array(json.loads(value)). Since the input it string and it's rejected by following code.

    def process_object_field(self, field: schema.ObjectField) -> Optional[Any]:
        data = validation.validate_field_type(self._current_data, bytes, field.required, self._current_path)
        if data is None:
            return data
        return field.codec.decode_object(data)

Then, is it better to add new field to handle str or expand it to accept str since the function to decode the data is up to the implementation of the Codec class.

@kds1010 kds1010 self-assigned this Aug 27, 2024
@@ -58,3 +58,4 @@ typing-extensions==3.10.0.2
urllib3==1.26.7
wandb==0.12.21
tqdm
snowflake-connector-python[pandas]==3.12.1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add this library to communicate with Snowflake.

Comment on lines 609 to 654
@abc.abstractmethod
def process_sf_variant_field(self, field: ObjectField) -> _T:
pass

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to add a function to handle new field type? Or better to expand existing function to handle str instance?

@@ -58,7 +58,7 @@ def validate_and_encode_object(self, obj: Any) -> bytes:
pass

@abc.abstractmethod
def decode_object(self, data: bytes) -> Any:
def decode_object(self, data: bytes | str) -> Any:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative approach to accept str values to decode object.

@kds1010 kds1010 force-pushed the feat/seiya/add-sf-dataset branch 7 times, most recently from 8ef2cc5 to 3aeb118 Compare September 3, 2024 07:13
dataset_version: str,
dataset_partition_name: str,
table_name: str,
connection_parameters: Optional[Dict[str, str]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the consistent perspective, prob need to move this to the wicker config as the s3 dataset does

for name, type in zip(schema_table["lowercase_name"], schema_table["type"]):
schema = self._get_schema_type(type.as_py())
if schema == SfNumpyField:
schema_instance = schema(name=name.as_py(), shape=(1, -1), dtype="float32") # type: ignore
Copy link
Collaborator

@zhenyu zhenyu Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only float32?

@@ -318,3 +318,6 @@ def process_object_field(self, field: schema.ObjectField) -> Any:
return data
cbf_info = ColumnBytesFileLocationV1.from_bytes(data)
return self.cbf_cache.read(cbf_info)

def process_sf_variant_field(self, field: schema.VariantField) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to store the reference in the s3 backed column files? I don't think we should store numpy in the column files, that would generated tons of small files that both kill the data governance and loading performance?

)

@property
def connection(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this property should be private, WDYT?

return self._schema

def arrow_table(self) -> pyarrow.Table:
"""Returns a table of the dataset as pyarrow table.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General speaking, I am not sure whether we should wrap the sf database table as a pyarrow table.

  1. loading performance, every data loading process need a copy of this dataset , aka a connection to the snowflake and a remote arrow table loading. Even Snowflake is designed as a OLAP instead of OLTP, but theoretically, the DB service overhead should be much slower than a direct S3 connection.
  2. Data volume, pyarrow loading from local file(downloaded from s3 although) utilize the mmap, theoretically support larger data volume than the memory, but your approach needs loading into memory , no local file based cache.
    The design principle of wicker is utilize a ETL stage(data dumping piple) to transform the format ,here parquet+column files(aka, heavy duty field, should only for the image like large size field) for more IO friendly for the gpu instances to shorten the loading time to save money.
    Based on the above, I would request for changes for your PR and let us have f2f discuss.

@@ -404,6 +404,147 @@ def __init__(
)


class VariantField(SchemaField):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduce of the schema is for clear data type definition, so personally dont think we should add a new VariantField

@@ -87,6 +87,12 @@ def process_object_field(self, field: schema.ObjectField) -> Optional[Any]:
return data
return field.codec.decode_object(data)

def process_sf_variant_field(self, field: schema.VariantField) -> Optional[Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field type should not be tied with storage, here sf. I would suggest make more abstraction.
we can add a backend field to the schema definition, just as whether heavy duty. default s3, could be sf. if sf, we should call the SF specific decoder even for the same data type, aka numpy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kds1010 , could we close this PR ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants