Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: raise when join introduces duplicates #1933

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from narwhals.utils import Implementation
from narwhals.utils import Version
from narwhals.utils import check_column_exists
from narwhals.utils import check_column_names_are_unique
from narwhals.utils import generate_temporary_column_name
from narwhals.utils import is_sequence_but_not_str
from narwhals.utils import parse_columns_to_drop
Expand Down Expand Up @@ -90,10 +91,15 @@ def _change_version(self: Self, version: Version) -> Self:
self._native_frame, backend_version=self._backend_version, version=version
)

def _from_native_frame(self: Self, df: pa.Table) -> Self:
return self.__class__(
def _from_native_frame(
self: Self, df: pa.Table, *, validate_column_names: bool = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ€” maybe the internal default should be true, and then we just opt out of the check when we know it's safe to do so

) -> Self:
result = self.__class__(
df, backend_version=self._backend_version, version=self._version
)
if validate_column_names:
check_column_names_are_unique(result.columns)
return result

@property
def shape(self: Self) -> tuple[int, int]:
Expand Down Expand Up @@ -367,6 +373,7 @@ def join(
right_suffix=suffix,
)
.drop([key_token]),
validate_column_names=True,
)

return self._from_native_frame(
Expand All @@ -377,6 +384,7 @@ def join(
join_type=how_to_join_map[how],
right_suffix=suffix,
),
validate_column_names=True,
)

def join_asof(
Expand Down Expand Up @@ -472,7 +480,8 @@ def with_row_index(self: Self, name: str) -> Self:

row_indices = pa.array(range(df.num_rows))
return self._from_native_frame(
df.append_column(name, row_indices).select([name, *cols])
df.append_column(name, row_indices).select([name, *cols]),
validate_column_names=True,
)

def filter(self: Self, *predicates: IntoArrowExpr, **constraints: Any) -> Self:
Expand Down Expand Up @@ -634,7 +643,9 @@ def item(self: Self, row: int | None, column: int | str | None) -> Any:
def rename(self: Self, mapping: dict[str, str]) -> Self:
df = self._native_frame
new_cols = [mapping.get(c, c) for c in df.column_names]
return self._from_native_frame(df.rename_columns(new_cols))
return self._from_native_frame(
df.rename_columns(new_cols), validate_column_names=True
)

def write_parquet(self: Self, file: str | Path | BytesIO) -> None:
import pyarrow.parquet as pp
Expand Down Expand Up @@ -802,7 +813,8 @@ def unpivot(
for on_col in on_
],
**promote_kwargs,
)
),
validate_column_names=True,
)
# TODO(Unassigned): Even with promote_options="permissive", pyarrow does not
# upcast numeric to non-numeric (e.g. string) datatypes
26 changes: 20 additions & 6 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from narwhals.typing import CompliantLazyFrame
from narwhals.utils import Implementation
from narwhals.utils import check_column_exists
from narwhals.utils import check_column_names_are_unique
from narwhals.utils import generate_temporary_column_name
from narwhals.utils import parse_columns_to_drop
from narwhals.utils import parse_version
Expand Down Expand Up @@ -68,10 +69,15 @@ def _change_version(self: Self, version: Version) -> Self:
self._native_frame, backend_version=self._backend_version, version=version
)

def _from_native_frame(self: Self, df: Any) -> Self:
return self.__class__(
def _from_native_frame(
self: Self, df: Any, *, validate_column_names: bool = False
) -> Self:
result = self.__class__(
df, backend_version=self._backend_version, version=self._version
)
if validate_column_names:
check_column_names_are_unique(result.columns)
return result

def with_columns(self: Self, *exprs: DaskExpr, **named_exprs: DaskExpr) -> Self:
df = self._native_frame
Expand Down Expand Up @@ -278,6 +284,7 @@ def join(
suffixes=("", suffix),
)
.drop(columns=key_token),
validate_column_names=True,
)

if how == "anti":
Expand Down Expand Up @@ -308,7 +315,8 @@ def join(
right_on=left_on,
)
return self._from_native_frame(
df[df[indicator_token] == "left_only"].drop(columns=[indicator_token])
df[df[indicator_token] == "left_only"].drop(columns=[indicator_token]),
validate_column_names=True,
)

if how == "semi":
Expand All @@ -333,7 +341,8 @@ def join(
how="inner",
left_on=left_on,
right_on=left_on,
)
),
validate_column_names=True,
)

if how == "left":
Expand All @@ -351,7 +360,9 @@ def join(
extra.append(right_key)
elif right_key != left_key:
extra.append(f"{right_key}_right")
return self._from_native_frame(result_native.drop(columns=extra))
return self._from_native_frame(
result_native.drop(columns=extra), validate_column_names=True
)

return self._from_native_frame(
self._native_frame.merge(
Expand All @@ -361,6 +372,7 @@ def join(
how=how,
suffixes=("", suffix),
),
validate_column_names=True,
)

def join_asof(
Expand All @@ -386,6 +398,7 @@ def join_asof(
direction=strategy,
suffixes=("", suffix),
),
validate_column_names=True,
)

def group_by(self: Self, *by: str, drop_null_keys: bool) -> DaskLazyGroupBy:
Expand Down Expand Up @@ -428,5 +441,6 @@ def unpivot(
value_vars=on,
var_name=variable_name,
value_name=value_name,
)
),
validate_column_names=True,
)
13 changes: 3 additions & 10 deletions narwhals/_dask/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ def agg(
self._grouped,
exprs,
self._keys,
self._from_native_frame,
)

def _from_native_frame(self: Self, df: DaskLazyFrame) -> DaskLazyFrame:
from narwhals._dask.dataframe import DaskLazyFrame

return DaskLazyFrame(
df, backend_version=self._df._backend_version, version=self._df._version
)


Expand All @@ -110,7 +102,6 @@ def agg_dask(
grouped: Any,
exprs: Sequence[CompliantExpr[dx.Series]],
keys: list[str],
from_dataframe: Callable[[Any], DaskLazyFrame],
) -> DaskLazyFrame:
"""This should be the fastpath, but cuDF is too far behind to use it.

Expand Down Expand Up @@ -163,7 +154,9 @@ def agg_dask(
}
)
result_simple = grouped.agg(**simple_aggregations)
return from_dataframe(result_simple.reset_index())
return df._from_native_frame(
result_simple.reset_index(), validate_column_names=True
)

msg = (
"Non-trivial complex aggregation found.\n\n"
Expand Down
18 changes: 13 additions & 5 deletions narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from narwhals.typing import CompliantDataFrame
from narwhals.utils import Implementation
from narwhals.utils import Version
from narwhals.utils import check_column_names_are_unique
from narwhals.utils import generate_temporary_column_name
from narwhals.utils import import_dtypes_module
from narwhals.utils import parse_columns_to_drop
Expand Down Expand Up @@ -252,10 +253,15 @@ def _change_version(self: Self, version: Version) -> Self:
self._native_frame, version=version, backend_version=self._backend_version
)

def _from_native_frame(self: Self, df: duckdb.DuckDBPyRelation) -> Self:
return self.__class__(
def _from_native_frame(
self: Self, df: duckdb.DuckDBPyRelation, *, validate_column_names: bool = False
) -> Self:
result = self.__class__(
df, backend_version=self._backend_version, version=self._version
)
if validate_column_names:
check_column_names_are_unique(result.columns)
return result

def group_by(self: Self, *keys: str, drop_null_keys: bool) -> DuckDBGroupBy:
from narwhals._duckdb.group_by import DuckDBGroupBy
Expand All @@ -269,7 +275,9 @@ def rename(self: Self, mapping: dict[str, str]) -> Self:
selection = [
f"{col} as {mapping[col]}" if col in mapping else col for col in df.columns
]
return self._from_native_frame(df.select(", ".join(selection)))
return self._from_native_frame(
df.select(", ".join(selection)), validate_column_names=True
)

def join(
self: Self,
Expand Down Expand Up @@ -319,7 +327,7 @@ def join(
select = ["lhs.*"]

res = rel.select(", ".join(select)).set_alias(original_alias)
return self._from_native_frame(res)
return self._from_native_frame(res, validate_column_names=True)

def join_asof(
self: Self,
Expand Down Expand Up @@ -518,4 +526,4 @@ def unpivot(
select {cols_to_select}
from unpivot_cte;
""" # noqa: S608
return self._from_native_frame(duckdb.sql(query))
return self._from_native_frame(duckdb.sql(query), validate_column_names=True)
3 changes: 2 additions & 1 deletion narwhals/_duckdb/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ def agg(self: Self, *exprs: DuckDBExpr) -> DuckDBLazyFrame:
)

return self._compliant_frame._from_native_frame(
self._compliant_frame._native_frame.aggregate(agg_columns)
self._compliant_frame._native_frame.aggregate(agg_columns),
validate_column_names=True,
)
Loading
Loading