Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs on how to transition to using dask-expr -- given that it's now the default for dataframes in the new dask release #968

Open
skrawcz opened this issue Mar 12, 2024 · 4 comments

Comments

@skrawcz
Copy link

skrawcz commented Mar 12, 2024

Describe the issue:
Creating dataframes without dask-expr works, but now with it being the default in the latest release it fails. I don't see migration/documentation on what the behavior changes are.

Minimal Complete Verifiable Example:
Run https://github.com/DAGWorks-Inc/hamilton/tree/main/plugin_tests/h_dask with the latest libraries (change conftest.py to use dask-expr query planning).

The code that is failing is this class https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/plugins/h_dask.py#L175 -- which has some custom stuff and assumptions about how dask used to behave.

When I set:

 dask.config.set({‘dataframe.query-planning’: False}) 

Everything works -- so I'm convinced it's dask-expr that's the issue.

Anything else we need to know?:
I posted https://dask.discourse.group/t/what-changed-in-the-latest-release-with-the-default-to-use-dask-expr/2597 too.

Environment:

  • Dask version: latest
  • Python version: 3.9
  • Operating System: Linux & mac.
  • Install method (conda, pip, source): pip
@skrawcz skrawcz changed the title Docs on how to transition to using dask-expr -- given that it's now the default for dataframes in the new release Docs on how to transition to using dask-expr -- given that it's now the default for dataframes in the new dask release Mar 12, 2024
@phofl
Copy link
Collaborator

phofl commented Mar 12, 2024

Hi, thanks for reaching out. Do you have tracebacks that we could look at?

One thing that's incorrect on first glance:

dask.dataframe.core.Scalar

That's a private import and changed for dask-expr

@skrawcz
Copy link
Author

skrawcz commented Mar 12, 2024

Can you see https://app.circleci.com/pipelines/github/DAGWorks-Inc/hamilton/2705/workflows/eef49863-3c50-4ff2-a68c-ced576de8385/jobs/45410 ?

Yep, I entirely think it could be me using internal APIs -- hence docs on what changed would be useful.

@edu-gi
Copy link

edu-gi commented Apr 26, 2024

I’m experiencing issues perhaps related to the same issue. With dask-expr installed, the issue happens with release 2024.3.0 and later. I am using dask.dataframe.read_parquet to read from a directory on S3 that contains multiple parquet files. I am running on Python 3.10.14 [GCC 10.2.1 20210110] on a linux machine (and see this happen with Python 3.11 as well). If I include dask.config.set({"dataframe.query-planning": False}) in my code, the issue goes away, but not a good long term solution as I presume query-planning will eventually be forced.

I am running the following:

import dask.dataframe as dd
ddf = dd.read_parquet(
    path=s3_input_path,
    filters=filter_expression,
    filesystem=pyarrow_s3_file_system,
)
ddf.compute()

Which throws the following on the ddf.compute() call:

File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:474, in FrameBase.compute(self, fuse, **kwargs)
    472 if not isinstance(out, Scalar):
    473     out = out.repartition(npartitions=1)
--> 474 out = out.optimize(fuse=fuse)
    475 return DaskMethodsMixin.compute(out, **kwargs)

File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:589, in FrameBase.optimize(self, fuse)
    571 def optimize(self, fuse: bool = True):
    572     """Optimizes the DataFrame.
    573
    574     Runs the optimizer with all steps over the DataFrame and wraps the result in a
   (...)
    587         The optimized Dask Dataframe
    588     """
--> 589     return new_collection(self.expr.optimize(fuse=fuse))

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:94, in Expr.optimize(self, **kwargs)
     93 def optimize(self, **kwargs):
---> 94     return optimize(self, **kwargs)

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:3009, in optimize(expr, fuse)
   2988 """High level query optimization
   2989
   2990 This leverages three optimization passes:
   (...)
   3005 optimize_blockwise_fusion
   3006 """
   3007 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 3009 return optimize_until(expr, stage)

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:2965, in optimize_until(expr, stage)
   2962     return expr
   2964 # Manipulate Expression to make it more efficient
-> 2965 expr = expr.rewrite(kind="tune")
   2966 if stage == "tuned-logical":
   2967     return expr

File /usr/local/lib/python3.11/site-packages/dask_expr/_core.py:263, in Expr.rewrite(self, kind)
    261 # Allow children to rewrite their parents
    262 for child in expr.dependencies():
--> 263     out = getattr(child, up_name)(expr)
    264     if out is None:
    265         out = expr

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:965, in ReadParquetPyarrowFS._tune_up(self, parent)
    964 def _tune_up(self, parent):
--> 965     if self._fusion_compression_factor >= 1:
    966         return
    967     if isinstance(parent, FusedParquetIO):

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1006, in ReadParquetPyarrowFS._fusion_compression_factor(self)
   1004 @property
   1005 def _fusion_compression_factor(self):
-> 1006     approx_stats = self.approx_statistics()
   1007     total_uncompressed = 0
   1008     after_projection = 0

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:782, in ReadParquetPyarrowFS.approx_statistics(self)
    780 files_to_consider = np.array(self._dataset_info["all_files"])[idxs]
    781 stats = [_STATS_CACHE[tokenize(finfo)] for finfo in files_to_consider]
--> 782 return _combine_stats(stats)

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1859, in _combine_stats(stats)
   1851 """Combine multiple file-level statistics into a single dict of metrics that
   1852 represent the average values of the parquet statistics"""
   1853 agg_cols = {
   1854     "total_compressed_size": statistics.mean,
   1855     "total_uncompressed_size": statistics.mean,
   1856     "path_in_schema": lambda x: set(x).pop(),
   1857 }
   1858 return _agg_dicts(
-> 1859     _aggregate_statistics_to_file(stats),
   1860     {
   1861         "num_rows": statistics.mean,
   1862         "num_row_groups": statistics.mean,
   1863         "serialized_size": statistics.mean,
   1864         "total_byte_size": statistics.mean,
   1865         "columns": partial(_aggregate_columns, agg_cols=agg_cols),
   1866     },
   1867 )

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1822, in _aggregate_statistics_to_file(stats)
   1819     file_stat = file_stat.copy()
   1820     aggregated_stats.append(file_stat)
-> 1822     file_stat.update(_agg_dicts(file_stat.pop("row_groups"), agg_func))
   1823 return aggregated_stats

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in _aggregate_columns(cols, agg_cols)
   1794         break
   1795     i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in <listcomp>(.0)
   1794         break
   1795     i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'

It seems that perhaps the issue might be with file statistics somehow?. I tried a few options of calling dd.read_parquet differently but no luck.

Thanks in advance for any help

@phofl
Copy link
Collaborator

phofl commented Apr 27, 2024

This was indeed a bug, put up a pr to fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants