Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply Aggregation Without Group By #429

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion xql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Running SQL like queries on Xarray Datasets. Consider dataset as a table and dat
* **`aggregate` Functions** - Aggregate functions `AVG()`, `MIN()`, `MAX()`, etc. Only supported on data variables.
* For more checkout the [road-map](https://github.com/google/weather-tools/tree/xql-init/xql#roadmap).
> Note: For now, we support `where` conditions on coordinates only.
> Note: For now, Only a single aggregate function is supported per query.

# Quickstart

Expand Down Expand Up @@ -105,7 +106,7 @@ _Updated on 2024-01-08_
2. [ ] On Variables
3. [x] **Aggregate Functions**: Only `AVG()`, `MIN()`, `MAX()`, `SUM()` are supported.
1. [x] With Group By
2. [ ] Without Group By
2. [x] Without Group By
3. [ ] Multiple Aggregate function in a single query
4. [ ] **Order By**: Only suppoted for coordinates.
5. [ ] **Limit**: Limiting the result to display.
Expand Down
57 changes: 36 additions & 21 deletions xql/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import readline # noqa

import numpy as np
import pandas as pd
import typing as t
import xarray as xr

Expand All @@ -42,10 +43,16 @@
}

aggregate_function_map = {
'avg': lambda x, y: x.mean(dim=y) if y else x.mean(),
'min': lambda x, y: x.min(dim=y) if y else x.min(),
'max': lambda x, y: x.max(dim=y) if y else x.max(),
'sum': lambda x, y: x.sum(dim=y) if y else x.sum(),
'avg': lambda x, y: x.mean(dim=y),
'min': lambda x, y: x.min(dim=y),
'max': lambda x, y: x.max(dim=y),
'sum': lambda x, y: x.sum(dim=y),
}

timestamp_formats = {
'time_date':"%Y-%m-%d",
'time_month':"%Y-%m",
'time_year': "%Y"
}

def parse(a: t.Union[xr.DataArray, str], b: t.Union[xr.DataArray, str]) -> t.Tuple[t.Union[xr.DataArray, str],
Expand Down Expand Up @@ -148,7 +155,7 @@ def apply_order_by(fields: t.List[str], ds: xr.Dataset) -> xr.Dataset:
return ordered_ds


def apply_group_by(fields: t.List[str], ds: xr.Dataset, agg_funcs: t.Dict[str, str]) -> xr.Dataset:
def apply_group_by(group_by: exp.Expression, ds: xr.Dataset, agg_funcs: t.Dict[str, str]) -> xr.Dataset:
"""
Apply group-by and aggregation operations to the dataset based on specified fields and aggregation functions.

Expand All @@ -162,21 +169,23 @@ def apply_group_by(fields: t.List[str], ds: xr.Dataset, agg_funcs: t.Dict[str, s
- xarray.Dataset: The dataset after applying group-by and aggregation operations.
"""

if group_by is None:
return ds, None

fields = [ e.args['this'].args['this'] for e in group_by.args['expressions'] ]

grouped_ds = ds
for field in fields:
if field in ds.coords:
grouped_ds = apply_aggregation(grouped_ds, list(agg_funcs.values())[0], field)
else:
field_parts = field.split("_")
groupby_field = field_parts[0]
if len(field_parts) > 1:
groupby_field = f"{groupby_field}.{field_parts[1]}"
groups = grouped_ds.groupby(groupby_field)
grouped_ds = apply_aggregation(groups, list(agg_funcs.values())[0])
return grouped_ds
time_fields = list(filter(lambda field: "time" in field, fields))
coord_to_squeeze = [ coord for coord in ds.coords if coord not in fields and (coord != "time") ]
if len(time_fields):
groups = grouped_ds.groupby(grouped_ds['time'].dt.strftime(timestamp_formats[time_fields[0]]))
grouped_ds = apply_aggregation(groups, list(agg_funcs.values())[0], None)
grouped_ds = grouped_ds.rename({"strftime" : "time"})

return grouped_ds, coord_to_squeeze


def apply_aggregation(groups: t.Union[xr.Dataset, DatasetGroupBy], fun: str, dim: t.Optional[str] = None) -> xr.Dataset:
def apply_aggregation(groups: t.Union[xr.Dataset, DatasetGroupBy], fun: str, dim: t.List[str] = []) -> xr.Dataset:
"""
Apply aggregation to the groups based on the specified aggregation function.

Expand Down Expand Up @@ -248,9 +257,10 @@ def parse_query(query: str) -> xr.Dataset:
mask = inorder(where, ds)
ds = ds.where(mask, drop=True)

if group_by:
groupby_fields = [ e.args['this'].args['this'] for e in group_by.args['expressions'] ]
ds = apply_group_by(groupby_fields, ds, agg_funcs)
ds, coord_to_squeeze = apply_group_by(group_by, ds, agg_funcs)

if len(agg_funcs):
ds = apply_aggregation(ds, list(agg_funcs.values())[0], coord_to_squeeze)

return ds

Expand Down Expand Up @@ -351,6 +361,11 @@ def display_table_dataset_map(cmd: str) -> None:
result = f"ERROR: {type(e).__name__}: {e.__str__()}."

if isinstance(result, xr.Dataset):
print(result.to_dataframe())
if len(result.coords):
print(result.to_dataframe().reset_index())
else:
result = result.compute().to_dict(data="list")
df = pd.DataFrame({ k: [v['data']] for k, v in result['data_vars'].items() })
print(df)
else:
print(result)
Loading