From 4b4be7360f49f3a7fb0abd2a2a96aaaac39b8dcf Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Sat, 9 Sep 2023 16:06:26 +0530 Subject: [PATCH 01/11] locally tested --- README.md | 7 ++ setup.py | 5 +- zilliqaetl/cli/stream.py | 16 ++-- zilliqaetl/exporters/console_item_exporter.py | 17 ++++ .../exporters/google_pubsub_item_exporter.py | 2 +- zilliqaetl/exporters/kafka_exporter.py | 82 +++++++++++++++++++ zilliqaetl/exporters/zilliqa_item_exporter.py | 30 ++++--- zilliqaetl/streaming/__init__.py | 0 8 files changed, 140 insertions(+), 19 deletions(-) create mode 100644 zilliqaetl/exporters/console_item_exporter.py create mode 100644 zilliqaetl/exporters/kafka_exporter.py create mode 100644 zilliqaetl/streaming/__init__.py diff --git a/README.md b/README.md index 9f8e239..d782dd7 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,13 @@ Export directory service blocks ([Schema](../docs/schema.md), [Reference](../doc --output-dir output --provider-uri https://api.zilliqa.com ``` +To run locally +```bash +python3 setup.py sdist bdist_wheel +pip3 install dist/zilliqa-etl-1.0.5.tar.gz +doppler run -- zilliqaetl stream --provider-uri https://api.zilliqa.com -o kafka -t procuder-zilliqa -ts hot +``` + Find other commands [here](https://zilliqa-etl.readthedocs.io/en/latest/commands/). For the latest version, check out the repo and call diff --git a/setup.py b/setup.py index df81c3a..30ab89d 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ def read(fname): setup( name='zilliqa-etl', - version='1.0.5', + version='1.0.6', author='Evgeny Medvedev', author_email='evge.medvedev@gmail.com', description='Tools for exporting Zilliqa blockchain data to JSON', @@ -42,7 +42,8 @@ def read(fname): extras_require={ 'dev': [ 'pytest~=4.3.0', - 'pytest-timeout~=1.3.3' + 'pytest-timeout~=1.3.3', + 'confluent-kafka==2.2.0' ], }, entry_points={ diff --git a/zilliqaetl/cli/stream.py b/zilliqaetl/cli/stream.py index e6fa389..ad068fc 100644 --- a/zilliqaetl/cli/stream.py +++ b/zilliqaetl/cli/stream.py @@ -40,8 +40,13 @@ help='The URI of the web3 provider e.g. ' 'https://dev-api.zilliqa.com/') @click.option('-o', '--output', type=str, - help='Google PubSub topic path e.g. projects/your-project/topics/zilliqa_blockchain. ' - 'If not specified will print to console') + help='pubsub or kafka, if empty defaults to printing to console') +@click.option('-t', '--topic-prefix', default='producer-ethereum', type=str, + help='Google PubSub topic path e.g. projects/your-project/topics/ethereum_blockchain. OR' + 'Kakfa topic prefix e.g. {chain}-{facet}-{hot/warm}') +@click.option('-ts', '--topic-suffix', default='hot', type=str, + help='Google PubSub topic path e.g. projects/your-project/topics/ethereum_blockchain. OR' + 'Kakfa topic prefix e.g. {chain}-{facet}-{hot/warm}') @click.option('-s', '--start-block', default=None, type=int, help='Start block') @click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_STREAMING), type=str, help='The list of entity types to export.') @@ -51,8 +56,8 @@ @click.option('--pid-file', default=None, type=str, help='pid file') @click.option('-r', '--rate-limit', default=None, show_default=True, type=int, help='Maximum requests per second for provider in case it has rate limiting') -def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, - period_seconds=10, block_batch_size=10, max_workers=5, pid_file=None, rate_limit=20): +def stream(last_synced_block_file, lag, provider_uri, output, topic_prefix, topic_suffix, start_block, entity_types, + period_seconds=10, block_batch_size=10, max_workers=5, pid_file=None, rate_limit=20): """Streams all data types to console or Google Pub/Sub.""" zilliqa_api = ThreadLocalProxy(lambda: ZilliqaAPI(provider_uri)) if rate_limit is not None and rate_limit > 0: @@ -65,7 +70,8 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit logging.info('Using ' + provider_uri) zil_streamer_adapter = ZilliqaStreamerAdapter( - provider_uri=zilliqa_api, item_exporter=get_streamer_exporter(output), max_workers=max_workers) + provider_uri=zilliqa_api, item_exporter=get_streamer_exporter(output, topic_prefix, topic_suffix), + max_workers=max_workers) streamer = Streamer( blockchain_streamer_adapter=zil_streamer_adapter, diff --git a/zilliqaetl/exporters/console_item_exporter.py b/zilliqaetl/exporters/console_item_exporter.py new file mode 100644 index 0000000..a6a5c60 --- /dev/null +++ b/zilliqaetl/exporters/console_item_exporter.py @@ -0,0 +1,17 @@ + +import json + + +class ConsoleItemExporter: + def open(self): + pass + + def export_items(self, items): + for item in items: + self.export_item(item) + + def export_item(self, item): + print(json.dumps(item)) + + def close(self): + pass diff --git a/zilliqaetl/exporters/google_pubsub_item_exporter.py b/zilliqaetl/exporters/google_pubsub_item_exporter.py index f05e412..956cf66 100644 --- a/zilliqaetl/exporters/google_pubsub_item_exporter.py +++ b/zilliqaetl/exporters/google_pubsub_item_exporter.py @@ -26,6 +26,7 @@ from google.cloud import pubsub_v1 from timeout_decorator import timeout_decorator + class GooglePubSubItemExporter: def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id',)): @@ -64,7 +65,6 @@ def export_item(self, item): if item_type is not None and item_type in self.item_type_to_topic_mapping: topic_path = self.item_type_to_topic_mapping.get(item_type) data = json.dumps(item).encode('utf-8') - message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item)) return message_future else: diff --git a/zilliqaetl/exporters/kafka_exporter.py b/zilliqaetl/exporters/kafka_exporter.py new file mode 100644 index 0000000..829ec51 --- /dev/null +++ b/zilliqaetl/exporters/kafka_exporter.py @@ -0,0 +1,82 @@ +import os +from confluent_kafka import Producer +from timeout_decorator import timeout_decorator + +import logging + +import socket +import json + + +class KafkaItemExporter: + def __init__(self, item_type_to_topic_mapping, message_attributes=("item_id",)) -> None: + logging.basicConfig( + level=logging.INFO, filename="message-publish.log", + format='{"time" : "%(asctime)s", "level" : "%(levelname)s" , "message" : "%(message)s"}', + ) + + conf = { + "bootstrap.servers": os.getenv("CONFLUENT_BROKER"), + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "client.id": socket.gethostname(), + "message.max.bytes": 5242880, + "sasl.username": os.getenv("KAFKA_PRODUCER_KEY"), + "sasl.password": os.getenv("KAFKA_PRODUCER_PASSWORD") + } + + producer = Producer(conf) + self.item_type_to_topic_mapping = item_type_to_topic_mapping + self.producer = producer + self.logging = logging.getLogger(__name__) + self.message_attributes = message_attributes + + def open(self): + pass + + def export_items(self, items): + try: + self._export_items_with_timeout(items) + except timeout_decorator.TimeoutError as e: + logging.error("Timeout error") + raise e + + @timeout_decorator.timeout(300) + def _export_items_with_timeout(self, items): + for item in items: + self.export_item(item) + + def export_item(self, item): + item_type = item.get("type", None) + has_item_type = item_type is not None + if has_item_type and item_type in self.item_type_to_topic_mapping: + topic = self.item_type_to_topic_mapping[item_type] + data = json.dumps(item).encode("utf-8") + self.write_txns(key=item.get("token_address"), value=data.decode("utf-8"), topic=topic) + else: + logging.error('Topic for item type "{item_type}" is not configured.') + + def get_message_attributes(self, item): + attributes = {} + + for attr_name in self.message_attributes: + if item.get(attr_name) is not None: + attributes[attr_name] = item.get(attr_name) + + return attributes + + def close(self): + self.producer.flush() + pass + + def write_txns(self, key: str, value: str, topic: str): + def acked(err, msg): + if err is not None: + self.logging.error('%% Message failed delivery: %s\n' % err) + + try: + self.producer.produce(topic, key=key, value=value, callback=acked) + except BufferError: + self.logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % + len(self.producer)) + self.producer.poll(0) diff --git a/zilliqaetl/exporters/zilliqa_item_exporter.py b/zilliqaetl/exporters/zilliqa_item_exporter.py index 23f0c69..2dc4f5e 100644 --- a/zilliqaetl/exporters/zilliqa_item_exporter.py +++ b/zilliqaetl/exporters/zilliqa_item_exporter.py @@ -27,8 +27,11 @@ from blockchainetl_common.atomic_counter import AtomicCounter from blockchainetl_common.exporters import JsonLinesItemExporter, CsvItemExporter from blockchainetl_common.file_utils import get_file_handle, close_silently + +from zilliqaetl.exporters.kafka_exporter import KafkaItemExporter from zilliqaetl.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter -from blockchainetl.jobs.console_item_exporter import ConsoleItemExporter +from zilliqaetl.exporters.console_item_exporter import ConsoleItemExporter + class ZilliqaItemExporter: def __init__(self, output_dir, item_type_to_filename=None, output_format='json'): @@ -96,18 +99,23 @@ def get_item_exporter(output_format, file): ValueError(f'output format {output_format} is not recognized') -def get_streamer_exporter(output): - if output is not None: +def get_streamer_exporter(output, topic_prefix, topic_suffix): + item_exporter = ConsoleItemExporter() + if output == 'gcp': item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={ - 'transaction': output + '.transactions', - 'token_transfer': output + '.token_transfers', - 'trace': output + '.traces', - #'block': output + '.blocks', - #'log': output + '.logs', + 'transaction': topic_prefix + '.transactions', + 'token_transfer': topic_prefix + '.token_transfers', + 'trace': topic_prefix + '.traces', + # 'block': output + '.blocks', + # 'log': output + '.logs', # 'contract': output + '.contracts', # 'token': output + '.tokens', }) - else: - item_exporter = ConsoleItemExporter() + if output == 'kafka': + item_exporter = KafkaItemExporter(item_type_to_topic_mapping={ + "transaction": topic_prefix + "-transactions-" + topic_suffix, + "token_transfer": topic_prefix + "-token_transfers-" + topic_suffix, + "trace": topic_prefix + "-traces-" + topic_suffix, + }) - return item_exporter \ No newline at end of file + return item_exporter diff --git a/zilliqaetl/streaming/__init__.py b/zilliqaetl/streaming/__init__.py new file mode 100644 index 0000000..e69de29 From 01d750b5328a0f707a431c635fe441180dd7ee42 Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 13:26:58 +0530 Subject: [PATCH 02/11] catch errors --- README.md | 4 ++-- zilliqaetl/cli/stream.py | 5 ++--- zilliqaetl/exporters/kafka_exporter.py | 16 ++++++++++------ zilliqaetl/exporters/zilliqa_item_exporter.py | 1 + 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index d782dd7..ac18245 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ Export directory service blocks ([Schema](../docs/schema.md), [Reference](../doc To run locally ```bash python3 setup.py sdist bdist_wheel -pip3 install dist/zilliqa-etl-1.0.5.tar.gz -doppler run -- zilliqaetl stream --provider-uri https://api.zilliqa.com -o kafka -t procuder-zilliqa -ts hot +pip3 install dist/zilliqa-etl-1.0.6.tar.gz +doppler run -- zilliqaetl stream --provider-uri https://api.zilliqa.com -o kafka -t producer-zilliqa -ts hot ``` Find other commands [here](https://zilliqa-etl.readthedocs.io/en/latest/commands/). diff --git a/zilliqaetl/cli/stream.py b/zilliqaetl/cli/stream.py index ad068fc..e97683d 100644 --- a/zilliqaetl/cli/stream.py +++ b/zilliqaetl/cli/stream.py @@ -21,7 +21,6 @@ # SOFTWARE. import logging -import random import click @@ -63,9 +62,9 @@ def stream(last_synced_block_file, lag, provider_uri, output, topic_prefix, topi if rate_limit is not None and rate_limit > 0: zilliqa_api = RateLimitingProxy(zilliqa_api, max_per_second=rate_limit) - entity_types = parse_entity_types(entity_types) + # entity_types = parse_entity_types(entity_types) - from zilliqaetl.exporters.zilliqa_item_exporter import get_item_exporter + # from zilliqaetl.exporters.zilliqa_item_exporter import get_item_exporter from blockchainetl.streaming.streamer import Streamer logging.info('Using ' + provider_uri) diff --git a/zilliqaetl/exporters/kafka_exporter.py b/zilliqaetl/exporters/kafka_exporter.py index 829ec51..b189c48 100644 --- a/zilliqaetl/exporters/kafka_exporter.py +++ b/zilliqaetl/exporters/kafka_exporter.py @@ -1,5 +1,5 @@ import os -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException from timeout_decorator import timeout_decorator import logging @@ -54,7 +54,7 @@ def export_item(self, item): data = json.dumps(item).encode("utf-8") self.write_txns(key=item.get("token_address"), value=data.decode("utf-8"), topic=topic) else: - logging.error('Topic for item type "{item_type}" is not configured.') + logging.error(f'Topic for item type {item_type} is not configured.') def get_message_attributes(self, item): attributes = {} @@ -70,13 +70,17 @@ def close(self): pass def write_txns(self, key: str, value: str, topic: str): - def acked(err, msg): - if err is not None: - self.logging.error('%% Message failed delivery: %s\n' % err) + # def acked(err, msg): + # if err is not None: + # self.logging.error(f' Message failed delivery, {topic} : {err}\n') try: - self.producer.produce(topic, key=key, value=value, callback=acked) + self.producer.produce(topic, key=key, value=value) except BufferError: self.logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(self.producer)) + except KafkaException as e: + self.logging.error(f"Kafka Exception for topic : {topic} , exception : {e}") + except NotImplementedError as e: + self.logging.error(f"NotImplementedError for topic : {topic} , exception : {e}") self.producer.poll(0) diff --git a/zilliqaetl/exporters/zilliqa_item_exporter.py b/zilliqaetl/exporters/zilliqa_item_exporter.py index 2dc4f5e..09a8b06 100644 --- a/zilliqaetl/exporters/zilliqa_item_exporter.py +++ b/zilliqaetl/exporters/zilliqa_item_exporter.py @@ -116,6 +116,7 @@ def get_streamer_exporter(output, topic_prefix, topic_suffix): "transaction": topic_prefix + "-transactions-" + topic_suffix, "token_transfer": topic_prefix + "-token_transfers-" + topic_suffix, "trace": topic_prefix + "-traces-" + topic_suffix, + "tx_block": topic_prefix + "-tx_blocks-" + topic_suffix, }) return item_exporter From 5524416e4bbf4eb51927b597001ecb4ffc0077f3 Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 15:01:29 +0530 Subject: [PATCH 03/11] start-kafka-streaming script --- .gitignore | 3 ++- start-kafka-streaming.sh | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 start-kafka-streaming.sh diff --git a/.gitignore b/.gitignore index cf4516a..eec571a 100644 --- a/.gitignore +++ b/.gitignore @@ -142,4 +142,5 @@ output .DS_Store .vscode/ -pyrightconfig.json \ No newline at end of file +pyrightconfig.json +last_synced_block.txt \ No newline at end of file diff --git a/start-kafka-streaming.sh b/start-kafka-streaming.sh new file mode 100644 index 0000000..e57f69e --- /dev/null +++ b/start-kafka-streaming.sh @@ -0,0 +1,3 @@ +#!/usr/bin/bash +source /root/zilliqa-etl/env/bin/activate +doppler run -- zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l /root/zilliqa-etl/last_synced_block.txt \ No newline at end of file From 16e841da7a8c86d6c213d1da960a617748c3b34d Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 15:08:04 +0530 Subject: [PATCH 04/11] requirements.txt --- requirements.txt | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bacf831 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,46 @@ +apply-defaults==0.1.6 +attrs==21.2.0 +blockchain-etl-common==1.6.1 +cachetools==4.2.2 +certifi==2021.5.30 +charset-normalizer==2.0.4 +Click==7.0 +confluent-kafka==2.2.0 +cytoolz==0.11.0 +eth-hash==0.3.1 +eth-typing==2.2.2 +eth-utils==1.10.0 +fastecdsa==2.2.1 +google-api-core==1.31.2 +google-auth==1.35.0 +google-cloud==0.34.0 +google-cloud-pubsub==2.5.0 +googleapis-common-protos==1.53.0 +grpc-google-iam-v1==0.12.3 +grpcio==1.40.0 +idna==3.2 +jsonrpcclient==3.3.6 +jsonschema==3.2.0 +libcst==0.3.20 +mypy-extensions==0.4.3 +packaging==21.0 +proto-plus==1.19.0 +protobuf==3.17.3 +pyasn1==0.4.8 +pyasn1-modules==0.2.8 +pycryptodome==3.10.1 +pyethash==0.1.27 +pyparsing==2.4.7 +pyrsistent==0.18.0 +python-dateutil==2.7.0 +pytz==2021.1 +PyYAML==5.4.1 +pyzil==1.5.22 +requests==2.26.0 +rsa==4.7.2 +six==1.16.0 +timeout-decorator==0.5.0 +toolz==0.11.1 +typing-extensions==3.10.0.2 +typing-inspect==0.7.1 +urllib3==1.26.6 \ No newline at end of file From db162d54b3ec0dc1b27108eda186e66bc65b574c Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 15:58:14 +0530 Subject: [PATCH 05/11] setup issues --- README.md | 10 ++++++++++ start-kafka-streaming.sh | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) mode change 100644 => 100755 start-kafka-streaming.sh diff --git a/README.md b/README.md index ac18245..363c7fd 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,16 @@ python3 setup.py sdist bdist_wheel pip3 install dist/zilliqa-etl-1.0.6.tar.gz doppler run -- zilliqaetl stream --provider-uri https://api.zilliqa.com -o kafka -t producer-zilliqa -ts hot ``` +While running/setting up on linux do this +```bash +sudo apt-get install build-essential libssl-dev libffi-dev python3-dev +``` + +Setup doppler +``` +doppler configure set token +``` + Find other commands [here](https://zilliqa-etl.readthedocs.io/en/latest/commands/). diff --git a/start-kafka-streaming.sh b/start-kafka-streaming.sh old mode 100644 new mode 100755 index e57f69e..8fb10c5 --- a/start-kafka-streaming.sh +++ b/start-kafka-streaming.sh @@ -1,3 +1,3 @@ #!/usr/bin/bash -source /root/zilliqa-etl/env/bin/activate +source /root/zilliqa-etl/bin/activate doppler run -- zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l /root/zilliqa-etl/last_synced_block.txt \ No newline at end of file From 8e5ca0e749d7c286831a6f7cac53a879f617f3a9 Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 16:01:45 +0530 Subject: [PATCH 06/11] doppler token --- start-kafka-streaming.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/start-kafka-streaming.sh b/start-kafka-streaming.sh index 8fb10c5..9429c93 100755 --- a/start-kafka-streaming.sh +++ b/start-kafka-streaming.sh @@ -1,3 +1,3 @@ #!/usr/bin/bash source /root/zilliqa-etl/bin/activate -doppler run -- zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l /root/zilliqa-etl/last_synced_block.txt \ No newline at end of file +doppler run --scope '/root/zilliqa-etl' -- zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l /root/zilliqa-etl/last_synced_block.txt \ No newline at end of file From 5c669980be6afde19b2f877e143592bb3b1f2148 Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 16:17:03 +0530 Subject: [PATCH 07/11] remove doppler run from script --- start-kafka-streaming.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/start-kafka-streaming.sh b/start-kafka-streaming.sh index 9429c93..6708b81 100755 --- a/start-kafka-streaming.sh +++ b/start-kafka-streaming.sh @@ -1,3 +1,3 @@ #!/usr/bin/bash source /root/zilliqa-etl/bin/activate -doppler run --scope '/root/zilliqa-etl' -- zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l /root/zilliqa-etl/last_synced_block.txt \ No newline at end of file +zilliqaetl stream --provider-uri $ZILLIQA_PROVIDER_URI -o kafka -t producer-zilliqa -ts hot -l '/root/zilliqa-etl/last_synced_block.txt' \ No newline at end of file From c65cbbde7db1c9988ee9bcd9bc1bb2d04c6bfe8e Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 19:20:49 +0530 Subject: [PATCH 08/11] mapper update --- zilliqaetl/exporters/kafka_exporter.py | 2 +- zilliqaetl/exporters/zilliqa_item_exporter.py | 2 +- zilliqaetl/jobs/export_tx_blocks_job.py | 46 ++++++++--------- zilliqaetl/mappers/ds_block_mapper.py | 2 +- zilliqaetl/mappers/event_log_mapper.py | 2 +- zilliqaetl/mappers/exception_mapper.py | 2 +- zilliqaetl/mappers/transaction_mapper.py | 34 +++++++------ zilliqaetl/mappers/transition_mapper.py | 50 ++++++++++--------- zilliqaetl/mappers/tx_block_mapper.py | 29 +++++------ zilliqaetl/streaming/zil_stream_adapter.py | 7 +++ zilliqaetl/utils/zilliqa_utils.py | 3 +- 11 files changed, 96 insertions(+), 83 deletions(-) diff --git a/zilliqaetl/exporters/kafka_exporter.py b/zilliqaetl/exporters/kafka_exporter.py index b189c48..940fbd1 100644 --- a/zilliqaetl/exporters/kafka_exporter.py +++ b/zilliqaetl/exporters/kafka_exporter.py @@ -47,7 +47,7 @@ def _export_items_with_timeout(self, items): self.export_item(item) def export_item(self, item): - item_type = item.get("type", None) + item_type = item.pop("type", None) has_item_type = item_type is not None if has_item_type and item_type in self.item_type_to_topic_mapping: topic = self.item_type_to_topic_mapping[item_type] diff --git a/zilliqaetl/exporters/zilliqa_item_exporter.py b/zilliqaetl/exporters/zilliqa_item_exporter.py index 09a8b06..4c36a5b 100644 --- a/zilliqaetl/exporters/zilliqa_item_exporter.py +++ b/zilliqaetl/exporters/zilliqa_item_exporter.py @@ -115,7 +115,7 @@ def get_streamer_exporter(output, topic_prefix, topic_suffix): item_exporter = KafkaItemExporter(item_type_to_topic_mapping={ "transaction": topic_prefix + "-transactions-" + topic_suffix, "token_transfer": topic_prefix + "-token_transfers-" + topic_suffix, - "trace": topic_prefix + "-traces-" + topic_suffix, + "transition": topic_prefix + "-transitions-" + topic_suffix, "tx_block": topic_prefix + "-tx_blocks-" + topic_suffix, }) diff --git a/zilliqaetl/jobs/export_tx_blocks_job.py b/zilliqaetl/jobs/export_tx_blocks_job.py index 1077847..cdeda46 100644 --- a/zilliqaetl/jobs/export_tx_blocks_job.py +++ b/zilliqaetl/jobs/export_tx_blocks_job.py @@ -46,9 +46,9 @@ def __init__( export_event_logs=True, export_exceptions=True, export_transitions=True, - export_token_transfers=True, - export_traces=True + export_token_transfers=True ): + self.export_token_transfers = export_token_transfers validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block @@ -62,15 +62,11 @@ def __init__( self.export_event_logs = export_event_logs self.export_exceptions = export_exceptions self.export_transitions = export_transitions - self.export_token_transfers = export_token_transfers - self.export_traces = export_traces def _start(self): self.item_exporter.open() - pass def _export(self): - """This method is called on job.run()""" self.batch_work_executor.execute( range(self.start_block, self.end_block + 1), self._export_batch, @@ -81,8 +77,8 @@ def _export_batch(self, block_number_batch): items = [] for number in block_number_batch: tx_block = map_tx_block(self.zilliqa_service.get_tx_block(number)) - num_txns: int = tx_block['num_transactions'] - txns = list(self.zilliqa_service.get_transactions(number)) if num_txns > 0 else [] + + txns = list(self.zilliqa_service.get_transactions(number)) if tx_block.get('num_transactions') > 0 else [] if self._should_export_transactions(): for txn in txns: items.append(map_transaction(tx_block, txn)) @@ -94,25 +90,23 @@ def _export_batch(self, block_number_batch): items.extend(map_transitions(tx_block, txn)) if self._should_export_token_transfers(txn): token_transfers = [] - token_transfers.extend(map_token_traces(tx_block, txn, txn_type="token_transfer")) + token_transfers.extend(map_token_traces(tx_block, txn)) # Since duplicate can happen for combination of "from_address", "to_address", "value", # "call_type", "transaction_hash" - dedup_token_transfers = {token["log_index"]: {"call_type": token["call_type"], - "from_address": token["from_address"], - "to_address": token["to_address"], - "transaction_hash": token["transaction_hash"], - "value": token["value"], - "token_address": token["token_address"]} - for token in token_transfers} - unique_token_transfers = {} - for key, token_value in dedup_token_transfers.items(): - if token_value not in unique_token_transfers.values(): - unique_token_transfers[key] = token_value - token_transfers = [token_transfer for token_transfer in token_transfers if - token_transfer["log_index"] in unique_token_transfers.keys()] + # dedup_token_transfers = {token["log_index"]: {"call_type": token["call_type"], + # "from_address": token["from_address"], + # "to_address": token["to_address"], + # "transaction_hash": token["transaction_hash"], + # "value": token["value"], + # "token_address": token["token_address"]} + # for token in token_transfers} + # unique_token_transfers = {} + # for key, token_value in dedup_token_transfers.items(): + # if token_value not in unique_token_transfers.values(): + # unique_token_transfers[key] = token_value + # token_transfers = [token_transfer for token_transfer in token_transfers if + # token_transfer["log_index"] in unique_token_transfers.keys()] items.extend(token_transfers) - if self._should_export_traces(txn): - items.extend(map_token_traces(tx_block, txn, txn_type="trace")) tx_block['num_present_transactions'] = len(txns) items.append(tx_block) @@ -134,8 +128,8 @@ def _should_export_transitions(self, txn): def _should_export_token_transfers(self, txn): return self.export_token_transfers and txn.get('receipt') - def _should_export_traces(self, txn): - return self.export_traces and txn.get('receipt') + # def _should_export_traces(self, txn): + # return self.export_traces and txn.get('receipt') def _end(self): self.batch_work_executor.shutdown() diff --git a/zilliqaetl/mappers/ds_block_mapper.py b/zilliqaetl/mappers/ds_block_mapper.py index 5c5e970..4319cd7 100644 --- a/zilliqaetl/mappers/ds_block_mapper.py +++ b/zilliqaetl/mappers/ds_block_mapper.py @@ -38,4 +38,4 @@ def map_ds_block(raw_block): 'signature': raw_block.get('signature'), } - return block + return block \ No newline at end of file diff --git a/zilliqaetl/mappers/event_log_mapper.py b/zilliqaetl/mappers/event_log_mapper.py index fc9fada..0fc71fe 100644 --- a/zilliqaetl/mappers/event_log_mapper.py +++ b/zilliqaetl/mappers/event_log_mapper.py @@ -20,7 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from zilliqaetl.utils.zilliqa_utils import to_int, json_dumps, iso_datetime_string, encode_bech32_address +from zilliqaetl.utils.zilliqa_utils import json_dumps, encode_bech32_address def map_event_logs(tx_block, txn): diff --git a/zilliqaetl/mappers/exception_mapper.py b/zilliqaetl/mappers/exception_mapper.py index 9fe462b..a721419 100644 --- a/zilliqaetl/mappers/exception_mapper.py +++ b/zilliqaetl/mappers/exception_mapper.py @@ -35,4 +35,4 @@ def map_exceptions(tx_block, txn): 'index': index, 'line': exception.get('line'), 'message': exception.get('message'), - } + } \ No newline at end of file diff --git a/zilliqaetl/mappers/transaction_mapper.py b/zilliqaetl/mappers/transaction_mapper.py index 4f9e0c9..a05e08f 100644 --- a/zilliqaetl/mappers/transaction_mapper.py +++ b/zilliqaetl/mappers/transaction_mapper.py @@ -19,28 +19,32 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from pyzil.account import Account +from zilliqaetl.utils.zilliqa_utils import to_int, iso_datetime_string, encode_bech32_pub_key, encode_bech32_address -from zilliqaetl.utils.zilliqa_utils import to_int, encode_bech32_pub_key, encode_bech32_address - -# Modified acc to MS use case def map_transaction(tx_block, txn): block = { 'type': 'transaction', - 'hash': '0x' + txn.get('ID'), + 'token_address': '0x0000', + 'id': {txn.get("ID")}, 'block_number': tx_block.get('number'), 'block_timestamp': tx_block.get('timestamp'), - 'value': to_int(txn.get('amount')), - 'gas_price': to_int(txn.get('gasPrice')), - 'from_address': encode_bech32_pub_key(txn.get('senderPubKey')), - 'to_address': encode_bech32_address(txn.get('toAddr')), + 'amount': txn.get('amount'), + # 'code': txn.get('code'), + # 'data': txn.get('data'), + # 'gas_limit': to_int(txn.get('gasLimit')), + 'gas_price': txn.get('gasPrice'), + # 'nonce': to_int(txn.get('nonce')), + # 'sender_pub_key': txn.get('senderPubKey'), + 'sender': encode_bech32_pub_key(txn.get('senderPubKey')), + # 'signature': txn.get('signature'), + 'to_addr': encode_bech32_address(txn.get('toAddr')), + # 'version': to_int(txn.get('version')), **map_receipt(txn) } - block["fee"] = block.pop("gas_price") * block.pop("gas_used") - if block["receipt_status"] == 0: - block["value"] = 0 - block["hash"]="0x" + return block @@ -50,6 +54,8 @@ def map_receipt(txn): return None return { - 'receipt_status': int(receipt.get('success')), - 'gas_used': to_int(receipt.get('cumulative_gas')) + # 'accepted': receipt.get('accepted'), + 'success': receipt.get('success'), + 'cumulative_gas': receipt.get('cumulative_gas'), + # 'epoch_num': to_int(receipt.get('epoch_num')), } diff --git a/zilliqaetl/mappers/transition_mapper.py b/zilliqaetl/mappers/transition_mapper.py index 0f6d892..a24c31e 100644 --- a/zilliqaetl/mappers/transition_mapper.py +++ b/zilliqaetl/mappers/transition_mapper.py @@ -19,12 +19,13 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - -from zilliqaetl.utils.zilliqa_utils import to_int, encode_bech32_address import json + from pyzil.crypto import zilkey -# The above tag based code would only work with the below XSGD and XIDR address only +from zilliqaetl.utils.zilliqa_utils import to_int, json_dumps, iso_datetime_string, encode_bech32_address, \ + encode_bech32_address + SUPPORTED_TOKENS = ["zil1zu72vac254htqpg3mtywdcfm84l3dfd9qzww8t", "zil180v66mlw007ltdv8tq5t240y7upwgf7djklmwh", "zil1cuf78e3p37utekgk0gtcvd3hvkrqcgt06lrnty"] @@ -54,21 +55,24 @@ def map_transitions(tx_block, txn): msg = transition.get('msg') yield { 'type': 'transition', + 'token_address': '0x0000', 'block_number': tx_block.get('number'), 'block_timestamp': tx_block.get('timestamp'), - 'transaction_hash': '0x' + txn.get('ID'), - 'log_index': index, + 'transaction_id': txn.get('ID'), + # 'index': index, + # 'accepted': receipt.get('accepted'), 'addr': encode_bech32_address(transition.get('addr')), - 'depth': transition.get('depth'), - 'zill_amount': to_int(msg.get('_amount')), + # 'depth': transition.get('depth'), + 'amount': msg.get('_amount'), 'recipient': encode_bech32_address(msg.get('_recipient')), - 'call_type': msg.get('_tag'), - "receipt_status": int(receipt.get("success")), - "parameters": params_to_json(msg.get('params')) + # 'tag': msg.get('_tag'), + # 'params': [json_dumps(param) for param in msg.get('params')], } -def map_token_traces(tx_block, txn, txn_type): +def map_token_traces(tx_block, txn): + # TODO: Cleanup logic for adding 0x for token_transfers, only for this we have to add rest we are not adding + txn_type = 'token_transfer' receipt = txn.get('receipt') if receipt and receipt.get('transitions'): for index, transition in enumerate(receipt.get('transitions')): @@ -80,15 +84,15 @@ def map_token_traces(tx_block, txn, txn_type): 'block_number': tx_block.get('number'), 'block_timestamp': tx_block.get('timestamp'), 'transaction_hash': '0x' + txn.get('ID'), - 'log_index': index, - "receipt_status": int(receipt.get("success")), + # 'log_index': index, + # "receipt_status": int(receipt.get("success")), } - if txn_type == 'trace' and msg.get('_amount', "0") != "0": - data["value"] = msg.get('_amount') - data["from_address"] = encoded_addr - data["to_address"] = encoded_recipient - yield data - elif txn_type == 'token_transfer' and (msg.get('_amount', "0") == "0") and ( + # if txn_type == 'trace' and msg.get('_amount', "0") != "0": + # data["value"] = msg.get('_amount') + # data["from_address"] = encoded_addr + # data["to_address"] = encoded_recipient + # yield data + if (msg.get('_amount', "0") == "0") and ( (encoded_addr in SUPPORTED_TOKENS) or (encoded_recipient in SUPPORTED_TOKENS)): tag = msg.get('_tag') params = params_to_json(msg.get('params'), json_string=False) @@ -98,14 +102,14 @@ def map_token_traces(tx_block, txn, txn_type): data["to_address"] = params["to"] data["value"] = params["amount"] data["token_address"] = encoded_recipient - data["call_type"] = tag + # data["call_type"] = tag yield data elif (tag == "RecipientAcceptTransfer") and ({"sender", "recipient", "amount"} <= param_keys): data["from_address"] = params["sender"] data["to_address"] = params["recipient"] data["value"] = params["amount"] data["token_address"] = encoded_addr - data["call_type"] = tag + # data["call_type"] = tag yield data elif (tag == "Mint") and ("amount" in param_keys) and \ (("to" in param_keys) or ("recipient" in param_keys)): @@ -113,7 +117,7 @@ def map_token_traces(tx_block, txn, txn_type): data["to_address"] = params.get("to", params.get("recipient", None)) data["value"] = params["amount"] data["token_address"] = encoded_addr if (encoded_addr in SUPPORTED_TOKENS) else encoded_recipient - data["call_type"] = tag + # data["call_type"] = tag yield data elif (tag == "Burn") and ("amount" in param_keys) and \ (("to" in param_keys) or ("recipient" in param_keys)): @@ -121,5 +125,5 @@ def map_token_traces(tx_block, txn, txn_type): data["to_address"] = "zil1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq9yf6pz" data["value"] = params["amount"] data["token_address"] = encoded_addr if (encoded_addr in SUPPORTED_TOKENS) else encoded_recipient - data["call_type"] = tag + # data["call_type"] = tag yield data diff --git a/zilliqaetl/mappers/tx_block_mapper.py b/zilliqaetl/mappers/tx_block_mapper.py index aab06b1..3ff3639 100644 --- a/zilliqaetl/mappers/tx_block_mapper.py +++ b/zilliqaetl/mappers/tx_block_mapper.py @@ -28,22 +28,23 @@ def map_tx_block(raw_block): body = raw_block.get('body') block = { 'type': 'tx_block', + 'token_address': '0x0000', 'number': to_int(header.get('BlockNum')), - 'ds_block_number': to_int(header.get('DSBlockNum')), + # 'ds_block_number': to_int(header.get('DSBlockNum')), 'timestamp': iso_datetime_string(header.get('Timestamp')), - 'version': header.get('Version'), - 'gas_limit': to_int(header.get('GasLimit')), - 'gas_used': to_int(header.get('GasUsed')), - 'mb_info_hash': header.get('MbInfoHash'), - 'tx_leader_pub_key': header.get('MinerPubKey'), + # 'version': header.get('Version'), + # 'gas_limit': to_int(header.get('GasLimit')), + # 'gas_used': to_int(header.get('GasUsed')), + # 'mb_info_hash': header.get('MbInfoHash'), + # 'tx_leader_pub_key': header.get('MinerPubKey'), 'tx_leader_address': encode_bech32_pub_key(header.get('MinerPubKey')), - 'num_micro_blocks': to_int(header.get('NumMicroBlocks')), - 'num_transactions': to_int(header.get('NumTxns')), - 'prev_block_hash': header.get('PrevBlockHash'), - 'rewards': to_int(header.get('Rewards')), - 'state_delta_hash': header.get('StateDeltaHash'), - 'state_root_hash': header.get('StateRootHash'), - 'header_signature': body.get('HeaderSign') + # 'num_micro_blocks': to_int(header.get('NumMicroBlocks')), + # 'num_transactions': to_int(header.get('NumTxns')), + # 'prev_block_hash': header.get('PrevBlockHash'), + 'rewards': header.get('Rewards'), + # 'state_delta_hash': header.get('StateDeltaHash'), + # 'state_root_hash': header.get('StateRootHash'), + # 'header_signature': body.get('HeaderSign') } - return block + return block \ No newline at end of file diff --git a/zilliqaetl/streaming/zil_stream_adapter.py b/zilliqaetl/streaming/zil_stream_adapter.py index 41c3d9b..80af910 100644 --- a/zilliqaetl/streaming/zil_stream_adapter.py +++ b/zilliqaetl/streaming/zil_stream_adapter.py @@ -53,6 +53,13 @@ def export_all(self, start_block, end_block): export_transitions=False, item_exporter=self.item_exporter, ) + job = ExportTxBlocksJob( + start_block=start_block, + end_block=end_block, + zilliqa_api=self.api, + max_workers=self.max_workers, + item_exporter=self.item_exporter, + ) job.run() def close(self): diff --git a/zilliqaetl/utils/zilliqa_utils.py b/zilliqaetl/utils/zilliqa_utils.py index c74ccda..4ac87d8 100644 --- a/zilliqaetl/utils/zilliqa_utils.py +++ b/zilliqaetl/utils/zilliqa_utils.py @@ -19,7 +19,8 @@ def iso_datetime_string(timestamp): if isinstance(timestamp, str): timestamp = int(timestamp) - return datetime.utcfromtimestamp(timestamp / 1000000).strftime('%Y-%m-%d %H:%M:%S') + return int(timestamp / 1000000) + # return datetime.utcfromtimestamp(timestamp / 1000000).strftime('%Y-%m-%d %H:%M:%S') def encode_bech32_pub_key(pub_key): From 385b34c887a67fd475355a5d63671c16c01c30ef Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 19:32:34 +0530 Subject: [PATCH 09/11] code working now --- zilliqaetl/jobs/export_tx_blocks_job.py | 2 +- zilliqaetl/mappers/transaction_mapper.py | 2 +- zilliqaetl/mappers/tx_block_mapper.py | 4 ++-- zilliqaetl/streaming/zil_stream_adapter.py | 1 - 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/zilliqaetl/jobs/export_tx_blocks_job.py b/zilliqaetl/jobs/export_tx_blocks_job.py index cdeda46..640d18f 100644 --- a/zilliqaetl/jobs/export_tx_blocks_job.py +++ b/zilliqaetl/jobs/export_tx_blocks_job.py @@ -78,7 +78,7 @@ def _export_batch(self, block_number_batch): for number in block_number_batch: tx_block = map_tx_block(self.zilliqa_service.get_tx_block(number)) - txns = list(self.zilliqa_service.get_transactions(number)) if tx_block.get('num_transactions') > 0 else [] + txns = list(self.zilliqa_service.get_transactions(number)) if tx_block.pop('num_transactions') > 0 else [] if self._should_export_transactions(): for txn in txns: items.append(map_transaction(tx_block, txn)) diff --git a/zilliqaetl/mappers/transaction_mapper.py b/zilliqaetl/mappers/transaction_mapper.py index a05e08f..6aa524b 100644 --- a/zilliqaetl/mappers/transaction_mapper.py +++ b/zilliqaetl/mappers/transaction_mapper.py @@ -28,7 +28,7 @@ def map_transaction(tx_block, txn): block = { 'type': 'transaction', 'token_address': '0x0000', - 'id': {txn.get("ID")}, + 'id': txn.get("ID"), 'block_number': tx_block.get('number'), 'block_timestamp': tx_block.get('timestamp'), 'amount': txn.get('amount'), diff --git a/zilliqaetl/mappers/tx_block_mapper.py b/zilliqaetl/mappers/tx_block_mapper.py index 3ff3639..8dfcf45 100644 --- a/zilliqaetl/mappers/tx_block_mapper.py +++ b/zilliqaetl/mappers/tx_block_mapper.py @@ -25,7 +25,7 @@ def map_tx_block(raw_block): header = raw_block.get('header') - body = raw_block.get('body') + # body = raw_block.get('body') block = { 'type': 'tx_block', 'token_address': '0x0000', @@ -39,7 +39,7 @@ def map_tx_block(raw_block): # 'tx_leader_pub_key': header.get('MinerPubKey'), 'tx_leader_address': encode_bech32_pub_key(header.get('MinerPubKey')), # 'num_micro_blocks': to_int(header.get('NumMicroBlocks')), - # 'num_transactions': to_int(header.get('NumTxns')), + 'num_transactions': to_int(header.get('NumTxns')), # 'prev_block_hash': header.get('PrevBlockHash'), 'rewards': header.get('Rewards'), # 'state_delta_hash': header.get('StateDeltaHash'), diff --git a/zilliqaetl/streaming/zil_stream_adapter.py b/zilliqaetl/streaming/zil_stream_adapter.py index 80af910..8846b96 100644 --- a/zilliqaetl/streaming/zil_stream_adapter.py +++ b/zilliqaetl/streaming/zil_stream_adapter.py @@ -48,7 +48,6 @@ def export_all(self, start_block, end_block): export_exceptions=False, export_event_logs=False, export_transactions=True, - export_traces=True, export_token_transfers=True, export_transitions=False, item_exporter=self.item_exporter, From f1eb0dfd091332e80f13881aa40352bc88ff021e Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Mon, 11 Sep 2023 19:56:35 +0530 Subject: [PATCH 10/11] remove num_present_transactions --- zilliqaetl/jobs/export_tx_blocks_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zilliqaetl/jobs/export_tx_blocks_job.py b/zilliqaetl/jobs/export_tx_blocks_job.py index 640d18f..bdd8bb8 100644 --- a/zilliqaetl/jobs/export_tx_blocks_job.py +++ b/zilliqaetl/jobs/export_tx_blocks_job.py @@ -107,7 +107,7 @@ def _export_batch(self, block_number_batch): # token_transfers = [token_transfer for token_transfer in token_transfers if # token_transfer["log_index"] in unique_token_transfers.keys()] items.extend(token_transfers) - tx_block['num_present_transactions'] = len(txns) + # tx_block['num_present_transactions'] = len(txns) items.append(tx_block) for item in items: From eb300fd7efad4b3e56da46695e694d0daefec5ea Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Wed, 20 Sep 2023 13:10:16 +0530 Subject: [PATCH 11/11] none params handle --- zilliqaetl/mappers/event_log_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zilliqaetl/mappers/event_log_mapper.py b/zilliqaetl/mappers/event_log_mapper.py index 0fc71fe..c432278 100644 --- a/zilliqaetl/mappers/event_log_mapper.py +++ b/zilliqaetl/mappers/event_log_mapper.py @@ -35,5 +35,5 @@ def map_event_logs(tx_block, txn): 'index': index, 'address': encode_bech32_address(event_log.get('address')), 'event_name': event_log.get('_eventname'), - 'params': [json_dumps(param) for param in event_log.get('params')] + 'params': [json_dumps(param) for param in event_log.get('params', [])] }