Skip to content

Commit

Permalink
added benchmark; fixed util reservation indexing issues to support qu…
Browse files Browse the repository at this point in the history
…okka clusters from different reservations; fixed stateful_transform, added covariance computation and word count
  • Loading branch information
marsupialtail committed Dec 1, 2022
1 parent e29da80 commit b4345f4
Show file tree
Hide file tree
Showing 12 changed files with 1,390 additions and 20 deletions.
48 changes: 41 additions & 7 deletions apps/tpc-h/tpch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pyquokka.df import *
from pyquokka.utils import LocalCluster, QuokkaClusterManager
from schema import *
mode = "S3"
mode = "DISK"
format = "csv"
disk_path = "/home/ziheng/tpc-h/"
#disk_path = "s3://yugan/tpc-h-out/"
Expand All @@ -10,6 +10,8 @@

import pyarrow as pa
import pyarrow.compute as compute
import numpy as np
from pyquokka.executors import Executor
import polars

if mode == "DISK":
Expand Down Expand Up @@ -251,7 +253,8 @@ def do_9():

f = d.groupby(["nation", "o_year"]).aggregate(aggregations = {"amount":"sum"})
f.explain()
return f.collect()
result = f.collect()
return result.sort('amount_sum')

def do_10():
d = customer.join(nation, left_on="c_nationkey", right_on="n_nationkey")
Expand Down Expand Up @@ -311,15 +314,45 @@ def word_count():

def udf2(x):
x = x.to_arrow()
da = compute.list_flatten(compute.ascii_split_whitespace(x["text"]))
da = compute.list_flatten(compute.ascii_split_whitespace(x["l_comment"]))
c = da.value_counts().flatten()
return polars.from_arrow(pa.Table.from_arrays([c[0], c[1]], names=["word","count"]))

words = qc.read_csv(disk_path + "random-words.txt",["text"],sep="|")
counted = words.transform( udf2, new_schema = ["word", "count"], required_columns = {"text"}, foldable=True)
# words = qc.read_csv(disk_path + "random-words.txt",["text"],sep="|")
counted = lineitem.transform( udf2, new_schema = ["word", "count"], required_columns = {"l_comment"}, foldable=True)
f = counted.groupby("word").agg({"count":"sum"})
return f.collect()

def covariance():

class AggExecutor(Executor):
def __init__(self) -> None:
self.state = None
def execute(self,batches,stream_id, executor_id):
for batch in batches:
#print(batch)
if self.state is None:
self.state = batch
else:
self.state += batch
def done(self,executor_id):
return self.state

agg_executor = AggExecutor()
def udf2(x):
x = x.select(["l_quantity", "l_extendedprice", "l_discount", "l_tax"]).to_numpy()
product = np.dot(x.transpose(), x)
#print(product)
return polars.from_numpy(product, columns = ["a","b","c","d"])

d = lineitem.select(["l_quantity", "l_extendedprice", "l_discount", "l_tax"])
d = d.transform( udf2, new_schema = ["a","b","c","d"], required_columns = {"l_quantity", "l_extendedprice", "l_discount", "l_tax"}, foldable=True)

d = d.stateful_transform( agg_executor , ["a","b","c","d"], {"a","b","c","d"},
partitioner=BroadcastPartitioner(), placement_strategy = SingleChannelStrategy())

return d.collect()

def sort():

return lineitem.drop("l_comment").sort("l_partkey", 200000000).write_parquet("s3://yugan/tpc-h-out/", output_line_limit = 5000000)
Expand All @@ -334,12 +367,13 @@ def sort():
# print(do_1())
# print(do_3())

print(do_4())
# print(do_4())
# print(do_5())
# print(do_6())
print(do_7())
# print(do_7())
# print(do_8())
# print(do_9())
# print(do_12())

#print(word_count())
print(covariance())
Loading

0 comments on commit b4345f4

Please sign in to comment.