Skip to content

Commit

Permalink
tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolytics committed Dec 26, 2024
1 parent ac9485b commit e952a5c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 28 deletions.
4 changes: 2 additions & 2 deletions benchmark/enrichment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --delete --topic topic-enrich || true

# Publish benchmark data set
python3 cmd/publish-test-data.py --num-messages=100000 --topic="topic-enrich"
python3 cmd/publish-test-data.py --num-messages=200000 --topic="topic-enrich"

# consume until complete
/usr/bin/time -l python3 cmd/sql-flow.py run $(PWD)/dev/config/examples/enrich.yml --max-msgs-to-process=100000
/usr/bin/time -l python3 cmd/sql-flow.py run $(PWD)/dev/config/examples/enrich.yml --max-msgs-to-process=200000
3 changes: 1 addition & 2 deletions dev/config/examples/tumbling.window.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ pipeline:

input:
batch_size: 1000
topics:
- "tumbling-window"
topics: [{{ topic|default('tumbling-window') }}]

sql: |
INSERT INTO agg_cities_count
Expand Down
8 changes: 7 additions & 1 deletion sqlflow/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@ def start(conf, max_msgs=None):
conn,
handler=h,
)
sflow.consume_loop(max_msgs)
stats = sflow.consume_loop(max_msgs)

# stop all managed tables
for table in managed_tables:
table.stop()

return stats
54 changes: 38 additions & 16 deletions sqlflow/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import duckdb
import sys
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
import logging
import socket
Expand All @@ -14,6 +14,14 @@
logger = logging.getLogger(__name__)


@dataclass
class Stats:
start_time: datetime = datetime.now(timezone.utc)
num_messages_consumed: int = 0
num_errors: int = 0
total_throughput_per_second: float = 0


class SQLFlow:
'''
SQLFlow executes a pipeline as a daemon.
Expand All @@ -24,30 +32,44 @@ def __init__(self, input, consumer, handler, output: Writer):
self.consumer = consumer
self.output = output
self.handler = handler
self._stats = Stats(
num_messages_consumed=0,
num_errors=0,
start_time=datetime.now(timezone.utc),
)

def consume_loop(self, max_msgs=None):
logger.info('consumer loop starting')
try:
self.consumer.subscribe(self.input.topics)
self._consume_loop(max_msgs)

now = datetime.now(timezone.utc)
diff = (now - self._stats.start_time)
self._stats.total_throughput_per_second = self._stats.num_messages_consumed // diff.total_seconds()
finally:
self.consumer.close()
logger.info(
'consumer loop ending: total messages / sec = {}'.format(self._stats.total_throughput_per_second),
)
return self._stats


def _consume_loop(self, max_msgs=None):
num_messages = 0
start_dt = datetime.now(timezone.utc)
total_messages = 0
num_batch_messages = 0
self._stats.start_time = datetime.now(timezone.utc)
self._stats.num_messages_consumed = 0

self.handler.init()

while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
total_messages += 1
self._stats.num_messages_consumed += 1
if msg.error():
self._stats.num_errors += 1
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write(
'%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()),
Expand All @@ -57,14 +79,16 @@ def _consume_loop(self, max_msgs=None):
continue

self.handler.write(msg.value().decode())
num_messages += 1
num_batch_messages += 1

if total_messages % 10000 == 0:
if self._stats.num_messages_consumed % 10000 == 0:
now = datetime.now(timezone.utc)
diff = (now - start_dt)
logger.debug('{}: reqs / second'.format(total_messages // diff.total_seconds()))
diff = (now - self._stats.start_time)
logger.debug('{}: reqs / second'.format(
self._stats.num_messages_consumed // diff.total_seconds()),
)

if num_messages == self.input.batch_size:
if num_batch_messages == self.input.batch_size:
# apply the pipeline
batch = self.handler.invoke()
for l in batch:
Expand All @@ -76,12 +100,10 @@ def _consume_loop(self, max_msgs=None):

# reset the file state
self.handler.init()
num_messages = 0
num_batch_messages = 0

if max_msgs and max_msgs <= total_messages:
now = datetime.now(timezone.utc)
diff = (now - start_dt)
logger.debug('{}: reqs / second - Total'.format(total_messages // diff.total_seconds()))
if max_msgs and max_msgs <= self._stats.num_messages_consumed:
logger.info('max messages reached')
return


Expand Down
10 changes: 7 additions & 3 deletions sqlflow/window/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timezone

from sqlflow.outputs import Writer
from sqlflow.serde import JSON, Noop
from sqlflow.serde import JSON

logger = logging.getLogger(__name__)

Expand All @@ -29,7 +29,11 @@ def __init__(self, conn, table: Table, size_seconds, writer: Writer):
self.size_seconds = size_seconds
self.writer = writer
self._poll_interval_seconds = 10
self.serde = Noop()
self.serde = JSON()
self._stopped = None

def stop(self):
self._stopped = True

def collect_closed(self) -> [object]:
# select all data with 'closed' windows.
Expand Down Expand Up @@ -106,6 +110,6 @@ def poll(self):

def start(self):
logger.debug('starting window thread')
while True:
while not self._stopped:
self.poll()
time.sleep(self._poll_interval_seconds)
7 changes: 5 additions & 2 deletions tests/benchmarks/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def bootstrap_server():


def test_mem_persistence_window_tumbling(bootstrap_server):
num_messages = 100000
num_messages = 500000
topic = 'mem-persistence-tumbling-window'
admin_client = AdminClient({'bootstrap.servers': bootstrap_server})
fs = admin_client.delete_topics([topic], operation_timeout=30)
Expand All @@ -43,7 +43,10 @@ def test_mem_persistence_window_tumbling(bootstrap_server):
path=os.path.join(settings.CONF_DIR, 'examples', 'tumbling.window.yml'),
setting_overrides={
'kafka_brokers': bootstrap_server,
'topic': topic,
},
)

start(conf, max_msgs=num_messages)
stats = start(conf, max_msgs=num_messages)
assert stats.num_messages_consumed == num_messages
print(stats)
4 changes: 2 additions & 2 deletions tests/window/test_tumbling.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def test_flush_results(self):
tw.flush(records)
self.assertEqual(
[
(None, 'first'),
(None, 'second'),
(None, '"first"'),
(None, '"second"'),
],
writer.writes,
)
Expand Down

0 comments on commit e952a5c

Please sign in to comment.