Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Improve DBCreator (#66, #84, #109) (#110)
Browse files Browse the repository at this point in the history
* Implement fluent interface; fix mypy issues; rename set_up, tear_down as connect, close; update uses; set up default append tables

* Remove metadata tables from env_blank.json

* Remove page_num condition

* Update APPEND_TABLE_NAMES

* Remove close, connect; update var names; make other tweaks

* Update DBCreator references

* Remove conn instance variable

* Remove page_num condition again

* Remove unused imports

* Update default append table names var name
  • Loading branch information
ssciolla authored Apr 28, 2020
1 parent 073ff7e commit 6fe2b94
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 38 deletions.
1 change: 0 additions & 1 deletion config/env_blank.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"password": ""
},
"APPEND_TABLE_NAMES": [
"job_run", "data_source_status",
"mivideo_media_started_hourly"
]
}
2 changes: 0 additions & 2 deletions course_inventory/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ def run_course_inventory() -> Sequence[Dict[str, Union[ValidDataSourceName, pd.T
# Empty tables (if any) in database, then migrate
logger.info('Emptying tables in DB')
db_creator_obj = DBCreator(INVENTORY_DB, APPEND_TABLE_NAMES)
db_creator_obj.set_up()
db_creator_obj.drop_records()
db_creator_obj.tear_down()

# Insert gathered data
logger.info(f'Inserting {num_course_records} course records to DB')
Expand Down
2 changes: 1 addition & 1 deletion create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logger = logging.getLogger(__name__)

DB_PARAMS = ENV['INVENTORY_DB']
APPEND_TABLE_NAMES = ENV.get('APPEND_TABLE_NAMES', ['job_run'])
APPEND_TABLE_NAMES = ENV.get('APPEND_TABLE_NAMES', [])


# Main Program
Expand Down
59 changes: 31 additions & 28 deletions db/db_creator.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,73 @@
# This is needed for type hinting with fluent interfaces
from __future__ import annotations

# standard libraries
import logging, os
from typing import Dict, Sequence
from typing import Dict, List
from urllib.parse import quote_plus

# third-party libraries
from sqlalchemy.engine import create_engine
from sqlalchemy.engine import create_engine, Engine
from yoyo import get_backend, read_migrations


# Initialize settings and global variables

logger = logging.getLogger(__name__)

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
MIGRATIONS_PATH = os.path.join(ROOT_DIR, 'migrations')
PARENT_PATH = os.path.dirname(os.path.abspath(__file__))
MIGRATIONS_PATH = os.path.join(PARENT_PATH, 'migrations')

# The metadata tables are append tables by default.
DEFAULT_APPEND_TABLE_NAMES = ['job_run', 'data_source_status']


class DBCreator:

def __init__(
self,
db_params: Dict[str, str],
append_table_names: Sequence[str] = []
append_table_names: List[str] = []
) -> None:

self.db_name = db_params['dbname']
self.conn = None
self.conn_str = (
self.db_name: str = db_params['dbname']
self.conn_str: str = (
'mysql+mysqldb' +
f"://{db_params['user']}" +
f":{quote_plus(db_params['password'])}" +
f"@{db_params['host']}" +
f":{db_params['port']}" +
f"/{db_params['dbname']}?charset=utf8&ssl=true"
)
self.engine = create_engine(self.conn_str)
self.append_table_names = append_table_names

def set_up(self) -> None:
logger.debug('set_up')
self.conn = self.engine.connect()
self.engine: Engine = create_engine(self.conn_str)

def tear_down(self) -> None:
logger.debug('tear_down')
self.conn.close()
self.append_table_names: List[str] = append_table_names
self.append_table_names += DEFAULT_APPEND_TABLE_NAMES

def get_table_names(self) -> Sequence[str]:
def get_table_names(self) -> List[str]:
logger.debug('get_table_names')
return self.engine.table_names()

def migrate(self) -> None:
def migrate(self) -> DBCreator:
logger.debug('migrate')
backend = get_backend(self.conn_str)
migrations = read_migrations(MIGRATIONS_PATH)
with backend.lock():
backend.apply_migrations(backend.to_apply(migrations))
return self

def drop_records(self) -> None:
def drop_records(self) -> DBCreator:
logger.debug('drop_records')
self.conn.execute('SET FOREIGN_KEY_CHECKS=0;')
conn = self.engine.connect()
conn.execute('SET FOREIGN_KEY_CHECKS=0;')
for table_name in self.get_table_names():
if 'yoyo' not in table_name and table_name not in self.append_table_names:
logger.debug(f'Table Name: {table_name}')
self.conn.execute(f'DELETE FROM {table_name};')
conn.execute(f'DELETE FROM {table_name};')
logger.info(f'Dropped records in {table_name} in {self.db_name}')
self.conn.execute('SET FOREIGN_KEY_CHECKS=1;')
conn.execute('SET FOREIGN_KEY_CHECKS=1;')
return self

def set_up_database(self) -> None:
self.set_up()
self.drop_records()
self.migrate()
self.tear_down()
def set_up_database(self) -> DBCreator:
self.drop_records().migrate()
return self
7 changes: 3 additions & 4 deletions mivideo/mivideo_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ def __init__(self):
logger.info(f'Connected to BigQuery project: "{self.udpDb.project}"')

dbParams: Dict = ENV['INVENTORY_DB']
appendTableNames: Sequence[str] = ENV.get('APPEND_TABLE_NAMES', [
'job_run', 'data_source_status', 'mivideo_media_started_hourly',
'mivideo_media_creation'])
appendTableNames: Sequence[str] = ENV.get(
'APPEND_TABLE_NAMES', ['mivideo_media_started_hourly']
)

self.appDb: DBCreator = DBCreator(dbParams, appendTableNames)
self.appDb.set_up()

def _readTableLastTime(self, tableName: str, tableColumnName: str) -> Union[datetime, None]:
lastTime: Union[datetime, None]
Expand Down
4 changes: 2 additions & 2 deletions run_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def run_jobs(self) -> None:
num_loops = 40
for i in range(num_loops + 1):
try:
db_creator_obj.set_up()
db_creator_obj.tear_down()
conn = db_creator_obj.engine.connect()
conn.close()
logger.info('MySQL caught up')
break
except sqlalchemy.exc.OperationalError:
Expand Down

0 comments on commit 6fe2b94

Please sign in to comment.