Skip to content

Commit

Permalink
Added parquet partitioning conversion test
Browse files Browse the repository at this point in the history
  • Loading branch information
joosthooz authored and austin3dickey committed Aug 30, 2022
1 parent 389a548 commit 7fcfa3e
Showing 1 changed file with 86 additions and 40 deletions.
126 changes: 86 additions & 40 deletions tests/test_datalogistik.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import datetime
import decimal
import json
import os
import pathlib
Expand Down Expand Up @@ -116,8 +117,8 @@ def create_test_dataset(path):
"""{"type_name": "decimal", "arguments": [7, 3]}""",
),
}
# These 3 should be equivalent
complete_schema_json_input = """{
# These schema's should be equivalent
complete_csv_schema_json_input = """{
"a": "null",
"b": "bool",
"c": "int8",
Expand All @@ -138,10 +139,10 @@ def create_test_dataset(path):
"u": "large_string",
"v": {"type_name": "time32", "arguments": "ms"},
"w": {"type_name": "time64", "arguments": "us"},
"x": {"type_name": "timestamp", "arguments": {"unit": "s"}}
"x": {"type_name": "timestamp", "arguments": {"unit": "ms"}}
}"""
complete_schema_json_output = "{'a': 'null', 'b': 'bool', 'c': 'int8', 'd': 'int16', 'e': 'int32', 'f': 'int64', 'g': 'uint8', 'h': 'uint16', 'i': 'uint32', 'j': 'uint64', 'l': 'float', 'm': 'double', 'n': 'date32[day]', 'o': 'date64[ms]', 'q': 'string', 'r': 'string', 't': 'large_string', 'u': 'large_string', 'v': 'time32[ms]', 'w': 'time64[us]', 'x': 'timestamp[s]'}"
complete_schema = pa.schema(
complete_csv_schema_json_output = "{'a': 'null', 'b': 'bool', 'c': 'int8', 'd': 'int16', 'e': 'int32', 'f': 'int64', 'g': 'uint8', 'h': 'uint16', 'i': 'uint32', 'j': 'uint64', 'l': 'float', 'm': 'double', 'n': 'date32[day]', 'o': 'date64[ms]', 'q': 'string', 'r': 'string', 't': 'large_string', 'u': 'large_string', 'v': 'time32[ms]', 'w': 'time64[us]', 'x': 'timestamp[ms]'}"
complete_csv_schema = pa.schema(
[
pa.field("a", pa.null()),
pa.field("b", pa.bool_()),
Expand All @@ -158,7 +159,7 @@ def create_test_dataset(path):
pa.field("m", pa.float64()),
pa.field("n", pa.date32()),
pa.field("o", pa.date64()),
# pa.field("p", pa.month_day_nano_interval()), # not supported by csv
# pa.field("p", pa.month_day_nano_interval()), # not supported by parquet and csv
pa.field("q", pa.string()),
pa.field("r", pa.utf8()),
# pa.field("s", pa.large_binary()), # not supported by csv
Expand All @@ -167,12 +168,44 @@ def create_test_dataset(path):
# types with arguments
pa.field("v", pa.time32("ms")),
pa.field("w", pa.time64("us")),
pa.field("x", pa.timestamp("s")),
# pa.field("y", pa.duration("ns")), # not supported by csv
pa.field("x", pa.timestamp("ms")),
# pa.field("y", pa.duration("ns")), # not supported by parquet and csv
# pa.field("z", pa.binary(10)), # not supported by csv
# pa.field("argh", pa.decimal128(7, 3)),
]
)
complete_parquet_schema = pa.schema(
[
pa.field("a", pa.null()),
pa.field("b", pa.bool_()),
pa.field("c", pa.int8()),
pa.field("d", pa.int16()),
pa.field("e", pa.int32()),
pa.field("f", pa.int64()),
pa.field("g", pa.uint8()),
pa.field("h", pa.uint16()),
# pa.field("i", pa.uint32()), # not supported by parquet
pa.field("j", pa.uint64()),
# pa.field("k", pa.float16()), # not supported by parquet and csv
pa.field("l", pa.float32()),
pa.field("m", pa.float64()),
pa.field("n", pa.date32()),
# pa.field("o", pa.date64()), # not supported by parquet
# pa.field("p", pa.month_day_nano_interval()), # not supported by parquet and csv
pa.field("q", pa.string()),
pa.field("r", pa.utf8()),
pa.field("s", pa.large_binary()),
pa.field("t", pa.large_string()),
pa.field("u", pa.large_utf8()),
# types with arguments
pa.field("v", pa.time32("ms")),
pa.field("w", pa.time64("us")),
pa.field("x", pa.timestamp("us")),
# pa.field("y", pa.duration("s")), # not supported by parquet and csv
pa.field("z", pa.binary(10)),
pa.field("argh", pa.decimal128(7, 3)),
]
)


def generate_random_string(length):
Expand All @@ -182,9 +215,9 @@ def generate_random_string(length):
return random_string


def generate_complete_schema_data(num_rows):
def generate_complete_schema_data(num_rows, format):
k = num_rows
return {
data = {
"a": pa.nulls(k),
"b": random.choices([True, False], k=k),
"c": [random.randint(-(2**8 / 2), 2**8 / 2 - 1) for _ in range(k)],
Expand All @@ -193,7 +226,6 @@ def generate_complete_schema_data(num_rows):
"f": [random.randint(-(2**64 / 2), 2**64 / 2 - 1) for _ in range(k)],
"g": [random.randint(0, 2**8 - 1) for _ in range(k)],
"h": [random.randint(0, 2**16 - 1) for _ in range(k)],
"i": [random.randint(0, 2**32 - 1) for _ in range(k)],
"j": [random.randint(0, 2**64 - 1) for _ in range(k)],
# "k": [np.float16(random.random()) for _ in range(k)],
"l": [random.random() for _ in range(k)],
Expand All @@ -210,20 +242,9 @@ def generate_complete_schema_data(num_rows):
// datetime.timedelta(days=1)
for _ in range(k)
],
"o": [
datetime.datetime(
random.randint(1970, 2270),
random.randint(1, 12),
random.randint(1, 28),
tzinfo=datetime.timezone.utc,
).timestamp()
* 1000
for _ in range(k)
],
# "p": [pa.MonthDayNano([random.randint(1,12),random.randint(1,28),random.randint(1,999) * 1000]) for _ in range(k)],
"q": [generate_random_string(random.randint(1, 8)) for _ in range(k)],
"r": [generate_random_string(random.randint(1, 8)) for _ in range(k)],
# "s": [random.randbytes(random.randint(1, 64)) for _ in range(k)],
"t": [generate_random_string(random.randint(1, 8)) for _ in range(k)],
"u": [generate_random_string(random.randint(1, 8)) for _ in range(k)],
# types with arguments
Expand Down Expand Up @@ -251,16 +272,33 @@ def generate_complete_schema_data(num_rows):
minutes=random.randint(0, 59),
seconds=random.randint(0, 59),
)
// datetime.timedelta(seconds=1)
// datetime.timedelta(milliseconds=1)
for _ in range(k)
],
# "y": [random.randint(0, 10e9) for _ in range(k)],
# "z": [random.randbytes(10) for _ in range(k)],
# "argh": [
# decimal.Decimal(f"{random.randint(0, 9999)}.{random.randint(0,999)}")
# for _ in range(k)
# ],
# "y": [random.randint(0, 10e6) for _ in range(k)],
}
if format == "csv":
data["i"] = [random.randint(0, 2**32 - 1) for _ in range(k)]
data["o"] = [
datetime.datetime(
random.randint(1970, 2270),
random.randint(1, 12),
random.randint(1, 28),
tzinfo=datetime.timezone.utc,
).timestamp()
* 1000
for _ in range(k)
]
elif format == "parquet":
data["s"] = [random.randbytes(random.randint(1, 64)) for _ in range(k)]
data["z"] = [random.randbytes(10) for _ in range(k)]
data["argh"] = [
decimal.Decimal(f"{random.randint(0, 9999)}.{random.randint(0,999)}")
for _ in range(k)
]
else:
raise (f"unsupport format {format}")
return data


def test_arrow_type_function_lookup():
Expand All @@ -276,12 +314,14 @@ def test_arrow_type_from_json():


def test_get_arrow_schema():
parsed_schema = util.get_arrow_schema(json.loads(complete_schema_json_input))
assert parsed_schema == complete_schema
parsed_schema = util.get_arrow_schema(json.loads(complete_csv_schema_json_input))
assert parsed_schema == complete_csv_schema


def test_schema_to_dict():
assert str(util.schema_to_dict(complete_schema)) == complete_schema_json_output
assert (
str(util.schema_to_dict(complete_csv_schema)) == complete_csv_schema_json_output
)


def test_get_dataset_with_schema():
Expand All @@ -291,15 +331,18 @@ def test_get_dataset_with_schema():
path = util.create_cached_dataset_path(name, None, "csv", 0)
path.mkdir(parents=True)
test_file = path / "complete_data.csv"
data = generate_complete_schema_data(num_rows)
ref_table = pa.table(data, schema=complete_schema)
data = generate_complete_schema_data(num_rows, "csv")
ref_table = pa.table(data, schema=complete_csv_schema)
wo = csv.WriteOptions(include_header=False)
csv.write_csv(ref_table, test_file, write_options=wo)
complete_dataset_info = {
"name": name,
"format": "csv",
"tables": [
{"table": "complete_data", "schema": json.loads(complete_schema_json_input)}
{
"table": "complete_data",
"schema": json.loads(complete_csv_schema_json_input),
}
],
}
util.write_metadata(complete_dataset_info, path)
Expand Down Expand Up @@ -423,16 +466,19 @@ def test_convert_dataset_csv_partitioning():
path = util.create_cached_dataset_path(name, None, format, 0)
path.mkdir(parents=True)
test_file = path / file_name
data = generate_complete_schema_data(num_rows)
orig_table = pa.table(data, schema=complete_schema)
data = generate_complete_schema_data(num_rows, "csv")
orig_table = pa.table(data, schema=complete_csv_schema)
wo = csv.WriteOptions(include_header=False)
csv.write_csv(orig_table, test_file, write_options=wo)
complete_dataset_info = {
"name": name,
"url": "http://example.com/complete_data.csv", # needed for filename during conversion
"format": format,
"tables": [
{"table": "complete_data", "schema": json.loads(complete_schema_json_input)}
{
"table": "complete_data",
"schema": json.loads(complete_csv_schema_json_input),
}
],
}
util.write_metadata(complete_dataset_info, path)
Expand Down Expand Up @@ -469,8 +515,8 @@ def test_convert_dataset_parquet_partitioning():
path = util.create_cached_dataset_path(name, None, format, 0)
path.mkdir(parents=True)
test_file = path / file_name
data = generate_complete_schema_data(num_rows)
orig_table = pa.table(data, schema=complete_schema)
data = generate_complete_schema_data(num_rows, "parquet")
orig_table = pa.table(data, schema=complete_parquet_schema)
pq.write_table(orig_table, test_file)
complete_dataset_info = {
"name": name,
Expand Down

0 comments on commit 7fcfa3e

Please sign in to comment.