Skip to content

Commit

Permalink
feat: deleteinserts (#1326)
Browse files Browse the repository at this point in the history
Co-authored-by: Derek Roberts <[email protected]>
  • Loading branch information
ronrobb and DerekRoberts authored Jul 3, 2024
1 parent 0e3d4f1 commit 69f57d2
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 657 deletions.
2 changes: 1 addition & 1 deletion sync/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN pip install -r ./requirements.txt --trusted-host pypi.org --trusted-host pyp
# Start the app
WORKDIR /app/src
USER nonroot
CMD python3 main.py
CMD python3 main2.py
5 changes: 2 additions & 3 deletions sync/config/SQL/SPAR/POSTGRES_SEEDLOT_EXTRACT.sql
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,5 @@ LEFT OUTER JOIN (SELECT DISTINCT
FROM spar.seedlot_orchard so
WHERE so.primary_ind = 'N') sec
ON sec.seedlot_number = s.seedlot_number
FROM spar.seedlot s
WHERE
update_timestamp between %(start_time)s AND % ( end_time ) s order by seedlot_number desc
WHERE s.seedlot_number = %(p_seedlot_number)s
order by seedlot_number desc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ SELECT
, update_timestamp
, revision_count
FROM spar.seedlot_genetic_worth sgw
WHERE sgw.seedlot_number IN
(SELECT soqin.seedlot_number
FROM spar.seedlot_owner_quantity soqin
WHERE soqin.update_timestamp between %(start_time)s AND %(end_time)s )
WHERE sgw.seedlot_number = %(p_seedlot_number)s
ORDER BY seedlot_number
, genetic_worth_code
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ SELECT
, spar_fund_srce_code
, revision_count
FROM spar.seedlot_owner_quantity soq
WHERE soq.seedlot_number IN
(SELECT soqin.seedlot_number
FROM spar.seedlot_owner_quantity soqin
WHERE soqin.update_timestamp between %(start_time)s AND %(end_time)s )
FROM spar.seedlot_owner_quantity
WHERE
update_timestamp between %(start_time)s AND %(end_time)s
WHERE soq.seedlot_number = %(p_seedlot_number)s
ORDER BY seedlot_number
, owner_client_number
, owner_locn_code
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ SELECT
, spt.total_genetic_worth_contrib
, spt.revision_count
FROM spar.seedlot_parent_tree spt
WHERE spt.seedlot_number IN
(SELECT sptin.seedlot_number
FROM spar.seedlot_parent_tree sptin
WHERE sptin.update_timestamp between %(start_time)s AND %(end_time)s )
WHERE spt.seedlot_number = %(p_seedlot_number)s
ORDER BY spt.seedlot_number
, spt.parent_tree_id
23 changes: 23 additions & 0 deletions sync/config/SQL/SPAR/POSTGRES_SEEDLOT_PARENT_TREE_GEN_QLTY.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
SELECT
soqgq.seedlot_number
, soqgq.parent_tree_id
, soqgq.genetic_type_code
, soqgq.genetic_worth_code
, soqgq.genetic_quality_value
, soqgq.estimated_ind
, soqgq.untested_ind
, aos.seed_plan_unit_id
, soqgq.revision_count
FROM spar.seedlot_parent_tree_gen_qlty soqgq
LEFT
OUTER
JOIN spar.seedlot_orchard so
ON so.seedlot_number = soqgq.seedlot_number
AND so.primary_ind = true
LEFT
OUTER
JOIN spar.active_orchard_spu aos
ON aos.orchard_id = so.orchard_id
WHERE soqgq.seedlot_number = %(p_seedlot_number)s
ORDER BY soqgq.seedlot_number
, soqgq.parent_tree_id
11 changes: 11 additions & 0 deletions sync/config/SQL/SPAR/POSTGRES_SEEDLOT_PARENT_TREE_SMP_MIX.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT
seedlot_number
, parent_tree_id
, genetic_type_code
, genetic_worth_code
, genetic_quality_value smp_mix_value
, revision_count
FROM spar.seedlot_parent_tree_smp_mix
WHERE seedlot_number = %(p_seedlot_number)s
ORDER BY seedlot_number
, parent_tree_id
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ SELECT
, slpz.entry_timestamp
, slpz.revision_count
FROM spar.seedlot_seed_plan_zone slpz
WHERE slpz.seedlot_number IN
(SELECT spzin.seedlot_number
FROM spar.seedlot_seed_plan_zone spzin
WHERE spzin.update_timestamp between %(start_time)s AND %(end_time)s )
WHERE slpz.seedlot_number = %(p_seedlot_number)s
ORDER BY slpz.seedlot_number
, slpz.primary_ind
5 changes: 1 addition & 4 deletions sync/config/SQL/SPAR/POSTGRES_SMP_MIX_EXTRACT.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ SELECT
, smp.amount_of_material
, smp.revision_count
FROM spar.smp_mix smp
WHERE smp.seedlot_number IN
(SELECT smpin.seedlot_number
FROM spar.smp_mix smpin
WHERE smpin.update_timestamp between %(start_time)s AND %(end_time)s )
WHERE smp.seedlot_number = %(p_seedlot_number)s
ORDER BY smp.seedlot_number
, smp.parent_tree_id
5 changes: 1 addition & 4 deletions sync/config/SQL/SPAR/POSTGRES_SMP_MIX_GEN_QLTY_EXTRACT.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ SELECT
, smpgq.estimated_ind
, smpgq.revision_count
FROM spar.smp_mix_gen_qlty smpgq
WHERE smpgq.seedlot_number IN
(SELECT smpgqin.seedlot_number
FROM spar.smp_mix_gen_qlty smpgqin
WHERE smpgqin.update_timestamp between %(start_time)s AND %(end_time)s )
WHERE smpgq.seedlot_number = %(p_seedlot_number)s
ORDER BY smpgq.seedlot_number
, smpgq.parent_tree_id
152 changes: 152 additions & 0 deletions sync/src/main2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import json
import module.data_synchronization as data_sync
from logging import config as logging_config, basicConfig as loggingBasicConfig, DEBUG as loggingDEBUG, INFO as loggingINFO
import os
import sys
import yaml

def env_var_is_filled(variable):
if os.environ.get(variable) is None:
print("Error: "+variable+" environment variable is None")
return False
return True

def generate_db_config(type_,schema_,settings):
dbconfig = {}
ssl_required = settings["ssl_required"]
version_column = settings["version_column"]
max_rows_upsert = settings["max_rows_upsert"]
if type_ == "ORACLE":
dbconfig = {
"type": "ORACLE",
"username": os.environ.get("ORACLE_USER"),
"password": os.environ.get("ORACLE_PASSWORD"),
"host": os.environ.get("ORACLE_HOST"),
"port": os.environ.get("ORACLE_PORT"),
"service_name": os.environ.get("ORACLE_SERVICE"),
"schema": schema_,
"test_query": "SELECT 'SUCCESS' a FROM DUAL",
"ssl_required": ssl_required,
"version_column": version_column,
"max_rows_upsert": max_rows_upsert
}
if type_ == "POSTGRES":
dbconfig = {
"type": "POSTGRES",
"username": os.environ.get("POSTGRES_USER"),
"password": os.environ.get("POSTGRES_PASSWORD"),
"host": os.environ.get("POSTGRES_HOST"),
"database": os.environ.get("POSTGRES_DATABASE"),
"port": os.environ.get("POSTGRES_PORT"),
"schema": schema_,
"test_query": "SELECT 'SUCCESS' a",
"ssl_required": ssl_required,
"version_column": version_column,
"max_rows_upsert": max_rows_upsert
}
return dbconfig

def required_variables_exists():
ret = True

print("-------------------------------------")
print("----- ETL Tool: Unit test Execution ")
print("----- 1. Checking if required variables are defined")
print("-------------------------------------")

if not env_var_is_filled("TEST_MODE") or \
not env_var_is_filled("EXECUTION_ID") or \
not env_var_is_filled("POSTGRES_HOST") or \
not env_var_is_filled("POSTGRES_PORT") or \
not env_var_is_filled("POSTGRES_USER") or \
not env_var_is_filled("POSTGRES_PASSWORD") or \
not env_var_is_filled("POSTGRES_DATABASE") or \
not env_var_is_filled("ORACLE_PORT") or \
not env_var_is_filled("ORACLE_HOST") or \
not env_var_is_filled("ORACLE_SERVICE") or \
not env_var_is_filled("ORACLE_USER") or \
not env_var_is_filled("ORACLE_PASSWORD"):
ret = False

if ret:
print("Required variable tests passed!")
else:
raise Exception("Not all required variables to execute a instance of Data Sync Engine exists.")

def testOracleConnection(settings):
print("-------------------------------------")
print("-- 3. Checking if Oracle connection is available and reachable")
print("-------------------------------------")
from module.test_db_connection import test_db_connection
dbConfig = generate_db_config("ORACLE","THE",settings)
d = test_db_connection.do_test(dbConfig)
print(d)

def testPostgresConnection(settings):
print("-------------------------------------")
print("-- 2. Checking if Postgres connection is available and reachable")
print("-------------------------------------")
from module.test_db_connection import test_db_connection
dbConfig = generate_db_config("POSTGRES","spar",settings)
d = test_db_connection.do_test(dbConfig)
print(d)

def read_settings():
file = os.path.join(os.path.abspath(os.path.dirname(__file__)) , "settings.yml")
try:
with open(file, 'r') as stream:
data_loaded = yaml.safe_load(stream)
if data_loaded["postgres"]["max_rows_upsert"] or data_loaded["postgres"]["version_column"] or \
data_loaded["oracle"]["max_rows_upsert"] or data_loaded["oracle"]["version_column"] or \
data_loaded["postgres"]["ssl_version"] or data_loaded["oracle"]["ssl_version"]:
return data_loaded
except FileNotFoundError:
print("Error: settings.yml not found")
except KeyError:
print("Error: settings.yml is not well formated or does not have required settings")
except Exception as err:
print(f"A fatal error has occurred when trying to load settings.yml ({type(err)}): {err}")


def main() -> None:
definition_of_yes = ["Y","YES","1","T","TRUE"]
# print(os.environ.get("TEST_MODE"))
if os.environ.get("TEST_MODE") is None:
print("Error: test mode variable is None")
elif os.environ.get("EXECUTION_ID") is None:
print("Error: EXECUTION_ID is None, no execution defined to be executed in this run.")
else:
this_is_a_test = os.environ.get("TEST_MODE")
settings = read_settings()
print("<------------------ settings ----------------->")
print(settings)
print("<------------------ settings ----------------->")
if this_is_a_test in definition_of_yes:
print("Executing in Test mode")
required_variables_exists()
testPostgresConnection(settings["postgres"])
testOracleConnection(settings["oracle"])
# Vault disabled
# testVault()
else:
print("-------------------------------------")
print("Starting ETL main process ")
print("-------------------------------------")

dbOracle = generate_db_config("ORACLE","THE",settings["oracle"])
dbPostgres = generate_db_config("POSTGRES","spar",settings["postgres"])

execute_etl(dbPostgres, dbOracle)

print("-------------------------------------")
print("ETL Main process finished ")
print("-------------------------------------")

# MAIN Execution
def execute_etl(dbPostgres, dbOracle) -> None:
loggingBasicConfig(level=loggingDEBUG, stream=sys.stdout)
data_sync.execute_instance( oracle_config = dbOracle, postgres_config = dbPostgres ,track_config = dbPostgres)

if __name__ == '__main__':
main()

49 changes: 7 additions & 42 deletions sync/src/module/data_sync_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_execution_map (track_db_conn: object,
select interface_id , execution_id, execution_order,
source_name, source_file, source_table,source_db_type,
target_name, target_file, target_table,target_primary_key,target_db_type,
truncate_before_run,retry_errors,
run_mode,upsert_with_delete_key,ignore_columns_on_update,retry_errors,
case when execution_parent_id is null then 'ORCHESTRATION' else 'PROCESS' end as process_type
from {database_schema}.etl_execution_map
where (execution_id = {execution_id} or execution_parent_id = {execution_id})
Expand All @@ -30,24 +30,16 @@ def get_execution_map (track_db_conn: object,
records = track_db_conn.select(select_sync_id_stm)
return records.mappings().all()

def get_scheduler(track_db_conn:object, database_schema:str, execution_id:int, interface_id:str) -> list:
#TODO fix dates to 1900-01-01
def get_scheduler(track_db_conn:object, database_schema:str) -> list:
#TODO fix query = where will we get this?
select_sync_id_stm = f"""
select es.last_run_ts as last_start_time,
es.current_run_ts as current_start_time,
CURRENT_TIMESTAMP as current_end_time
from {database_schema}.etl_execution_schedule es
where execution_id = {execution_id} and interface_id = '{interface_id}'
union all
select '2024-06-20'::timestamp as last_start_time,
'2024-06-22'::timestamp as current_start_time,
CURRENT_TIMESTAMP as current_end_time
where not exists(select 1 from {database_schema}.etl_execution_schedule es
where execution_id = {execution_id} and interface_id = '{interface_id}') """
where not exists(select 1 from {database_schema}.etl_execution_schedule es ) """
records = track_db_conn.select(select_sync_id_stm)
return records.mappings().all()


def get_log_hist_schedules_to_process(track_db_conn,database_schema,execution_id,interface_id) -> list:
select_sync_id_stm = f"""
select es.last_run_ts as last_start_time,
Expand All @@ -60,25 +52,6 @@ def get_log_hist_schedules_to_process(track_db_conn,database_schema,execution_id
records = track_db_conn.select(select_sync_id_stm)
return records.mappings().all()


def get_scheduler(track_db_conn:object, database_schema:str, execution_id:int, interface_id:str) -> list:
#TODO fix dates to 1900-01-01
select_sync_id_stm = f"""
select es.last_run_ts as last_start_time,
es.current_run_ts as current_start_time,
CURRENT_TIMESTAMP as current_end_time
from {database_schema}.etl_execution_schedule es
where execution_id = {execution_id} and interface_id = '{interface_id}'
union all
select '2024-06-20'::timestamp as last_start_time,
'2024-06-22'::timestamp as current_start_time,
CURRENT_TIMESTAMP as current_end_time
where not exists(select 1 from {database_schema}.etl_execution_schedule es
where execution_id = {execution_id} and interface_id = '{interface_id}') """
records = track_db_conn.select(select_sync_id_stm)
return records.mappings().all()


"""
Validate if the Execution map data is correct
Expand Down Expand Up @@ -120,12 +93,6 @@ def validate_execution_map (execution_map) -> bool:
if row["target_file"] == "":
print("[EXECUTION MAP ERROR] Target file does not exist in Interface Id " + row["interface_id"])
ret = False
if row["truncate_before_run"] and row["target_table"]=="":
print(f"[EXECUTION MAP ERROR] Target table is not filled for truncate statement in Interface Id {row['interface_id']}")
ret = False
if row["target_db_type"]=="ORACLE" and row["truncate_before_run"] :
print(f"[EXECUTION MAP ERROR] Truncate command is not allowed on Oracle as target database. Update this parameter in Interface Id {row['interface_id']}")
ret = False
return (ret and exist_process)

"""
Expand All @@ -149,9 +116,8 @@ def print_process(process):
print("--------------------------")
print(f"--Process Execution ID: ({process['interface_id']}):")
print(f"--Process Execution order: {process['execution_order']} ")
print(f"-- Retry_errors:{process['truncate_before_run']}." )
print(f"--Process Source: {process['source_name']} (table: {process['source_table']}, file: {process['source_file']})." )
print(f"--Process Target: {process['target_name']} (table: {process['target_table']}, PK: {process['target_primary_key']}).")
print(f"--Process Source: (table: {process['source_table']}, file: {process['source_file']})." )
print(f"--Process Target: (table: {process['target_table']}, PK: {process['target_primary_key']}).")
print("--------------------------")

def get_config(oracle_config, postgres_config, db_type):
Expand All @@ -163,7 +129,7 @@ def get_config(oracle_config, postgres_config, db_type):
return None


def include_process_log_info(stored_metrics, log_message,execution_status, retry):
def include_process_log_info(stored_metrics, log_message,execution_status):
process_log = {}
process_log["source_connect_timedelta"]=stored_metrics['time_conn_source']
process_log["source_extract_timedelta"]=stored_metrics['time_source_extract']
Expand All @@ -176,7 +142,6 @@ def include_process_log_info(stored_metrics, log_message,execution_status, retry
process_log["process_started_at"] =stored_metrics['record_start_time']
process_log["process_finished_at"] =stored_metrics['process_end_time']
process_log["process_timedelta"] =stored_metrics['process_delta']
process_log["retry_process"] =retry
return process_log


Expand Down
Loading

0 comments on commit 69f57d2

Please sign in to comment.