Skip to content

Commit

Permalink
Range update do not delete old range at the beginning
Browse files Browse the repository at this point in the history
Old range deleted only after successful downloading updated data
  • Loading branch information
crazyproger committed Jun 28, 2018
1 parent c59fab2 commit 98e3229
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 4 deletions.
8 changes: 8 additions & 0 deletions db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ def create_table(self, table_name: str, fields: List[Tuple[str, str]],
)
self._query_clickhouse(q)

def rename_table(self, from_table_name: str, to_table_name: str):
q = '''
RENAME TABLE {db}.{from_table} TO {db}.{to_table}
'''.format(db=self.db_name,
from_table=from_table_name,
to_table=to_table_name)
self._query_clickhouse(q)

def create_merge_table(self, table_name: str,
fields: List[Tuple[str, str]],
merge_re: str):
Expand Down
4 changes: 4 additions & 0 deletions db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ def create_table(self, table_name: str, fields: List[Tuple[str, str]],
primary_key_fields: List[str]):
pass

@abstractmethod
def rename_table(self, from_table_name: str, to_table_name: str):
pass

@abstractmethod
def create_merge_table(self, table_name: str,
fields: List[Tuple[str, str]],
Expand Down
4 changes: 2 additions & 2 deletions fields/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __init__(self, source: Source):


class DbTableDefinition(object):
def __init__(self, source: Source):
self.table_name = source.db_name
def __init__(self, source: Source, name=None):
self.table_name = source.db_name if name is None else name
self.primary_keys = []
self.column_types = dict()
self.field_types = dict()
Expand Down
13 changes: 13 additions & 0 deletions updater/db_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from db import Database
from fields import DbTableDefinition
from copy import copy

logger = logging.getLogger(__name__)

Expand All @@ -36,6 +37,7 @@ class DbController(object):
ARCHIVE_SUFFIX = 'old'
ALL_SUFFIX = 'all'
LATEST_SUFFIX = 'latest'
TEMP_PREFIX = 'temp'

def __init__(self, db: Database, definition: DbTableDefinition):
self._db = db
Expand Down Expand Up @@ -131,6 +133,17 @@ def recreate_table(self, table_suffix: str):
self._db.drop_table(table_name)
self._create_table(table_name)

def create_temp_table_controller(self):
temp_name = "{}__{}".format(self.TEMP_PREFIX, self._definition.table_name)
temp_def = copy(self._definition)
temp_def.table_name = temp_name
return DbController(self._db, temp_def)

def replace_with(self, table_suffix: str, source):
table_name = self.table_name(table_suffix)
self._db.drop_table(table_name)
self._db.rename_table(source.table_name(table_suffix), table_name)

def ensure_table_created(self, table_suffix: str):
table_name = self.table_name(table_suffix)
self._ensure_table_created(table_name)
Expand Down
7 changes: 5 additions & 2 deletions updater/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,18 @@ def _try_update(self, app_id: str, since: datetime, until: datetime,
db_controller: DbController,
processing_definition: ProcessingDefinition,
loading_definition: LoadingDefinition):
db_controller.recreate_table(table_suffix)
temp_table_controller = db_controller.create_temp_table_controller()
temp_table_controller.recreate_table(table_suffix)

df_it = self._load(app_id, loading_definition, since, until,
LogsApiClient.DATE_DIMENSION_CREATE, parts_count)
for df in df_it:
logger.debug("Start processing data chunk")
upload_df = self._process_data(app_id, df,
processing_definition)
db_controller.insert_data(upload_df, table_suffix)
temp_table_controller.insert_data(upload_df, table_suffix)

db_controller.replace_with(table_suffix, temp_table_controller)

def update(self, app_id: str, date: Optional[datetime.date],
table_suffix: str, db_controller: DbController,
Expand Down

0 comments on commit 98e3229

Please sign in to comment.