Skip to content

Commit

Permalink
Invoke test and support for managed (windowed) tables
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolytics committed Dec 24, 2024
1 parent dad69c6 commit b25a0d1
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ install-tools:

.PHONY: test-unit
test-unit:
pytest tests
pytest --ignore=tests/benchmarks --ignore=tests/integration tests


.PHONY: start-backing-services
Expand Down
4 changes: 4 additions & 0 deletions dev/fixtures/window.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"timestamp":"2015-12-12T19:11:01.249Z","event":"search","properties":{"city":"New York"},"user":{"id":"123412ds"}}
{"timestamp":"2015-12-12T19:11:01.249Z","event":"search","properties":{"city":"New York"},"user":{"id":"123412ds1"}}
{"timestamp":"2015-12-12T19:11:01.249Z","event":"search","properties":{"city":"Baltimore"},"user":{"id":"123412ds1"}}
{"timestamp":"2015-12-12T19:11:01.249Z","event":"search","properties":{"city":"Baltimore"},"user":{"id":"123412ds1"}}
47 changes: 39 additions & 8 deletions sqlflow/cli/dev/invoke.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
from sqlflow.sql import init_tables
from sqlflow.handlers import InferredDiskBatch
from sqlflow.serde import JSON
from sqlflow.sql import init_tables, build_managed_tables
from sqlflow.handlers import InferredDiskBatch, get_class
from sqlflow.config import new_from_path


def invoke(conn, config, fixture, setting_overrides={}):
def invoke(conn, config, fixture, setting_overrides={}, flush_window=False):
"""
Invoke will initialize config and invoke the configured pipleline against
the provided fixture.
:param conn:
:param config:
:param fixture:
:param setting_overrides:
:param flush_window: Flushes the window after the invocation.
:return:
"""
conf = new_from_path(config, setting_overrides)
init_tables(conn, conf.tables)

p = InferredDiskBatch(
conf=conf,
BatchHandler = get_class(conf.pipeline.type)
h = BatchHandler(
conf,
deserializer=JSON(),
conn=conn,
).init()

init_tables(conn, conf.tables)
managed_tables = build_managed_tables(
conn,
conf.tables.sql,
)
if managed_tables:
assert len(managed_tables) == 1, \
"only a single managed table is currently supported"

with open(fixture) as f:
for line in f:
p.write(line)
res = list(p.invoke())
cleaned_line = line.strip()
if cleaned_line:
h.write(cleaned_line)

res = list(h.invoke())
if not flush_window:
print(res)
return res

res = managed_tables[0].collect_closed()
print(res)

return res
13 changes: 11 additions & 2 deletions sqlflow/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from sqlflow import new_from_path
from sqlflow.handlers import get_class
from sqlflow.serde import JSON
from sqlflow.sql import new_sqlflow_from_conf, init_tables,handle_tables
from sqlflow.sql import (
new_sqlflow_from_conf,
init_tables,
handle_managed_tables,
build_managed_tables,
)


def start(config, max_msgs=None):
Expand All @@ -19,7 +24,11 @@ def start(config, max_msgs=None):
)

init_tables(conn, conf.tables)
handle_tables(conn, conf.tables)
managed_tables = build_managed_tables(
conn,
conf.tables.sql,
)
handle_managed_tables(managed_tables)

sflow = new_sqlflow_from_conf(
conf,
Expand Down
16 changes: 13 additions & 3 deletions sqlflow/handlers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import json
import os

import duckdb
import pyarrow as pa

logger = logging.getLogger(__name__)


class InferredDiskBatch:
"""
Expand Down Expand Up @@ -89,9 +92,16 @@ def invoke(self):

def _invoke(self):
batch = pa.Table.from_pylist(self.rows)
res = self.conn.sql(
self.conf.pipeline.sql,
)

try:
res = self.conn.sql(
self.conf.pipeline.sql,
)
except duckdb.duckdb.BinderException as e:
logger.error(
'could not execute sql: {}'.format(self.conf.pipeline.sql),
)
raise e

if not res:
return
Expand Down
34 changes: 20 additions & 14 deletions sqlflow/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,13 @@ def init_tables(conn, tables):
conn.sql(sql_table.sql)


def handle_tables(conn, tables):
"""
Starts table management routines. Some tables are managed throughout the
lifetime of the sqlflow process, such as windowed tables.
This routine kicks off that management.
:param conn:
:param tables:
:return:
"""
for table in tables.sql:
def build_managed_tables(conn, table_confs):
managed_tables = []
for table in table_confs:
# windowed tables are the only supported tables currently
if not table.window:
continue

# init table
if table.window.type != 'tumbling':
raise NotImplementedError('only tumbling window is supported')

Expand All @@ -126,9 +117,24 @@ def handle_tables(conn, tables):
size_seconds=table.window.duration_seconds,
writer=ConsoleWriter(),
)
managed_tables.append(h)
return managed_tables


def handle_managed_tables(tables):
"""
Starts table management routines. Some tables are managed throughout the
lifetime of the sqlflow process, such as windowed tables.
This routine kicks off that management.
:param conn:
:param tables:
:return:
"""
for handler in tables:
t = threading.Thread(
target=h.start,
target=handler.start,
)
t.start()

Expand Down
2 changes: 1 addition & 1 deletion sqlflow/window/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .handlers import Tumbling
from .handlers import Tumbling, Table
Empty file added tests/benchmarks/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions tests/benchmarks/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import unittest


class TestCase(unittest.TestCase):
def test_fail(self):
self.fail()
20 changes: 18 additions & 2 deletions tests/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def test_basic_agg_mem(self):
fixture=os.path.join(fixtures_dir, 'simple.json'),
)
self.assertEqual([
'{"city":"New York","city_count":28672}',
'{"city":"Baltimore","city_count":28672}',
'{"city": "New York", "city_count": 28672}',
'{"city": "Baltimore", "city_count": 28672}',
], out)

def test_csv_filesystem_join(self):
Expand Down Expand Up @@ -84,6 +84,22 @@ def test_enrich(self):
'{"event":"search","properties":{"city":"New York"},"user":{"id":"123412ds"},"nested_city":{"something":"New York"},"nested_json":{"":"New York","":1,"":2}}',
], out)

def test_tumbling_window(self):
conn = duckdb.connect()
out = invoke(
conn=conn,
config=os.path.join(conf_dir, 'examples', 'tumbling.window.yml'),
fixture=os.path.join(fixtures_dir, 'window.jsonl'),
flush_window=True,
)
self.assertEqual(
[
{'timestamp': 1449878400000, 'city': 'New York', 'count': 2},
{'timestamp': 1449878400000, 'city': 'Baltimore', 'count': 2},
],
out,
)


class TablesTestCase(unittest.TestCase):
def test_init_window_success(self):
Expand Down

0 comments on commit b25a0d1

Please sign in to comment.