diff --git a/blockchainetl/jobs/exporters/mysql_item_exporter.py b/blockchainetl/jobs/exporters/mysql_item_exporter.py new file mode 100644 index 000000000..275a658e3 --- /dev/null +++ b/blockchainetl/jobs/exporters/mysql_item_exporter.py @@ -0,0 +1,48 @@ +import collections + +from sqlalchemy import create_engine + +from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter + + +class MySQLItemExporter: + + def __init__(self, connection_url, item_type_to_insert_stmt_mapping, converters=(), print_sql=True): + self.connection_url = connection_url + self.item_type_to_insert_stmt_mapping = item_type_to_insert_stmt_mapping + self.converter = CompositeItemConverter(converters) + self.print_sql = print_sql + + self.engine = self.create_engine() + + def open(self): + pass + + def export_items(self, items): + items_grouped_by_type = group_by_item_type(items) + + for item_type, insert_stmt in self.item_type_to_insert_stmt_mapping.items(): + item_group = items_grouped_by_type.get(item_type) + if item_group: + connection = self.engine.connect() + converted_items = list(self.convert_items(item_group)) + connection.execute(insert_stmt, converted_items) + + def convert_items(self, items): + for item in items: + yield self.converter.convert_item(item) + + def create_engine(self): + engine = create_engine(self.connection_url, echo=self.print_sql, pool_recycle=3600) + return engine + + def close(self): + pass + + +def group_by_item_type(items): + result = collections.defaultdict(list) + for item in items: + result[item.get('type')].append(item) + + return result diff --git a/blockchainetl/streaming/mysql_utils.py b/blockchainetl/streaming/mysql_utils.py new file mode 100644 index 000000000..29d330072 --- /dev/null +++ b/blockchainetl/streaming/mysql_utils.py @@ -0,0 +1,7 @@ +from sqlalchemy.dialects.mysql import insert + + +def create_insert_statement_for_table(table): + insert_stmt = insert(table) + + return insert_stmt diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 400ad7efd..710931dec 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -71,6 +71,27 @@ def create_item_exporter(output): }, converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(), ListFieldItemConverter('topics', 'topic', fill=4)]) + elif item_exporter_type == ItemExporterType.MYSQL: + from blockchainetl.jobs.exporters.mysql_item_exporter import MySQLItemExporter + from blockchainetl.streaming.mysql_utils import create_insert_statement_for_table + from blockchainetl.jobs.exporters.converters.unix_timestamp_item_converter import UnixTimestampItemConverter + from blockchainetl.jobs.exporters.converters.int_to_decimal_item_converter import IntToDecimalItemConverter + from blockchainetl.jobs.exporters.converters.list_field_item_converter import ListFieldItemConverter + from ethereumetl.streaming.mysql_tables import BLOCKS, TRANSACTIONS, LOGS, TOKEN_TRANSFERS, TRACES, TOKENS, CONTRACTS + + item_exporter = MySQLItemExporter( + output, item_type_to_insert_stmt_mapping={ + 'block': create_insert_statement_for_table(BLOCKS), + 'transaction': create_insert_statement_for_table(TRANSACTIONS), + 'log': create_insert_statement_for_table(LOGS), + 'token_transfer': create_insert_statement_for_table(TOKEN_TRANSFERS), + 'trace': create_insert_statement_for_table(TRACES), + 'token': create_insert_statement_for_table(TOKENS), + 'contract': create_insert_statement_for_table(CONTRACTS), + }, + converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(), + ListFieldItemConverter('topics', 'topic', fill=4)]) + elif item_exporter_type == ItemExporterType.GCS: from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter bucket, path = get_bucket_and_path_from_gcs_output(output) @@ -113,6 +134,8 @@ def determine_item_exporter_type(output): return ItemExporterType.KAFKA elif output is not None and output.startswith('postgresql'): return ItemExporterType.POSTGRES + elif output is not None and output.startswith('mysql'): + return ItemExporterType.MYSQL elif output is not None and output.startswith('gs://'): return ItemExporterType.GCS elif output is None or output == 'console': @@ -124,6 +147,7 @@ def determine_item_exporter_type(output): class ItemExporterType: PUBSUB = 'pubsub' POSTGRES = 'postgres' + MYSQL = 'mysql' GCS = 'gcs' CONSOLE = 'console' KAFKA = 'kafka' diff --git a/ethereumetl/streaming/mysql_tables.py b/ethereumetl/streaming/mysql_tables.py new file mode 100644 index 000000000..cd577d476 --- /dev/null +++ b/ethereumetl/streaming/mysql_tables.py @@ -0,0 +1,129 @@ + +from sqlalchemy import Table, Column, Integer, BigInteger, Boolean, String, Numeric, \ + MetaData, PrimaryKeyConstraint, VARCHAR, DATETIME + +metadata = MetaData() + +# SQL schema is here https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema + +BLOCKS = Table( + 'blocks', metadata, + Column('timestamp', DATETIME), + Column('number', BigInteger), + Column('hash', VARCHAR, primary_key=True), + Column('parent_hash', VARCHAR), + Column('nonce', VARCHAR), + Column('sha3_uncles', VARCHAR), + Column('logs_bloom', VARCHAR), + Column('transactions_root', VARCHAR), + Column('state_root', VARCHAR), + Column('receipts_root', VARCHAR), + Column('miner', VARCHAR), + Column('difficulty', Numeric(38)), + Column('total_difficulty', Numeric(38)), + Column('size', BigInteger), + Column('extra_data', VARCHAR), + Column('gas_limit', BigInteger), + Column('gas_used', BigInteger), + Column('transaction_count', BigInteger), + Column('base_fee_per_gas', BigInteger), +) + +TRANSACTIONS = Table( + 'transactions', metadata, + Column('hash', VARCHAR, primary_key=True), + Column('nonce', BigInteger), + Column('transaction_index', BigInteger), + Column('from_address', VARCHAR), + Column('to_address', VARCHAR), + Column('value', Numeric(38)), + Column('gas', BigInteger), + Column('gas_price', BigInteger), + Column('input', VARCHAR), + Column('receipt_cumulative_gas_used', BigInteger), + Column('receipt_gas_used', BigInteger), + Column('receipt_contract_address', VARCHAR), + Column('receipt_root', VARCHAR), + Column('receipt_status', BigInteger), + Column('block_timestamp', DATETIME), + Column('block_number', BigInteger), + Column('block_hash', VARCHAR), + Column('max_fee_per_gas', BigInteger), + Column('max_priority_fee_per_gas', BigInteger), + Column('transaction_type', BigInteger), + Column('receipt_effective_gas_price', BigInteger), +) + +LOGS = Table( + 'logs', metadata, + Column('log_index', BigInteger, primary_key=True), + Column('transaction_hash', VARCHAR, primary_key=True), + Column('transaction_index', BigInteger), + Column('address', VARCHAR), + Column('data', VARCHAR), + Column('topic0', VARCHAR), + Column('topic1', VARCHAR), + Column('topic2', VARCHAR), + Column('topic3', VARCHAR), + Column('block_timestamp', DATETIME), + Column('block_number', BigInteger), + Column('block_hash', VARCHAR), +) + +TOKEN_TRANSFERS = Table( + 'token_transfers', metadata, + Column('token_address', VARCHAR), + Column('from_address', VARCHAR), + Column('to_address', VARCHAR), + Column('value', Numeric(18)), + Column('transaction_hash', VARCHAR, primary_key=True), + Column('log_index', BigInteger, primary_key=True), + Column('block_timestamp', DATETIME), + Column('block_number', BigInteger), + Column('block_hash', VARCHAR), +) + +TRACES = Table( + 'traces', metadata, + Column('transaction_hash', VARCHAR), + Column('transaction_index', BigInteger), + Column('from_address', VARCHAR), + Column('to_address', VARCHAR), + Column('value', Numeric(38)), + Column('input', VARCHAR), + Column('output', VARCHAR), + Column('trace_type', VARCHAR), + Column('call_type', VARCHAR), + Column('reward_type', VARCHAR), + Column('gas', BigInteger), + Column('gas_used', BigInteger), + Column('subtraces', BigInteger), + Column('trace_address', VARCHAR), + Column('error', VARCHAR), + Column('status', Integer), + Column('block_timestamp', DATETIME), + Column('block_number', BigInteger), + Column('block_hash', VARCHAR), + Column('trace_id', VARCHAR, primary_key=True), +) + +TOKENS = Table( + 'tokens', metadata, + Column('address', VARCHAR(42), primary_key=True), + Column('name', String), + Column('symbol', String), + Column('decimals', Integer), + Column('function_sighashes', String), + Column('total_supply', Numeric(18)), + Column('block_number', BigInteger, primary_key=True), +) + +CONTRACTS = Table( + 'contracts', metadata, + Column('address', VARCHAR(42), primary_key=True), + Column('bytecode', VARCHAR), + Column('function_sighashes', String), + Column('is_erc20', Boolean), + Column('is_erc721', Boolean), + Column('block_number', BigInteger, primary_key=True), +)