diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 4e57f53..68bf8e1 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -179,46 +179,55 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non def insert(self, table_name, records, table_structure: TableStructure = None): current_version = self.get_last_used_version(table_name) + 1 - records_to_insert = [] - for record in records: - new_record = [] - for i, e in enumerate(record): + if not records: + return + + if not isinstance(records, list): + records = list(records) + + process_indices = [] + if table_structure: + for i, field in enumerate(table_structure.fields): + if (("DateTime" in field.field_type or "Date32" in field.field_type) and "Nullable" not in field.field_type): + process_indices.append(i) + else: + first_record = records[0] + for i, value in enumerate(first_record): + if isinstance(value, (datetime.date, datetime.datetime)): + process_indices.append(i) + + for j, record in enumerate(records): + if not isinstance(record, list): + record = list(record) + records[j] = record + + for i in process_indices: + e = record[i] + if isinstance(e, datetime.date) and not isinstance(e, datetime.datetime): try: - e = datetime.datetime.combine(e, datetime.time()) + record[i] = datetime.datetime.combine(e, datetime.time()) except ValueError: - e = datetime.datetime(1970, 1, 1) - if isinstance(e, datetime.datetime): + record[i] = datetime.datetime(1970, 1, 1) + elif isinstance(e, datetime.datetime): try: e.timestamp() - except ValueError: - e = datetime.datetime(1970, 1, 1) - if table_structure is not None: - field: TableField = table_structure.fields[i] - is_datetime = ( - ('DateTime' in field.field_type) or - ('Date32' in field.field_type) - ) - if is_datetime and 'Nullable' not in field.field_type: - try: - e.timestamp() - except (ValueError, AttributeError): - e = datetime.datetime(1970, 1, 1) - new_record.append(e) - record = new_record - - records_to_insert.append(tuple(record) + (current_version,)) + except (ValueError, AttributeError): + record[i] = datetime.datetime(1970, 1, 1) + + record.append(current_version) current_version += 1 - full_table_name = f'`table_name`' - if '.' not in full_table_name: + if '.' in table_name: + full_table_name = f'`{table_name}`' + else: full_table_name = f'`{self.database}`.`{table_name}`' duration = 0.0 for attempt in range(ClickhouseApi.MAX_RETRIES): try: t1 = time.time() - self.client.insert(table=full_table_name, data=records_to_insert) + self.client.insert(table=full_table_name, data=records) t2 = time.time() duration += (t2 - t1) break @@ -232,7 +241,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None): table_name=table_name, duration=duration, is_insert=True, - records=len(records_to_insert), + records=len(records), ) self.set_last_used_version(table_name, current_version) diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 9a1ac92..ccd9394 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -6,6 +6,7 @@ from enum import Enum from dataclasses import dataclass from collections import defaultdict +from datetime import date from .config import Settings, MysqlSettings, ClickhouseSettings from .mysql_api import MySQLApi @@ -268,6 +269,30 @@ def perform_initial_replication(self): self.clickhouse_api.database = self.target_database logger.info(f'initial replication - done') + def to_date_if_str(self, value): + if not isinstance(value, str): + return value + + if len(value) == 10 and value[4] == '-' and value[7] == '-': + try: + year = int(value[0:4]) + month = int(value[5:7]) + day = int(value[8:10]) + return date(year, month, day) + except ValueError: + return value + + if len(value) == 12 and value[5] == '-' and value[8] == '-' and ((value[0] == '\'' and value[11] == '\'') or (value[0] == '"' and value[11] == '"')): + try: + year = int(value[1:5]) + month = int(value[6:8]) + day = int(value[9:11]) + return date(year, month, day) + except ValueError: + return value + + return value + def perform_initial_replication_table(self, table_name): logger.info(f'running initial replication for table {table_name}') @@ -329,13 +354,11 @@ def perform_initial_replication_table(self, table_name): if not records: break + self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure) - for record in records: - record_primary_key = [record[key_idx] for key_idx in primary_key_ids] - if max_primary_key is None: - max_primary_key = record_primary_key - else: - max_primary_key = max(max_primary_key, record_primary_key) + + last_record = records[-1] + max_primary_key = [self.to_date_if_str(last_record[key_idx]) for key_idx in primary_key_ids] self.state.initial_replication_max_primary_key = max_primary_key self.save_state_if_required() diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index b8b25c3..84bcd6e 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -95,12 +95,30 @@ def get_table_create_statement(self, table_name) -> str: def get_records(self, table_name, order_by, limit, start_value=None): self.reconnect_if_required() - order_by = ','.join(order_by) + order_by_str = ','.join(order_by) where = '' + if start_value is not None: - start_value = ','.join(map(str, start_value)) - where = f'WHERE ({order_by}) > ({start_value}) ' - query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by} LIMIT {limit}' + or_clauses = [] + + for i in range(len(order_by)): + eq_parts = [] + + for j in range(i): + eq_parts.append(f"{order_by[j]} = {start_value[j]}") + + gt_part = f"{order_by[i]} > {start_value[i]}" + + if eq_parts: + clause = f"({' AND '.join(eq_parts)} AND {gt_part})" + else: + clause = f"({gt_part})" + + or_clauses.append(clause) + where = f"WHERE {' OR '.join(or_clauses)} " + + query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' + self.cursor.execute(query) res = self.cursor.fetchall() records = [x for x in res]