From 83eaa87d489dff6769972c9b18376e9ba39d86cf Mon Sep 17 00:00:00 2001 From: Seki_Yoshinori Date: Wed, 19 Jun 2019 12:20:03 +0000 Subject: [PATCH] Add enrich option --- bitcoinetl/cli/export_all.py | 5 +++-- bitcoinetl/jobs/export_all.py | 22 +++++++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/bitcoinetl/cli/export_all.py b/bitcoinetl/cli/export_all.py index 48257ad..1b88071 100644 --- a/bitcoinetl/cli/export_all.py +++ b/bitcoinetl/cli/export_all.py @@ -95,7 +95,8 @@ def get_partitions(start, end, partition_batch_size, provider_uri): @click.option('-B', '--export-batch-size', default=1, type=int, help='The number of requests in JSON RPC batches.') @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain') -def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain): +@click.option('--enrich', default=False, type=bool, help="Enable filling in transactions inputs fields.") +def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich): """Exports all data for a range of blocks.""" do_export_all(chain, get_partitions(start, end, partition_batch_size, provider_uri), - output_dir, provider_uri, max_workers, export_batch_size) + output_dir, provider_uri, max_workers, export_batch_size, enrich) diff --git a/bitcoinetl/jobs/export_all.py b/bitcoinetl/jobs/export_all.py index f228eb3..8a33591 100644 --- a/bitcoinetl/jobs/export_all.py +++ b/bitcoinetl/jobs/export_all.py @@ -21,14 +21,17 @@ # SOFTWARE. import datetime +import json import logging import os import shutil from time import time from bitcoinetl.jobs.export_blocks_job import ExportBlocksJob +from bitcoinetl.jobs.enrich_transactions import EnrichTransactionsJob from bitcoinetl.jobs.exporters.blocks_and_transactions_item_exporter import blocks_and_transactions_item_exporter from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc +from blockchainetl.file_utils import smart_open from blockchainetl.logging_utils import logging_basic_config from blockchainetl.misc_utils import filter_items from blockchainetl.thread_local_proxy import ThreadLocalProxy @@ -37,7 +40,7 @@ logger = logging.getLogger('export_all') -def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_size): +def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich): for batch_start_block, batch_end_block, partition_dir, *args in partitions: # # # start # # # @@ -76,6 +79,10 @@ def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_s transactions_output_dir=transactions_output_dir, file_name_suffix=file_name_suffix, ) + enriched_transactions_file = '{transactions_output_dir}/enriched_transactions_{file_name_suffix}.json'.format( + transactions_output_dir=transactions_output_dir, + file_name_suffix=file_name_suffix, + ) logger.info('Exporting blocks {block_range} to {blocks_file}'.format( block_range=block_range, blocks_file=blocks_file, @@ -97,6 +104,19 @@ def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_s export_transactions=transactions_file is not None) job.run() + if enrich == True: + with smart_open(transactions_file, 'r') as transactions_file: + job = EnrichTransactionsJob( + transactions_iterable = (json.loads(transaction) for transaction in transactions_file), + batch_size = batch_size, + bitcoin_rpc = ThreadLocalProxy(lambda: BitcoinRpc(provider_uri)), + max_workers = max_workers, + item_exporter = blocks_and_transactions_item_exporter(None, enriched_transactions_file), + chain = chain + ) + job.run() + + if args is not None and len(args) > 0: date = args[0] logger.info('Filtering blocks {blocks_file} by date {date}'.format(