Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to specify schema #41

Merged
merged 39 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b626cd7
Only adding auto-detected schema if no schema was provided in the rep…
joosthooz Aug 11, 2022
a83d800
Do not store schemas in metadata for parquet datasets
joosthooz Aug 11, 2022
0db0571
Added basic schema handling
joosthooz Aug 12, 2022
1a5011d
Removed file extension from table names in tables metadata
joosthooz Aug 12, 2022
a742722
Fixed: removing existing file with unlink instead of rmdir
joosthooz Aug 15, 2022
da20871
Now using getattr instead of a name -> function mapping table
joosthooz Aug 15, 2022
406cec0
Created separate function to create arrow datatypes from json
joosthooz Aug 15, 2022
3c87dcd
Added support for types with parameters
joosthooz Aug 15, 2022
0a67b29
Added pytest for parsing schema
joosthooz Aug 15, 2022
50186e5
Fixed bug spotted by review
joosthooz Aug 19, 2022
4a9e3ac
Removed oudated comment
joosthooz Aug 19, 2022
b139a4f
Gathered a set of TODOs in 1 place
joosthooz Aug 19, 2022
1e00a48
Removed TODO, ordering should be preserved with since python 3.7
joosthooz Aug 25, 2022
e6dfd39
Added test for arrow_type_function_lookup
joosthooz Aug 25, 2022
1129d10
Added some tests for parsing arrow datatypes and schemas
joosthooz Aug 25, 2022
477a036
Changed "name" and "args" to "type_name" and "arguments", added more …
joosthooz Aug 26, 2022
860a4d3
Renamed tmpfile to testfile
joosthooz Aug 26, 2022
b1e862c
Added code to generate random data for a dataset with a column of eac…
joosthooz Aug 26, 2022
3ac13ea
Fixed issue with timezone in tests
joosthooz Aug 29, 2022
b775bed
Fix None vs empty string comparison
joosthooz Aug 30, 2022
8885e3f
If a schema is known, don't write out a CSV header line and don't ove…
joosthooz Aug 30, 2022
df615e5
Added csv partitioning conversion test (failing!)
joosthooz Aug 30, 2022
6d395db
Added parquet partitioning conversion test
joosthooz Aug 30, 2022
a2f938d
Removed commented code
joosthooz Aug 30, 2022
84ed050
Sorting table after multi->single partition conversion because datase…
joosthooz Aug 30, 2022
389a548
Added some f;ags for handling timestampls in parquet
joosthooz Aug 30, 2022
7fcfa3e
Added parquet partitioning conversion test
joosthooz Aug 30, 2022
274a490
Separated out infered vs user-specified schema, added header-line pro…
joosthooz Aug 31, 2022
cf6d998
Updated Arrow dependency to 9.0.0 because of https://issues.apache.or…
joosthooz Aug 31, 2022
0d5c06b
Fixed cache entry pruning
joosthooz Aug 31, 2022
99f8391
Writing header-line when converting parquet to csv
joosthooz Aug 31, 2022
b09f261
Removed TODO
joosthooz Aug 31, 2022
14941f2
Processed review comments
joosthooz Aug 31, 2022
fae88cb
Formatting
joosthooz Aug 31, 2022
ba487c6
Added info about schema specification to README
joosthooz Aug 31, 2022
a7cd552
Reformatted schema example
joosthooz Aug 31, 2022
c7c6008
Reformatted example schema attempt 3
joosthooz Aug 31, 2022
57157b2
Reformatted example schema attempt 4
joosthooz Aug 31, 2022
b645261
Reformatted example schema attempt 5
joosthooz Aug 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 168 additions & 36 deletions datalogistik/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import datetime
import gzip
import hashlib
Expand All @@ -20,6 +21,8 @@
import pathlib
import shutil
import time
from collections import OrderedDict
from collections.abc import Mapping

import pyarrow as pa
import urllib3
Expand Down Expand Up @@ -54,7 +57,7 @@ def file_visitor(written_file):
# Construct a path to a dataset entry in the cache (possibly not existing yet)
def create_cached_dataset_path(name, scale_factor, format, partitioning_nrows):
local_cache_location = config.get_cache_location()
scale_factor = f"scalefactor_{scale_factor}" if scale_factor != "" else ""
scale_factor = f"scalefactor_{scale_factor}" if scale_factor else ""
partitioning_nrows = f"partitioning_{partitioning_nrows}"
return pathlib.Path(
local_cache_location, name, scale_factor, format, partitioning_nrows
Expand Down Expand Up @@ -284,16 +287,108 @@ def schema_to_dict(schema):
return field_dict


def convert_arrow_alias(type_name):
aliases = {
"bool": "bool_",
"halffloat": "float16",
"float": "float32",
"double": "float64",
"decimal": "decimal128",
}
for (alias, func_name) in aliases.items():
if type_name == alias:
return func_name
# no alias was found
return type_name
austin3dickey marked this conversation as resolved.
Show resolved Hide resolved


# Create an instance of the pyarrow datatype with the given name
def arrow_type_function_lookup(function_name):
if isinstance(function_name, str):
function_name = convert_arrow_alias(function_name)
pa_type_func = getattr(pa, function_name)
return pa_type_func

# The argument was not a pyarrow type (maybe a nested structure?)
return None


# Convert a given item (string or dict) to the corresponding Arrow datatype
def arrow_type_from_json(input_type):
arrow_nested_types = {
"list_",
"large_list",
"map_",
"struct",
"dictionary",
# Could be useful for the user to have control over nullability
"field",
}

# In case the type is a simple string
if isinstance(input_type, str):
if input_type in arrow_nested_types:
msg = "Nested types in schema not supported yet"
log.error(msg)
raise ValueError(msg)
return arrow_type_function_lookup(input_type)()

# Alternatively, a type can be encoded as a name:value pair
if not input_type.get("type_name"):
msg = "Schema field type 'type_name' missing"
log.error(msg)
raise ValueError(msg)

type_name = input_type.get("type_name")
args = input_type.get("arguments")
if type_name in arrow_nested_types:
msg = "Nested types in schema not supported yet"
log.error(msg)
raise ValueError(msg)

if args is None:
return arrow_type_function_lookup(type_name)()
if isinstance(args, Mapping):
return arrow_type_function_lookup(type_name)(**args)
elif isinstance(args, list):
log.debug(f"args {args}")
return arrow_type_function_lookup(type_name)(*args)
else: # args is probably a single value
return arrow_type_function_lookup(type_name)(args)


# Convert the given dict to a pyarrow.schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make all these comments into docstrings they'll pop up nicely for us in editors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I shouldv'e done that properly from the start. Maybe we should start a PR that does that and adds type hints.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#59

def get_arrow_schema(input_schema):
log.debug("Converting schema to pyarrow.schema...")
if input_schema is None:
return None
austin3dickey marked this conversation as resolved.
Show resolved Hide resolved
field_list = []
# TODO: a `field()` entry is not a (name, type) tuple
for (field_name, type) in input_schema.items():
log.debug(f"Schema: adding field {field_name}")
arrow_type = arrow_type_from_json(type)
field_list.append(pa.field(field_name, arrow_type))

output_schema = pa.schema(field_list)
return output_schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super useful. Do we have the reverse (serialize a schema to json) here somewhere as well? (Not sure if it's necessary here at the moment, but I've run into cases where it would've been very handy.) It might be nice to put all this schema stuff in a separate file so when I, um, borrow this code in the future it's in a nice self-contained package.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, that would be really handy and the inferred-schema was actually a workaround for not having it at first (but then after thinking about it I think it is good to know the difference between a user-specified schema vs an inferred one).
The other way is more difficult, though. It might come down to a large case statement with all the types with parameters. I remember seeing some Rust code for this at some point, maybe in datafusion, but I couldn't find it anymore.
Let's implement this in the near future.

Side comment; I found this old spec for Arrow Schema JSON representation: https://github.com/apache/arrow/pull/158/files



# Create Arrow Dataset for a given input file
def get_dataset(input_file, dataset_info, table_name=None):
column_list = None # Default
if dataset_info["format"] == "parquet":
# Defaults
column_list = None
schema = None
format = dataset_info["format"]
if format == "parquet":
dataset_read_format = ds.ParquetFileFormat()
if dataset_info["format"] == "csv":
if format == "csv":
# defaults
po = csv.ParseOptions()
ro = csv.ReadOptions() # autogenerate_column_names=True)
ro = csv.ReadOptions()
co = csv.ConvertOptions()
# TODO: Should we autogenerate column names by default?
# Or add a property in the metadata about it?
# or allow a fall-back to read_csv in case schema detection fails?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've got both csv datasets with no headers (fanniemae) and with headers (nyctaxi). If we're specifying a schema, we should use the names specified there. If we're not...well, we probably should be

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we should be careful not to accidentally skip the first line if there are no headers and we overwrite ones inferred from the first line of data; we need to store metadata about this for each dataset

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a property for this! We can now handle both. Default is no header line, so field names will be auto-generated. I had to upgrade our Arrow version for this, though, because I reported a problem with field name auto-generation which was resolved in v9.0.0... https://issues.apache.org/jira/browse/ARROW-16436


if "delim" in dataset_info:
po = csv.ParseOptions(delimiter=dataset_info["delim"])
Expand All @@ -312,15 +407,25 @@ def get_dataset(input_file, dataset_info, table_name=None):
column_types_trailed = column_types.copy()
column_types_trailed["trailing_columns"] = pa.string()
ro = csv.ReadOptions(
column_names=column_types_trailed.keys(), encoding="ISO-8859"
column_names=column_types_trailed.keys(),
encoding="iso8859" if dataset_info["name"] == "tpc-ds" else "utf8",
)
co = csv.ConvertOptions(column_types=column_types_trailed)
else: # not a TPC dataset
if dataset_info.get("tables"):
log.debug("Found schema information in metadata")
for table_entry in dataset_info.get("tables"):
if table_name is None or table_entry["table"] == table_name:
schema = get_arrow_schema(table_entry["schema"])
column_names = list(table_entry["schema"].keys())
break
ro = csv.ReadOptions(column_names=column_names)

dataset_read_format = ds.CsvFileFormat(
read_options=ro, parse_options=po, convert_options=co
)

dataset = ds.dataset(input_file, format=dataset_read_format)
dataset = ds.dataset(input_file, schema=schema, format=dataset_read_format)
scanner = dataset.scanner(columns=column_list)
return dataset, scanner

Expand Down Expand Up @@ -358,7 +463,7 @@ def convert_dataset(
raise ValueError(msg)

with open(cached_dataset_metadata_file) as f:
dataset_metadata = json.load(f)
dataset_metadata = json.load(f, object_pairs_hook=OrderedDict)

if (dataset_metadata["format"] == new_format) and (old_nrows == new_nrows):
log.info("Conversion not needed.")
Expand All @@ -383,10 +488,18 @@ def convert_dataset(
if parquet_compression is None:
parquet_compression = "snappy" # Use snappy by default
write_options = dataset_write_format.make_write_options(
compression=parquet_compression
compression=parquet_compression,
use_deprecated_int96_timestamps=False,
coerce_timestamps="us",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you've upgraded to arrow 9.0.0, this may be unnecessary now; some of the weird behavior around this was part of the old parquet versions, and the default version got bumped with 9.0.0. Fine to leave here for safety though.

allow_truncated_timestamps=True,
)
if new_format == "csv":
dataset_write_format = ds.CsvFileFormat()
# Don't include header if there's a known schema
if dataset_info.get("tables"):
write_options = dataset_write_format.make_write_options(
include_header=False
)

ds.write_dataset(
scanner,
Expand All @@ -411,22 +524,19 @@ def convert_dataset(

metadata_table_list.append(
{
"table": f"{file_name}.{new_format}",
"table": file_name,
"schema": schema_to_dict(dataset.schema),
}
)

# TODO: The dataset API does a poor job at detecting the schema.
# Would be nice to be able to fall back to read/write_csv etc.
# Another option is to store the schema as metadata in the repo and pass it
# to dataset.
# It would also be nice to detect/provide option whether the first line
# contains column names.

conv_time = time.perf_counter() - conv_start
log.info("Finished conversion.")
log.debug(f"conversion took {conv_time:0.2f} s")
dataset_info["tables"] = metadata_table_list
# Parquet already stores the schema internally
if new_format == "csv":
# Don't overwrite schema if it is already known
if dataset_info.get("tables") is None:
dataset_info["tables"] = metadata_table_list
dataset_info["format"] = new_format
dataset_info["partitioning-nrows"] = new_nrows
if parquet_compression is not None:
Expand Down Expand Up @@ -467,18 +577,31 @@ def generate_dataset(dataset_info, argument_info):
out_dir=cached_dataset_path, scale_factor=argument_info.scale_factor
)

metadata_table_list = []
for table in tpc_info.tpc_table_names[dataset_name]:
input_file = pathlib.Path(cached_dataset_path, table + ".csv")
dataset, scanner = get_dataset(input_file, dataset_info, table)
metadata_table_list.append(
{"table": table + ".csv", "schema": schema_to_dict(dataset.schema)}
)
# If the entry in the repo file does not specify the schema, try to detect it
if not dataset_info.get("tables"):
metadata_table_list = []
for table in tpc_info.tpc_table_names[dataset_name]:
input_file = pathlib.Path(cached_dataset_path, table + ".csv")
try:
dataset, scanner = get_dataset(input_file, dataset_info, table)
metadata_table_list.append(
{
"table": table,
"schema": schema_to_dict(dataset.schema),
}
)
except Exception:
log.error(
f"pyarrow.dataset is unable to read schema from generated file {input_file}"
)
clean_cache_dir(cached_dataset_path)
raise

dataset_info["tables"] = metadata_table_list

gen_time = time.perf_counter() - gen_start
log.info("Finished generating.")
log.debug(f"generation took {gen_time:0.2f} s")
dataset_info["tables"] = metadata_table_list
write_metadata(dataset_info, cached_dataset_path)

except Exception:
Expand Down Expand Up @@ -534,7 +657,7 @@ def download_dataset(dataset_info, argument_info):
# so something could have gone wrong while downloading/converting previously
if dataset_file_path.exists():
log.debug(f"Removing existing file '{dataset_file_path}'")
dataset_file_path.rmdir()
dataset_file_path.unlink()
url = dataset_info["url"]
try:
http = urllib3.PoolManager()
Expand All @@ -557,15 +680,24 @@ def download_dataset(dataset_info, argument_info):
dataset_file_name = removesuffix(dataset_file_name, "." + compression)
dataset_file_path = removesuffix(dataset_file_path, "." + compression)

try:
dataset, scanner = get_dataset(dataset_file_path, dataset_info)
dataset_info["tables"] = [
{"table": str(dataset_file_name), "schema": schema_to_dict(dataset.schema)}
]
except Exception:
log.error("pyarrow.dataset is unable to read downloaded file")
clean_cache_dir(cached_dataset_path)
raise
# Parquet already stores the schema internally
if dataset_info["format"] == "csv":
# If the entry in the repo file does not specify the schema, try to detect it
if not dataset_info.get("tables"):
try:
dataset, scanner = get_dataset(dataset_file_path, dataset_info)
dataset_info["tables"] = [
{
"table": str(pathlib.Path(dataset_file_name).stem),
"schema": schema_to_dict(dataset.schema),
}
]
except Exception:
log.error(
"pyarrow.dataset is unable to read schema from downloaded file"
)
clean_cache_dir(cached_dataset_path)
raise

if dataset_info.get("files"):
# In this case, the dataset info contained checksums. Check them
Expand Down
2 changes: 1 addition & 1 deletion repo.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"dim" : [22180168, 31],
"format" : "csv",
"file-compression" : "gz",
"tables": [{"table" : "fanniemae_2016Q4.csv", "schema" : {
"tables": [{"table" : "2016Q4", "schema" : {
"LOAN_ID" : "string",
"ACT_PERIOD" : "string",
"SERVICER" : "string",
Expand Down
Loading