Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Publisher #17

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,5 @@ output
.DS_Store

.vscode/
pyrightconfig.json
pyrightconfig.json
last_synced_block.txt
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ Export directory service blocks ([Schema](../docs/schema.md), [Reference](../doc
--output-dir output --provider-uri https://api.zilliqa.com
```

To run locally
```bash
python3 setup.py sdist bdist_wheel
pip3 install dist/zilliqa-etl-1.0.6.tar.gz
doppler run -- zilliqaetl stream --provider-uri https://api.zilliqa.com -o kafka -t producer-zilliqa -ts hot
```
While running/setting up on linux do this
```bash
sudo apt-get install build-essential libssl-dev libffi-dev python3-dev
```

Setup doppler
```
doppler configure set token <token>
```


Find other commands [here](https://zilliqa-etl.readthedocs.io/en/latest/commands/).

For the latest version, check out the repo and call
Expand Down
46 changes: 46 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
apply-defaults==0.1.6
attrs==21.2.0
blockchain-etl-common==1.6.1
cachetools==4.2.2
certifi==2021.5.30
charset-normalizer==2.0.4
Click==7.0
confluent-kafka==2.2.0
cytoolz==0.11.0
eth-hash==0.3.1
eth-typing==2.2.2
eth-utils==1.10.0
fastecdsa==2.2.1
google-api-core==1.31.2
google-auth==1.35.0
google-cloud==0.34.0
google-cloud-pubsub==2.5.0
googleapis-common-protos==1.53.0
grpc-google-iam-v1==0.12.3
grpcio==1.40.0
idna==3.2
jsonrpcclient==3.3.6
jsonschema==3.2.0
libcst==0.3.20
mypy-extensions==0.4.3
packaging==21.0
proto-plus==1.19.0
protobuf==3.17.3
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycryptodome==3.10.1
pyethash==0.1.27
pyparsing==2.4.7
pyrsistent==0.18.0
python-dateutil==2.7.0
pytz==2021.1
PyYAML==5.4.1
pyzil==1.5.22
requests==2.26.0
rsa==4.7.2
six==1.16.0
timeout-decorator==0.5.0
toolz==0.11.1
typing-extensions==3.10.0.2
typing-inspect==0.7.1
urllib3==1.26.6
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def read(fname):

setup(
name='zilliqa-etl',
version='1.0.5',
version='1.0.6',
author='Evgeny Medvedev',
author_email='[email protected]',
description='Tools for exporting Zilliqa blockchain data to JSON',
Expand Down Expand Up @@ -42,7 +42,8 @@ def read(fname):
extras_require={
'dev': [
'pytest~=4.3.0',
'pytest-timeout~=1.3.3'
'pytest-timeout~=1.3.3',
'confluent-kafka==2.2.0'
],
},
entry_points={
Expand Down
3 changes: 3 additions & 0 deletions start-kafka-streaming.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/bash
source /root/zilliqa-etl/bin/activate
zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l '/root/zilliqa-etl/last_synced_block.txt'
21 changes: 13 additions & 8 deletions zilliqaetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
# SOFTWARE.

import logging
import random

import click

Expand All @@ -40,8 +39,13 @@
help='The URI of the web3 provider e.g. '
'https://dev-api.zilliqa.com/')
@click.option('-o', '--output', type=str,
help='Google PubSub topic path e.g. projects/your-project/topics/zilliqa_blockchain. '
'If not specified will print to console')
help='pubsub or kafka, if empty defaults to printing to console')
@click.option('-t', '--topic-prefix', default='producer-ethereum', type=str,
help='Google PubSub topic path e.g. projects/your-project/topics/ethereum_blockchain. OR'
'Kakfa topic prefix e.g. {chain}-{facet}-{hot/warm}')
@click.option('-ts', '--topic-suffix', default='hot', type=str,
help='Google PubSub topic path e.g. projects/your-project/topics/ethereum_blockchain. OR'
'Kakfa topic prefix e.g. {chain}-{facet}-{hot/warm}')
@click.option('-s', '--start-block', default=None, type=int, help='Start block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_STREAMING), type=str,
help='The list of entity types to export.')
Expand All @@ -51,21 +55,22 @@
@click.option('--pid-file', default=None, type=str, help='pid file')
@click.option('-r', '--rate-limit', default=None, show_default=True, type=int,
help='Maximum requests per second for provider in case it has rate limiting')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, block_batch_size=10, max_workers=5, pid_file=None, rate_limit=20):
def stream(last_synced_block_file, lag, provider_uri, output, topic_prefix, topic_suffix, start_block, entity_types,
period_seconds=10, block_batch_size=10, max_workers=5, pid_file=None, rate_limit=20):
"""Streams all data types to console or Google Pub/Sub."""
zilliqa_api = ThreadLocalProxy(lambda: ZilliqaAPI(provider_uri))
if rate_limit is not None and rate_limit > 0:
zilliqa_api = RateLimitingProxy(zilliqa_api, max_per_second=rate_limit)

entity_types = parse_entity_types(entity_types)
# entity_types = parse_entity_types(entity_types)

from zilliqaetl.exporters.zilliqa_item_exporter import get_item_exporter
# from zilliqaetl.exporters.zilliqa_item_exporter import get_item_exporter
from blockchainetl.streaming.streamer import Streamer

logging.info('Using ' + provider_uri)
zil_streamer_adapter = ZilliqaStreamerAdapter(
provider_uri=zilliqa_api, item_exporter=get_streamer_exporter(output), max_workers=max_workers)
provider_uri=zilliqa_api, item_exporter=get_streamer_exporter(output, topic_prefix, topic_suffix),
max_workers=max_workers)

streamer = Streamer(
blockchain_streamer_adapter=zil_streamer_adapter,
Expand Down
17 changes: 17 additions & 0 deletions zilliqaetl/exporters/console_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

import json


class ConsoleItemExporter:
def open(self):
pass

def export_items(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
print(json.dumps(item))

def close(self):
pass
2 changes: 1 addition & 1 deletion zilliqaetl/exporters/google_pubsub_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.cloud import pubsub_v1
from timeout_decorator import timeout_decorator


class GooglePubSubItemExporter:

def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id',)):
Expand Down Expand Up @@ -64,7 +65,6 @@ def export_item(self, item):
if item_type is not None and item_type in self.item_type_to_topic_mapping:
topic_path = self.item_type_to_topic_mapping.get(item_type)
data = json.dumps(item).encode('utf-8')

message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
return message_future
else:
Expand Down
86 changes: 86 additions & 0 deletions zilliqaetl/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
from confluent_kafka import Producer, KafkaException
from timeout_decorator import timeout_decorator

import logging

import socket
import json


class KafkaItemExporter:
def __init__(self, item_type_to_topic_mapping, message_attributes=("item_id",)) -> None:
logging.basicConfig(
level=logging.INFO, filename="message-publish.log",
format='{"time" : "%(asctime)s", "level" : "%(levelname)s" , "message" : "%(message)s"}',
)

conf = {
"bootstrap.servers": os.getenv("CONFLUENT_BROKER"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"client.id": socket.gethostname(),
"message.max.bytes": 5242880,
"sasl.username": os.getenv("KAFKA_PRODUCER_KEY"),
"sasl.password": os.getenv("KAFKA_PRODUCER_PASSWORD")
}

producer = Producer(conf)
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.producer = producer
self.logging = logging.getLogger(__name__)
self.message_attributes = message_attributes

def open(self):
pass

def export_items(self, items):
try:
self._export_items_with_timeout(items)
except timeout_decorator.TimeoutError as e:
logging.error("Timeout error")
raise e

@timeout_decorator.timeout(300)
def _export_items_with_timeout(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
item_type = item.pop("type", None)
has_item_type = item_type is not None
if has_item_type and item_type in self.item_type_to_topic_mapping:
topic = self.item_type_to_topic_mapping[item_type]
data = json.dumps(item).encode("utf-8")
self.write_txns(key=item.get("token_address"), value=data.decode("utf-8"), topic=topic)
else:
logging.error(f'Topic for item type {item_type} is not configured.')

def get_message_attributes(self, item):
attributes = {}

for attr_name in self.message_attributes:
if item.get(attr_name) is not None:
attributes[attr_name] = item.get(attr_name)

return attributes

def close(self):
self.producer.flush()
pass

def write_txns(self, key: str, value: str, topic: str):
# def acked(err, msg):
# if err is not None:
# self.logging.error(f' Message failed delivery, {topic} : {err}\n')

try:
self.producer.produce(topic, key=key, value=value)
except BufferError:
self.logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(self.producer))
except KafkaException as e:
self.logging.error(f"Kafka Exception for topic : {topic} , exception : {e}")
except NotImplementedError as e:
self.logging.error(f"NotImplementedError for topic : {topic} , exception : {e}")
self.producer.poll(0)
31 changes: 20 additions & 11 deletions zilliqaetl/exporters/zilliqa_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
from blockchainetl_common.atomic_counter import AtomicCounter
from blockchainetl_common.exporters import JsonLinesItemExporter, CsvItemExporter
from blockchainetl_common.file_utils import get_file_handle, close_silently

from zilliqaetl.exporters.kafka_exporter import KafkaItemExporter
from zilliqaetl.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
from blockchainetl.jobs.console_item_exporter import ConsoleItemExporter
from zilliqaetl.exporters.console_item_exporter import ConsoleItemExporter


class ZilliqaItemExporter:
def __init__(self, output_dir, item_type_to_filename=None, output_format='json'):
Expand Down Expand Up @@ -96,18 +99,24 @@ def get_item_exporter(output_format, file):
ValueError(f'output format {output_format} is not recognized')


def get_streamer_exporter(output):
if output is not None:
def get_streamer_exporter(output, topic_prefix, topic_suffix):
item_exporter = ConsoleItemExporter()
if output == 'gcp':
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={
'transaction': output + '.transactions',
'token_transfer': output + '.token_transfers',
'trace': output + '.traces',
#'block': output + '.blocks',
#'log': output + '.logs',
'transaction': topic_prefix + '.transactions',
'token_transfer': topic_prefix + '.token_transfers',
'trace': topic_prefix + '.traces',
# 'block': output + '.blocks',
# 'log': output + '.logs',
# 'contract': output + '.contracts',
# 'token': output + '.tokens',
})
else:
item_exporter = ConsoleItemExporter()
if output == 'kafka':
item_exporter = KafkaItemExporter(item_type_to_topic_mapping={
"transaction": topic_prefix + "-transactions-" + topic_suffix,
"token_transfer": topic_prefix + "-token_transfers-" + topic_suffix,
"transition": topic_prefix + "-transitions-" + topic_suffix,
"tx_block": topic_prefix + "-tx_blocks-" + topic_suffix,
})

return item_exporter
return item_exporter
Loading