From aed6d964c646f4798b984049ad7375b5e6d99031 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sun, 22 Dec 2024 10:01:32 -0800 Subject: [PATCH] cleanup a few from_json methods from DC (#727) --- examples/get_started/json-csv-reader.py | 59 +++++------- src/datachain/lib/dc.py | 115 ++---------------------- src/datachain/lib/meta_formats.py | 80 ++++++++--------- tests/func/test_datachain.py | 2 +- tests/func/test_meta_formats.py | 8 +- tests/unit/lib/test_datachain.py | 6 +- 6 files changed, 72 insertions(+), 198 deletions(-) diff --git a/examples/get_started/json-csv-reader.py b/examples/get_started/json-csv-reader.py index a344c2791..693b5443b 100644 --- a/examples/get_started/json-csv-reader.py +++ b/examples/get_started/json-csv-reader.py @@ -4,6 +4,7 @@ from datachain import C, DataChain from datachain.lib.data_model import ModelStore +from datachain.lib.meta_formats import gen_datamodel_code # Sample model for static JSON model @@ -28,71 +29,51 @@ class ChatDialog(BaseModel): def main(): - print() - print("========================================================================") - print("Dynamic JSONl schema from 2 objects") - print("========================================================================") + # Dynamic JSONl schema from 2 objects uri = "gs://datachain-demo/jsonl/object.jsonl" - jsonl_ds = DataChain.from_json(uri, meta_type="jsonl", print_schema=True) + jsonl_ds = DataChain.from_json(uri, format="jsonl", anon="True") jsonl_ds.show() - print() - print("========================================================================") - print("Dynamic JSON schema from 200 OpenImage json-pairs with validation errors") - print("========================================================================") + # Dynamic JSON schema from 200 OpenImage json-pairs with validation errors uri = "gs://datachain-demo/openimages-v6-test-jsonpairs/*json" schema_uri = ( "gs://datachain-demo/openimages-v6-test-jsonpairs/08392c290ecc9d2a.json" ) json_pairs_ds = DataChain.from_json( - uri, schema_from=schema_uri, jmespath="@", model_name="OpenImage" + uri, schema_from=schema_uri, jmespath="@", model_name="OpenImage", anon="True" ) json_pairs_ds.show() uri = "gs://datachain-demo/coco2017/annotations_captions/" - print() - print("========================================================================") - print("Reading JSON schema from main COCO annotation") - print("========================================================================") - chain = ( - DataChain.from_storage(uri) - .filter(C("file.path").glob("*.json")) - .print_json_schema(jmespath="@", model_name="Coco") + # Print JSON schema in Pydantic format from main COCO annotation + chain = DataChain.from_storage(uri, anon="True").filter( + C("file.path").glob("*.json") ) - chain.save() + file = next(chain.limit(1).collect("file")) + print(gen_datamodel_code(file, jmespath="@", model_name="Coco")) - print() - print("========================================================================") - print("static JSON schema test parsing 3/7 objects") - print("========================================================================") + # Static JSON schema test parsing 3/7 objects static_json_ds = DataChain.from_json( - uri, jmespath="licenses", spec=LicenseFeature, nrows=3 + uri, jmespath="licenses", spec=LicenseFeature, nrows=3, anon="True" ) static_json_ds.show() - print() - print("========================================================================") - print("dynamic JSON schema test parsing 5K objects") - print("========================================================================") - dynamic_json_ds = DataChain.from_json(uri, jmespath="images", print_schema=True) + # Dynamic JSON schema test parsing 5K objects + dynamic_json_ds = DataChain.from_json(uri, jmespath="images", anon="True") print(dynamic_json_ds.to_pandas()) + # Static CSV with header schema test parsing 3.5K objects uri = "gs://datachain-demo/chatbot-csv/" - print() - print("========================================================================") - print("static CSV with header schema test parsing 3.5K objects") - print("========================================================================") - static_csv_ds = DataChain.from_csv(uri, output=ChatDialog, object_name="chat") + static_csv_ds = DataChain.from_csv( + uri, output=ChatDialog, object_name="chat", anon="True" + ) static_csv_ds.print_schema() static_csv_ds.show() + # Dynamic CSV with header schema test parsing 3/3M objects uri = "gs://datachain-demo/laion-aesthetics-csv/laion_aesthetics_1024_33M_1.csv" - print() - print("========================================================================") - print("dynamic CSV with header schema test parsing 3/3M objects") - print("========================================================================") - dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion", nrows=3) + dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion", nrows=3, anon="True") dynamic_csv_ds.print_schema() dynamic_csv_ds.show() diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index a3278b232..c47968a3f 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -41,7 +41,7 @@ parse_listing_uri, ) from datachain.lib.listing_info import ListingInfo -from datachain.lib.meta_formats import read_meta, read_schema +from datachain.lib.meta_formats import read_meta from datachain.lib.model_store import ModelStore from datachain.lib.settings import Settings from datachain.lib.signal_schema import SignalSchema @@ -554,8 +554,7 @@ def from_json( jmespath: Optional[str] = None, object_name: Optional[str] = "", model_name: Optional[str] = None, - print_schema: Optional[bool] = False, - meta_type: Optional[str] = "json", + format: Optional[str] = "json", nrows=None, **kwargs, ) -> "DataChain": @@ -564,12 +563,12 @@ def from_json( Parameters: path : storage URI with directory. URI must start with storage prefix such as `s3://`, `gs://`, `az://` or "file:///" - type : read file as "binary", "text", or "image" data. Default is "binary". + type : read file as "binary", "text", or "image" data. Default is "text". spec : optional Data Model schema_from : path to sample to infer spec (if schema not provided) object_name : generated object column name model_name : optional generated model name - print_schema : print auto-generated schema + format: "json", "jsonl" jmespath : optional JMESPATH expression to reduce JSON nrows : optional row limit for jsonl and JSON arrays @@ -594,75 +593,14 @@ def jmespath_to_name(s: str): if (not object_name) and jmespath: object_name = jmespath_to_name(jmespath) if not object_name: - object_name = meta_type + object_name = format chain = DataChain.from_storage(uri=path, type=type, **kwargs) signal_dict = { object_name: read_meta( schema_from=schema_from, - meta_type=meta_type, + format=format, spec=spec, model_name=model_name, - print_schema=print_schema, - jmespath=jmespath, - nrows=nrows, - ) - } - return chain.gen(**signal_dict) # type: ignore[misc, arg-type] - - @classmethod - def from_jsonl( - cls, - path, - type: Literal["binary", "text", "image"] = "text", - spec: Optional[DataType] = None, - schema_from: Optional[str] = "auto", - jmespath: Optional[str] = None, - object_name: Optional[str] = "", - model_name: Optional[str] = None, - print_schema: Optional[bool] = False, - meta_type: Optional[str] = "jsonl", - nrows=None, - **kwargs, - ) -> "DataChain": - """Get data from JSON lines. It returns the chain itself. - - Parameters: - path : storage URI with directory. URI must start with storage prefix such - as `s3://`, `gs://`, `az://` or "file:///" - type : read file as "binary", "text", or "image" data. Default is "binary". - spec : optional Data Model - schema_from : path to sample to infer spec (if schema not provided) - object_name : generated object column name - model_name : optional generated model name - print_schema : print auto-generated schema - jmespath : optional JMESPATH expression to reduce JSON - nrows : optional row limit for jsonl and JSON arrays - - Example: - infer JSONl schema from data, limit parsing to 1 row - ```py - chain = DataChain.from_jsonl("gs://myjsonl", nrows=1) - ``` - """ - if schema_from == "auto": - schema_from = path - - def jmespath_to_name(s: str): - name_end = re.search(r"\W", s).start() if re.search(r"\W", s) else len(s) # type: ignore[union-attr] - return s[:name_end] - - if (not object_name) and jmespath: - object_name = jmespath_to_name(jmespath) - if not object_name: - object_name = meta_type - chain = DataChain.from_storage(uri=path, type=type, **kwargs) - signal_dict = { - object_name: read_meta( - schema_from=schema_from, - meta_type=meta_type, - spec=spec, - model_name=model_name, - print_schema=print_schema, jmespath=jmespath, nrows=nrows, ) @@ -793,47 +731,6 @@ def listings( **{object_name: catalog.listings()}, # type: ignore[arg-type] ) - def print_json_schema( # type: ignore[override] - self, jmespath: Optional[str] = None, model_name: Optional[str] = None - ) -> "Self": - """Print JSON data model and save it. It returns the chain itself. - - Parameters: - jmespath : JMESPATH expression to reduce JSON - model_name : generated model name - - Example: - print JSON schema and save to column "meta_from": - ```py - uri = "gs://datachain-demo/coco2017/annotations_captions/" - chain = DataChain.from_storage(uri) - chain = chain.print_json_schema() - chain.save() - ``` - """ - return self.map( - meta_schema=lambda file: read_schema( - file, data_type="json", expr=jmespath, model_name=model_name - ), - output=str, - ) - - def print_jsonl_schema( # type: ignore[override] - self, jmespath: Optional[str] = None, model_name: Optional[str] = None - ) -> "Self": - """Print JSON data model and save it. It returns the chain itself. - - Parameters: - jmespath : JMESPATH expression to reduce JSON - model_name : generated model name - """ - return self.map( - meta_schema=lambda file: read_schema( - file, data_type="jsonl", expr=jmespath, model_name=model_name - ), - output=str, - ) - def save( # type: ignore[override] self, name: Optional[str] = None, version: Optional[int] = None, **kwargs ) -> "Self": diff --git a/src/datachain/lib/meta_formats.py b/src/datachain/lib/meta_formats.py index d2fc40613..93d888089 100644 --- a/src/datachain/lib/meta_formats.py +++ b/src/datachain/lib/meta_formats.py @@ -38,38 +38,41 @@ def process_json(data_string, jmespath): return json_dict -# Print a dynamic datamodel-codegen output from JSON or CSV on stdout -def read_schema(source_file, data_type="csv", expr=None, model_name=None): +def gen_datamodel_code( + source_file, format="json", jmespath=None, model_name=None +) -> str: + """Generates Python code with Pydantic models that corresponds + to the provided JSON, CSV, or JSONL file. + It support root JSON arrays (samples the first entry). + """ data_string = "" # using uiid to get around issue #1617 if not model_name: # comply with Python class names uid_str = str(generate_uuid()).replace("-", "") - model_name = f"Model{data_type}{uid_str}" - try: - with source_file.open() as fd: # CSV can be larger than memory - if data_type == "csv": - data_string += fd.readline().replace("\r", "") - data_string += fd.readline().replace("\r", "") - elif data_type == "jsonl": - data_string = fd.readline().replace("\r", "") - else: - data_string = fd.read() # other meta must fit into RAM - except OSError as e: - print(f"An unexpected file error occurred: {e}") - return - if data_type in ("json", "jsonl"): - json_object = process_json(data_string, expr) - if data_type == "json" and isinstance(json_object, list): + model_name = f"Model{format}{uid_str}" + + with source_file.open() as fd: # CSV can be larger than memory + if format == "csv": + data_string += fd.readline().replace("\r", "") + data_string += fd.readline().replace("\r", "") + elif format == "jsonl": + data_string = fd.readline().replace("\r", "") + else: + data_string = fd.read() # other meta must fit into RAM + + if format in ("json", "jsonl"): + json_object = process_json(data_string, jmespath) + if format == "json" and isinstance(json_object, list): json_object = json_object[0] # sample the 1st object from JSON array - if data_type == "jsonl": - data_type = "json" # treat json line as plain JSON in auto-schema + if format == "jsonl": + format = "json" # treat json line as plain JSON in auto-schema data_string = json.dumps(json_object) import datamodel_code_generator input_file_types = {i.value: i for i in datamodel_code_generator.InputFileType} - input_file_type = input_file_types[data_type] + input_file_type = input_file_types[format] with tempfile.TemporaryDirectory() as tmpdir: output = Path(tmpdir) / "model.py" datamodel_code_generator.generate( @@ -95,36 +98,29 @@ def read_schema(source_file, data_type="csv", expr=None, model_name=None): def read_meta( # noqa: C901 spec=None, schema_from=None, - meta_type="json", + format="json", jmespath=None, - print_schema=False, model_name=None, nrows=None, ) -> Callable: from datachain.lib.dc import DataChain if schema_from: - chain = ( - DataChain.from_storage(schema_from, type="text") - .limit(1) - .map( # dummy column created (#1615) - meta_schema=lambda file: read_schema( - file, data_type=meta_type, expr=jmespath, model_name=model_name - ), - output=str, - ) + file = next( + DataChain.from_storage(schema_from, type="text").limit(1).collect("file") ) - (model_output,) = chain.collect("meta_schema") - assert isinstance(model_output, str) - if print_schema: - print(f"{model_output}") + model_code = gen_datamodel_code( + file, format=format, jmespath=jmespath, model_name=model_name + ) + assert isinstance(model_code, str) + # Below 'spec' should be a dynamically converted DataModel from Pydantic if not spec: gl = globals() - exec(model_output, gl) # type: ignore[arg-type] # noqa: S102 + exec(model_code, gl) # type: ignore[arg-type] # noqa: S102 spec = gl["spec"] - if not (spec) and not (schema_from): + if not spec and not schema_from: raise ValueError( "Must provide a static schema in spec: or metadata sample in schema_from:" ) @@ -136,7 +132,7 @@ def read_meta( # noqa: C901 def parse_data( file: File, data_model=spec, - meta_type=meta_type, + format=format, jmespath=jmespath, nrows=nrows, ) -> Iterator[spec]: @@ -148,7 +144,7 @@ def validator(json_object: dict, nrow=0) -> spec: except ValidationError as e: print(f"Validation error occurred in row {nrow} file {file.name}:", e) - if meta_type == "csv": + if format == "csv": with ( file.open() as fd ): # TODO: if schema is statically given, should allow CSV without headers @@ -156,7 +152,7 @@ def validator(json_object: dict, nrow=0) -> spec: for row in reader: # CSV can be larger than memory yield from validator(row) - if meta_type == "json": + if format == "json": try: with file.open() as fd: # JSON must fit into RAM data_string = fd.read() @@ -174,7 +170,7 @@ def validator(json_object: dict, nrow=0) -> spec: return yield from validator(json_dict, nrow) - if meta_type == "jsonl": + if format == "jsonl": try: nrow = 0 with file.open() as fd: diff --git a/tests/func/test_datachain.py b/tests/func/test_datachain.py index 597c825ac..79824598f 100644 --- a/tests/func/test_datachain.py +++ b/tests/func/test_datachain.py @@ -1585,7 +1585,7 @@ def test_to_from_jsonl_remote(cloud_test_catalog_upload): dc_to = DataChain.from_pandas(df, session=ctc.session) dc_to.to_jsonl(path) - dc_from = DataChain.from_jsonl(path, session=ctc.session) + dc_from = DataChain.from_json(path, format="jsonl", session=ctc.session) df1 = dc_from.select("jsonl.first_name", "jsonl.age", "jsonl.city").to_pandas() df1 = df1["jsonl"] assert df_equal(df1, df) diff --git a/tests/func/test_meta_formats.py b/tests/func/test_meta_formats.py index f28fa21f2..e7fd67da1 100644 --- a/tests/func/test_meta_formats.py +++ b/tests/func/test_meta_formats.py @@ -3,7 +3,7 @@ import pytest from datachain.lib.file import TextFile -from datachain.lib.meta_formats import read_meta, read_schema +from datachain.lib.meta_formats import gen_datamodel_code, read_meta example = { "id": "1", @@ -23,7 +23,7 @@ @pytest.mark.filterwarnings("ignore::pydantic.warnings.PydanticDeprecatedSince20") -def test_read_schema(tmp_dir, catalog): +def test_gen_datamodel_code(tmp_dir, catalog): (tmp_dir / "valid.json").write_text(json.dumps(example), encoding="utf8") file = TextFile(path=tmp_dir / "valid.json") file._set_stream(catalog) @@ -59,7 +59,7 @@ class Image(UserModel): DataModel.register(Image) spec = Image""" - actual = read_schema(file, data_type="json", model_name="Image") + actual = gen_datamodel_code(file, format="json", model_name="Image") actual = "\n".join(actual.splitlines()[4:]) # remove header assert actual == expected @@ -72,7 +72,7 @@ def test_read_meta(tmp_dir, catalog): parser = read_meta( schema_from=str(tmp_dir / "valid.json"), - meta_type="jsonl", + format="jsonl", model_name="Image", ) rows = list(parser(file)) diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index 2535935df..30d6e644b 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -1482,7 +1482,7 @@ def test_to_from_jsonl(tmp_dir, test_session): for n, a, c in zip(DF_DATA["first_name"], DF_DATA["age"], DF_DATA["city"]) ] - dc_from = DataChain.from_jsonl(path.as_uri(), session=test_session) + dc_from = DataChain.from_json(path.as_uri(), format="jsonl", session=test_session) df1 = dc_from.select("jsonl.first_name", "jsonl.age", "jsonl.city").to_pandas() df1 = df1["jsonl"] assert df_equal(df1, df) @@ -1504,8 +1504,8 @@ def test_from_jsonl_jmespath(tmp_dir, test_session): ) f.write("\n") - dc_from = DataChain.from_jsonl( - path.as_uri(), jmespath="value", session=test_session + dc_from = DataChain.from_json( + path.as_uri(), format="jsonl", jmespath="value", session=test_session ) df1 = dc_from.select("value.first_name", "value.age", "value.city").to_pandas() df1 = df1["value"]