Skip to content

Commit

Permalink
Update modin
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Apr 12, 2024
1 parent 10fb102 commit 0fbfd6c
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 361 deletions.
33 changes: 14 additions & 19 deletions queries/modin/q1.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,28 @@


def q() -> None:
VAR1 = date(1998, 9, 2)

lineitem = utils.get_line_item_ds
line_item_ds = utils.get_line_item_ds
# first call one time to cache in case we don't include the IO times
lineitem()
line_item_ds()

def query() -> pd.DataFrame:
nonlocal lineitem
lineitem = lineitem()
nonlocal line_item_ds
line_item_ds = line_item_ds()

var1 = date(1998, 9, 2)

sel = lineitem.l_shipdate <= VAR1
lineitem_filtered = lineitem[sel]
filt = line_item_ds[line_item_ds["l_shipdate"] <= var1]

# This is lenient towards pandas as normally an optimizer should decide
# that this could be computed before the groupby aggregation.
# Other implementations don't enjoy this benefit.
lineitem_filtered["disc_price"] = lineitem_filtered.l_extendedprice * (
1 - lineitem_filtered.l_discount
)
lineitem_filtered["charge"] = (
lineitem_filtered.l_extendedprice
* (1 - lineitem_filtered.l_discount)
* (1 + lineitem_filtered.l_tax)
filt["disc_price"] = filt.l_extendedprice * (1.0 - filt.l_discount)
filt["charge"] = (
filt.l_extendedprice * (1.0 - filt.l_discount) * (1.0 + filt.l_tax)
)
gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"], as_index=False)

total = gb.agg(
gb = filt.groupby(["l_returnflag", "l_linestatus"], as_index=False)
agg = gb.agg(
sum_qty=pd.NamedAgg(column="l_quantity", aggfunc="sum"),
sum_base_price=pd.NamedAgg(column="l_extendedprice", aggfunc="sum"),
sum_disc_price=pd.NamedAgg(column="disc_price", aggfunc="sum"),
Expand All @@ -47,9 +42,9 @@ def query() -> pd.DataFrame:
count_order=pd.NamedAgg(column="l_orderkey", aggfunc="size"),
)

result_df = total.sort_values(["l_returnflag", "l_linestatus"])
result_df = agg.sort_values(["l_returnflag", "l_linestatus"])

return result_df
return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
134 changes: 26 additions & 108 deletions queries/modin/q2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@


def q() -> None:
var1 = 15
var2 = "BRASS"
var3 = "EUROPE"

region_ds = utils.get_region_ds
nation_ds = utils.get_nation_ds
supplier_ds = utils.get_supplier_ds
Expand All @@ -40,96 +36,26 @@ def query() -> pd.DataFrame:
part_ds = part_ds()
part_supp_ds = part_supp_ds()

nation_filtered = nation_ds.loc[:, ["n_nationkey", "n_name", "n_regionkey"]]
region_filtered = region_ds[(region_ds["r_name"] == var3)]
region_filtered = region_filtered.loc[:, ["r_regionkey"]]
r_n_merged = nation_filtered.merge(
region_filtered, left_on="n_regionkey", right_on="r_regionkey", how="inner"
)
r_n_merged = r_n_merged.loc[:, ["n_nationkey", "n_name"]]
supplier_filtered = supplier_ds.loc[
:,
[
"s_suppkey",
"s_name",
"s_address",
"s_nationkey",
"s_phone",
"s_acctbal",
"s_comment",
],
]
s_r_n_merged = r_n_merged.merge(
supplier_filtered,
left_on="n_nationkey",
right_on="s_nationkey",
how="inner",
)
s_r_n_merged = s_r_n_merged.loc[
:,
[
"n_name",
"s_suppkey",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
],
]
partsupp_filtered = part_supp_ds.loc[
:, ["ps_partkey", "ps_suppkey", "ps_supplycost"]
]
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered, left_on="s_suppkey", right_on="ps_suppkey", how="inner"
)
ps_s_r_n_merged = ps_s_r_n_merged.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_partkey",
"ps_supplycost",
],
]
part_filtered = part_ds.loc[:, ["p_partkey", "p_mfgr", "p_size", "p_type"]]
part_filtered = part_filtered[
(part_filtered["p_size"] == var1)
& (part_filtered["p_type"].str.endswith(var2))
]
part_filtered = part_filtered.loc[:, ["p_partkey", "p_mfgr"]]
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner"
)
merged_df = merged_df.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_supplycost",
"p_partkey",
"p_mfgr",
],
]
min_values = merged_df.groupby("p_partkey", as_index=False)[
"ps_supplycost"
].min()
min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["p_partkey", "ps_supplycost"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
var1 = 15
var2 = "BRASS"
var3 = "EUROPE"

jn = (
part_ds.merge(part_supp_ds, left_on="p_partkey", right_on="ps_partkey")
.merge(supplier_ds, left_on="ps_suppkey", right_on="s_suppkey")
.merge(nation_ds, left_on="s_nationkey", right_on="n_nationkey")
.merge(region_ds, left_on="n_regionkey", right_on="r_regionkey")
)
result_df = merged_df.loc[

jn = jn[jn["p_size"] == var1]
jn = jn[jn["p_type"].str.endswith(var2)]
jn = jn[jn["r_name"] == var3]

gb = jn.groupby("p_partkey", as_index=False)
agg = gb["ps_supplycost"].min()
jn2 = agg.merge(jn, on=["p_partkey", "ps_supplycost"])

sel = jn2.loc[
:,
[
"s_acctbal",
Expand All @@ -142,22 +68,14 @@ def query() -> pd.DataFrame:
"s_comment",
],
]
result_df = result_df.sort_values(
by=[
"s_acctbal",
"n_name",
"s_name",
"p_partkey",
],
ascending=[
False,
True,
True,
True,
],
).head(100)

return result_df
sort = sel.sort_values(
by=["s_acctbal", "n_name", "s_name", "p_partkey"],
ascending=[False, True, True, True],
)
result_df = sort.head(100)

return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
49 changes: 21 additions & 28 deletions queries/modin/q3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@


def q() -> None:
var1 = var2 = date(1995, 3, 15)
var3 = "BUILDING"

customer_ds = utils.get_customer_ds
line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds
Expand All @@ -32,34 +29,30 @@ def query() -> pd.DataFrame:
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lineitem_filtered = line_item_ds.loc[
:, ["l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"]
]
orders_filtered = orders_ds.loc[
:, ["o_orderkey", "o_custkey", "o_orderdate", "o_shippriority"]
]
customer_filtered = customer_ds.loc[:, ["c_mktsegment", "c_custkey"]]
lsel = lineitem_filtered.l_shipdate > var1
osel = orders_filtered.o_orderdate < var2
csel = customer_filtered.c_mktsegment == var3
flineitem = lineitem_filtered[lsel]
forders = orders_filtered[osel]
fcustomer = customer_filtered[csel]
jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey")
var1 = "BUILDING"
var2 = date(1995, 3, 15)

fcustomer = customer_ds[customer_ds["c_mktsegment"] == var1]

jn1 = fcustomer.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")

jn2 = jn2[jn2["o_orderdate"] < var2]
jn2 = jn2[jn2["l_shipdate"] > var2]
jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount)

total = (
jn2.groupby(
["l_orderkey", "o_orderdate", "o_shippriority"], as_index=False
)["revenue"]
.sum()
.sort_values(["revenue"], ascending=False)
gb = jn2.groupby(
["o_orderkey", "o_orderdate", "o_shippriority"], as_index=False
)
result_df = total.head(10).loc[
:, ["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]
]
return result_df
agg = gb["revenue"].sum()

sel = agg.loc[:, ["o_orderkey", "revenue", "o_orderdate", "o_shippriority"]]
sel = sel.rename({"o_orderkey": "l_orderkey"}, axis="columns")

sorted = sel.sort_values(by=["revenue", "o_orderdate"], ascending=[False, True])
result_df = sorted.head(10)

return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
37 changes: 18 additions & 19 deletions queries/modin/q4.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
from __future__ import annotations

from datetime import date
from typing import TYPE_CHECKING

from queries.modin import utils
import modin.pandas as pd

if TYPE_CHECKING:
import modin.pandas as pd
from queries.modin import utils

Q_NUM = 4


def q() -> None:
date1 = date(1993, 10, 1)
date2 = date(1993, 7, 1)

line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds

Expand All @@ -28,18 +23,22 @@ def query() -> pd.DataFrame:
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate
osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2)
flineitem = line_item_ds[lsel]
forders = orders_ds[osel]
jn = forders[forders["o_orderkey"].isin(flineitem["l_orderkey"])]
result_df = (
jn.groupby("o_orderpriority", as_index=False)["o_orderkey"]
.count()
.sort_values(["o_orderpriority"])
.rename(columns={"o_orderkey": "order_count"})
)
return result_df
var1 = date(1993, 7, 1)
var2 = date(1993, 10, 1)

jn = line_item_ds.merge(orders_ds, left_on="l_orderkey", right_on="o_orderkey")

jn = jn[(jn["o_orderdate"] >= var1) & (jn["o_orderdate"] < var2)]
jn = jn[jn["l_commitdate"] < jn["l_receiptdate"]]

jn = jn.drop_duplicates(subset=["o_orderpriority", "l_orderkey"])

gb = jn.groupby("o_orderpriority", as_index=False)
agg = gb.agg(order_count=pd.NamedAgg(column="o_orderkey", aggfunc="count"))

result_df = agg.sort_values(["o_orderpriority"])

return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
26 changes: 12 additions & 14 deletions queries/modin/q5.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@


def q() -> None:
date1 = date(1994, 1, 1)
date2 = date(1995, 1, 1)

region_ds = utils.get_region_ds
nation_ds = utils.get_nation_ds
customer_ds = utils.get_customer_ds
Expand All @@ -37,34 +34,35 @@ def query() -> pd.DataFrame:
nonlocal line_item_ds
nonlocal orders_ds
nonlocal supplier_ds

region_ds = region_ds()
nation_ds = nation_ds()
customer_ds = customer_ds()
line_item_ds = line_item_ds()
orders_ds = orders_ds()
supplier_ds = supplier_ds()

var1 = "ASIA"
var2 = date(1994, 1, 1)
var3 = date(1995, 1, 1)

jn1 = region_ds.merge(nation_ds, left_on="r_regionkey", right_on="n_regionkey")
jn2 = jn1.merge(customer_ds, left_on="n_nationkey", right_on="c_nationkey")
jn3 = jn2.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
jn4 = jn3.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
jn5 = supplier_ds.merge(
jn4,
left_on=["s_suppkey", "s_nationkey"],
right_on=["l_suppkey", "n_nationkey"],
jn5 = jn4.merge(
supplier_ds,
left_on=["l_suppkey", "n_nationkey"],
right_on=["s_suppkey", "s_nationkey"],
)

jn5 = jn5[jn5["r_name"] == var1]
jn5 = jn5[(jn5["o_orderdate"] >= var2) & (jn5["o_orderdate"] < var3)]
jn5["revenue"] = jn5.l_extendedprice * (1.0 - jn5.l_discount)

jn5 = jn5[
(jn5.o_orderdate >= date1)
& (jn5.o_orderdate < date2)
& (jn5.r_name == "ASIA")
]
gb = jn5.groupby("n_name", as_index=False)["revenue"].sum()
result_df = gb.sort_values("revenue", ascending=False)

return result_df
return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
Loading

0 comments on commit 0fbfd6c

Please sign in to comment.