Skip to content

Commit

Permalink
sqlflow.conf -> sqlflow.input
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolytics committed Dec 23, 2024
1 parent 1c3029d commit cc52c30
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions benchmark/enrichment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --delete --topic topic-enrich || true

# Publish benchmark data set
python3 cmd/publish-test-data.py --num-messages=1000000 --topic="topic-enrich"
python3 cmd/publish-test-data.py --num-messages=100000 --topic="topic-enrich"

# consume until complete
/usr/bin/time -l python3 cmd/sql-flow.py run $(PWD)/dev/config/examples/enrich.yml --max-msgs-to-process=1000000
/usr/bin/time -l python3 cmd/sql-flow.py run $(PWD)/dev/config/examples/enrich.yml --max-msgs-to-process=100000
10 changes: 5 additions & 5 deletions sqlflow/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ class SQLFlow:
SQLFlow executes a pipeline as a daemon.
'''

def __init__(self, conf, consumer, handler, output: Writer):
self.conf = conf
def __init__(self, input, consumer, handler, output: Writer):
self.input = input
self.consumer = consumer
self.output = output
self.handler = handler

def consume_loop(self, max_msgs=None):
logger.info('consumer loop starting')
try:
self.consumer.subscribe(self.conf.pipeline.input.topics)
self.consumer.subscribe(self.input.topics)
self._consume_loop(max_msgs)
finally:
self.consumer.close()
Expand Down Expand Up @@ -63,7 +63,7 @@ def _consume_loop(self, max_msgs=None):
diff = (now - start_dt)
logger.debug('{}: reqs / second'.format(total_messages // diff.total_seconds()))

if num_messages == self.conf.pipeline.input.batch_size:
if num_messages == self.input.batch_size:
# apply the pipeline
batch = self.handler.invoke()
for l in batch:
Expand Down Expand Up @@ -155,7 +155,7 @@ def new_sqlflow_from_conf(conf, conn, handler) -> SQLFlow:
)

sflow = SQLFlow(
conf=conf,
input=conf.pipeline.input,
consumer=consumer,
handler=handler,
output=output,
Expand Down

0 comments on commit cc52c30

Please sign in to comment.