Skip to content

Commit

Permalink
Restore modin queries (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Apr 11, 2024
1 parent d3c7855 commit c899e0a
Show file tree
Hide file tree
Showing 16 changed files with 935 additions and 5 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ run-pandas: .venv ## Run pandas benchmarks
run-dask: .venv ## Run dask benchmarks
$(VENV_BIN)/python -m queries.dask.executor

.PHONY: run-modin
run-modin: .venv ## Run pandas benchmarks
$(VENV_BIN)/python -m queries.modin.executor

.PHONY: run-pyspark
run-pyspark: .venv ## Run pyspark benchmarks
$(VENV_BIN)/python -m queries.pyspark.executor
Expand Down
Empty file added queries/modin/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions queries/modin/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from queries.common_utils import execute_all

if __name__ == "__main__":
execute_all("modin")
58 changes: 58 additions & 0 deletions queries/modin/q1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

from datetime import date

import modin.pandas as pd

from queries.modin import utils

Q_NUM = 1


def q() -> None:
VAR1 = date(1998, 9, 2)

lineitem = utils.get_line_item_ds
# first call one time to cache in case we don't include the IO times
lineitem()

def query() -> pd.DataFrame:
nonlocal lineitem
lineitem = lineitem()

sel = lineitem.l_shipdate <= VAR1
lineitem_filtered = lineitem[sel]

# This is lenient towards pandas as normally an optimizer should decide
# that this could be computed before the groupby aggregation.
# Other implementations don't enjoy this benefit.
lineitem_filtered["disc_price"] = lineitem_filtered.l_extendedprice * (
1 - lineitem_filtered.l_discount
)
lineitem_filtered["charge"] = (
lineitem_filtered.l_extendedprice
* (1 - lineitem_filtered.l_discount)
* (1 + lineitem_filtered.l_tax)
)
gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"], as_index=False)

total = gb.agg(
sum_qty=pd.NamedAgg(column="l_quantity", aggfunc="sum"),
sum_base_price=pd.NamedAgg(column="l_extendedprice", aggfunc="sum"),
sum_disc_price=pd.NamedAgg(column="disc_price", aggfunc="sum"),
sum_charge=pd.NamedAgg(column="charge", aggfunc="sum"),
avg_qty=pd.NamedAgg(column="l_quantity", aggfunc="mean"),
avg_price=pd.NamedAgg(column="l_extendedprice", aggfunc="mean"),
avg_disc=pd.NamedAgg(column="l_discount", aggfunc="mean"),
count_order=pd.NamedAgg(column="l_orderkey", aggfunc="size"),
)

result_df = total.sort_values(["l_returnflag", "l_linestatus"])

return result_df

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
166 changes: 166 additions & 0 deletions queries/modin/q2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from queries.modin import utils

if TYPE_CHECKING:
import modin.pandas as pd

Q_NUM = 2


def q() -> None:
var1 = 15
var2 = "BRASS"
var3 = "EUROPE"

region_ds = utils.get_region_ds
nation_ds = utils.get_nation_ds
supplier_ds = utils.get_supplier_ds
part_ds = utils.get_part_ds
part_supp_ds = utils.get_part_supp_ds

# first call one time to cache in case we don't include the IO times
region_ds()
nation_ds()
supplier_ds()
part_ds()
part_supp_ds()

def query() -> pd.DataFrame:
nonlocal region_ds
nonlocal nation_ds
nonlocal supplier_ds
nonlocal part_ds
nonlocal part_supp_ds
region_ds = region_ds()
nation_ds = nation_ds()
supplier_ds = supplier_ds()
part_ds = part_ds()
part_supp_ds = part_supp_ds()

nation_filtered = nation_ds.loc[:, ["n_nationkey", "n_name", "n_regionkey"]]
region_filtered = region_ds[(region_ds["r_name"] == var3)]
region_filtered = region_filtered.loc[:, ["r_regionkey"]]
r_n_merged = nation_filtered.merge(
region_filtered, left_on="n_regionkey", right_on="r_regionkey", how="inner"
)
r_n_merged = r_n_merged.loc[:, ["n_nationkey", "n_name"]]
supplier_filtered = supplier_ds.loc[
:,
[
"s_suppkey",
"s_name",
"s_address",
"s_nationkey",
"s_phone",
"s_acctbal",
"s_comment",
],
]
s_r_n_merged = r_n_merged.merge(
supplier_filtered,
left_on="n_nationkey",
right_on="s_nationkey",
how="inner",
)
s_r_n_merged = s_r_n_merged.loc[
:,
[
"n_name",
"s_suppkey",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
],
]
partsupp_filtered = part_supp_ds.loc[
:, ["ps_partkey", "ps_suppkey", "ps_supplycost"]
]
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered, left_on="s_suppkey", right_on="ps_suppkey", how="inner"
)
ps_s_r_n_merged = ps_s_r_n_merged.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_partkey",
"ps_supplycost",
],
]
part_filtered = part_ds.loc[:, ["p_partkey", "p_mfgr", "p_size", "p_type"]]
part_filtered = part_filtered[
(part_filtered["p_size"] == var1)
& (part_filtered["p_type"].str.endswith(var2))
]
part_filtered = part_filtered.loc[:, ["p_partkey", "p_mfgr"]]
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner"
)
merged_df = merged_df.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_supplycost",
"p_partkey",
"p_mfgr",
],
]
min_values = merged_df.groupby("p_partkey", as_index=False)[
"ps_supplycost"
].min()
min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["p_partkey", "ps_supplycost"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
)
result_df = merged_df.loc[
:,
[
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
],
]
result_df = result_df.sort_values(
by=[
"s_acctbal",
"n_name",
"s_name",
"p_partkey",
],
ascending=[
False,
True,
True,
True,
],
).head(100)

return result_df

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
68 changes: 68 additions & 0 deletions queries/modin/q3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

from datetime import date
from typing import TYPE_CHECKING

from queries.modin import utils

if TYPE_CHECKING:
import modin.pandas as pd

Q_NUM = 3


def q() -> None:
var1 = var2 = date(1995, 3, 15)
var3 = "BUILDING"

customer_ds = utils.get_customer_ds
line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds

# first call one time to cache in case we don't include the IO times
customer_ds()
line_item_ds()
orders_ds()

def query() -> pd.DataFrame:
nonlocal customer_ds
nonlocal line_item_ds
nonlocal orders_ds
customer_ds = customer_ds()
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lineitem_filtered = line_item_ds.loc[
:, ["l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"]
]
orders_filtered = orders_ds.loc[
:, ["o_orderkey", "o_custkey", "o_orderdate", "o_shippriority"]
]
customer_filtered = customer_ds.loc[:, ["c_mktsegment", "c_custkey"]]
lsel = lineitem_filtered.l_shipdate > var1
osel = orders_filtered.o_orderdate < var2
csel = customer_filtered.c_mktsegment == var3
flineitem = lineitem_filtered[lsel]
forders = orders_filtered[osel]
fcustomer = customer_filtered[csel]
jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey")
jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount)

total = (
jn2.groupby(
["l_orderkey", "o_orderdate", "o_shippriority"], as_index=False
)["revenue"]
.sum()
.sort_values(["revenue"], ascending=False)
)
result_df = total.head(10).loc[
:, ["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]
]
return result_df

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
48 changes: 48 additions & 0 deletions queries/modin/q4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

from datetime import date
from typing import TYPE_CHECKING

from queries.modin import utils

if TYPE_CHECKING:
import modin.pandas as pd

Q_NUM = 4


def q() -> None:
date1 = date(1993, 10, 1)
date2 = date(1993, 7, 1)

line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds

# first call one time to cache in case we don't include the IO times
line_item_ds()
orders_ds()

def query() -> pd.DataFrame:
nonlocal line_item_ds
nonlocal orders_ds
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate
osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2)
flineitem = line_item_ds[lsel]
forders = orders_ds[osel]
jn = forders[forders["o_orderkey"].isin(flineitem["l_orderkey"])]
result_df = (
jn.groupby("o_orderpriority", as_index=False)["o_orderkey"]
.count()
.sort_values(["o_orderpriority"])
.rename(columns={"o_orderkey": "order_count"})
)
return result_df

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
Loading

0 comments on commit c899e0a

Please sign in to comment.