-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator.py
45 lines (36 loc) · 1.05 KB
/
aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import duckdb
import pandas as pd
import time
from memory_profiler import profile
import os
def init():
db = duckdb.connect('sensordb.duckdb')
db.sql('SET threads TO 1')
db.sql("SET memory_limit = '125MB'")
return db
@profile
def aggregate(db, by='date'):
if by == 'date':
df = db.sql("""SELECT
timestamp::DATE as date,
SUM(temp) as temprature
FROM read_parquet('sensor_readings/**/*.parquet', hive_partitioning = true)
GROUP BY 1
ORDER BY 1 DESC""").fetchdf()
if by == 'hour':
df = db.sql("""
SELECT
timestamp::DATE as date,
date_part('hour', timestamp) as hour,
SUM(temp) as temprature
FROM read_parquet('sensor_readings/**/*.parquet', hive_partitioning = true)
GROUP BY 1,2
ORDER BY 1,2
""").fetchdf()
return df
if __name__ == '__main__':
print(os.getcwd())
print(os.listdir())
db = init()
df = aggregate(db, 'hour')
print(df)