Skip to content

Commit

Permalink
feat: add new fields to Parquet dump (#12)
Browse files Browse the repository at this point in the history
* feat: add `owner_fields` field

* feat: add `ecoscore_data` field
  • Loading branch information
raphael0202 authored Nov 18, 2024
1 parent 3bb202b commit 51ab153
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
49 changes: 47 additions & 2 deletions openfoodfacts_exports/exports/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import orjson
import pyarrow as pa
import pyarrow.parquet as pq
import tqdm
from huggingface_hub import HfApi
from more_itertools import chunked
from openfoodfacts.utils import jsonl_iter
Expand Down Expand Up @@ -99,6 +100,15 @@
)
)

OWNER_FIELD_DATATYPE = pa.list_(
pa.struct(
[
pa.field("field_name", pa.string()),
pa.field("timestamp", pa.int64()),
]
)
)

PRODUCT_SCHEMA = pa.schema(
[
pa.field("additives_n", pa.int32(), nullable=True),
Expand All @@ -123,6 +133,7 @@
pa.field("data_quality_info_tags", pa.list_(pa.string()), nullable=True),
pa.field("data_quality_warnings_tags", pa.list_(pa.string()), nullable=True),
pa.field("data_sources_tags", pa.list_(pa.string()), nullable=True),
pa.field("ecoscore_data", pa.string(), nullable=True),
pa.field("ecoscore_grade", pa.string(), nullable=True),
pa.field("ecoscore_score", pa.int32(), nullable=True),
pa.field("ecoscore_tags", pa.list_(pa.string()), nullable=True),
Expand Down Expand Up @@ -180,6 +191,7 @@
pa.field("obsolete", pa.bool_()),
pa.field("origins_tags", pa.list_(pa.string()), nullable=True),
pa.field("origins", pa.string(), nullable=True),
pa.field("owner_fields", OWNER_FIELD_DATATYPE, nullable=True),
pa.field("owner", pa.string(), nullable=True),
pa.field("packagings_complete", pa.bool_(), nullable=True),
pa.field("packaging_recycling_tags", pa.list_(pa.string()), nullable=True),
Expand Down Expand Up @@ -322,6 +334,11 @@ class PackagingField(BaseModel):
weight_measured: float | None = None


class OwnerField(BaseModel):
field_name: str
timestamp: int


class Product(BaseModel):
additives_n: int | None = None
additives_tags: list[str] | None = None
Expand All @@ -345,6 +362,7 @@ class Product(BaseModel):
data_quality_info_tags: list[str] | None = None
data_quality_warnings_tags: list[str] | None = None
data_sources_tags: list[str] | None = None
ecoscore_data: dict | None = None
ecoscore_grade: str | None = None
ecoscore_score: int | None = None
ecoscore_tags: list[str] | None = None
Expand Down Expand Up @@ -400,6 +418,7 @@ class Product(BaseModel):
origins_tags: list[str] | None = None
origins: str | None = None
owner: str | None = None
owner_fields: list[OwnerField] | None = None
packagings_complete: bool | None = None
packaging_recycling_tags: list[str] | None = None
packaging_shapes_tags: list[str] | None = None
Expand Down Expand Up @@ -547,6 +566,17 @@ def parse_ecoscore_score(cls, data: dict):

return data

@model_validator(mode="before")
@classmethod
def parse_owner_fields(cls, data: dict):
owner_fields = data.pop("owner_fields", None)
if owner_fields:
data["owner_fields"] = [
{"field_name": key, "timestamp": value}
for key, value in owner_fields.items()
]
return data

@field_serializer("ingredients")
def serialize_ingredients(
self, ingredients: list[Ingredient] | None, _info
Expand All @@ -558,6 +588,14 @@ def serialize_ingredients(
return None
return orjson.dumps([ing.model_dump() for ing in ingredients]).decode("utf-8")

@field_serializer("ecoscore_data")
def serialize_ecoscore_data(self, ecoscore_data: dict | None, _info) -> str | None:
"""Ecoscore data is a complex structure, leave it as a JSON string for
now."""
if ecoscore_data is None:
return None
return orjson.dumps(ecoscore_data).decode("utf-8")


def export_parquet(dataset_path: Path, output_path: Path) -> None:
"""Convert a JSONL dataset to Parquet format and push it to Hugging Face
Expand Down Expand Up @@ -585,7 +623,8 @@ def convert_jsonl_to_parquet(
dataset_path: Path,
schema: pa.Schema = PRODUCT_SCHEMA,
batch_size: int = 1024,
row_group_size: int = 122_880, # DuckDB default row group size
row_group_size: int = 122_880, # DuckDB default row group size,
use_tqdm: bool = False,
) -> None:
"""Convert the Open Food Facts JSONL dataset to Parquet format.
Expand All @@ -595,14 +634,20 @@ def convert_jsonl_to_parquet(
schema (pa.Schema): The schema of the Parquet file.
batch_size (int, optional): The size of the batches used to convert the
dataset. Defaults to 1024.
use_tqdm (bool, optional): Whether to use tqdm to display a progress
bar. Defaults to False.
"""
writer = None
DTYPE_MAP = {
"images": IMAGES_DATATYPE,
"nutriments": NUTRIMENTS_DATATYPE,
"packagings": PACKAGING_FIELD_DATATYPE,
}
for batch in chunked(jsonl_iter(dataset_path), batch_size):
item_iter = jsonl_iter(dataset_path)
if use_tqdm:
item_iter = tqdm.tqdm(item_iter, desc="JSONL")

for batch in chunked(item_iter, batch_size):
# We use by_alias=True because some fields start with a digit
# (ex: nutriments.100g), and we cannot declare the schema with
# Pydantic without an alias.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"rq>=2.0.0",
"sentry-sdk>=2.18.0",
"toml>=0.10.2",
"tqdm>=4.66.6",
"typer>=0.12.5",
]

Expand Down
4 changes: 3 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 51ab153

Please sign in to comment.