From 72661aedf6135415796baa456e5e9756e59f3c5c Mon Sep 17 00:00:00 2001 From: FeSens Date: Tue, 12 Jul 2022 16:55:51 -0300 Subject: [PATCH 1/7] add prefix to the blockchain exporters --- ethereumetl/cli/stream.py | 4 ++- .../streaming/item_exporter_creator.py | 30 +++++++++---------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 1ff978e04..b57553be9 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -37,6 +37,8 @@ @click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') +@click.option('-P', '--prefix', default='', show_default=True, type=str, + help='The prefix of the output files. Can be useful when streaming multiple blockchains.') @click.option('-o', '--output', type=str, help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; ' 'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum; ' @@ -68,7 +70,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit streamer_adapter = EthStreamerAdapter( batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), - item_exporter=create_item_exporters(output), + item_exporter=create_item_exporters(output, prefix), batch_size=batch_size, max_workers=max_workers, entity_types=entity_types diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 400ad7efd..cca205437 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -24,7 +24,7 @@ from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter -def create_item_exporters(outputs): +def create_item_exporters(outputs, prefix): split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] item_exporters = [create_item_exporter(output) for output in split_outputs] @@ -38,13 +38,13 @@ def create_item_exporter(output): enable_message_ordering = 'sorted' in output or 'ordered' in output item_exporter = GooglePubSubItemExporter( item_type_to_topic_mapping={ - 'block': output + '.blocks', - 'transaction': output + '.transactions', - 'log': output + '.logs', - 'token_transfer': output + '.token_transfers', - 'trace': output + '.traces', - 'contract': output + '.contracts', - 'token': output + '.tokens', + 'block': output + f'.{prefix}blocks', + 'transaction': output + f'.{prefix}transactions', + 'log': output + f'.{prefix}logs', + 'token_transfer': output + f'.{prefix}token_transfers', + 'trace': output + f'.{prefix}traces', + 'contract': output + f'.{prefix}contracts', + 'token': output + f'.{prefix}tokens', }, message_attributes=('item_id', 'item_timestamp'), batch_max_bytes=1024 * 1024 * 5, @@ -80,13 +80,13 @@ def create_item_exporter(output): elif item_exporter_type == ItemExporterType.KAFKA: from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ - 'block': 'blocks', - 'transaction': 'transactions', - 'log': 'logs', - 'token_transfer': 'token_transfers', - 'trace': 'traces', - 'contract': 'contracts', - 'token': 'tokens', + 'block': f'{prefix}blocks', + 'transaction': f'{prefix}transactions', + 'log': f'{prefix}logs', + 'token_transfer': f'{prefix}token_transfers', + 'trace': f'{prefix}traces', + 'contract': f'{prefix}contracts', + 'token': f'{prefix}tokens', }) else: From e94f8cd3e04af4c216efbbc8b4342dc1f744f5ef Mon Sep 17 00:00:00 2001 From: FeSens Date: Tue, 12 Jul 2022 17:17:07 -0300 Subject: [PATCH 2/7] revert --- ethereumetl/cli/stream.py | 4 +-- .../streaming/item_exporter_creator.py | 30 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index b57553be9..1ff978e04 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -37,8 +37,6 @@ @click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') -@click.option('-P', '--prefix', default='', show_default=True, type=str, - help='The prefix of the output files. Can be useful when streaming multiple blockchains.') @click.option('-o', '--output', type=str, help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; ' 'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum; ' @@ -70,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit streamer_adapter = EthStreamerAdapter( batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), - item_exporter=create_item_exporters(output, prefix), + item_exporter=create_item_exporters(output), batch_size=batch_size, max_workers=max_workers, entity_types=entity_types diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index cca205437..400ad7efd 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -24,7 +24,7 @@ from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter -def create_item_exporters(outputs, prefix): +def create_item_exporters(outputs): split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] item_exporters = [create_item_exporter(output) for output in split_outputs] @@ -38,13 +38,13 @@ def create_item_exporter(output): enable_message_ordering = 'sorted' in output or 'ordered' in output item_exporter = GooglePubSubItemExporter( item_type_to_topic_mapping={ - 'block': output + f'.{prefix}blocks', - 'transaction': output + f'.{prefix}transactions', - 'log': output + f'.{prefix}logs', - 'token_transfer': output + f'.{prefix}token_transfers', - 'trace': output + f'.{prefix}traces', - 'contract': output + f'.{prefix}contracts', - 'token': output + f'.{prefix}tokens', + 'block': output + '.blocks', + 'transaction': output + '.transactions', + 'log': output + '.logs', + 'token_transfer': output + '.token_transfers', + 'trace': output + '.traces', + 'contract': output + '.contracts', + 'token': output + '.tokens', }, message_attributes=('item_id', 'item_timestamp'), batch_max_bytes=1024 * 1024 * 5, @@ -80,13 +80,13 @@ def create_item_exporter(output): elif item_exporter_type == ItemExporterType.KAFKA: from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ - 'block': f'{prefix}blocks', - 'transaction': f'{prefix}transactions', - 'log': f'{prefix}logs', - 'token_transfer': f'{prefix}token_transfers', - 'trace': f'{prefix}traces', - 'contract': f'{prefix}contracts', - 'token': f'{prefix}tokens', + 'block': 'blocks', + 'transaction': 'transactions', + 'log': 'logs', + 'token_transfer': 'token_transfers', + 'trace': 'traces', + 'contract': 'contracts', + 'token': 'tokens', }) else: From 091457c0101f9371f2cbddd1874398398443bdaf Mon Sep 17 00:00:00 2001 From: FeSens Date: Tue, 12 Jul 2022 17:32:36 -0300 Subject: [PATCH 3/7] allow to prefix kafka topics directly in the url --- blockchainetl/jobs/exporters/kafka_exporter.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index ac75e242b..5e1f8609c 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -13,14 +13,21 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) self.connection_url = self.get_connection_url(output) - print(self.connection_url) + self.topic_prefix = self.get_topic_prefix(output) + print(self.connection_url, self.topic_prefix) self.producer = KafkaProducer(bootstrap_servers=self.connection_url) def get_connection_url(self, output): try: return output.split('/')[1] except KeyError: - raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"') + raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/"') + + def get_topic_prefix(self, output): + try: + return output.split('/')[2] + "." + except KeyError: + return '' def open(self): pass @@ -34,7 +41,7 @@ def export_item(self, item): if item_type is not None and item_type in self.item_type_to_topic_mapping: data = json.dumps(item).encode('utf-8') print(data) - return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data) + return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], value=data) else: logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) From e26e8b6748f8d2be2f96b80ebcb893889a1ca251 Mon Sep 17 00:00:00 2001 From: FeSens Date: Tue, 12 Jul 2022 17:41:32 -0300 Subject: [PATCH 4/7] add documentation for kafka topics prefix --- docs/commands.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/commands.md b/docs/commands.md index 2be620cda..e8b1124d7 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -213,7 +213,7 @@ e.g. `-e block,transaction,log,token_transfer,trace,contract,token`. - For Postgres: `--output=postgresql+pg8000://:@:/`, e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`. - For GCS: `--output=gs://`. Make sure to install and initialize `gcloud` cli. - - For Kafka: `--output=kafka/:`, e.g. `--output=kafka/127.0.0.1:9092` + - For Kafka: `--output=kafka/:/`, e.g. `--output=kafka/127.0.0.1:9092` or `--output=kafka/127.0.0.1:9092/crypto_ethereum`. - Those output types can be combined with a comma e.g. `--output=gs://,projects//topics/crypto_ethereum` The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema) From 387818a3aa8309116733457160fe798763f82eed Mon Sep 17 00:00:00 2001 From: FeSens Date: Sat, 16 Jul 2022 15:24:18 -0300 Subject: [PATCH 5/7] fix error type --- blockchainetl/jobs/exporters/kafka_exporter.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index 5e1f8609c..f4796cd10 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -10,7 +10,7 @@ class KafkaItemExporter: def __init__(self, output, item_type_to_topic_mapping, converters=()): - self.item_type_to_topic_mapping = item_type_to_topic_mapping + self.item_type_to_topπic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) self.connection_url = self.get_connection_url(output) self.topic_prefix = self.get_topic_prefix(output) @@ -20,13 +20,13 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()): def get_connection_url(self, output): try: return output.split('/')[1] - except KeyError: + except IndexError: raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/"') def get_topic_prefix(self, output): try: return output.split('/')[2] + "." - except KeyError: + except IndexError: return '' def open(self): @@ -52,7 +52,6 @@ def convert_items(self, items): def close(self): pass - def group_by_item_type(items): result = collections.defaultdict(list) for item in items: From 1c5d7dddfeaf81ec1be9e493a052eea9632fdad2 Mon Sep 17 00:00:00 2001 From: FeSens Date: Sat, 16 Jul 2022 15:28:40 -0300 Subject: [PATCH 6/7] insert data in kafka through transactions --- .../jobs/exporters/kafka_exporter.py | 23 +++++++++++-------- setup.py | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index f4796cd10..eb0ccf67d 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -2,7 +2,7 @@ import json import logging -from kafka import KafkaProducer +from confluent_kafka import Producer from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter @@ -10,12 +10,15 @@ class KafkaItemExporter: def __init__(self, output, item_type_to_topic_mapping, converters=()): - self.item_type_to_topπic_mapping = item_type_to_topic_mapping + self.item_type_to_topic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) self.connection_url = self.get_connection_url(output) self.topic_prefix = self.get_topic_prefix(output) print(self.connection_url, self.topic_prefix) - self.producer = KafkaProducer(bootstrap_servers=self.connection_url) + self.producer = Producer({ + 'bootstrap.servers': self.connection_url, + 'transactional.id': 'ethereumetl' + }) def get_connection_url(self, output): try: @@ -30,18 +33,20 @@ def get_topic_prefix(self, output): return '' def open(self): - pass + self.producer.init_transactions() def export_items(self, items): + self.producer.begin_transaction() for item in items: self.export_item(item) - + self.producer.commit_transaction() + def export_item(self, item): item_type = item.get('type') if item_type is not None and item_type in self.item_type_to_topic_mapping: data = json.dumps(item).encode('utf-8') - print(data) - return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], value=data) + logging.debug(data) + return self.producer.produce(self.topic_prefix + self.item_type_to_topic_mapping[item_type], data) else: logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) @@ -50,8 +55,8 @@ def convert_items(self, items): yield self.converter.convert_item(item) def close(self): - pass - + pass + def group_by_item_type(items): result = collections.defaultdict(list) for item in items: diff --git a/setup.py b/setup.py index 3804545c1..8447f52de 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,7 @@ def read(fname): 'timeout-decorator==0.4.1', 'google-cloud-pubsub==2.1.0', 'google-cloud-storage==1.33.0', - 'kafka-python==2.0.2', + 'confluent-kafka==1.9.0', 'sqlalchemy==1.4', 'pg8000==1.16.6', # This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust, From 8ae53c9be10d97e037f630338004e4223d26a438 Mon Sep 17 00:00:00 2001 From: FeSens Date: Fri, 22 Jul 2022 14:41:57 -0300 Subject: [PATCH 7/7] add idepotence --- blockchainetl/jobs/exporters/kafka_exporter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index eb0ccf67d..32e6a07ba 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -17,7 +17,8 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()): print(self.connection_url, self.topic_prefix) self.producer = Producer({ 'bootstrap.servers': self.connection_url, - 'transactional.id': 'ethereumetl' + 'transactional.id': 'ethereumetl', + 'enable.idempotence': True, }) def get_connection_url(self, output):