diff --git a/db/clickhouse.py b/db/clickhouse.py index 18720b4..95de5c6 100644 --- a/db/clickhouse.py +++ b/db/clickhouse.py @@ -114,6 +114,14 @@ def create_table(self, table_name: str, fields: List[Tuple[str, str]], ) self._query_clickhouse(q) + def rename_table(self, from_table_name: str, to_table_name: str): + q = ''' + RENAME TABLE {db}.{from_table} TO {db}.{to_table} + '''.format(db=self.db_name, + from_table=from_table_name, + to_table=to_table_name) + self._query_clickhouse(q) + def create_merge_table(self, table_name: str, fields: List[Tuple[str, str]], merge_re: str): diff --git a/db/db.py b/db/db.py index 0e824ec..f39ba82 100644 --- a/db/db.py +++ b/db/db.py @@ -51,6 +51,10 @@ def create_table(self, table_name: str, fields: List[Tuple[str, str]], primary_key_fields: List[str]): pass + @abstractmethod + def rename_table(self, from_table_name: str, to_table_name: str): + pass + @abstractmethod def create_merge_table(self, table_name: str, fields: List[Tuple[str, str]], diff --git a/fields/collection.py b/fields/collection.py index 098fe1c..d502a3b 100644 --- a/fields/collection.py +++ b/fields/collection.py @@ -39,8 +39,8 @@ def __init__(self, source: Source): class DbTableDefinition(object): - def __init__(self, source: Source): - self.table_name = source.db_name + def __init__(self, source: Source, name=None): + self.table_name = source.db_name if name is None else name self.primary_keys = [] self.column_types = dict() self.field_types = dict() diff --git a/updater/db_controller.py b/updater/db_controller.py index cb00fb2..725dbe7 100644 --- a/updater/db_controller.py +++ b/updater/db_controller.py @@ -16,6 +16,7 @@ from db import Database from fields import DbTableDefinition +from copy import copy logger = logging.getLogger(__name__) @@ -36,6 +37,7 @@ class DbController(object): ARCHIVE_SUFFIX = 'old' ALL_SUFFIX = 'all' LATEST_SUFFIX = 'latest' + TEMP_PREFIX = 'temp' def __init__(self, db: Database, definition: DbTableDefinition): self._db = db @@ -131,6 +133,17 @@ def recreate_table(self, table_suffix: str): self._db.drop_table(table_name) self._create_table(table_name) + def create_temp_table_controller(self): + temp_name = "{}__{}".format(self.TEMP_PREFIX, self.table_name) + temp_def = copy(self._definition) + temp_def.table_name = temp_name + return DbController(self._db, temp_def) + + def replace_with(self, table_suffix: str, source): + table_name = self.table_name(table_suffix) + self._db.drop_table(table_name) + self._db.rename_table(source.table_name(table_suffix), table_name) + def ensure_table_created(self, table_suffix: str): table_name = self.table_name(table_suffix) self._ensure_table_created(table_name) diff --git a/updater/updater.py b/updater/updater.py index 0233830..99a71b0 100644 --- a/updater/updater.py +++ b/updater/updater.py @@ -80,7 +80,8 @@ def _try_update(self, app_id: str, since: datetime, until: datetime, db_controller: DbController, processing_definition: ProcessingDefinition, loading_definition: LoadingDefinition): - db_controller.recreate_table(table_suffix) + temp_table_controller = db_controller.create_temp_table_controller() + temp_table_controller.recreate_table(table_suffix) df_it = self._load(app_id, loading_definition, since, until, LogsApiClient.DATE_DIMENSION_CREATE, parts_count) @@ -88,7 +89,8 @@ def _try_update(self, app_id: str, since: datetime, until: datetime, logger.debug("Start processing data chunk") upload_df = self._process_data(app_id, df, processing_definition) - db_controller.insert_data(upload_df, table_suffix) + temp_table_controller.insert_data(upload_df, table_suffix) + db_controller.replace_with(table_suffix, temp_table_controller) def update(self, app_id: str, date: Optional[datetime.date], table_suffix: str, db_controller: DbController,