Skip to content

Commit

Permalink
- adds typed window output
Browse files Browse the repository at this point in the history
- adds basic agg mem invoke test
  • Loading branch information
turbolytics committed Dec 23, 2024
1 parent cc52c30 commit 92fd1f9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
22 changes: 14 additions & 8 deletions sqlflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,19 @@ def new_from_path(path: str, setting_overrides={}):
return new_from_dict(conf)


def new_from_dict(conf):
output_type = conf['pipeline'].get('output', {}).get('type')

def build_output(c: object):
output_type = c.get('type')
if output_type == 'kafka':
output = KafkaOutput(
return KafkaOutput(
type='kafka',
topic=conf['pipeline']['output']['topic'],
topic=c['topic']
)
else:
output = ConsoleOutput(type='console')
return ConsoleOutput(type='console')


def new_from_dict(conf):
output = build_output(conf['pipeline'].get('output', {}))

tables = Tables(
csv=[],
Expand All @@ -121,9 +124,12 @@ def new_from_dict(conf):
for sql_table_conf in conf.get('tables', {}).get('sql', []):
window_conf = sql_table_conf.pop('window')
if window_conf:
w = Window(**window_conf)
output = build_output((window_conf.pop('output')))
s = SQLTable(
window=w,
window=Window(
output=output,
**window_conf,
),
**sql_table_conf,
)
tables.sql.append(s)
Expand Down
18 changes: 16 additions & 2 deletions tests/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class ExamplesTestCase(unittest.TestCase):
def test_basic_agg(self):
def test_basic_agg_disk(self):
conn = duckdb.connect()
out = invoke(
conn=conn,
Expand All @@ -29,6 +29,18 @@ def test_basic_agg(self):
'{"city":"Baltimore","city_count":28672}',
], out)

def test_basic_agg_mem(self):
conn = duckdb.connect()
out = invoke(
conn=conn,
config=os.path.join(conf_dir, 'examples', 'basic.agg.mem.yml'),
fixture=os.path.join(fixtures_dir, 'simple.json'),
)
self.assertEqual([
'{"city":"New York","city_count":28672}',
'{"city":"Baltimore","city_count":28672}',
], out)

def test_csv_filesystem_join(self):
conn = duckdb.connect()
out = invoke(
Expand Down Expand Up @@ -101,7 +113,9 @@ def test_init_window_success(self):
type='tumbling',
duration_seconds=600,
time_field='time',
output={'type': 'console'},
output=ConsoleOutput(
type='console',
),
),
conf.tables.sql[0].window,
)
Expand Down

0 comments on commit 92fd1f9

Please sign in to comment.