Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for ClickHouse as export target #422

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions blockchainetl/jobs/exporters/clickhouse_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import collections
import logging
from typing import List
from urllib.parse import urlparse, parse_qs
from dataclasses import dataclass

from clickhouse_connect.driver.exceptions import DatabaseError
from clickhouse_connect.driver.models import ColumnDef

import clickhouse_connect


@dataclass
class Table:
column_names: List[str]
column_types: List[str]


MIN_INSERT_BATCH_SIZE = 1000


class ClickHouseItemExporter:

def __init__(self, connection_url, item_type_to_table_mapping):
parsed = urlparse(connection_url)
self.username = parsed.username
self.password = parsed.password
self.host = parsed.hostname
self.port = parsed.port
self.database = parsed.path[1:].split('/')[0] if parsed.path else 'default'
self.settings = dict(parse_qs(parsed.query))
self.item_type_to_table_mapping = item_type_to_table_mapping
self.connection = self.create_connection()
self.tables = {}
self.cached_batches = {}
## for each time grab the schema to save a prefetch of the columns on each insert
for table in self.item_type_to_table_mapping.values():
try:
describe_result = self.connection.query(f'DESCRIBE TABLE {self.database}.{table}')
column_defs = [ColumnDef(**row) for row in describe_result.named_results()
if row['default_type'] not in ('ALIAS', 'MATERIALIZED')]
column_names = [cd.name for cd in column_defs]
column_types = [cd.ch_type for cd in column_defs]
self.tables[table] = Table(column_names, column_types)
self.cached_batches[table] = []
except DatabaseError as de:
# this may not be critical since the user may not be exporting the type and hence the table likely
# won't exist
logging.warning('Unable to read columns for table "{}". This column will not be exported.'.format(table))
logging.debug(de)
pass

def open(self):
pass

def export_items(self, items):
items_grouped_by_table = self.group_items_by_table(items)
for item_type, table in self.item_type_to_table_mapping.items():
table_data = items_grouped_by_table.get(table)
if table_data:
cached = self.cached_batches[table]
batch = cached + table_data
if len(batch) >= MIN_INSERT_BATCH_SIZE:
logging.info('Flushing batch for "{}" with {} items.'.format(item_type, len(batch)))
self.connection.insert(table, data=table_data, column_names=self.tables[table].column_names,
column_types=self.tables[table].column_types, database=self.database)
self.cached_batches[table] = []
else:
# insufficient size, so cache
logging.debug(
'Batch for "{}" is too small to be flushed ({}<{}), caching.'.format(item_type, len(batch),
MIN_INSERT_BATCH_SIZE))
self.cached_batches[table] = batch

def convert_items(self, items):
for item in items:
yield self.converter.convert_item(item)

def create_connection(self):
return clickhouse_connect.create_client(host=self.host, port=self.port, username=self.username,
password=self.password, database=self.database,
settings=self.settings)

def close(self):
# clear the cache
logging.info("Flushing remaining batches")
for table, batch in self.cached_batches.items():
self.connection.insert(table, data=batch, column_names=self.tables[table].column_names,
column_types=self.tables[table].column_types, database=self.database)
self.connection.close()

def group_items_by_table(self, items):
results = collections.defaultdict(list)
for item in items:
type = item.get('type')
if type in self.item_type_to_table_mapping:
table = self.item_type_to_table_mapping[type]
if table not in self.tables:
logging.error('Table "{}" does not exist. Type "{}" cannot be exported.'.format(table, type))
result = []
# only insert the columns which we have in the database
for column in self.tables[table].column_names:
result.append(item.get(column))
results[table].append(result)
else:
logging.warning('ClickHouse exporter ignoring {} items as type is not currently supported.'.format(type))
return results
15 changes: 14 additions & 1 deletion ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,17 @@ def create_item_exporter(output):
'contract': 'contracts',
'token': 'tokens',
})

elif item_exporter_type == ItemExporterType.CLICKHOUSE:
from blockchainetl.jobs.exporters.clickhouse_exporter import ClickHouseItemExporter
item_exporter = ClickHouseItemExporter(output, item_type_to_table_mapping={
'block': 'blocks',
'transaction': 'transactions',
'log': 'logs',
'token_transfer': 'token_transfers',
'trace': 'traces',
'contract': 'contracts',
'token': 'tokens',
})
else:
raise ValueError('Unable to determine item exporter type for output ' + output)

Expand Down Expand Up @@ -122,6 +132,8 @@ def determine_item_exporter_type(output):
return ItemExporterType.POSTGRES
elif output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is not None and output.startswith('clickhouse'):
return ItemExporterType.CLICKHOUSE
elif output is None or output == 'console':
return ItemExporterType.CONSOLE
else:
Expand All @@ -135,4 +147,5 @@ class ItemExporterType:
GCS = 'gcs'
CONSOLE = 'console'
KAFKA = 'kafka'
CLICKHOUSE = 'clickhouse'
UNKNOWN = 'unknown'
23 changes: 23 additions & 0 deletions schemas/clickhouse/blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE TABLE IF NOT EXISTS blocks
(
`number` Int64,
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` Int64,
`extra_data` String,
`gas_limit` Int64,
`gas_used` Int64,
`timestamp` Int64,
`transaction_count` Int64,
`base_fee_per_gas` Int64
)
ENGINE = MergeTree() ORDER BY (timestamp)
17 changes: 17 additions & 0 deletions schemas/clickhouse/transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS transactions (
`hash` String,
`nonce` Int64,
`block_hash` String,
`block_number` Int64,
`transaction_index` Int64,
`from_address` String,
`to_address` Nullable(String),
`value` Decimal(38, 0),
`gas` Int64,
`gas_price` Int64,
`input` String,
`block_timestamp` Int64,
`max_fee_per_gas` Nullable(Int64),
`max_priority_fee_per_gas` Nullable(Int64),
`transaction_type` Int64
) ENGINE = MergeTree() ORDER BY (block_timestamp)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def read(fname):
'google-cloud-pubsub==2.13.0',
'google-cloud-storage==1.33.0',
'kafka-python==2.0.2',
'clickhouse-connect==0.4.7',
'sqlalchemy==1.4',
'pg8000==1.16.6',
# This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,
Expand Down