Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka transactions #370

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging

from kafka import KafkaProducer
from confluent_kafka import Producer

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter

Expand All @@ -13,28 +13,41 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
self.topic_prefix = self.get_topic_prefix(output)
print(self.connection_url, self.topic_prefix)
self.producer = Producer({
'bootstrap.servers': self.connection_url,
'transactional.id': 'ethereumetl',
'enable.idempotence': True,
})

def get_connection_url(self, output):
try:
return output.split('/')[1]
except KeyError:
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
except IndexError:
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/<topic-prefix>"')

def get_topic_prefix(self, output):
try:
return output.split('/')[2] + "."
except IndexError:
return ''

def open(self):
pass
self.producer.init_transactions()

def export_items(self, items):
self.producer.begin_transaction()
for item in items:
self.export_item(item)

self.producer.commit_transaction()

def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
print(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
logging.debug(data)
return self.producer.produce(self.topic_prefix + self.item_type_to_topic_mapping[item_type], data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

Expand All @@ -43,9 +56,8 @@ def convert_items(self, items):
yield self.converter.convert_item(item)

def close(self):
pass


pass

def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
Expand Down
2 changes: 1 addition & 1 deletion docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- For Postgres: `--output=postgresql+pg8000://<user>:<password>@<host>:<port>/<database_name>`,
e.g. `--output=postgresql+pg8000://postgres:[email protected]:5432/ethereum`.
- For GCS: `--output=gs://<bucket_name>`. Make sure to install and initialize `gcloud` cli.
- For Kafka: `--output=kafka/<host>:<port>`, e.g. `--output=kafka/127.0.0.1:9092`
- For Kafka: `--output=kafka/<host>:<port>/<optional: topic_prefix>`, e.g. `--output=kafka/127.0.0.1:9092` or `--output=kafka/127.0.0.1:9092/crypto_ethereum`.
- Those output types can be combined with a comma e.g. `--output=gs://<bucket_name>,projects/<your-project>/topics/crypto_ethereum`

The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def read(fname):
'timeout-decorator==0.4.1',
'google-cloud-pubsub==2.1.0',
'google-cloud-storage==1.33.0',
'kafka-python==2.0.2',
'confluent-kafka==1.9.0',
'sqlalchemy==1.4',
'pg8000==1.16.6',
# This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,
Expand Down