Skip to content

Commit

Permalink
24051 Tombstone pipeline (first cut) (bcgov#3077)
Browse files Browse the repository at this point in the history
* 24051 - tombstone pipeline (first cut)

Signed-off-by: Hongjing Chen <[email protected]>

* update extract script

Signed-off-by: Hongjing Chen <[email protected]>

* update makefile

Signed-off-by: Hongjing Chen <[email protected]>

* update corp_type_cd in query

Signed-off-by: Hongjing Chen <[email protected]>

* add todo

Signed-off-by: Hongjing Chen <[email protected]>

* seperate batch configs for delete & tombstone flow

Signed-off-by: Hongjing Chen <[email protected]>

---------

Signed-off-by: Hongjing Chen <[email protected]>
  • Loading branch information
chenhongjing authored Nov 18, 2024
1 parent 5260ebb commit cf2fc72
Show file tree
Hide file tree
Showing 11 changed files with 1,114 additions and 67 deletions.
7 changes: 5 additions & 2 deletions data-tool/.corps.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ DATABASE_HOST_AUTH=
DATABASE_PORT_AUTH=

## batch configs, must be positive integer
BATCHES=1
BATCH_SIZE=300
TOMBSTONE_BATCHES=1
TOMBSTONE_BATCH_SIZE=300

DELETE_BATCHES=1
DELETE_BATCH_SIZE=300

## delete corps record in auth db, corp_processing of colin extract
DELETE_AUTH_RECORDS=False
Expand Down
4 changes: 4 additions & 0 deletions data-tool/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ run-corps-delete: ## Run delete flow
. $(VENV_DIR)/bin/activate && \
python flows/batch_delete_flow.py

run-tombstone-migration: ## Run corp tombstone migration flow
. $(VENV_DIR)/bin/activate && \
python flows/corps_tombstone_flow.py


#################################################################################
# Self Documenting Commands #
Expand Down
69 changes: 10 additions & 59 deletions data-tool/flows/batch_delete_flow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import math
from contextlib import contextmanager

from config import get_named_config
from common.init_utils import auth_init, colin_init, get_config, lear_init
from prefect import flow, task
from prefect.futures import wait
from sqlalchemy import Connection, create_engine, exc, text
from sqlalchemy import Connection, text
from sqlalchemy.engine import Engine


businesses_cnt_query = """
SELECT COUNT(*) FROM businesses
WHERE legal_type IN ('BC', 'ULC', 'CC')
Expand Down Expand Up @@ -36,59 +35,11 @@ def replica_role(conn: Connection):
conn.execute(text("SET session_replication_role = 'origin';"))


@task
def get_config():
config = get_named_config()
return config

@task
def check_db_connection(db_engine: Engine):
with db_engine.connect() as conn:
res = conn.execute(text('SELECT current_database()')).scalar()
if not res:
raise ValueError("Failed to retrieve the current database name.")
print(f'✅ Connected to database: {res}')



@task
def colin_init(config):
try:
engine = create_engine(config.SQLALCHEMY_DATABASE_URI_COLIN_MIGR)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for COLIN DB') from e


@task
def lear_init(config):
try:
engine = create_engine(
config.SQLALCHEMY_DATABASE_URI,
**config.SQLALCHEMY_ENGINE_OPTIONS
)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for LEAR DB') from e


@task
def auth_init(config):
try:
engine = create_engine(config.SQLALCHEMY_DATABASE_URI_AUTH)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for AUTH DB') from e


@task
def get_selected_corps(db_engine: Engine, config):
with db_engine.connect() as conn:
results = conn.execute(text(identifiers_query), {
'batch_size': config.BATCH_SIZE,
'batch_size': config.DELETE_BATCH_SIZE,
'corp_name_suffix': config.CORP_NAME_SUFFIX
})

Expand Down Expand Up @@ -394,7 +345,7 @@ def delete_by_ids(conn: Connection, table_name: str, ids: list, id_name: str = '
def count_corp_num(engine: Engine, config):
with engine.connect() as conn:
res = conn.execute(text(businesses_cnt_query), {
'batch_size': config.BATCH_SIZE,
'batch_size': config.DELETE_BATCH_SIZE,
'corp_name_suffix': config.CORP_NAME_SUFFIX
}).scalar()
return res
Expand All @@ -413,12 +364,12 @@ def batch_delete_flow():
total = count_corp_num(lear_engine, config)

# validate batch config
if config.BATCHES <= 0:
raise ValueError('BATCHES must be explicitly set to a positive integer')
if config.BATCH_SIZE <= 0:
raise ValueError('BATCH_SIZE must be explicitly set to a positive integer')
batch_size = config.BATCH_SIZE
batches = min(math.ceil(total/batch_size), config.BATCHES)
if config.DELETE_BATCHES <= 0:
raise ValueError('DELETE_BATCHES must be explicitly set to a positive integer')
if config.DELETE_BATCH_SIZE <= 0:
raise ValueError('DELETE_BATCH_SIZE must be explicitly set to a positive integer')
batch_size = config.DELETE_BATCH_SIZE
batches = min(math.ceil(total/batch_size), config.DELETE_BATCHES)

print(f'👷 Going to delete {total} businesses with batch size of {batch_size}...')
print(f"👷 Auth delete {'enabled' if config.DELETE_AUTH_RECORDS else 'disabled'}.")
Expand Down
52 changes: 52 additions & 0 deletions data-tool/flows/common/init_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from config import get_named_config
from prefect import task
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine


@task
def get_config():
config = get_named_config()
return config


@task
def check_db_connection(db_engine: Engine):
with db_engine.connect() as conn:
res = conn.execute(text('SELECT current_database()')).scalar()
if not res:
raise ValueError("Failed to retrieve the current database name.")
print(f'✅ Connected to database: {res}')


@task
def colin_init(config):
try:
engine = create_engine(config.SQLALCHEMY_DATABASE_URI_COLIN_MIGR)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for COLIN DB') from e


@task
def lear_init(config):
try:
engine = create_engine(
config.SQLALCHEMY_DATABASE_URI,
**config.SQLALCHEMY_ENGINE_OPTIONS
)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for LEAR DB') from e


@task
def auth_init(config):
try:
engine = create_engine(config.SQLALCHEMY_DATABASE_URI_AUTH)
check_db_connection(engine)
return engine
except Exception as e:
raise Exception('Failed to create engine for AUTH DB') from e
14 changes: 10 additions & 4 deletions data-tool/flows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,20 @@ class _Config(): # pylint: disable=too-few-public-methods
ACCOUNT_SVC_TIMEOUT = os.getenv('ACCOUNT_SVC_TIMEOUT')

# batch delete flow
BATCHES = os.getenv('BATCHES')
BATCHES = int(BATCHES) if BATCHES.isnumeric() else 0
BATCH_SIZE = os.getenv('BATCH_SIZE')
BATCH_SIZE = int(BATCH_SIZE) if BATCH_SIZE.isnumeric() else 0
DELETE_BATCHES = os.getenv('DELETE_BATCHES')
DELETE_BATCHES = int(DELETE_BATCHES) if DELETE_BATCHES.isnumeric() else 0
DELETE_BATCH_SIZE = os.getenv('DELETE_BATCH_SIZE')
DELETE_BATCH_SIZE = int(DELETE_BATCH_SIZE) if DELETE_BATCH_SIZE.isnumeric() else 0

DELETE_AUTH_RECORDS = os.getenv('DELETE_AUTH_RECORDS').lower() == 'true'
DELETE_CORP_PROCESSING_RECORDS = os.getenv('DELETE_CORP_PROCESSING_RECORDS').lower() == 'true'

# tombstone flow
TOMBSTONE_BATCHES = os.getenv('TOMBSTONE_BATCHES')
TOMBSTONE_BATCHES = int(TOMBSTONE_BATCHES) if TOMBSTONE_BATCHES.isnumeric() else 0
TOMBSTONE_BATCH_SIZE = os.getenv('TOMBSTONE_BATCH_SIZE')
TOMBSTONE_BATCH_SIZE = int(TOMBSTONE_BATCH_SIZE) if TOMBSTONE_BATCH_SIZE.isnumeric() else 0

TESTING = False
DEBUG = False

Expand Down
Loading

0 comments on commit cf2fc72

Please sign in to comment.